rintStream的用法
馬克-to-win:從學java第一天,我們就經常用到System.out.println(),實際上查閱文檔可知,System.out就是Sun 編的一個PrintStream的實例對象。PrintStream顧名思義,Sun編它,就是用來打印的,以各種各樣的格式,打印各種各樣的數據,(boolean,char,double,float)。下面的例子就介紹了println(int x),print(String)和print(char c)的用法。馬克- to-win:馬克 java社區:防盜版實名手機尾號: 73203。
例:1.2.1
import java.io.*;
public class TestMark_to_win {
public static void main(String args[]) throws Exception {
byte inp[]=new byte[3];
inp[0]=97;inp[1]=98;inp[2]=99;
for (int i=0; i < 3; i++) {
/*there is no such method as println(Byte x), only sun. have the
following public void println(int x) if you want to print out
"a", use System.out.println((char)inp[i]);*/
System.out.println(inp[i]);
}
for (int i=0; i < 3; i++) {
/*public void print(char c)Print a character.*/
System.out.println((char) inp[i]);
}
char c='z';
System.out.println(c);
String s="我們是good123";
System.out.println(s);
double d=3.14;
System.out.println(d);
}
}
結果是:
97
98
99
a
b
c
z
我們是good123
3.14
例:1.2.2
import java.io.*;
public class TestMark_to_win {
public static void main(String args[]) throws Exception {
String m="qi hello bye97我們";
FileOutputStream f2=new FileOutputStream("i:/4.txt");
PrintStream ps=new PrintStream(f2);
/*void println(String x) Print a String and then terminate the line.
*/
篇幅有限更多請見擴展鏈接:http://www.mark-to-win.com/tutorial/java_8_UsageOfPrintStream.html
文轉自 “美團點評技術博客” http://tech.meituan.com/stream-internals.html
上篇(基礎篇)主要介紹了Stream的基本概念和用法,本篇將深入剖析背后工作原理,重點是如何實現流式數據處理和back pressure機制。
目錄
本篇介紹stream是如何實現流式數據處理的。
數據生產和消耗的媒介
為什么使用流取數據
如何通過流取到數據
read
push方法
end事件
readable事件
doRead
howMuchToRead
數據的流式消耗
數據消耗模式
暫停模式
流動模式
背壓反饋機制
pipe
消耗驅動的數據生產
數據生產和消耗的媒介
為什么使用流取數據
下面是一個讀取文件內容的例子:
const fs=require('fs')
fs.readFile(file, function (err, body) { console.log(body) console.log(body.toString())
})
但如果文件內容較大,譬如在440M時,執行上述代碼的輸出為:
<Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... >
buffer.js:382
throw new Error('toString failed');
^
Error: toString failed
at Buffer.toString (buffer.js:382:11)
報錯的原因是body
這個Buffer
對象的長度過大,導致toString
方法失敗。
可見,這種一次獲取全部內容的做法,不適合操作大文件。
可以考慮使用流來讀取文件內容。
const fs=require('fs')
fs.createReadStream(file).pipe(process.stdout)
fs.createReadStream
創建一個可讀流,連接了源頭(上游,文件)和消耗方(下游,標準輸出)。
執行上面代碼時,流會逐次調用fs.read
,將文件中的內容分批取出傳給下游。
在文件看來,它的內容被分塊地連續取走了。
在下游看來,它收到的是一個先后到達的數據序列。
如果不需要一次操作全部內容,它可以處理完一個數據便丟掉。
在流看來,任一時刻它都只存儲了文件中的一部分數據,只是內容在變化而已。
這種情況就像是用水管去取池子中的水。
每當用掉一點水,水管便會從池子中再取出一點。
無論水池有多大,都只存儲了與水管容積等量的水。
如何通過流取到數據
用Readable
創建對象readable
后,便得到了一個可讀流。
如果實現_read
方法,就將流連接到一個底層數據源。
流通過調用_read
向底層請求數據,底層再調用流的push
方法將需要的數據傳遞過來。
當readable
連接了數據源后,下游便可以調用readable.read(n)
向流請求數據,同時監聽readable
的data
事件來接收取到的數據。
這個流程可簡述為:
read
read
方法中的邏輯可用下圖表示,后面幾節將對該圖中各環節加以說明。
push方法
消耗方調用read(n)
促使流輸出數據,而流通過_read()
使底層調用push
方法將數據傳給流。
如果流在流動模式下(state.flowing
為true
)輸出數據,數據會自發地通過data
事件輸出,不需要消耗方反復調用read(n)
。
如果調用push
方法時緩存為空,則當前數據即為下一個需要的數據。
這個數據可能先添加到緩存中,也可能直接輸出。
執行read
方法時,在調用_read
后,如果從緩存中取到了數據,就以data
事件輸出。
所以,如果_read
異步調用push
時發現緩存為空,則意味著當前數據是下一個需要的數據,且不會被read
方法輸出,應當在push
方法中立即以data
事件輸出。
因此,上圖中“立即輸出”的條件是:
state.flowing && state.length===0 && !state.sync
end事件
由于流是分次向底層請求數據的,需要底層顯示地告訴流數據是否取完。
所以,當某次(執行_read()
)取數據時,調用了push(null)
,就意味著底層數據取完。
此時,流會設置state.ended
。
state.length
表示緩存中當前的數據量。
只有當state.length
為0
,且state.ended
為true
,才意味著所有的數據都被消耗了。
一旦在執行read(n)
時檢測到這個條件,便會觸發end
事件。
當然,這個事件只會觸發一次。
readable事件
在調用完_read()
后,read(n)
會試著從緩存中取數據。
如果_read()
是異步調用push
方法的,則此時緩存中的數據量不會增多,容易出現數據量不夠的現象。
如果read(n)
的返回值為null
,說明這次未能從緩存中取出所需量的數據。
此時,消耗方需要等待新的數據到達后再次嘗試調用read
方法。
在數據到達后,流是通過readable
事件來通知消耗方的。
在此種情況下,push
方法如果立即輸出數據,接收方直接監聽data
事件即可,否則數據被添加到緩存中,需要觸發readable
事件。
消耗方必須監聽這個事件,再調用read
方法取得數據。
doRead
流中維護了一個緩存,當緩存中的數據足夠多時,調用read()
不會引起_read()
的調用,即不需要向底層請求數據。
用doRead
來表示read(n)
是否需要向底層取數據,其邏輯為:
var doRead=state.needReadableif (state.length===0 || state.length - n < state.highWaterMark) {
doRead=true}if (state.ended || state.reading) {
doRead=false}if (doRead) {
state.reading=true
state.sync=true
if (state.length===0) {
state.needReadable=true
} this._read(state.highWaterMark)
state.sync=false}
state.reading
標志上次從底層取數據的操作是否已完成。
一旦push
方法被調用,就會設置為false
,表示此次_read()
結束。
state.highWaterMark
是給緩存大小設置的一個上限閾值。
如果取走n
個數據后,緩存中保有的數據不足這個量,便會從底層取一次數據。
howMuchToRead
調用read(n)
去取n
個數據時,m=howMuchToRead(n)
是將從緩存中實際獲取的數據量。
根據以下幾種情況賦值,一旦確定則立即返回:
state.length
為0,state.ended
為true
。
數據源已枯竭,且緩存為空,無數據可取,m
為0.
state.objectMode
為true
。
n
為0,則m
為0;
否則m
為1,將緩存的第一個元素輸出。
n
是數字。
若n <=0
,則m
為0;
若n > state.length
,表示緩存中數據量不夠。
此時如果還有數據可讀(state.ended
為false
),則m
為0,同時設置state.needReadable
,下次執行read()
時doRead
會為true
,將從底層再取數據。
如果已無數據可讀(state.ended
為true
),則m
為state.length
,將剩下的數據全部輸出。
若0 < n <=state.length
,則緩存中數據夠用,m
為n
。
其它情況。
state.flowing
為true
(流動模式),則m
為緩存中第一個元素(Buffer
)的長度,實則還是將第一個元素輸出;
否則m
為state.length
,將緩存讀空。
上面的規則中:
n
通常是undefined
或0
,即不指定讀取的字節數。
read(0)
不會有數據輸出,但從前面對doRead
的分析可以看出,是有可能從底層讀取數據的。
執行read()
時,由于流動模式下數據會不斷輸出,所以每次只輸出緩存中第一個元素輸出,而非流動模式則會將緩存讀空。
objectMode
為true
時,m
為0
或1
。此時,一次push()
對應一次data
事件。
綜上所述:
可讀流是獲取底層數據的工具,消耗方通過調用read
方法向流請求數據,流再從緩存中將數據返回,或以data
事件輸出。
如果緩存中數據不夠,便會調用_read
方法去底層取數據。
該方法在拿到底層數據后,調用push
方法將數據交由流處理(立即輸出或存入緩存)。
可以結合readable
事件和read
方法來將數據全部消耗,這是暫停模式的消耗方法。
但更常見的是在流動模式下消耗數據,具體見后面的章節。
數據的流式消耗
所謂“流式數據”,是指按時間先后到達的數據序列。
數據消耗模式
可以在兩種模式下消耗可讀流中的數據:暫停模式(paused mode)和流動模式(flowing mode)。
流動模式下,數據會源源不斷地生產出來,形成“流動”現象。
監聽流的data
事件便可進入該模式。
暫停模式下,需要顯示地調用read()
,觸發data
事件。
可讀流對象readable
中有一個維護狀態的對象,readable._readableState
,這里簡稱為state
。
其中有一個標記,state.flowing
, 可用來判別流的模式。
它有三種可能值:
true
。流動模式。
false
。暫停模式。
null
。初始狀態。
調用readable.resume()
可使流進入流動模式,state.flowing
被設為true
。
調用readable.pause()
可使流進入暫停模式,state.flowing
被設為false
。
暫停模式
在初始狀態下,監聽data
事件,會使流進入流動模式。
但如果在暫停模式下,監聽data
事件并不會使它進入流動模式。
為了消耗流,需要顯示調用read()
方法。
const Readable=require('stream').Readable// 底層數據const dataSource=['a', 'b', 'c']const readable=Readable()
readable._read=function () { if (dataSource.length) { this.push(dataSource.shift())
} else { this.push(null)
}
}// 進入暫停模式readable.pause()
readable.on('data', data=> process.stdout.write('\ndata: ' + data))var data=readable.read()while (data !==null) {
process.stdout.write('\nread: ' + data)
data=readable.read()
}
執行上面的腳本,輸出如下:
data: a
read: a
data: b
read: b
data: c
read: c
可見,在暫停模式下,調用一次read
方法便讀取一次數據。
執行read()
時,如果緩存中數據不夠,會調用_read()
去底層取。
_read
方法中可以同步或異步地調用push(data)
來將底層數據交給流處理。
在上面的例子中,由于是同步調用push
方法,數據會添加到緩存中。
read
方法在執行完_read
方法后,便從緩存中取數據,再返回,且以data
事件輸出。
如果改成異步調用push
方法,則由于_read()
執行完后,數據來不及放入緩存,
將出現read()
返回null
的現象。
見下面的示例:
const Readable=require('stream').Readable// 底層數據const dataSource=['a', 'b', 'c']const readable=Readable()
readable._read=function () {
process.nextTick(()=> { if (dataSource.length) { this.push(dataSource.shift())
} else { this.push(null)
}
})
}
readable.pause()
readable.on('data', data=> process.stdout.write('\ndata: ' + data))while (null !==readable.read()) ;
執行上述腳本,可以發現沒有任何數據輸出。
此時,需要使用readable
事件:
const Readable=require('stream').Readable// 底層數據const dataSource=['a', 'b', 'c']const readable=Readable()
readable._read=function () {
process.nextTick(()=> { if (dataSource.length) { this.push(dataSource.shift())
} else { this.push(null)
}
})
}
readable.pause()
readable.on('data', data=> process.stdout.write('\ndata: ' + data))
readable.on('readable', function () { while (null !==readable.read()) ;;
})
輸出:
data: a
data: b
data: c
當read()
返回null
時,意味著當前緩存數據不夠,而且底層數據還沒加進來(異步調用push()
)。
此種情況下state.needReadable
會被設置為true
。
push
方法被調用時,由于是暫停模式,不會立即輸出數據,而是將數據放入緩存,并觸發一次readable
事件。
所以,一旦read
被調用,上面的例子中就會形成一個循環:readable
事件導致read
方法調用,read
方法又觸發readable
事件。
首次監聽readable
事件時,還會觸發一次read(0)
的調用,從而引起_read
和push
方法的調用,從而啟動循環。
總之,在暫停模式下需要使用readable
事件和read
方法來消耗流。
流動模式
流動模式使用起來更簡單一些。
一般創建流后,監聽data
事件,或者通過pipe
方法將數據導向另一個可寫流,即可進入流動模式開始消耗數據。
尤其是pipe
方法中還提供了back pressure機制,所以使用pipe
進入流動模式的情況非常普遍。
本節解釋data
事件如何能觸發流動模式。
先看一下Readable
是如何處理data
事件的監聽的:
Readable.prototype.on=function (ev, fn) { var res=Stream.prototype.on.call(this, ev, fn) if (ev==='data' && false !==this._readableState.flowing) { this.resume()
} // 處理readable事件的監聽
// 省略
return res
}
Stream
繼承自EventEmitter
,且是Readable
的父類。
從上面的邏輯可以看出,在將fn
加入事件隊列后,如果發現處于非暫停模式,則會調用this.resume()
,開始流動模式。
resume()
方法先將state.flowing
設為true
,
然后會在下一個tick中執行flow
,試圖將緩存讀空:
if (state.flowing) do { var chunk=stream.read()
} while (null !==chunk && state.flowing)
flow
中每次read()
都可能觸發push()
的調用,
而push()
中又可能觸發flow()
或read()
的調用,
這樣就形成了數據生生不息的流動。
其關系可簡述為:
下面再詳細看一下push()
的兩個分支:
if (state.flowing && state.length===0 && !state.sync) {
stream.emit('data', chunk)
stream.read(0)
} else {
state.length +=state.objectMode ? 1 : chunk.length
state.buffer.push(chunk) if (state.needReadable)
emitReadable(stream)
}
稱第一個分支為立即輸出。
在立即輸出的情況下,輸出數據后,執行read(0)
,進一步引起_read()
和push()
的調用,從而使數據源源不斷地輸出。
在非立即輸出的情況下,數據先被添加到緩存中。
此時有兩種情況:
state.length
為0。
這時,在調用_read()
前,state.needReadable
就會被設為true
。
因此,一定會調用emitReadable()
。
這個方法會在下一個tick中觸發readable
事件,同時再調用flow()
,從而形成流動。
state.length
不為0。
由于流動模式下,每次都是從緩存中取第一個元素,所以這時read()
返回值一定不為null
。
故flow()
中的循環還在繼續。
此外,從push()
的兩個分支可以看出來,如果state.flowing
設為false
,第一個分支便不會再進去,也就不會再調用read(0)
。
同時第二個分支中引發flow
的調用后,也不會再調用read()
,這就完全暫停了底層數據的讀取。
事實上,pause
方法就是這樣使流從流動模式轉換到暫停模式的。
背壓反饋機制
考慮下面的例子:
const fs=require('fs')
fs.createReadStream(file).on('data', doSomething)
監聽data
事件后文件中的內容便立即開始源源不斷地傳給doSomething()
。
如果doSomething
處理數據較慢,就需要緩存來不及處理的數據data
,占用大量內存。
理想的情況是下游消耗一個數據,上游才生產一個新數據,這樣整體的內存使用就能保持在一個水平。
Readable
提供pipe
方法,用來實現這個功能。
pipe
用pipe
方法連接上下游:
const fs=require('fs')
fs.createReadStream(file).pipe(writable)
writable
是一個可寫流Writable
對象,上游調用其write
方法將數據寫入其中。
writable
內部維護了一個寫隊列,當這個隊列長度達到某個閾值(state.highWaterMark
)時,
執行write()
時返回false
,否則返回true
。
于是上游可以根據write()
的返回值在流動模式和暫停模式間切換:
readable.on('data', function (data) { if (false===writable.write(data)) {
readable.pause()
}
})
writable.on('drain', function () {
readable.resume()
})
上面便是pipe
方法的核心邏輯。
當write()
返回false
時,調用readable.pause()
使上游進入暫停模式,不再觸發data
事件。
但是當writable
將緩存清空時,會觸發一個drain
事件,再調用readable.resume()
使上游進入流動模式,繼續觸發data
事件。
看一個例子:
const stream=require('stream')var c=0const readable=stream.Readable({
highWaterMark: 2,
read: function () {
process.nextTick(()=> { var data=c < 6 ? String.fromCharCode(c + 65) : null
console.log('push', ++c, data) this.push(data)
})
}
})const writable=stream.Writable({
highWaterMark: 2,
write: function (chunk, enc, next) { console.log('write', chunk)
}
})
readable.pipe(writable)
輸出:
push 1 A
write <Buffer 41>
push 2 B
push 3 C
push 4 D
雖然上游一共有6個數據(ABCDEF
)可以生產,但實際只生產了4個(ABCD
)。
這是因為第一個數據(A
)遲遲未能寫完(未調用next()
),所以后面通過write
方法添加進來的數據便被緩存起來。
下游的緩存隊列到達2時,write
返回false
,上游切換至暫停模式。
此時下游保存了AB
。
由于Readable
總是緩存state.highWaterMark
這么多的數據,所以上游保存了CD
。
從而一共生產出來ABCD
四個數據。
下面使用tick-node將Readable
的debug信息按tick分組:
? NODE_DEBUG=stream tick-node pipe.js
STREAM 18930: pipe count=1 opts=undefined
STREAM 18930: resume
---------- TICK 1 ----------
STREAM 18930: resume read 0
STREAM 18930: read 0
STREAM 18930: need readable false
STREAM 18930: length less than watermark true
STREAM 18930: do read
STREAM 18930: flow true
STREAM 18930: read undefined
STREAM 18930: need readable true
STREAM 18930: length less than watermark true
STREAM 18930: reading or ended false
---------- TICK 2 ----------
push 1 A
STREAM 18930: ondata
write <Buffer 41>
STREAM 18930: read 0
STREAM 18930: need readable true
STREAM 18930: length less than watermark true
STREAM 18930: do read
---------- TICK 3 ----------
push 2 B
STREAM 18930: ondata
STREAM 18930: call pause flowing=true
STREAM 18930: pause
STREAM 18930: read 0
STREAM 18930: need readable true
STREAM 18930: length less than watermark true
STREAM 18930: do read
---------- TICK 4 ----------
push 3 C
STREAM 18930: emitReadable false
STREAM 18930: emit readable
STREAM 18930: flow false
---------- TICK 5 ----------
STREAM 18930: maybeReadMore read 0
STREAM 18930: read 0
STREAM 18930: need readable false
STREAM 18930: length less than watermark true
STREAM 18930: do read
---------- TICK 6 ----------
push 4 D
---------- TICK 7 ----------
TICK 0: readable.resume()
TICK 1: readable
在流動模式下開始從底層讀取數據
TICK 2: A
被輸出,同時執行readable.read(0)
。
TICK 3: B
被輸出,同時執行readable.read(0)
。
writable.write('B')
返回false
。
執行readable.pause()
切換至暫停模式。
TICK 4: TICK 3中read(0)
引起push('C')
的調用,C
被加到readable
緩存中。
此時,writable
中有A
和B
,readable
中有C
。
這時已在暫停模式,但在readable.push('C')
結束前,發現緩存中只有1個數據,小于設定的highWaterMark
(2),故準備在下一個tick再讀一次數據。
TICK 5: 調用read(0)
從底層取數據。
TICK 6: push('D')
,D
被加到readable
緩存中。
此時,writable
中有A
和B
,readable
中有C
和D
。
readable
緩存中有2個數據,等于設定的highWaterMark
(2),不再從底層讀取數據。
可以認為,隨著下游緩存隊列的增加,上游寫數據時受到的阻力變大。
這種back pressure大到一定程度時上游便停止寫,等到back pressure降低時再繼續。
消耗驅動的數據生產
使用pipe()
時,數據的生產和消耗形成了一個閉環。
通過負反饋調節上游的數據生產節奏,事實上形成了一種所謂的拉式流(pull stream)。
用喝飲料來說明拉式流和普通流的區別的話,普通流就像是將杯子里的飲料往嘴里傾倒,動力來源于上游,數據是被推往下游的;拉式流則是用吸管去喝飲料,動力實際來源于下游,數據是被拉去下游的。
所以,使用拉式流時,是“按需生產”。
如果下游停止消耗,上游便會停止生產。
所有緩存的數據量便是兩者的閾值和。
當使用Transform
作為下游時,尤其需要注意消耗。
const stream=require('stream')var c=0const readable=stream.Readable({
highWaterMark: 2,
read: function () {
process.nextTick(()=> { var data=c < 26 ? String.fromCharCode(c++ + 97) : null
console.log('push', data) this.push(data)
})
}
})const transform=stream.Transform({
highWaterMark: 2,
transform: function (buf, enc, next) { console.log('transform', buf)
next(null, buf)
}
})
readable.pipe(transform)
以上代碼執行結果為:
push a
transform <Buffer 61>
push b
transform <Buffer 62>
push c
push d
push e
push f
可見,并沒有將26個字母全生產出來。
Transform
中有兩個緩存:可寫端的緩存和可讀端的緩存。
調用transform.write()
時,如果可讀端緩存未滿,數據會經過變換后加入到可讀端的緩存中。
當可讀端緩存到達閾值后,再調用transform.write()
則會將寫操作緩存到可寫端的緩存隊列。
當可寫端的緩存隊列也到達閾值時,transform.write()
返回false
,上游進入暫停模式,不再繼續transform.write()
。
所以,上面的transform
中實際存儲了4個數據,ab
在可讀端(經過了_transform
的處理),cd
在可寫端(還未經過_transform
處理)。
此時,由前面一節的分析可知,readable
將緩存ef
,之后便不再生產數據。
這三個緩存加起來的長度恰好為6,所以一共就生產了6個數據。
要想將26個數據全生產出來,有兩種做法。
第一種是消耗transform
中可讀端的緩存,以拉動上游的生產:
readable.pipe(transform).pipe(process.stdout)
第二種是,不要將數據存入可讀端中,這樣可讀端的緩存便會一直處于數據不足狀態,上游便會源源不斷地生產數據:
const transform=stream.Transform({
highWaterMark: 2,
transform: function (buf, enc, next) {
next()
}
})
參考文獻
GitHub,substack/browserify-handbook
GitHub,zoubin/streamify-your-node-program
閱讀更多技術類文章,請關注微信公眾號 “美團點評技術團隊”
文轉自 “美團點評技術團隊” http://tech.meituan.com/stream-in-action.html
背景
前面兩篇(基礎篇和進階篇)主要介紹流的基本用法和原理,本篇從應用的角度,介紹如何使用管道進行程序設計,主要內容包括:
管道的概念
Browserify的管道設計
Gulp的管道設計
兩種管道設計模式比較
實例
Pipeline
所謂“管道”,指的是通過a.pipe(b)
的形式連接起來的多個Stream對象的組合。
假如現在有兩個Transform
:bold
和red
,分別可將文本流中某些關鍵字加粗和飄紅。
可以按下面的方式對文本同時加粗和飄紅:
// source: 輸入流// dest: 輸出目的地source.pipe(bold).pipe(red).pipe(dest)
bold.pipe(red)
便可以看作一個管道,輸入流先后經過bold
和red
的變換再輸出。
但如果這種加粗且飄紅的功能的應用場景很廣,我們期望的使用方式是:
// source: 輸入流// dest: 輸出目的地// pipeline: 加粗且飄紅source.pipe(pipeline).pipe(dest)
此時,pipeline
封裝了bold.pipe(red)
,從邏輯上來講,也稱其為管道。
其實現可簡化為:
var pipeline=new Duplex()var streams=pipeline._streams=[bold, red]// 底層寫邏輯:將數據寫入管道的第一個Stream,即boldpipeline._write=function (buf, enc, next) {
streams[0].write(buf, enc, next)
}// 底層讀邏輯:從管道的最后一個Stream(即red)中讀取數據pipeline._read=function () { var buf var reads=0
var r=streams[streams.length - 1] // 將緩存讀空
while ((buf=r.read()) !==null) {
pipeline.push(buf)
reads++
} if (reads===0) { // 緩存本來為空,則等待新數據的到來
r.once('readable', function () {
pipeline._read()
})
}
}// 將各個Stream組合起來(此處等同于`bold.pipe(red)`)streams.reduce(function (r, next) {
r.pipe(next) return next
})
往pipeline
寫數據時,數據直接寫入bold
,再流向red
,最后從pipeline
讀數據時再從red
中讀出。
如果需要在中間新加一個underline
的Stream,可以:
pipeline._streams.splice(1, 0, underline)
bold.unpipe(red)
bold.pipe(underline).pipe(red)
如果要將red
替換成green
,可以:
// 刪除redpipeline._streams.pop()
bold.unpipe(red)// 添加greenpipeline._streams.push(green)
bold.pipe(green)
可見,這種管道的各個環節是可以修改的。
stream-splicer對上述邏輯進行了進一步封裝,提供splice
、push
、pop
等方法,使得pipeline
可以像數組那樣被修改:
var splicer=require('stream-splicer')var pipeline=splicer([bold, red])// 在中間添加underlinepipeline.splice(1, 0, underline)// 刪除redpipeline.pop()// 添加greenpipeline.push(green)
labeled-stream-splicer在此基礎上又添加了使用名字替代下標進行操作的功能:
var splicer=require('labeled-stream-splicer')var pipeline=splicer([ 'bold', bold, 'red', red,
])// 在`red`前添加underlinepipeline.splice('red', 0, underline)// 刪除`bold`pipeline.splice('bold', 1)
由于pipeline
本身與其各個環節一樣,也是一個Stream對象,因此可以嵌套:
var splicer=require('labeled-stream-splicer')var pipeline=splicer([ 'style', [ bold, red ], 'insert', [ comma ],
])
pipeline.get('style') // 取得管道:[bold, red]
.splice(1, 0, underline) // 添加underline
Browserify
Browserify的功能介紹可見substack/browserify-handbook,其核心邏輯的實現在于管道的設計:
var splicer=require('labeled-stream-splicer')var pipeline=splicer.obj([ // 記錄輸入管道的數據,重建管道時直接將記錄的數據寫入。
// 用于像watch時需要多次打包的情況
'record', [ this._recorder() ], // 依賴解析,預處理
'deps', [ this._mdeps ], // 處理JSON文件
'json', [ this._json() ], // 刪除文件前面的BOM
'unbom', [ this._unbom() ], // 刪除文件前面的`#!`行
'unshebang', [ this._unshebang() ], // 語法檢查
'syntax', [ this._syntax() ], // 排序,以確保打包結果的穩定性
'sort', [ depsSort(dopts) ], // 對擁有同樣內容的模塊去重
'dedupe', [ this._dedupe() ], // 將id從文件路徑轉換成數字,避免暴露系統路徑信息
'label', [ this._label(opts) ], // 為每個模塊觸發一次dep事件
'emit-deps', [ this._emitDeps() ], 'debug', [ this._debug(opts) ], // 將模塊打包
'pack', [ this._bpack ], // 更多自定義的處理
'wrap', [],
])
每個模塊用row
表示,定義如下:
{ // 模塊的唯一標識
id: id, // 模塊對應的文件路徑
file: '/path/to/file', // 模塊內容
source: '', // 模塊的依賴
deps: { // `require(expr)`
expr: id,
}
}
在wrap
階段前,所有的階段都處理這樣的對象流,且除pack
外,都輸出這樣的流。
有的補充row
中的一些信息,有的則對這些信息做一些變換,有的只是讀取和輸出。
一般row
中的source
、deps
內容都是在deps
階段解析出來的。
下面提供一個修改Browserify管道的函數。
var Transform=require('stream').Transform// 創建Transform對象function through(write, end) { return Transform({
transform: write,
flush: end,
})
}// `b`為Browserify實例// 該插件可打印出打包時間function log(b) { // watch時需要重新打包,整個pipeline會被重建,所以也要重新修改
b.on('reset', reset) // 修改當前pipeline
reset() function reset () { var time=null
var bytes=0
b.pipeline.get('record').on('end', function () { // 以record階段結束為起始時刻
time=Date.now()
}) // `wrap`是最后一個階段,在其后添加記錄結束時刻的Transform
b.pipeline.get('wrap').push(through(write, end)) function write (buf, enc, next) { // 累計大小
bytes +=buf.length this.push(buf)
next()
} function end () { // 打包時間
var delta=Date.now() - time
b.emit('time', delta)
b.emit('bytes', bytes)
b.emit('log', bytes + ' bytes written ('
+ (delta / 1000).toFixed(2) + ' seconds)'
) this.push(null)
}
}
}var fs=require('fs')var browserify=require('browserify')var b=browserify(opts)// 應用插件b.plugin(log)
b.bundle().pipe(fs.createWriteStream('bundle.js'))
事實上,這里的b.plugin(log)
就是直接執行了log(b)
。
在插件中,可以修改b.pipeline
中的任何一個環節。
因此,Browserify本身只保留了必要的功能,其它都由插件去實現,如watchify、factor-bundle等。
除了了上述的插件機制外,Browserify還有一套Transform機制,即通過b.transform(transform)
可以新增一些文件內容預處理的Transform。
預處理是發生在deps
階段的,當模塊文件內容被讀出來時,會經過這些Transform處理,然后才做依賴解析,如babelify、envify。
Gulp
Gulp的核心邏輯分成兩塊:任務調度與文件處理。
任務調度是基于orchestrator,而文件處理則是基于vinyl-fs。
類似于Browserify提供的模塊定義(用row
表示),vinyl-fs也提供了文件定義(vinyl對象)。
Browserify的管道處理的是row
流,Gulp管道處理vinyl流:
gulp.task('scripts', ['clean'], function() { // Minify and copy all JavaScript (except vendor scripts)
// with sourcemaps all the way down
return gulp.src(paths.scripts)
.pipe(sourcemaps.init())
.pipe(coffee())
.pipe(uglify())
.pipe(concat('all.min.js'))
.pipe(sourcemaps.write())
.pipe(gulp.dest('build/js'));
});
任務中創建的管道起始于gulp.src
,終止于gulp.dest
,中間有若干其它的Transform(插件)。
如果與Browserify的管道對比,可以發現Browserify是確定了一條具有完整功能的管道,而Gulp本身只提供了創建vinyl流和將vinyl流寫入磁盤的工具,管道中間經歷什么全由用戶決定。
這是因為任務中做什么,是沒有任何限制的,文件處理也只是常見的情況,并非一定要用gulp.src
與gulp.dest
。
兩種模式比較
Browserify與Gulp都借助管道的概念來實現插件機制。
Browserify定義了模塊的數據結構,提供了默認的管道以處理這樣的數據流,而插件可用來修改管道結構,以定制處理行為。
Gulp雖也定義了文件的數據結構,但只提供產生、消耗這種數據流的接口,完全由用戶通過插件去構造處理管道。
當明確具體的處理需求時,可以像Browserify那樣,構造一個基本的處理管道,以提供插件機制。
如果需要的是實現任意功能的管道,可以如Gulp那樣,只提供數據流的抽象。
實例
本節中實現一個針對Git倉庫自動生成changelog的工具,完整代碼見ezchangelog。
ezchangelog的輸入為git log
生成的文本流,輸出默認為markdown格式的文本流,但可以修改為任意的自定義格式。
輸入示意:
commit 9c5829ce45567bedccda9beb7f5de17574ea9437
Author: zoubin <zoubin04@gmail.com>
Date: Sat Nov 7 18:42:35 2015 +0800
CHANGELOG
commit 3bf9055b732cc23a9c14f295ff91f48aed5ef31a
Author: zoubin <zoubin04@gmail.com>
Date: Sat Nov 7 18:41:37 2015 +0800
4.0.3
commit 87abe8e12374079f73fc85c432604642059806ae
Author: zoubin <zoubin04@gmail.com>
Date: Sat Nov 7 18:41:32 2015 +0800
fix readme
add more tests
輸出示意:
* [[`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c)] CHANGELOG## [v4.0.3](https://github.com/zoubin/ezchangelog/commit/3bf9055) (2015-11-07)* [[`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e)] fix readme add more tests
其實需要的是這樣一個pipeline
:
source.pipe(pipeline).pipe(dest)
可以分為兩個階段:
parse:從輸入文本流中解析出commit信息
format: 將commit流變換為文本流
默認的情況下,要想得到示例中的markdown,需要解析出每個commit的sha1、日期、消息、是否為tag。
定義commit的格式如下:
{
commit: { // commit sha1
long: '3bf9055b732cc23a9c14f295ff91f48aed5ef31a',
short: '3bf9055',
},
committer: { // commit date
date: new Date('Sat Nov 7 18:41:37 2015 +0800'),
}, // raw message lines
messages: ['', ' 4.0.3', ''], // raw headers before the messages
headers: [
['Author', 'zoubin <zoubin04@gmail.com>'],
['Date', 'Sat Nov 7 18:41:37 2015 +0800'],
], // the first non-empty message line
subject: '4.0.3', // other message lines
body: '', // git tag
tag: 'v4.0.3', // link to the commit. opts.baseUrl should be specified.
url: 'https://github.com/zoubin/ezchangelog/commit/3bf9055',
}
于是有:
var splicer=require('labeled-stream-splicer')
pipeline=splicer.obj([ 'parse', [ // 按行分隔
'split', split(), // 生成commit對象,解析出sha1和日期
'commit', commit(), // 解析出tag
'tag', tag(), // 解析出url
'url', url({ baseUrl: opts.baseUrl }),
], 'format', [ // 將commit組合成markdown文本
'markdownify', markdownify(),
],
])
至此,基本功能已經實現。
現在將其封裝并提供插件機制。
function Changelog(opts) {
opts=opts || {} this._options=opts // 創建pipeline
this.pipeline=splicer.obj([ 'parse', [ 'split', split(), 'commit', commit(), 'tag', tag(), 'url', url({ baseUrl: opts.baseUrl }),
], 'format', [ 'markdownify', markdownify(),
],
]) // 應用插件
;[].concat(opts.plugin).filter(Boolean).forEach(function (p) { this.plugin(p)
}, this)
}
Changelog.prototype.plugin=function (p, opts) { if (Array.isArray(p)) {
opts=p[1]
p=p[0]
} // 執行插件函數,修改pipeline
p(this, opts) return this}
上面的實現提供了兩種方式來應用插件。
一種是通過配置傳入,另一種是創建實例后再調用plugin
方法,本質一樣。
為了使用方便,還可以簡單封裝一下。
function changelog(opts) { return new Changelog(opts).pipeline
}
這樣,就可以如下方式使用:
source.pipe(changelog()).pipe(dest)
這個已經非常接近我們的預期了。
現在來開發一個插件,修改默認的渲染方式。
var through=require('through2')function customFormatter(c) { // c是`Changelog`實例
// 添加解析author的transform
c.pipeline.get('parse').push(through.obj(function (ci, enc, next) { // parse the author name from: 'zoubin <zoubin04@gmail.com>'
ci.committer.author=ci.headers[0][1].split(/\s+/)[0]
next(null, ci)
})) // 替換原有的渲染
c.pipeline.get('format').splice('markdownify', 1, through.obj(function (ci, enc, next) { var sha1=ci.commit.short
sha1='[`' + sha1 + '`](' + c._options.baseUrl + sha1 + ')'
var date=ci.committer.date.toISOString().slice(0, 10)
next(null, '* ' + sha1 + ' ' + date + ' @' + ci.committer.author + '\n')
}))
}
source
.pipe(changelog({
baseUrl: 'https://github.com/zoubin/ezchangelog/commit/',
plugin: [customFormatter],
}))
.pipe(dest)
同樣的輸入,輸出將會是:
* [`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c) 2015-11-07 @zoubin* [`3bf9055`](https://github.com/zoubin/ezchangelog/commit/3bf9055) 2015-11-07 @zoubin* [`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e) 2015-11-07 @zoubin
可以看出,通過創建可修改的管道,ezchangelog保持了本身邏輯的單一性,同時又提供了強大的自定義空間。
參考文獻
GitHub,substack/browserify-handbook
GitHub,zoubin/streamify-your-node-program
查看更多技術類文章,請關注微信公眾號:美團點評技術團隊。
*請認真填寫需求信息,我們會在24小時內與您取得聯系。