整合營銷服務商

          電腦端+手機端+微信端=數據同步管理

          免費咨詢熱線:

          java中講講PrintStream的用法

          java中講講PrintStream的用法

          rintStream的用法
          馬克-to-win:從學java第一天,我們就經常用到System.out.println(),實際上查閱文檔可知,System.out就是Sun 編的一個PrintStream的實例對象。PrintStream顧名思義,Sun編它,就是用來打印的,以各種各樣的格式,打印各種各樣的數據,(boolean,char,double,float)。下面的例子就介紹了println(int x),print(String)和print(char c)的用法。馬克- to-win:馬克 java社區:防盜版實名手機尾號: 73203。

          例:1.2.1

          import java.io.*;
          public class TestMark_to_win {
          public static void main(String args[]) throws Exception {
          byte inp[]=new byte[3];
          inp[0]=97;inp[1]=98;inp[2]=99;
          for (int i=0; i < 3; i++) {
          /*there is no such method as println(Byte x), only sun. have the
          following public void println(int x) if you want to print out
          "a", use System.out.println((char)inp[i]);*/
          System.out.println(inp[i]);
          }

          for (int i=0; i < 3; i++) {
          /*public void print(char c)Print a character.*/
          System.out.println((char) inp[i]);
          }
          char c='z';
          System.out.println(c);
          String s="我們是good123";
          System.out.println(s);
          double d=3.14;
          System.out.println(d);
          }
          }

          結果是:
          97
          98
          99
          a
          b
          c
          z
          我們是good123
          3.14

          例:1.2.2

          import java.io.*;
          public class TestMark_to_win {
          public static void main(String args[]) throws Exception {
          String m="qi hello bye97我們";
          FileOutputStream f2=new FileOutputStream("i:/4.txt");
          PrintStream ps=new PrintStream(f2);
          /*void println(String x) Print a String and then terminate the line.
          */

          篇幅有限更多請見擴展鏈接:http://www.mark-to-win.com/tutorial/java_8_UsageOfPrintStream.html

          文轉自 “美團點評技術博客” http://tech.meituan.com/stream-internals.html

          上篇(基礎篇)主要介紹了Stream的基本概念和用法,本篇將深入剖析背后工作原理,重點是如何實現流式數據處理和back pressure機制。

          目錄

          本篇介紹stream是如何實現流式數據處理的。

          • 數據生產和消耗的媒介

            • 為什么使用流取數據

            • 如何通過流取到數據

            • read

            • push方法

            • end事件

            • readable事件

            • doRead

            • howMuchToRead

          • 數據的流式消耗

            • 數據消耗模式

            • 暫停模式

            • 流動模式

          • 背壓反饋機制

            • pipe

            • 消耗驅動的數據生產

          數據生產和消耗的媒介

          為什么使用流取數據

          下面是一個讀取文件內容的例子:

          const fs=require('fs')

          fs.readFile(file, function (err, body) { console.log(body) console.log(body.toString())

          })

          但如果文件內容較大,譬如在440M時,執行上述代碼的輸出為:

          <Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... >

          buffer.js:382

          throw new Error('toString failed');

          ^

          Error: toString failed

          at Buffer.toString (buffer.js:382:11)

          報錯的原因是body這個Buffer對象的長度過大,導致toString方法失敗。

          可見,這種一次獲取全部內容的做法,不適合操作大文件。

          可以考慮使用流來讀取文件內容。

          const fs=require('fs')

          fs.createReadStream(file).pipe(process.stdout)

          fs.createReadStream創建一個可讀流,連接了源頭(上游,文件)和消耗方(下游,標準輸出)。

          執行上面代碼時,流會逐次調用fs.read,將文件中的內容分批取出傳給下游。

          在文件看來,它的內容被分塊地連續取走了。

          在下游看來,它收到的是一個先后到達的數據序列。

          如果不需要一次操作全部內容,它可以處理完一個數據便丟掉。

          在流看來,任一時刻它都只存儲了文件中的一部分數據,只是內容在變化而已。

          這種情況就像是用水管去取池子中的水。

          每當用掉一點水,水管便會從池子中再取出一點。

          無論水池有多大,都只存儲了與水管容積等量的水。

          如何通過流取到數據

          Readable創建對象readable后,便得到了一個可讀流。

          如果實現_read方法,就將流連接到一個底層數據源。

          流通過調用_read向底層請求數據,底層再調用流的push方法將需要的數據傳遞過來。

          readable連接了數據源后,下游便可以調用readable.read(n)向流請求數據,同時監聽readabledata事件來接收取到的數據。

          這個流程可簡述為:

          read

          read方法中的邏輯可用下圖表示,后面幾節將對該圖中各環節加以說明。

          push方法

          消耗方調用read(n)促使流輸出數據,而流通過_read()使底層調用push方法將數據傳給流。

          如果流在流動模式下(state.flowingtrue)輸出數據,數據會自發地通過data事件輸出,不需要消耗方反復調用read(n)

          如果調用push方法時緩存為空,則當前數據即為下一個需要的數據。

          這個數據可能先添加到緩存中,也可能直接輸出。

          執行read方法時,在調用_read后,如果從緩存中取到了數據,就以data事件輸出。

          所以,如果_read異步調用push時發現緩存為空,則意味著當前數據是下一個需要的數據,且不會被read方法輸出,應當在push方法中立即以data事件輸出。

          因此,上圖中“立即輸出”的條件是:

          state.flowing && state.length===0 && !state.sync

          end事件

          由于流是分次向底層請求數據的,需要底層顯示地告訴流數據是否取完。

          所以,當某次(執行_read())取數據時,調用了push(null),就意味著底層數據取完。

          此時,流會設置state.ended

          state.length表示緩存中當前的數據量。

          只有當state.length0,且state.endedtrue,才意味著所有的數據都被消耗了。

          一旦在執行read(n)時檢測到這個條件,便會觸發end事件。

          當然,這個事件只會觸發一次。

          readable事件

          在調用完_read()后,read(n)會試著從緩存中取數據。

          如果_read()是異步調用push方法的,則此時緩存中的數據量不會增多,容易出現數據量不夠的現象。

          如果read(n)的返回值為null,說明這次未能從緩存中取出所需量的數據。

          此時,消耗方需要等待新的數據到達后再次嘗試調用read方法。

          在數據到達后,流是通過readable事件來通知消耗方的。

          在此種情況下,push方法如果立即輸出數據,接收方直接監聽data事件即可,否則數據被添加到緩存中,需要觸發readable事件。

          消耗方必須監聽這個事件,再調用read方法取得數據。

          doRead

          流中維護了一個緩存,當緩存中的數據足夠多時,調用read()不會引起_read()的調用,即不需要向底層請求數據。

          doRead來表示read(n)是否需要向底層取數據,其邏輯為:

          var doRead=state.needReadableif (state.length===0 || state.length - n < state.highWaterMark) {

          doRead=true}if (state.ended || state.reading) {

          doRead=false}if (doRead) {

          state.reading=true

          state.sync=true

          if (state.length===0) {

          state.needReadable=true

          } this._read(state.highWaterMark)

          state.sync=false}

          state.reading標志上次從底層取數據的操作是否已完成。

          一旦push方法被調用,就會設置為false,表示此次_read()結束。

          state.highWaterMark是給緩存大小設置的一個上限閾值。

          如果取走n個數據后,緩存中保有的數據不足這個量,便會從底層取一次數據。

          howMuchToRead

          調用read(n)去取n個數據時,m=howMuchToRead(n)是將從緩存中實際獲取的數據量。

          根據以下幾種情況賦值,一旦確定則立即返回:

          • state.length為0,state.endedtrue

            數據源已枯竭,且緩存為空,無數據可取,m為0.

          • state.objectModetrue

            n為0,則m為0;

            否則m為1,將緩存的第一個元素輸出。

          • n是數字。

            n <=0,則m為0;

            n > state.length,表示緩存中數據量不夠。

            此時如果還有數據可讀(state.endedfalse),則m為0,同時設置state.needReadable,下次執行read()doRead會為true,將從底層再取數據。

            如果已無數據可讀(state.endedtrue),則mstate.length,將剩下的數據全部輸出。

            0 < n <=state.length,則緩存中數據夠用,mn

          • 其它情況。

            state.flowingtrue(流動模式),則m為緩存中第一個元素(Buffer)的長度,實則還是將第一個元素輸出;

            否則mstate.length,將緩存讀空。

          上面的規則中:

          • n通常是undefined0,即不指定讀取的字節數。

          • read(0)不會有數據輸出,但從前面對doRead的分析可以看出,是有可能從底層讀取數據的。

          • 執行read()時,由于流動模式下數據會不斷輸出,所以每次只輸出緩存中第一個元素輸出,而非流動模式則會將緩存讀空。

          • objectModetrue時,m01。此時,一次push()對應一次data事件。

          綜上所述:

          可讀流是獲取底層數據的工具,消耗方通過調用read方法向流請求數據,流再從緩存中將數據返回,或以data事件輸出。

          如果緩存中數據不夠,便會調用_read方法去底層取數據。

          該方法在拿到底層數據后,調用push方法將數據交由流處理(立即輸出或存入緩存)。

          可以結合readable事件和read方法來將數據全部消耗,這是暫停模式的消耗方法。

          但更常見的是在流動模式下消耗數據,具體見后面的章節。

          數據的流式消耗

          所謂“流式數據”,是指按時間先后到達的數據序列。

          數據消耗模式

          可以在兩種模式下消耗可讀流中的數據:暫停模式(paused mode)和流動模式(flowing mode)。

          流動模式下,數據會源源不斷地生產出來,形成“流動”現象。

          監聽流的data事件便可進入該模式。

          暫停模式下,需要顯示地調用read(),觸發data事件。

          可讀流對象readable中有一個維護狀態的對象,readable._readableState,這里簡稱為state

          其中有一個標記,state.flowing, 可用來判別流的模式。

          它有三種可能值:

          • true。流動模式。

          • false。暫停模式。

          • null。初始狀態。

          調用readable.resume()可使流進入流動模式,state.flowing被設為true

          調用readable.pause()可使流進入暫停模式,state.flowing被設為false

          暫停模式

          在初始狀態下,監聽data事件,會使流進入流動模式。

          但如果在暫停模式下,監聽data事件并不會使它進入流動模式。

          為了消耗流,需要顯示調用read()方法。

          const Readable=require('stream').Readable// 底層數據const dataSource=['a', 'b', 'c']const readable=Readable()

          readable._read=function () { if (dataSource.length) { this.push(dataSource.shift())

          } else { this.push(null)

          }

          }// 進入暫停模式readable.pause()

          readable.on('data', data=> process.stdout.write('\ndata: ' + data))var data=readable.read()while (data !==null) {

          process.stdout.write('\nread: ' + data)

          data=readable.read()

          }

          執行上面的腳本,輸出如下:

          data: a

          read: a

          data: b

          read: b

          data: c

          read: c

          可見,在暫停模式下,調用一次read方法便讀取一次數據。

          執行read()時,如果緩存中數據不夠,會調用_read()去底層取。

          _read方法中可以同步或異步地調用push(data)來將底層數據交給流處理。

          在上面的例子中,由于是同步調用push方法,數據會添加到緩存中。

          read方法在執行完_read方法后,便從緩存中取數據,再返回,且以data事件輸出。

          如果改成異步調用push方法,則由于_read()執行完后,數據來不及放入緩存,

          將出現read()返回null的現象。

          見下面的示例:

          const Readable=require('stream').Readable// 底層數據const dataSource=['a', 'b', 'c']const readable=Readable()

          readable._read=function () {

          process.nextTick(()=> { if (dataSource.length) { this.push(dataSource.shift())

          } else { this.push(null)

          }

          })

          }

          readable.pause()

          readable.on('data', data=> process.stdout.write('\ndata: ' + data))while (null !==readable.read()) ;

          執行上述腳本,可以發現沒有任何數據輸出。

          此時,需要使用readable事件:

          const Readable=require('stream').Readable// 底層數據const dataSource=['a', 'b', 'c']const readable=Readable()

          readable._read=function () {

          process.nextTick(()=> { if (dataSource.length) { this.push(dataSource.shift())

          } else { this.push(null)

          }

          })

          }

          readable.pause()

          readable.on('data', data=> process.stdout.write('\ndata: ' + data))

          readable.on('readable', function () { while (null !==readable.read()) ;;

          })

          輸出:

          data: a

          data: b

          data: c

          read()返回null時,意味著當前緩存數據不夠,而且底層數據還沒加進來(異步調用push())。

          此種情況下state.needReadable會被設置為true

          push方法被調用時,由于是暫停模式,不會立即輸出數據,而是將數據放入緩存,并觸發一次readable事件。

          所以,一旦read被調用,上面的例子中就會形成一個循環:readable事件導致read方法調用,read方法又觸發readable事件。

          首次監聽readable事件時,還會觸發一次read(0)的調用,從而引起_readpush方法的調用,從而啟動循環。

          總之,在暫停模式下需要使用readable事件和read方法來消耗流。

          流動模式

          流動模式使用起來更簡單一些。

          一般創建流后,監聽data事件,或者通過pipe方法將數據導向另一個可寫流,即可進入流動模式開始消耗數據。

          尤其是pipe方法中還提供了back pressure機制,所以使用pipe進入流動模式的情況非常普遍。

          本節解釋data事件如何能觸發流動模式。

          先看一下Readable是如何處理data事件的監聽的:

          Readable.prototype.on=function (ev, fn) { var res=Stream.prototype.on.call(this, ev, fn) if (ev==='data' && false !==this._readableState.flowing) { this.resume()

          } // 處理readable事件的監聽

          // 省略

          return res

          }

          Stream繼承自EventEmitter,且是Readable的父類。

          從上面的邏輯可以看出,在將fn加入事件隊列后,如果發現處于非暫停模式,則會調用this.resume(),開始流動模式。

          resume()方法先將state.flowing設為true

          然后會在下一個tick中執行flow,試圖將緩存讀空:

          if (state.flowing) do { var chunk=stream.read()

          } while (null !==chunk && state.flowing)

          flow中每次read()都可能觸發push()的調用,

          push()中又可能觸發flow()read()的調用,

          這樣就形成了數據生生不息的流動。

          其關系可簡述為:

          下面再詳細看一下push()的兩個分支:

          if (state.flowing && state.length===0 && !state.sync) {

          stream.emit('data', chunk)

          stream.read(0)

          } else {

          state.length +=state.objectMode ? 1 : chunk.length

          state.buffer.push(chunk) if (state.needReadable)

          emitReadable(stream)

          }

          稱第一個分支為立即輸出。

          在立即輸出的情況下,輸出數據后,執行read(0),進一步引起_read()push()的調用,從而使數據源源不斷地輸出。

          在非立即輸出的情況下,數據先被添加到緩存中。

          此時有兩種情況:

          • state.length為0。

            這時,在調用_read()前,state.needReadable就會被設為true

            因此,一定會調用emitReadable()

            這個方法會在下一個tick中觸發readable事件,同時再調用flow(),從而形成流動。

          • state.length不為0。

            由于流動模式下,每次都是從緩存中取第一個元素,所以這時read()返回值一定不為null

            flow()中的循環還在繼續。

          此外,從push()的兩個分支可以看出來,如果state.flowing設為false,第一個分支便不會再進去,也就不會再調用read(0)

          同時第二個分支中引發flow的調用后,也不會再調用read(),這就完全暫停了底層數據的讀取。

          事實上,pause方法就是這樣使流從流動模式轉換到暫停模式的。

          背壓反饋機制

          考慮下面的例子:

          const fs=require('fs')

          fs.createReadStream(file).on('data', doSomething)

          監聽data事件后文件中的內容便立即開始源源不斷地傳給doSomething()

          如果doSomething處理數據較慢,就需要緩存來不及處理的數據data,占用大量內存。

          理想的情況是下游消耗一個數據,上游才生產一個新數據,這樣整體的內存使用就能保持在一個水平。

          Readable提供pipe方法,用來實現這個功能。

          pipe

          pipe方法連接上下游:

          const fs=require('fs')

          fs.createReadStream(file).pipe(writable)

          writable是一個可寫流Writable對象,上游調用其write方法將數據寫入其中。

          writable內部維護了一個寫隊列,當這個隊列長度達到某個閾值(state.highWaterMark)時,

          執行write()時返回false,否則返回true

          于是上游可以根據write()的返回值在流動模式和暫停模式間切換:

          readable.on('data', function (data) { if (false===writable.write(data)) {

          readable.pause()

          }

          })

          writable.on('drain', function () {

          readable.resume()

          })

          上面便是pipe方法的核心邏輯。

          write()返回false時,調用readable.pause()使上游進入暫停模式,不再觸發data事件。

          但是當writable將緩存清空時,會觸發一個drain事件,再調用readable.resume()使上游進入流動模式,繼續觸發data事件。

          看一個例子:

          const stream=require('stream')var c=0const readable=stream.Readable({

          highWaterMark: 2,

          read: function () {

          process.nextTick(()=> { var data=c < 6 ? String.fromCharCode(c + 65) : null

          console.log('push', ++c, data) this.push(data)

          })

          }

          })const writable=stream.Writable({

          highWaterMark: 2,

          write: function (chunk, enc, next) { console.log('write', chunk)

          }

          })

          readable.pipe(writable)

          輸出:

          push 1 A

          write <Buffer 41>

          push 2 B

          push 3 C

          push 4 D

          雖然上游一共有6個數據(ABCDEF)可以生產,但實際只生產了4個(ABCD)。

          這是因為第一個數據(A)遲遲未能寫完(未調用next()),所以后面通過write方法添加進來的數據便被緩存起來。

          下游的緩存隊列到達2時,write返回false,上游切換至暫停模式。

          此時下游保存了AB

          由于Readable總是緩存state.highWaterMark這么多的數據,所以上游保存了CD

          從而一共生產出來ABCD四個數據。

          下面使用tick-node將Readable的debug信息按tick分組:

          ? NODE_DEBUG=stream tick-node pipe.js

          STREAM 18930: pipe count=1 opts=undefined

          STREAM 18930: resume

          ---------- TICK 1 ----------

          STREAM 18930: resume read 0

          STREAM 18930: read 0

          STREAM 18930: need readable false

          STREAM 18930: length less than watermark true

          STREAM 18930: do read

          STREAM 18930: flow true

          STREAM 18930: read undefined

          STREAM 18930: need readable true

          STREAM 18930: length less than watermark true

          STREAM 18930: reading or ended false

          ---------- TICK 2 ----------

          push 1 A

          STREAM 18930: ondata

          write <Buffer 41>

          STREAM 18930: read 0

          STREAM 18930: need readable true

          STREAM 18930: length less than watermark true

          STREAM 18930: do read

          ---------- TICK 3 ----------

          push 2 B

          STREAM 18930: ondata

          STREAM 18930: call pause flowing=true

          STREAM 18930: pause

          STREAM 18930: read 0

          STREAM 18930: need readable true

          STREAM 18930: length less than watermark true

          STREAM 18930: do read

          ---------- TICK 4 ----------

          push 3 C

          STREAM 18930: emitReadable false

          STREAM 18930: emit readable

          STREAM 18930: flow false

          ---------- TICK 5 ----------

          STREAM 18930: maybeReadMore read 0

          STREAM 18930: read 0

          STREAM 18930: need readable false

          STREAM 18930: length less than watermark true

          STREAM 18930: do read

          ---------- TICK 6 ----------

          push 4 D

          ---------- TICK 7 ----------

          • TICK 0: readable.resume()

          • TICK 1: readable在流動模式下開始從底層讀取數據

          • TICK 2: A被輸出,同時執行readable.read(0)

          • TICK 3: B被輸出,同時執行readable.read(0)

            writable.write('B')返回false

            執行readable.pause()切換至暫停模式。

          • TICK 4: TICK 3中read(0)引起push('C')的調用,C被加到readable緩存中。

            此時,writable中有ABreadable中有C

            這時已在暫停模式,但在readable.push('C')結束前,發現緩存中只有1個數據,小于設定的highWaterMark(2),故準備在下一個tick再讀一次數據。

          • TICK 5: 調用read(0)從底層取數據。

          • TICK 6: push('D')D被加到readable緩存中。

            此時,writable中有ABreadable中有CD

            readable緩存中有2個數據,等于設定的highWaterMark(2),不再從底層讀取數據。

          可以認為,隨著下游緩存隊列的增加,上游寫數據時受到的阻力變大。

          這種back pressure大到一定程度時上游便停止寫,等到back pressure降低時再繼續。

          消耗驅動的數據生產

          使用pipe()時,數據的生產和消耗形成了一個閉環。

          通過負反饋調節上游的數據生產節奏,事實上形成了一種所謂的拉式流(pull stream)。

          用喝飲料來說明拉式流和普通流的區別的話,普通流就像是將杯子里的飲料往嘴里傾倒,動力來源于上游,數據是被推往下游的;拉式流則是用吸管去喝飲料,動力實際來源于下游,數據是被拉去下游的。

          所以,使用拉式流時,是“按需生產”。

          如果下游停止消耗,上游便會停止生產。

          所有緩存的數據量便是兩者的閾值和。

          當使用Transform作為下游時,尤其需要注意消耗。

          const stream=require('stream')var c=0const readable=stream.Readable({

          highWaterMark: 2,

          read: function () {

          process.nextTick(()=> { var data=c < 26 ? String.fromCharCode(c++ + 97) : null

          console.log('push', data) this.push(data)

          })

          }

          })const transform=stream.Transform({

          highWaterMark: 2,

          transform: function (buf, enc, next) { console.log('transform', buf)

          next(null, buf)

          }

          })

          readable.pipe(transform)

          以上代碼執行結果為:

          push a

          transform <Buffer 61>

          push b

          transform <Buffer 62>

          push c

          push d

          push e

          push f

          可見,并沒有將26個字母全生產出來。

          Transform中有兩個緩存:可寫端的緩存和可讀端的緩存。

          調用transform.write()時,如果可讀端緩存未滿,數據會經過變換后加入到可讀端的緩存中。

          當可讀端緩存到達閾值后,再調用transform.write()則會將寫操作緩存到可寫端的緩存隊列。

          當可寫端的緩存隊列也到達閾值時,transform.write()返回false,上游進入暫停模式,不再繼續transform.write()

          所以,上面的transform中實際存儲了4個數據,ab在可讀端(經過了_transform的處理),cd在可寫端(還未經過_transform處理)。

          此時,由前面一節的分析可知,readable將緩存ef,之后便不再生產數據。

          這三個緩存加起來的長度恰好為6,所以一共就生產了6個數據。

          要想將26個數據全生產出來,有兩種做法。

          第一種是消耗transform中可讀端的緩存,以拉動上游的生產:

          readable.pipe(transform).pipe(process.stdout)

          第二種是,不要將數據存入可讀端中,這樣可讀端的緩存便會一直處于數據不足狀態,上游便會源源不斷地生產數據:

          const transform=stream.Transform({

          highWaterMark: 2,

          transform: function (buf, enc, next) {

          next()

          }

          })

          參考文獻

          • GitHub,substack/browserify-handbook

          • GitHub,zoubin/streamify-your-node-program

          閱讀更多技術類文章,請關注微信公眾號 “美團點評技術團隊”


          文轉自 “美團點評技術團隊” http://tech.meituan.com/stream-in-action.html

          背景

          前面兩篇(基礎篇和進階篇)主要介紹流的基本用法和原理,本篇從應用的角度,介紹如何使用管道進行程序設計,主要內容包括:

          1. 管道的概念

          2. Browserify的管道設計

          3. Gulp的管道設計

          4. 兩種管道設計模式比較

          5. 實例

          Pipeline

          所謂“管道”,指的是通過a.pipe(b)的形式連接起來的多個Stream對象的組合。

          假如現在有兩個Transformboldred,分別可將文本流中某些關鍵字加粗和飄紅。

          可以按下面的方式對文本同時加粗和飄紅:

          // source: 輸入流// dest: 輸出目的地source.pipe(bold).pipe(red).pipe(dest)

          bold.pipe(red)便可以看作一個管道,輸入流先后經過boldred的變換再輸出。

          但如果這種加粗且飄紅的功能的應用場景很廣,我們期望的使用方式是:

          // source: 輸入流// dest: 輸出目的地// pipeline: 加粗且飄紅source.pipe(pipeline).pipe(dest)

          此時,pipeline封裝了bold.pipe(red),從邏輯上來講,也稱其為管道。

          其實現可簡化為:

          var pipeline=new Duplex()var streams=pipeline._streams=[bold, red]// 底層寫邏輯:將數據寫入管道的第一個Stream,即boldpipeline._write=function (buf, enc, next) {

          streams[0].write(buf, enc, next)

          }// 底層讀邏輯:從管道的最后一個Stream(即red)中讀取數據pipeline._read=function () { var buf var reads=0

          var r=streams[streams.length - 1] // 將緩存讀空

          while ((buf=r.read()) !==null) {

          pipeline.push(buf)

          reads++

          } if (reads===0) { // 緩存本來為空,則等待新數據的到來

          r.once('readable', function () {

          pipeline._read()

          })

          }

          }// 將各個Stream組合起來(此處等同于`bold.pipe(red)`)streams.reduce(function (r, next) {

          r.pipe(next) return next

          })

          pipeline寫數據時,數據直接寫入bold,再流向red,最后從pipeline讀數據時再從red中讀出。

          如果需要在中間新加一個underline的Stream,可以:

          pipeline._streams.splice(1, 0, underline)

          bold.unpipe(red)

          bold.pipe(underline).pipe(red)

          如果要將red替換成green,可以:

          // 刪除redpipeline._streams.pop()

          bold.unpipe(red)// 添加greenpipeline._streams.push(green)

          bold.pipe(green)

          可見,這種管道的各個環節是可以修改的。

          stream-splicer對上述邏輯進行了進一步封裝,提供splicepushpop等方法,使得pipeline可以像數組那樣被修改:

          var splicer=require('stream-splicer')var pipeline=splicer([bold, red])// 在中間添加underlinepipeline.splice(1, 0, underline)// 刪除redpipeline.pop()// 添加greenpipeline.push(green)

          labeled-stream-splicer在此基礎上又添加了使用名字替代下標進行操作的功能:

          var splicer=require('labeled-stream-splicer')var pipeline=splicer([ 'bold', bold, 'red', red,

          ])// 在`red`前添加underlinepipeline.splice('red', 0, underline)// 刪除`bold`pipeline.splice('bold', 1)

          由于pipeline本身與其各個環節一樣,也是一個Stream對象,因此可以嵌套:

          var splicer=require('labeled-stream-splicer')var pipeline=splicer([ 'style', [ bold, red ], 'insert', [ comma ],

          ])

          pipeline.get('style') // 取得管道:[bold, red]

          .splice(1, 0, underline) // 添加underline

          Browserify

          Browserify的功能介紹可見substack/browserify-handbook,其核心邏輯的實現在于管道的設計:

          var splicer=require('labeled-stream-splicer')var pipeline=splicer.obj([ // 記錄輸入管道的數據,重建管道時直接將記錄的數據寫入。

          // 用于像watch時需要多次打包的情況

          'record', [ this._recorder() ], // 依賴解析,預處理

          'deps', [ this._mdeps ], // 處理JSON文件

          'json', [ this._json() ], // 刪除文件前面的BOM

          'unbom', [ this._unbom() ], // 刪除文件前面的`#!`行

          'unshebang', [ this._unshebang() ], // 語法檢查

          'syntax', [ this._syntax() ], // 排序,以確保打包結果的穩定性

          'sort', [ depsSort(dopts) ], // 對擁有同樣內容的模塊去重

          'dedupe', [ this._dedupe() ], // 將id從文件路徑轉換成數字,避免暴露系統路徑信息

          'label', [ this._label(opts) ], // 為每個模塊觸發一次dep事件

          'emit-deps', [ this._emitDeps() ], 'debug', [ this._debug(opts) ], // 將模塊打包

          'pack', [ this._bpack ], // 更多自定義的處理

          'wrap', [],

          ])

          每個模塊用row表示,定義如下:

          { // 模塊的唯一標識

          id: id, // 模塊對應的文件路徑

          file: '/path/to/file', // 模塊內容

          source: '', // 模塊的依賴

          deps: { // `require(expr)`

          expr: id,

          }

          }

          wrap階段前,所有的階段都處理這樣的對象流,且除pack外,都輸出這樣的流。

          有的補充row中的一些信息,有的則對這些信息做一些變換,有的只是讀取和輸出。

          一般row中的sourcedeps內容都是在deps階段解析出來的。

          下面提供一個修改Browserify管道的函數。

          var Transform=require('stream').Transform// 創建Transform對象function through(write, end) { return Transform({

          transform: write,

          flush: end,

          })

          }// `b`為Browserify實例// 該插件可打印出打包時間function log(b) { // watch時需要重新打包,整個pipeline會被重建,所以也要重新修改

          b.on('reset', reset) // 修改當前pipeline

          reset() function reset () { var time=null

          var bytes=0

          b.pipeline.get('record').on('end', function () { // 以record階段結束為起始時刻

          time=Date.now()

          }) // `wrap`是最后一個階段,在其后添加記錄結束時刻的Transform

          b.pipeline.get('wrap').push(through(write, end)) function write (buf, enc, next) { // 累計大小

          bytes +=buf.length this.push(buf)

          next()

          } function end () { // 打包時間

          var delta=Date.now() - time

          b.emit('time', delta)

          b.emit('bytes', bytes)

          b.emit('log', bytes + ' bytes written ('

          + (delta / 1000).toFixed(2) + ' seconds)'

          ) this.push(null)

          }

          }

          }var fs=require('fs')var browserify=require('browserify')var b=browserify(opts)// 應用插件b.plugin(log)

          b.bundle().pipe(fs.createWriteStream('bundle.js'))

          事實上,這里的b.plugin(log)就是直接執行了log(b)

          在插件中,可以修改b.pipeline中的任何一個環節。

          因此,Browserify本身只保留了必要的功能,其它都由插件去實現,如watchify、factor-bundle等。

          除了了上述的插件機制外,Browserify還有一套Transform機制,即通過b.transform(transform)可以新增一些文件內容預處理的Transform。

          預處理是發生在deps階段的,當模塊文件內容被讀出來時,會經過這些Transform處理,然后才做依賴解析,如babelify、envify。

          Gulp

          Gulp的核心邏輯分成兩塊:任務調度與文件處理。

          任務調度是基于orchestrator,而文件處理則是基于vinyl-fs。

          類似于Browserify提供的模塊定義(用row表示),vinyl-fs也提供了文件定義(vinyl對象)。

          Browserify的管道處理的是row流,Gulp管道處理vinyl流:

          gulp.task('scripts', ['clean'], function() { // Minify and copy all JavaScript (except vendor scripts)

          // with sourcemaps all the way down

          return gulp.src(paths.scripts)

          .pipe(sourcemaps.init())

          .pipe(coffee())

          .pipe(uglify())

          .pipe(concat('all.min.js'))

          .pipe(sourcemaps.write())

          .pipe(gulp.dest('build/js'));

          });

          任務中創建的管道起始于gulp.src,終止于gulp.dest,中間有若干其它的Transform(插件)。

          如果與Browserify的管道對比,可以發現Browserify是確定了一條具有完整功能的管道,而Gulp本身只提供了創建vinyl流和將vinyl流寫入磁盤的工具,管道中間經歷什么全由用戶決定。

          這是因為任務中做什么,是沒有任何限制的,文件處理也只是常見的情況,并非一定要用gulp.srcgulp.dest

          兩種模式比較

          Browserify與Gulp都借助管道的概念來實現插件機制。

          Browserify定義了模塊的數據結構,提供了默認的管道以處理這樣的數據流,而插件可用來修改管道結構,以定制處理行為。

          Gulp雖也定義了文件的數據結構,但只提供產生、消耗這種數據流的接口,完全由用戶通過插件去構造處理管道。

          當明確具體的處理需求時,可以像Browserify那樣,構造一個基本的處理管道,以提供插件機制。

          如果需要的是實現任意功能的管道,可以如Gulp那樣,只提供數據流的抽象。

          實例

          本節中實現一個針對Git倉庫自動生成changelog的工具,完整代碼見ezchangelog。

          ezchangelog的輸入為git log生成的文本流,輸出默認為markdown格式的文本流,但可以修改為任意的自定義格式。

          輸入示意:

          commit 9c5829ce45567bedccda9beb7f5de17574ea9437

          Author: zoubin <zoubin04@gmail.com>

          Date: Sat Nov 7 18:42:35 2015 +0800

          CHANGELOG

          commit 3bf9055b732cc23a9c14f295ff91f48aed5ef31a

          Author: zoubin <zoubin04@gmail.com>

          Date: Sat Nov 7 18:41:37 2015 +0800

          4.0.3

          commit 87abe8e12374079f73fc85c432604642059806ae

          Author: zoubin <zoubin04@gmail.com>

          Date: Sat Nov 7 18:41:32 2015 +0800

          fix readme

          add more tests

          輸出示意:

          * [[`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c)] CHANGELOG## [v4.0.3](https://github.com/zoubin/ezchangelog/commit/3bf9055) (2015-11-07)* [[`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e)] fix readme add more tests

          其實需要的是這樣一個pipeline

          source.pipe(pipeline).pipe(dest)

          可以分為兩個階段:

          • parse:從輸入文本流中解析出commit信息

          • format: 將commit流變換為文本流

          默認的情況下,要想得到示例中的markdown,需要解析出每個commit的sha1、日期、消息、是否為tag。

          定義commit的格式如下:

          {

          commit: { // commit sha1

          long: '3bf9055b732cc23a9c14f295ff91f48aed5ef31a',

          short: '3bf9055',

          },

          committer: { // commit date

          date: new Date('Sat Nov 7 18:41:37 2015 +0800'),

          }, // raw message lines

          messages: ['', ' 4.0.3', ''], // raw headers before the messages

          headers: [

          ['Author', 'zoubin <zoubin04@gmail.com>'],

          ['Date', 'Sat Nov 7 18:41:37 2015 +0800'],

          ], // the first non-empty message line

          subject: '4.0.3', // other message lines

          body: '', // git tag

          tag: 'v4.0.3', // link to the commit. opts.baseUrl should be specified.

          url: 'https://github.com/zoubin/ezchangelog/commit/3bf9055',

          }

          于是有:

          var splicer=require('labeled-stream-splicer')

          pipeline=splicer.obj([ 'parse', [ // 按行分隔

          'split', split(), // 生成commit對象,解析出sha1和日期

          'commit', commit(), // 解析出tag

          'tag', tag(), // 解析出url

          'url', url({ baseUrl: opts.baseUrl }),

          ], 'format', [ // 將commit組合成markdown文本

          'markdownify', markdownify(),

          ],

          ])

          至此,基本功能已經實現。

          現在將其封裝并提供插件機制。

          function Changelog(opts) {

          opts=opts || {} this._options=opts // 創建pipeline

          this.pipeline=splicer.obj([ 'parse', [ 'split', split(), 'commit', commit(), 'tag', tag(), 'url', url({ baseUrl: opts.baseUrl }),

          ], 'format', [ 'markdownify', markdownify(),

          ],

          ]) // 應用插件

          ;[].concat(opts.plugin).filter(Boolean).forEach(function (p) { this.plugin(p)

          }, this)

          }

          Changelog.prototype.plugin=function (p, opts) { if (Array.isArray(p)) {

          opts=p[1]

          p=p[0]

          } // 執行插件函數,修改pipeline

          p(this, opts) return this}

          上面的實現提供了兩種方式來應用插件。

          一種是通過配置傳入,另一種是創建實例后再調用plugin方法,本質一樣。

          為了使用方便,還可以簡單封裝一下。

          function changelog(opts) { return new Changelog(opts).pipeline

          }

          這樣,就可以如下方式使用:

          source.pipe(changelog()).pipe(dest)

          這個已經非常接近我們的預期了。

          現在來開發一個插件,修改默認的渲染方式。

          var through=require('through2')function customFormatter(c) { // c是`Changelog`實例

          // 添加解析author的transform

          c.pipeline.get('parse').push(through.obj(function (ci, enc, next) { // parse the author name from: 'zoubin <zoubin04@gmail.com>'

          ci.committer.author=ci.headers[0][1].split(/\s+/)[0]

          next(null, ci)

          })) // 替換原有的渲染

          c.pipeline.get('format').splice('markdownify', 1, through.obj(function (ci, enc, next) { var sha1=ci.commit.short

          sha1='[`' + sha1 + '`](' + c._options.baseUrl + sha1 + ')'

          var date=ci.committer.date.toISOString().slice(0, 10)

          next(null, '* ' + sha1 + ' ' + date + ' @' + ci.committer.author + '\n')

          }))

          }

          source

          .pipe(changelog({

          baseUrl: 'https://github.com/zoubin/ezchangelog/commit/',

          plugin: [customFormatter],

          }))

          .pipe(dest)

          同樣的輸入,輸出將會是:

          * [`9c5829c`](https://github.com/zoubin/ezchangelog/commit/9c5829c) 2015-11-07 @zoubin* [`3bf9055`](https://github.com/zoubin/ezchangelog/commit/3bf9055) 2015-11-07 @zoubin* [`87abe8e`](https://github.com/zoubin/ezchangelog/commit/87abe8e) 2015-11-07 @zoubin

          可以看出,通過創建可修改的管道,ezchangelog保持了本身邏輯的單一性,同時又提供了強大的自定義空間。

          參考文獻

          • GitHub,substack/browserify-handbook

          • GitHub,zoubin/streamify-your-node-program

          查看更多技術類文章,請關注微信公眾號:美團點評技術團隊。


          主站蜘蛛池模板: 一区二区三区四区精品视频 | 国产未成女一区二区三区| 一夲道无码人妻精品一区二区| 国产精品一区二区久久| 99久久精品国产高清一区二区| 精品国产乱子伦一区二区三区| 精品成人av一区二区三区| 韩国精品福利一区二区三区| 中文字幕无线码一区二区| 国产一区二区在线观看麻豆| 伊人久久一区二区三区无码| 中文字幕在线看视频一区二区三区| 综合久久一区二区三区| 合区精品久久久中文字幕一区 | 无码精品人妻一区二区三区免费| 精品国产一区二区三区无码| 乱码人妻一区二区三区| 国产在线精品一区二区在线观看 | 久久精品一区二区免费看| 免费一区二区视频| 国产免费私拍一区二区三区| 日韩精品国产一区| 日韩精品一区二区三区中文版| 亚洲熟女www一区二区三区| 亚洲乱码av中文一区二区| 国产一区二区三区影院| 国模私拍福利一区二区| 久久精品无码一区二区三区不卡| 亚洲AV日韩AV一区二区三曲| 精品国产一区二区三区www| 日本一区中文字幕日本一二三区视频| 少妇一晚三次一区二区三区| 成人精品一区久久久久| 亚洲性日韩精品国产一区二区| 精品久久久久中文字幕一区| 国偷自产av一区二区三区| 一区视频免费观看| 无码人妻精品一区二区三| 亚洲一区视频在线播放| 亚洲一区精品无码| 老熟女高潮一区二区三区|