流的定义
流是抽象化的概念,形象生动的描述了数据的流动、变化。
具体来说,在node中流是处理数据的抽象接口,继承了EventEmitter,通过这个接口我们能够控制流的开关,流动的方向等等。比较形象直观一点类似我们在linux上使用shell,通过管道,链接处理各个部分,下面是我写的一个命令,筛选出version并导出到文件中。流的分类
- Readable(可读流)
- Writable(可写流)
- Duplex(可读可写的流)
- Transform(在读写过程中可以修改和变化的Duplex流)
流按照功能大致划分为以上四类,具体应用的话有很多场景,如下图所示(来源:参考链接2)
下面我根据流的分类,列举一些demo应用实例Readable
可读流能接受各种数据源,例如控制台的输入,文件,字符串等等,就如介绍中所说是抽象接口,可以面向各种形式的输入,下面举几个例子。
文件流
require('fs').createReadStream('./1.txt',{ encoding: 'utf8'}).on('data',(data) => { console.log(data)})// 输出 hello jsdt
说明 为什么要用流来读取,直接用fs.readFile岂不是更方便吗,因为readFile是整体操作,会将文件全部读到内存中在做处理,这样的话文件如果很大,程序就会很卡,甚至报错。
标准输入流
process.stdin.setEncoding('utf8');process.stdin.on('data',(data) => { console.log('输出: '+ data)})node 运行code,然后输入 hello jsdt输出: hello jsdt
说明 这个做acm的时候会用到,或者平时自己写一些交互式应用的时候
普通数据流
let {Readable} = require('stream')let util = require('util')class Test extends Readable{ constructor(){ super() this.dataSource = 5 } _read(){ if(this.dataSource-->0){ this.push(this.dataSource+''); }else{ this.push(null); } }}let counter = new Test();counter.on('data',function(data){ console.log(data.toString())});输出:4321
说明 重写_read方法,自定义输入的逻辑,上面示例中是自己逻辑中产生的一个数据源。
Writable
文件流
let dataSource = 'hello jsdt',i = 0;(function(){ let ws = require('fs').createWriteStream('./1.txt',{ encoding: 'utf8' }) let flag = true; while(flag && i
说明 闭包自执行,通过流将数据写入到文件中,上面是输出结果。
自定义输出
let {Writable} = require('stream')let arr = []let ws = Writable({ write(chunk,encoding,cb){ arr.push(chunk) cb() }})for(let i = 1; i<= 3;i++){ ws.write(''+i,'utf8',()=>{})}process.nextTick(function () { console.log(arr.toString())})// 输出 1,2,3
说明 上面重写了流的write方法,可以自定义写逻辑
Duplex
require('net').createServer(socket => { socket.on('data',data => { console.log('client message ' + data); socket.write("server message " + 'hello client '); })}).listen(8080,() =>{})
说明 作为可写流一面socket可以向客户端发送信息,做为可读流一面可以监听data事件,收到客户端发送过信息
Transform
let t = require('stream').Transform({ transform(chunk,encoding,cb){ this.push(chunk.toString().toUpperCase()); cb(); }});process.stdin.pipe(t).pipe(process.stdout);// 输入abc// 输出ABC
说明 上面使用转换流,实现了terminal上小写输入,对应大写输出的功能
流中数据分类
- 二进制模式
- 对象模式
在创建流的时候可以指定配置,objectMode默认为false,设为true切换到对象模式。二进制即buffer模式,可读或可写流都会将数据会缓存数据在buffer中。
流的剖析
通过上面的介绍我们明确了流的定义,并按照功能对流进行了分类,下面我进行下剖析,总的来说流的各种形态间转化传输底层都是二进制,具体到使用形态上有buffer,string等等。
首先详细说下可读流,可读流有两种模式,默认为paused模式。
- flowing 按照初始化配置,自动读取数据,并通过观察者模式,直接将数据提供给订阅者
- paused 显式调用流的read方法读取数据
其中如果我们想切换到流动模式可以通过监听data事件的方式、或者调用stream.resume()、stream.pipe() 这些方法。
可读流源码分析
// 可读流入口,根据配置返回一个可读流fs.createReadStream = function(path, options) { return new ReadStream(path, options);};// 实现原理是ReadStream.prototype.__proto__ = Readable.prototype,可以继承Readable上的一些方法util.inherits(ReadStream, Readable);fs.ReadStream = ReadStream;function ReadStream(path, options) { // 非new方式调用,直接返回一个实例 if (!(this instanceof ReadStream)) return new ReadStream(path, options); options = copyObject(getOptions(options, {})); if (options.highWaterMark === undefined) // highWaterMark默认值为64k,设置了flow模式下缓冲区的大小 options.highWaterMark = 64 * 1024; Readable.call(this, options); handleError((this.path = getPathFromURL(path))); // 文件描述符,根据这个句柄找到文件 this.fd = options.fd === undefined ? null : options.fd; // flags打开文件要做的操作,默认为'r' this.flags = options.flags === undefined ? 'r' : options.flags; // 用于设置文件模式(权限和粘结位),仅限创建文件时。 this.mode = options.mode === undefined ? 0o666 : options.mode; // 开始读取位置 this.start = options.start; // 结束读取位置(!!!包括结束位置) this.end = options.end; /** * 如果 autoClose 为 false,则文件描述符不会被关闭,即使有错误。 * 需要程序负责关闭它,并且确保没有文件描述符泄漏。 * 如果 autoClose 被设置为 true(默认),则在 error 或 end 时,文件描述符会被自动关闭 */ this.autoClose = options.autoClose === undefined ? true : options.autoClose; this.pos = this.start; }// 适合传入句柄的情况,例如fd: 0,这样就不是文件,而是控制台输入的数据了 if (typeof this.fd !== 'number') this.open(); this.on('end', function() { if (this.autoClose) { this.destroy(); } });}// 打开文件,并触发open事件,只有打开了才能读取,所以在回调中触发open事件,看下步操作ReadStream.prototype.open = function() { var self = this; fs.open(this.path, this.flags, this.mode, function(er, fd) { self.fd = fd; self.emit('open', fd); // start the flow of data. self.read(); });};Readable.prototype.read = function(n) { // 当read(0)时,如果缓存中已有数据,则触发readable事件,相当于刷新下缓存。否则触发end事件if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) { if (state.length === 0 && state.ended) endReadable(this); else emitReadable(this); return null; } // 若可读流已经被传入了终止符(null),且缓冲中没有遗留数据,则结束这个可读流 if (n === 0 && state.ended) { if (state.length === 0) endReadable(this); return null; } // 若目前缓冲中的数据大小为空,或未超过设置的警戒线,则进行一次数据读取。 if (state.length === 0 || state.length - n < state.highWaterMark) { doRead = true; } if (state.ended || state.reading) { doRead = false; } else if (doRead) { state.reading = true; state.sync = true; this._read(state.highWaterMark); }}ReadStream.prototype._read = function(n) { if (typeof this.fd !== 'number') { // 防止重复绑定open事件,当文件打开且emit open事件,此时才会进行真正的读操作 return this.once('open', function() { this._read(n); }); } // 然后读数据的时候会计算实际读的数量 function howMuchToRead(n, state) { // 如果读的数量超过highWaterMark,则重新计算highWaterMark if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n); if (n <= state.length) return n; } // 经过上面一系列的准备工作,下面开始真正的读操作咯fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { if (bytesRead > 0) { this.bytesRead += bytesRead; } this.push(b); });};// 上面整个过程是paused的流程,其中flow模式又有所不同,如下所示// 如果监听了data事件,则会调用this.resume(),开始流动模式Readable.prototype.on = function(ev, fn) { const res = Stream.prototype.on.call(this, ev, fn); if (ev === 'data') { // Start flowing on next tick if stream isn't explicitly paused if (this._readableState.flowing !== false) this.resume(); } }// flow模式下 流内部自动触发data事件,循环读取数据function flow(stream) { const state = stream._readableState; debug('flow', state.flowing); while (state.flowing && stream.read() !== null);}// 然后触发 data事件,循环发射数据stream.emit('data', chunk);
总结 上面是可读流的源码分析,摘要了关键部分,下面在梳理一下,当通过ReadStream创建一个流的时候,默认会触发readable事件,进入暂停模式,此时内部维护的有一个缓冲区,在readable事件回调逻辑中进行read操作,首先会通过howMuchToRead方法计算实际读取的数量,如果现有数据小于highWaterMark,内部会进行this._read(state.highWaterMark)操作,其回调中会进行push操作,push在调用readableAddChunk将数据放到内部维护的缓存中,反之则从fromList中读取缓存中的数据,然后返回。而如果监听了data事件,代码中所示会调用this.resume(),将流状态设置为flowing模式,然后resume()->resume_()->flow()的调用顺序执行flow方法循环读取数据,触发data事件,完成数据的自动读取,然后发射给调用者,会不停的循环整个过程。上面比较值的注意一点的就是flow模式和paused模式区别,如果是flow模式在addChunk的时候,如下所示
function addChunk(stream, state, chunk, addToFront) { if (state.flowing && state.length === 0 && !state.sync) { stream.emit('data', chunk); stream.read(0); } }
会自动发射数据,不会走缓存,而paused模式会走一遍内部的缓存机制。
根据上面node源码的分析过程,下面图形化描述下整个流程。自己实现的一个
可写流源码分析
// 1:首先第一步根据createWriteStream传入参数进行初始化// 2:调用写操作Writable.prototype.write = function(chunk, encoding, cb) { if (state.ended) //在end继续写入会emit一个error事件 writeAfterEnd(this, cb); else if (validChunk(this, state, chunk, cb)) { //在校验数据chunk合法的情况下才会进行后续的写逻辑 state.pendingcb++; ret = writeOrBuffer(this, state, chunk, encoding, cb); }return ret;};function writeOrBuffer(stream, state, chunk, encoding, cb) { chunk = decodeChunk(state, chunk, encoding); if (chunk instanceof Buffer) encoding = 'buffer'; var len = state.objectMode ? 1 : chunk.length; state.length += len;//实时更新缓冲区长度 var ret = state.length < state.highWaterMark;//判断缓存区是否超过水位线(highWaterMark,不传默认16k,源码_stream_writeable.js--40行)设置 if (!ret) state.needDrain = true; if (state.writing || state.corked) { //如果此时处于写状态,将新添加的数据放到缓冲池链表尾部 var last = state.lastBufferedRequest; state.lastBufferedRequest = new WriteReq(chunk, encoding, cb); if (last) { last.next = state.lastBufferedRequest; } else { state.bufferedRequest = state.lastBufferedRequest; } state.bufferedRequestCount += 1; } else { //写入数据 doWrite(stream, state, false, len, chunk, encoding, cb); }return ret;}function doWrite(stream, state, writev, len, chunk, encoding, cb) { if (writev) //一次写入多个数据块 stream._writev(chunk, state.onwrite); else //一次写入一个数据块 stream._write(chunk, encoding, state.onwrite); state.sync = false;}function onwrite(stream, er) { if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) { //清空缓冲池 ,不为空,则循环执行 _write() 写入单个数据块 clearBuffer(stream, state); } }}function clearBuffer(stream, state) { // 单个数据写入 while (entry) { var chunk = entry.chunk; var encoding = entry.encoding; var cb = entry.callback; var len = state.objectMode ? 1 : chunk.length; //开启数据写操作 doWrite(stream, state, false, len, chunk, encoding, cb); entry = entry.next; }}
总结 上面是可写流源码分析,摘要了关键流程,首先根据传入参数进行初始化配置,然后用户调用write方法进行写入,写入前会判断一下是否超过水位线,超过触发drain事件,返回false,注意一点此时仍可以进行写入,返回false只是告诉你,已经满了,后需要不要写入还是靠用户根据这个返回值来控制。如果没超过,在写之前会先判断是否处于写状态,是的话将数据放到缓存中,反之会进行doWrite <-->clearBuffer这样的循环操作,一直到数据缓存中数据消耗完为止。清理完了之后,后续调用write的返回值ret为false,从而继续写,一直循环前面描述的整个过程,直到数据源写完为止。总的来说,因为可写流内部只有一个状态,复杂度低于可读流,整个过程还是比较清晰的,不在图形化流程。
自己实现的一个
说明
node源码分析版本基于v8.9.4参考资料