整合營銷服務(wù)商

          電腦端+手機(jī)端+微信端=數(shù)據(jù)同步管理

          免費(fèi)咨詢熱線:

          Python3讀取文件指定行的三種方案

          • 技術(shù)背景
          • 行遍歷實(shí)現(xiàn)
          • linecache實(shí)現(xiàn)
          • 命令行sed獲取
          • 總結(jié)概要
          • 版權(quán)聲明

          技術(shù)背景

          考慮到深度學(xué)習(xí)領(lǐng)域中的數(shù)據(jù)規(guī)模一般都比較大,尤其是訓(xùn)練集,這個(gè)限制條件對應(yīng)到實(shí)際編程中就意味著,我們很有可能無法將整個(gè)數(shù)據(jù)文件的內(nèi)容全部都加載到內(nèi)存中。那么就需要一些特殊的處理方式,比如:創(chuàng)建內(nèi)存映射文件來替代原始文件被加載到內(nèi)存中、預(yù)處理數(shù)據(jù)后再加載內(nèi)存中以及單次只加載文件的片段。其中關(guān)于內(nèi)存映射技術(shù)的一些應(yīng)用,在前面的這2篇博客1和博客2中有所介紹,而本文將要介紹的是從文件中只讀取特定行的內(nèi)容的3種解決方案。

          行遍歷實(shí)現(xiàn)

          在python中如果要將一個(gè)文件完全加載到內(nèi)存中,通過file.readlines()即可,但是在文件占用較高時(shí),我們是無法完整的將文件加載到內(nèi)存中的,這時(shí)候就需要用到python的file.readline()進(jìn)行迭代式的逐行讀取:

          filename = 'hello.txt'
          
          with open(filename, 'r') as file:
              line = file.readline()
              counts = 1
              while line:
                  if counts >= 50000000:
                      break
                  line = file.readline()
                  counts += 1

          這里我們的實(shí)現(xiàn)方式是先用一個(gè)with語句打開一個(gè)文件,然后用readline()函數(shù)配合while循環(huán)逐行加載,最終通過一個(gè)序號(hào)標(biāo)記來結(jié)束循環(huán)遍歷,輸出文件第50000000行的內(nèi)容。該代碼的執(zhí)行效果如下:

          dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py 
          
          real    0m10.359s
          user    0m10.062s
          sys     0m0.296s

          linecache實(shí)現(xiàn)

          雖然在python的readline函數(shù)中并沒有實(shí)現(xiàn)讀取指定行內(nèi)容的方案,但是在另一個(gè)庫linecache中是實(shí)現(xiàn)了的,由于使用的方式較為簡單,這里直接放上代碼示例供參考:

          filename = 'hello.txt'
          
          import linecache
          text = linecache.getline(filename, 50000000)

          該代碼的執(zhí)行結(jié)果如下:

          dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py 
          
          real    0m11.904s
          user    0m5.672s
          sys     0m6.231s

          雖然在實(shí)現(xiàn)方式上簡化了許多,但是我們發(fā)現(xiàn)這個(gè)實(shí)現(xiàn)的用時(shí)超過了11s,還不如我們自己手動(dòng)實(shí)現(xiàn)的循環(huán)遍歷方案。因此如果是對于性能有一定要求的場景,是不建議采用這個(gè)方案的。

          命令行sed獲取

          我們知道用Linux系統(tǒng)本身自帶的sed指令也是可以獲取到文件指定行或者是指定行范圍的數(shù)據(jù)的,其執(zhí)行指令為:sed -n 50000000p filename即表示讀取文件的第50000000行的內(nèi)容。同時(shí)結(jié)合python的話,我們可以在python代碼中執(zhí)行系統(tǒng)指令并獲取輸出結(jié)果:

          filename = 'hello.txt'
          
          import os
          result = os.popen('sed -n {}p {}'.format(50000000, filename)).read()

          需要注意的是,如果直接運(yùn)行os.system()是沒有返回值的,只有os.popen()是有返回值的,并且需要在尾巴加上一個(gè)read()的選項(xiàng)。該代碼的執(zhí)行結(jié)果如下:

          dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py 
          
          real    0m2.532s
          user    0m0.032s
          sys     0m0.020s

          可以看到直接使用sed指令的執(zhí)行速度很快,但是用這種方法并不是一本萬利的,比如以下這個(gè)例子:

          filename = 'hello.txt'
          
          import os
          result = os.popen('sed -n {}p {}'.format(500, filename)).read()

          我們把讀取第50000000行內(nèi)容改為讀取第500行的內(nèi)容,再運(yùn)行一次程序:

          dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py 
          
          real    0m2.540s
          user    0m0.037s
          sys     0m0.013s

          然而我們發(fā)現(xiàn)這個(gè)速度并沒有因?yàn)橐x取的行數(shù)減少了而變少,而是幾乎保持不變的。

          總結(jié)概要

          本文通過4個(gè)測試案例分析了在python中讀取文件指定行內(nèi)容的方案,并得到了一些運(yùn)行耗時(shí)的數(shù)據(jù)。從需求上來說,如果是對于小規(guī)模的數(shù)據(jù),比如幾百行規(guī)模的數(shù)據(jù),建議使用readline循環(huán)遍歷來操作,速度也相當(dāng)不錯(cuò),或者是linecache中的函數(shù)實(shí)現(xiàn)也是可以的,甚至可以直接用readlines將整個(gè)文本內(nèi)容加載到內(nèi)存中。但是對于數(shù)據(jù)規(guī)模比較大的場景,比如超過了千萬行的級(jí)別,那么使用sed指令的方式對指定行內(nèi)容進(jìn)行讀取的方式,應(yīng)該是所有方式中最快速的。

          版權(quán)聲明

          本文首發(fā)鏈接為:https://www.cnblogs.com/dechinphy/p/lbl.html

          作者ID:DechinPhy

          更多原著文章請參考:https://www.cnblogs.com/dechinphy/

          打賞專用鏈接:https://www.cnblogs.com/dechinphy/gallery/image/379634.html

          騰訊云專欄同步:https://cloud.tencent.com/developer/column/91958

          頭條號(hào)同步:https://www.toutiao.com/c/user/token/MS4wLjABAAAA0XQA38KoijOGhh-GLoqYttlkilR7BRepm5pujqXuidY/?tab=all

          >微服務(wù)環(huán)境搭建


          本章中,我們將介紹旨在利用多核處理器的多線程 API 和構(gòu)造:

          • 并行 LINQ 或
          • 并行類
          • 構(gòu)造

          這些構(gòu)造統(tǒng)稱為(松散地)并行 (PFX)。并行類與任務(wù)并行性構(gòu)造一起稱為 (TPL)。

          在閱讀本章之前,您需要熟悉第 中的基礎(chǔ)知識(shí),尤其是鎖定、線程安全和 Task 類。

          注意

          .NET 提供了許多其他專用 API 來幫助進(jìn)行并行和異步編程:

          • System.Threading.Channels.Channel 是一個(gè)高性能異步生產(chǎn)者/使用者隊(duì)列,在 .NET Core 3 中引入。
          • (在System.Threading.Tasks.Dataflow命名空間中)是一個(gè)復(fù)雜的API,用于創(chuàng)建緩沖塊網(wǎng)絡(luò),這些并行執(zhí)行操作或數(shù)據(jù)轉(zhuǎn)換,類似于actor/agent編程。
          • 實(shí)現(xiàn)了 LINQ over IObservable(IAsyncEnumerable 的替代抽象),并且擅長組合異步流。反應(yīng)式擴(kuò)展包含在 NuGet 包中。

          為什么選擇PFX?

          在過去的 15 年里,CPU 制造商已經(jīng)從單核處理器轉(zhuǎn)向多核處理器。這對我們程序員來說是有問題的,因?yàn)閱尉€程代碼不會(huì)因?yàn)檫@些額外的內(nèi)核而自動(dòng)運(yùn)行得更快。

          對于大多數(shù)服務(wù)器應(yīng)用程序來說,使用多個(gè)內(nèi)核很容易,其中每個(gè)線程都可以獨(dú)立處理單獨(dú)的客戶端請求,但在桌面上則更加困難,因?yàn)樗ǔP枰@取計(jì)算密集型代碼并執(zhí)行以下操作:

          1. 將其小塊。
          2. 通過多線程并行執(zhí)行這些塊。
          3. 在結(jié)果可用時(shí),以線程安全和高性能的方式結(jié)果。

          盡管您可以使用經(jīng)典的多線程構(gòu)造完成所有這些操作,但它很尷尬,尤其是分區(qū)和整理的步驟。另一個(gè)問題是,當(dāng)許多線程同時(shí)處理相同的數(shù)據(jù)時(shí),通常的線程安全鎖定策略會(huì)導(dǎo)致大量爭用。

          PFX 庫專為幫助這些方案而設(shè)計(jì)。

          注意

          利用多核或多處理器的編程稱為。這是更廣泛的多線程概念的子集。

          PFX 概念

          在線程之間劃分工作有兩種策略:。

          當(dāng)必須對許多數(shù)據(jù)值執(zhí)行一組任務(wù)時(shí),我們可以通過讓每個(gè)線程對值子集執(zhí)行(相同的)任務(wù)集來并行化。這稱為數(shù)據(jù),因?yàn)槲覀冊诰€程之間對進(jìn)行分區(qū)。相反,使用任務(wù),我們對進(jìn)行分區(qū);換句話說,我們讓每個(gè)線程執(zhí)行不同的任務(wù)。

          通常,數(shù)據(jù)并行性更容易,并且可以更好地?cái)U(kuò)展到高度并行的硬件,因?yàn)樗梢詼p少或消除共享數(shù)據(jù)(從而減少爭用和線程安全問題)。此外,數(shù)據(jù)并行性利用了數(shù)據(jù)值通常比離散任務(wù)更多的事實(shí),從而增加了并行性的潛力。

          數(shù)據(jù)并行性也有利于結(jié)構(gòu)化并行,這意味著并行工作單元在程序中的同一位置開始和結(jié)束。相比之下,任務(wù)并行性往往是非結(jié)構(gòu)化的,這意味著并行工作單元可能在分散在程序中的位置開始和結(jié)束。結(jié)構(gòu)化并行性更簡單,更不容易出錯(cuò),允許您將分區(qū)和線程協(xié)調(diào)(甚至結(jié)果排序)的困難工作轉(zhuǎn)移到庫中。

          聚苯乙烯組件

          PFX 包含兩層功能,如圖 所示。較高層由兩個(gè) API 組成:PLINQ 和 Parallel 類。下層包含任務(wù)并行類,以及一組有助于并行編程活動(dòng)的附加構(gòu)造。



          PLINQ 提供了最豐富的功能:它自動(dòng)執(zhí)行并行化的所有步驟,包括將工作劃分為任務(wù)、在線程上執(zhí)行這些任務(wù)以及將結(jié)果整理到單個(gè)輸出序列中。它稱為聲明式,因?yàn)槟恍枰⑿谢墓ぷ鳎鷮⑵錁?gòu)造為 LINQ 查詢),并讓運(yùn)行時(shí)處理實(shí)現(xiàn)細(xì)節(jié)。相比之下,其他方法是的,因?yàn)槟枰@式編寫代碼來分區(qū)或整理。如以下概要所示,對于并行類,您必須自己整理結(jié)果;使用任務(wù)并行構(gòu)造,您還必須自己對工作進(jìn)行分區(qū):


          分區(qū)工作

          整理結(jié)果

          普林克

          是的

          是的

          并行類

          是的

          PFX 的任務(wù)并行性

          并發(fā)集合和旋轉(zhuǎn)基元可幫助您進(jìn)行較低級(jí)別的并行編程活動(dòng)。這些很重要,因?yàn)?PFX 不僅適用于當(dāng)今的硬件,還適用于具有更多內(nèi)核的未來幾代處理器。如果你想移動(dòng)一堆切碎的木頭,并且你有32名工人來完成這項(xiàng)工作,那么最大的挑戰(zhàn)是在工人互不妨礙的情況下移動(dòng)木材。這與在 32 個(gè)內(nèi)核之間劃分算法相同:如果使用普通鎖來保護(hù)公共資源,則由此產(chǎn)生的阻塞可能意味著這些內(nèi)核中只有一小部分實(shí)際上同時(shí)處于繁忙狀態(tài)。并發(fā)集合專門針對高并發(fā)訪問進(jìn)行了優(yōu)化,重點(diǎn)是最小化或消除阻塞。PLINQ 和 Parallel 類本身依賴于并發(fā)集合和旋轉(zhuǎn)基元來有效地管理工作。

          何時(shí)使用 PFX

          PFX 的主要用例是:利用多核處理器來加速計(jì)算密集型代碼。

          并行編程中的一個(gè)挑戰(zhàn)是阿姆達(dá)爾定律,該定律指出并行化的最大性能改進(jìn)由必須按順序執(zhí)行的代碼部分決定。例如,如果算法的執(zhí)行時(shí)間只有三分之二是可并行化的,則即使內(nèi)核數(shù)量無限,性能提升也永遠(yuǎn)不會(huì)超過三倍。

          PFX的其他用途

          并行編程結(jié)構(gòu)不僅可用于利用多核,還可用于其他方案:

          • 當(dāng)您需要線程安全的隊(duì)列、堆棧或字典時(shí),并發(fā)集合有時(shí)是合適的。
          • BlockingCollection 提供了一種實(shí)現(xiàn)生產(chǎn)者/使用者結(jié)構(gòu)的簡單方法,并且是并發(fā)性的好方法。
          • 任務(wù)是異步編程的基礎(chǔ),正如我們在第中看到的那樣。

          因此,在繼續(xù)之前,有必要驗(yàn)證瓶頸是否在可并行化代碼中。同樣值得考慮的是,計(jì)算密集型 - 優(yōu)化通常是最簡單、最有效的方法。但是,需要權(quán)衡的是,某些優(yōu)化技術(shù)可能會(huì)使并行化代碼變得更加困難。

          最簡單的收益來自所謂的問題 - 這是當(dāng)一個(gè)作業(yè)可以很容易地劃分為可以有效地自行執(zhí)行的任務(wù)時(shí)(結(jié)構(gòu)化并行性非常適合此類問題)。示例包括數(shù)學(xué)或密碼學(xué)中的許多圖像處理任務(wù)、光線追蹤和暴力破解方法。非尷尬并行問題的一個(gè)例子是實(shí)現(xiàn)快速排序算法的優(yōu)化版本 - 一個(gè)好的結(jié)果需要一些思考,并且可能需要非結(jié)構(gòu)化并行性。

          普林克

          PLINQ 會(huì)自動(dòng)并行處理本地 LINQ 查詢。PLINQ 的優(yōu)點(diǎn)是易于使用,因?yàn)樗鼘⒐ぷ鞣謪^(qū)和結(jié)果排序規(guī)則的負(fù)擔(dān)卸載到 .NET。

          若要使用 PLINQ,只需在輸入序列上調(diào)用 AsParallel(),然后像往常一樣繼續(xù) LINQ 查詢。以下查詢計(jì)算 3 到 100,000 之間的質(zhì)數(shù),充分利用目標(biāo)計(jì)算機(jī)上的所有內(nèi)核:

          // Calculate prime numbers using a simple (unoptimized) algorithm.
          
          IEnumerable<int> numbers = Enumerable.Range (3, 100000-3);
          
          var parallelQuery = 
            from n in numbers.AsParallel()
            where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
            select n;
          
          int[] primes = parallelQuery.ToArray();

          AsParallel 是 System.Linq.ParallelEnumerable 中的擴(kuò)展方法。它將輸入包裝在基于 ParallelQuery<TSource> 的序列中,這會(huì)導(dǎo)致隨后調(diào)用的 LINQ 查詢運(yùn)算符綁定到在 ParallelEnumerable 中定義的一組備用擴(kuò)展方法。它們提供了每個(gè)標(biāo)準(zhǔn)查詢運(yùn)算符的并行實(shí)現(xiàn)。本質(zhì)上,它們的工作原理是將輸入序列劃分為在不同線程上執(zhí)行的塊,將結(jié)果整理回單個(gè)輸出序列以供使用,如圖 所示。



          PLINQ 執(zhí)行模型

          調(diào)用 AsSequential() 將解開 ParallelQuery 序列,以便后續(xù)查詢運(yùn)算符綁定到標(biāo)準(zhǔn)查詢運(yùn)算符并按順序執(zhí)行。在調(diào)用具有副作用或不是線程安全的方法之前,這是必需的。

          對于接受兩個(gè)輸入序列(聯(lián)接、組聯(lián)、康卡特、并集、相交、except 和 Zip)的查詢運(yùn)算符,必須將 AsParallel() 應(yīng)用于兩個(gè)輸入序列(否則將引發(fā)異常)。但是,您不需要在查詢進(jìn)行時(shí)繼續(xù)將 AsParallel 應(yīng)用于查詢,因?yàn)?PLINQ 的查詢運(yùn)算符會(huì)輸出另一個(gè) ParallelQuery 序列。事實(shí)上,再次調(diào)用 AsParallel 會(huì)導(dǎo)致效率低下,因?yàn)樗鼤?huì)強(qiáng)制合并和重新分區(qū)查詢:

          mySequence.AsParallel()           // Wraps sequence in ParallelQuery<int>
                    .Where (n => n > 100)   // Outputs another ParallelQuery<int>
                    .AsParallel()           // Unnecessary - and inefficient!
                    .Select (n => n * n)

          并非所有查詢運(yùn)算符都可以有效地并行化。對于那些不能實(shí)現(xiàn)運(yùn)算符(請參閱 將按順序?qū)崿F(xiàn)運(yùn)算符。如果 PLINQ 懷疑并行化的開銷實(shí)際上會(huì)減慢特定查詢的速度,則 PLINQ 也可能按順序運(yùn)行。

          PLINQ 僅適用于本地集合:例如,它不適用于實(shí)體框架,因?yàn)樵谶@些情況下,LINQ 會(huì)轉(zhuǎn)換為 SQL,然后在數(shù)據(jù)庫服務(wù)器上執(zhí)行。但是, PLINQ 對從數(shù)據(jù)庫查詢獲取的結(jié)果集執(zhí)行其他本地查詢。

          注意

          如果 PLINQ 查詢引發(fā)異常,則會(huì)將其重新引發(fā)為 AggregateException,其 InnerExceptions 屬性包含真正的異常(或多個(gè)異常)。有關(guān)更多詳細(xì)信息,請參閱

          并行執(zhí)行彈道學(xué)

          與普通的 LINQ 查詢一樣,PLINQ 查詢是延遲計(jì)算的。這意味著只有在您開始使用結(jié)果時(shí)才會(huì)觸發(fā)執(zhí)行 — 通常通過 foreach 循環(huán)(盡管它也可以通過轉(zhuǎn)換運(yùn)算符(如 ToArray 或返回單個(gè)元素或值的運(yùn)算符)觸發(fā))。

          但是,在枚舉結(jié)果時(shí),執(zhí)行過程與普通順序查詢的執(zhí)行方式略有不同。順序查詢完全由使用者以“拉取”方式提供支持:輸入序列中的每個(gè)元素在使用者需要時(shí)準(zhǔn)確獲取。并行查詢通常使用獨(dú)立的線程從輸入序列中獲取元素,略于使用者需要它們的時(shí)間(更像新聞閱讀器的提詞器)。然后,它通過查詢鏈并行處理元素,將結(jié)果保存在一個(gè)小緩沖區(qū)中,以便它們?yōu)榘葱枋褂谜咦龊脺?zhǔn)備。如果使用者提前暫停或中斷枚舉,查詢處理器也會(huì)暫停或停止,以免浪費(fèi) CPU 時(shí)間或內(nèi)存。

          注意

          您可以通過在 AsParallel 之后調(diào)用 WithMergeOptions 來調(diào)整 PLINQ 的緩沖行為。自動(dòng)緩沖的默認(rèn)值通常可提供最佳的總體結(jié)果。NotBuffered 禁用緩沖區(qū),如果您希望盡快看到結(jié)果,它很有用;完全緩沖在將整個(gè)結(jié)果集呈現(xiàn)給使用者之前對其進(jìn)行緩存(OrderBy 和反向運(yùn)算符自然以這種方式工作,元素、聚合和轉(zhuǎn)換運(yùn)算符也是如此)。

          為什么 ASPARALLEL 不是默認(rèn)值?

          鑒于 AsParallel 透明地并行化 LINQ 查詢,問題就來了:為什么Microsoft不簡單地并行化標(biāo)準(zhǔn)查詢運(yùn)算符并將 PLINQ 設(shè)為默認(rèn)值?

          方法的原因有很多。首先,要使 PLINQ 有用,必須有合理數(shù)量的計(jì)算密集型工作才能將其分配給工作線程。大多數(shù) LINQ 到對象查詢的執(zhí)行速度都非常快;因此,不僅不需要并行化,而且分區(qū)、整理和協(xié)調(diào)額外線程的開銷實(shí)際上可能會(huì)減慢速度。

          此外:

          • 在元素排序方面,PLINQ 查詢的輸出(默認(rèn)情況下)可能與 LINQ 查詢不同(請參閱)。
          • PLINQ 將異常包裝在 AggregateException 中(以處理引發(fā)多個(gè)異常的可能性)。
          • 如果查詢調(diào)用線程不安全的方法,PLINQ 將給出不可靠的結(jié)果。

          最后,PLINQ 提供了相當(dāng)多的鉤子用于調(diào)整和調(diào)整。給標(biāo)準(zhǔn)的 LINQ to-Objects API 增加這些細(xì)微差別的負(fù)擔(dān)會(huì)增加分心。

          普林克和訂購

          并行化查詢運(yùn)算符的副作用是,在整理結(jié)果時(shí),結(jié)果的提交順序不一定相同(參見)。換句話說,LINQ 對序列的正常順序保留保證不再有效。

          如果需要順序保留,可以通過在 之后調(diào)用 AsOrdered() 來強(qiáng)制保留它:

          myCollection.AsParallel().AsOrdered()...

          調(diào)用 AsOrdered 會(huì)導(dǎo)致大量元素的性能下降,因?yàn)?PLINQ 必須跟蹤每個(gè)元素的原始位置。

          您可以通過調(diào)用 AsUnordered 來抵消查詢中稍后的 AsOrdered 的影響:這將引入一個(gè)“隨機(jī)隨機(jī)洗牌點(diǎn)”,它允許點(diǎn)開始更有效地執(zhí)行。因此,如果只想保留前兩個(gè)查詢運(yùn)算符的輸入序列順序,則可以執(zhí)行以下操作:

          inputSequence.AsParallel().AsOrdered()
            .QueryOperator1()
            .QueryOperator2()
            .AsUnordered()       // From here on, ordering doesn’t matter
            .QueryOperator3()
            ...

          AsOrdered 不是默認(rèn)值,因?yàn)閷τ诖蠖鄶?shù)查詢,原始輸入順序無關(guān)緊要。換句話說,如果 AsOrdered 是默認(rèn)值,則需要將 AsUnordered 應(yīng)用于大多數(shù)并行查詢以獲得最佳性能,這將很麻煩。

          普林克限制

          PLINQ 可以并行化的內(nèi)容存在實(shí)際限制。默認(rèn)情況下,以下查詢運(yùn)算符會(huì)阻止并行化,除非源元素位于其原始索引位置:

          Select 、SelectMany 和 ElementAt 的索引版本

          大多數(shù)查詢運(yùn)算符會(huì)更改元素的索引位置(包括刪除元素的運(yùn)算符,例如 Where )。這意味著,如果要使用前面的運(yùn)算符,它們通常需要位于查詢的開頭。

          以下查詢運(yùn)算符是可并行化的,但使用昂貴的分區(qū)策略,有時(shí)可能比順序處理慢:

          連接 、分組依據(jù) 、組連接 、非重復(fù) 、 并集 、 相交 和 除外

          聚合運(yùn)算符在其標(biāo)準(zhǔn)化身中的重載不可并行化 — PLINQ 提供了特殊的重載來處理此問題(請參閱)。

          所有其他運(yùn)算符都是可并行化的,盡管使用這些運(yùn)算符并不能保證查詢將被并行化。如果 PLINQ 懷疑并行化的開銷會(huì)減慢該特定查詢的速度,則 PLINQ 可能會(huì)按順序運(yùn)行查詢。您可以通過在 AsParallel() 之后調(diào)用以下內(nèi)容來覆蓋此行為并強(qiáng)制并行:

          .WithExecutionMode (ParallelExecutionMode.ForceParallelism)

          示例:并行拼寫檢查器

          假設(shè)我們要編寫一個(gè)拼寫檢查器,該拼寫檢查器利用所有可用內(nèi)核快速運(yùn)行非常大的文檔。通過將我們的算法表述為 LINQ 查詢,我們可以很容易地將其并行化。

          第一步是將英語單詞詞典下載到HashSet中,以便高效查找:

          if (!File.Exists ("WordLookup.txt")    // Contains about 150,000 words
            File.WriteAllText ("WordLookup.txt",
              await new HttpClient().GetStringAsync (
                "http://www.albahari.com/ispell/allwords.txt"));
          
          var wordLookup = new HashSet<string> (
            File.ReadAllLines ("WordLookup.txt"),
            StringComparer.InvariantCultureIgnoreCase);

          然后,我們使用單詞查找創(chuàng)建一個(gè)測試“文檔”,其中包含一百萬個(gè)隨機(jī)單詞的數(shù)組。構(gòu)建數(shù)組后,讓我們引入幾個(gè)拼寫錯(cuò)誤:

          var random = new Random();
          string[] wordList = wordLookup.ToArray();
          
          string[] wordsToTest = Enumerable.Range (0, 1000000)
            .Select (i => wordList [random.Next (0, wordList.Length)])
            .ToArray();
          
          wordsToTest [12345] = "woozsh";     // Introduce a couple
          wordsToTest [23456] = "wubsie";     // of spelling mistakes.

          現(xiàn)在我們可以通過測試單詞到測試來執(zhí)行并行拼寫檢查 wordLookup .PLINQ 使這變得非常簡單:

          var query = wordsToTest
            .AsParallel()
            .Select  ((word, index) => new IndexedWord { Word=word, Index=index })
            .Where   (iword => !wordLookup.Contains (iword.Word))
            .OrderBy (iword => iword.Index);
          
          foreach (var mistake in query)
            Console.WriteLine (mistake.Word + " - index = " + mistake.Index);
          
          // OUTPUT:
          // woozsh - index = 12345
          // wubsie - index = 23456

          索引字是一個(gè)自定義結(jié)構(gòu),我們定義如下:

          struct IndexedWord { public string Word; public int Index; }

          謂詞中的 wordLookup.Contains 方法為查詢提供了一些“肉”,使其值得并行化。

          注意

          我們可以通過使用匿名類型而不是 IndexedWord 結(jié)構(gòu)來稍微簡化查詢。但是,這會(huì)降低性能,因?yàn)槟涿愋停悾虼耸且妙愋停?huì)產(chǎn)生基于堆的分配和后續(xù)垃圾回收的成本。

          對于順序查詢,這種差異可能不足以影響,但對于并行查詢,支持基于堆棧的分配可能非常有利。這是因?yàn)榛诙褩5姆峙涫歉叨瓤刹⑿谢模ㄒ驗(yàn)槊總€(gè)線程都有自己的堆棧),而所有線程必須競爭同一個(gè)堆 - 由單個(gè)內(nèi)存管理器和垃圾回收器管理。

          使用 ThreadLocal<T>

          讓我們通過并行創(chuàng)建隨機(jī)測試詞列表本身來擴(kuò)展我們的示例。我們將其構(gòu)造為 LINQ 查詢,因此應(yīng)該很容易。這是順序版本:

          string[] wordsToTest = Enumerable.Range (0, 1000000)
            .Select (i => wordList [random.Next (0, wordList.Length)])
            .ToArray();

          不幸的是,調(diào)用隨機(jī)。Next 不是線程安全的,因此它不像將 AsParallel() 插入查詢那么簡單。一個(gè)可能的解決方案是編寫一個(gè)鎖定隨機(jī)的函數(shù)。下一個(gè);但是,這將限制并發(fā)性。更好的選擇是使用 ThreadLocal<Random>(參見中的創(chuàng)建一個(gè)單獨(dú)的 Random 對象。然后,我們可以并行化查詢,如下所示:

          var localRandom = new ThreadLocal<Random>
           ( () => new Random (Guid.NewGuid().GetHashCode()) );
          
          string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
            .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
            .ToArray();

          在用于實(shí)例化隨機(jī)對象的工廠函數(shù)中,我們傳入 Guid 的哈希碼,以確保如果在短時(shí)間內(nèi)創(chuàng)建兩個(gè)隨機(jī)對象,它們將產(chǎn)生不同的隨機(jī)數(shù)序列。

          功能純度

          由于 PLINQ 在并行線程上運(yùn)行查詢,因此必須注意不要執(zhí)行線程不安全的操作。特別是,寫入,因此線程不安全:

          // The following query multiplies each element by its position.
          // Given an input of Enumerable.Range(0,999), it should output squares.
          int i = 0;
          var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;

          我們可以通過使用鎖使增量 i 線程安全,但問題仍然存在,i 不一定對應(yīng)于輸入元素的位置。將 AsOrdered 添加到查詢中并不能解決后一個(gè)問題,因?yàn)?AsOrdered 只能確保元素的輸出順序與按順序處理的順序一致,而實(shí)際上它并不按順序它們。

          正確的解決方案是重寫我們的查詢以使用 Select 的索引版本:

          何時(shí)使用 PLINQ

          在現(xiàn)有應(yīng)用程序中搜索 LINQ 查詢并嘗試并行化它們很誘人。這通常是無效的,因?yàn)?LINQ 顯然是最佳解決方案的大多數(shù)問題往往執(zhí)行得非常快,因此無法從并行化中受益。更好的方法是查找 CPU 密集型瓶頸,然后考慮是否可以將其表示為 LINQ 查詢。(這種重組的一個(gè)受歡迎的副作用是 LINQ 通常使代碼更小且更具可讀性。

          PLINQ 非常適合令人尷尬的并行問題。然而,對于成像來說,這可能是一個(gè)糟糕的選擇,因?yàn)閷?shù)百萬像素整理成一個(gè)輸出序列會(huì)產(chǎn)生瓶頸。相反,最好將像素直接寫入數(shù)組或非托管內(nèi)存塊,并使用并行類或任務(wù)并行性來管理多線程。(但是,使用 ForAll 可以擊敗結(jié)果排序規(guī)則 — 我們在中對此進(jìn)行了討論。如果圖像處理算法自然適合 LINQ,則這樣做是有意義的。

          var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);

          為了獲得最佳性能,從查詢運(yùn)算符調(diào)用的任何方法都應(yīng)該是線程安全的,因?yàn)樗粚懭胱侄位驅(qū)傩裕ǚ歉弊饔没颍H绻鼈兺ㄟ^鎖定是線程安全的,則查詢的并行性潛力將受到持續(xù)時(shí)間除以在該函數(shù)中花費(fèi)的總時(shí)間的限制。

          設(shè)置并行度

          默認(rèn)情況下,PLINQ 為正在使用的處理器選擇最佳并行度。您可以通過在 AsParallel 之后調(diào)用 WithDegreeOfParallelism 來覆蓋它:

          ...AsParallel().WithDegreeOfPallelism(4)...

          當(dāng)并行度增加到超出核心計(jì)數(shù)時(shí),一個(gè)例子是 I/O 密集型工作(例如,一次下載多個(gè)網(wǎng)頁)。然而,任務(wù)組合器和異步函數(shù)提供了一種同樣簡單和的解決方案(參見中的)。與任務(wù) s 不同,PLINQ 無法在不阻塞線程(和線程,使情況更糟)的情況下執(zhí)行 I/O 綁定工作。

          更改并行度

          在 PLINQ 查詢中只能調(diào)用一次 WithDegreeOfParallelism。如果需要再次調(diào)用它,則必須通過在查詢中再次調(diào)用 AsParallel() 來強(qiáng)制合并和重新分區(qū)查詢:

          "The Quick Brown Fox"
            .AsParallel().WithDegreeOfParallelism (2)
            .Where (c => !char.IsWhiteSpace (c))
            .AsParallel().WithDegreeOfParallelism (3)   // Forces Merge + Partition
            .Select (c => char.ToUpper (c))

          取消

          取消 PLINQ 查詢(在 foreach 循環(huán)中使用其結(jié)果)很容易:只需脫離 foreach ,查詢將自動(dòng)取消,因?yàn)槊杜e器是隱式釋放的。

          對于以轉(zhuǎn)換、元素或聚合運(yùn)算符終止的查詢,可以通過取消令牌從另一個(gè)線程它(請參閱中的)。若要插入令牌,請?jiān)谡{(diào)用 AsParallel 后調(diào)用 WithCancel,傳入 CancelTokenSource 對象的 Token 屬性。然后,另一個(gè)線程可以在令牌源上調(diào)用 Cancel,這會(huì)在查詢的使用者上引發(fā) OperationCanceledException:

          IEnumerable<int> million = Enumerable.Range (3, 1000000);
          
          var cancelSource = new CancellationTokenSource();
          
          var primeNumberQuery = 
            from n in million.AsParallel().WithCancellation (cancelSource.Token)
            where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
            select n;
          
          new Thread (() => {
                              Thread.Sleep (100);      // Cancel query after
                              cancelSource.Cancel();   // 100 milliseconds.
                            }
                     ).Start();
          try 
          {
            // Start query running:
            int[] primes = primeNumberQuery.ToArray();
            // We'll never get here because the other thread will cancel us.
          }
          catch (OperationCanceledException)
          {
            Console.WriteLine ("Query canceled");
          }

          取消后,PLINQ 會(huì)等待每個(gè)工作線程完成其當(dāng)前元素,然后再結(jié)束查詢。這意味著查詢調(diào)用的任何外部方法都將運(yùn)行完成。

          優(yōu)化普林克

          輸出側(cè)優(yōu)化

          PLINQ 的優(yōu)勢之一是它可以方便地將并行化工作的結(jié)果整理到單個(gè)輸出序列中。但是,有時(shí),您最終對該序列所做的只是在每個(gè)元素上運(yùn)行一次某個(gè)函數(shù):

          foreach (int n in parallelQuery)
            DoSomething (n);

          如果是這種情況,并且您不關(guān)心元素的處理順序,則可以使用 PLINQ 的 ForAll 方法提高效率。

          ForAll 方法對 ParallelQuery 的每個(gè)輸出元素運(yùn)行一個(gè)委托。它直接掛接到 PLINQ 的內(nèi)部,繞過整理和枚舉結(jié)果的步驟。這里有一個(gè)簡單的例子:

          "abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);

          顯示了該過程。

          注意

          整理和枚舉結(jié)果并不是一項(xiàng)成本高昂的操作,因此當(dāng)有大量快速執(zhí)行的輸入時(shí),F(xiàn)orAll 優(yōu)化會(huì)產(chǎn)生最大的收益。



          輸入端優(yōu)化

          PLINQ 有三種分區(qū)策略,用于將輸入元素分配給線程:

          策略

          元素分配

          相對性能

          區(qū)塊分區(qū)

          動(dòng)態(tài)

          平均

          范圍分區(qū)

          靜態(tài)的

          從差到優(yōu)

          哈希分區(qū)

          靜態(tài)的

          對于需要比較元素( GroupBy 、 Join 、 GroupJoin 、 相交 、 except 、 并集 和 非重復(fù) )的查詢運(yùn)算符,您別無選擇:PLINQ 始終使用。哈希分區(qū)的效率相對較低,因?yàn)樗仨氼A(yù)先計(jì)算每個(gè)元素的哈希代碼(以便可以在同一線程上處理具有相同哈希代碼的元素)。如果發(fā)現(xiàn)這太慢,唯一的選擇是調(diào)用 AsSequential 以禁用并行化。

          對于所有其他查詢運(yùn)算符,您可以選擇是使用范圍分區(qū)還是塊分區(qū)。默認(rèn)情況下:

          • 如果輸入序列是(如果它是一個(gè)數(shù)組或?qū)崿F(xiàn) IList<T> ),PLINQ 選擇。
          • 否則,PLINQ 將選擇。

          簡而言之,對于長序列,范圍分區(qū)更快,每個(gè)元素都需要相似的 CPU 時(shí)間來處理。否則,塊分區(qū)通常更快。

          要強(qiáng)制范圍分區(qū):

          • 如果查詢以 Enumerable.Range 開頭,則將該方法替換為 ParallelEnumerable.Range 。
          • 否則,只需在輸入序列上調(diào)用 ToList 或 ToArray(顯然,這本身會(huì)產(chǎn)生性能成本,您應(yīng)該考慮到這一點(diǎn))。

          注意

          ParallelEnumerable.Range 不僅僅是調(diào)用 Enumerable.Range( ... ) 的快捷方式。作為并行() .它通過激活范圍分區(qū)來更改查詢的性能。

          要強(qiáng)制塊分區(qū),請將輸入序列包裝在對 Partitioner.Create(在 System.Collection.Concurrent 中)的調(diào)用中,如下所示:

          int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
          var parallelQuery =
            Partitioner.Create (numbers, true).AsParallel()
            .Where (...)

          Partitioner.Create 的第二個(gè)參數(shù)表示您希望對查詢,這是您希望塊分區(qū)的另一種說法。

          塊分區(qū)的工作原理是讓每個(gè)工作線程定期從輸入序列中抓取小的元素“塊”進(jìn)行處理(參見)。PLINQ 首先分配非常小的塊(一次一個(gè)或兩個(gè)元素)。然后,它會(huì)隨著查詢的進(jìn)行增加塊大小:這可確保小序列有效地并行化,并且大序列不會(huì)導(dǎo)致過多的往返。如果一個(gè)工人碰巧得到了“簡單”的元素(這個(gè)過程很快),它最終會(huì)得到更多的塊。該系統(tǒng)使每個(gè)線程保持同樣繁忙(并且內(nèi)核“平衡”);唯一的缺點(diǎn)是從共享輸入序列中獲取元素需要同步(通常是獨(dú)占鎖),這可能會(huì)導(dǎo)致一些開銷和爭用。



          塊分區(qū)與范圍分區(qū)

          范圍分區(qū)繞過正常的輸入端枚舉,并為每個(gè)工作線程預(yù)分配相同數(shù)量的元素,從而避免對輸入序列進(jìn)行爭用。但是,如果某些線程碰巧獲得簡單的元素并提前完成,則它們將閑置,而其余線程將繼續(xù)工作。我們早期的素?cái)?shù)計(jì)算器在范圍分區(qū)方面可能表現(xiàn)不佳。當(dāng)范圍分區(qū)可以很好地工作時(shí),計(jì)算前 10 萬個(gè)整數(shù)的平方根之和的一個(gè)例子:

          ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i))

          ParallelEnumerable.Range 返回一個(gè) ParallelQuery<T> ,因此您隨后不需要調(diào)用 AsParallel 。

          注意

          范圍分區(qū)不一定在塊中分配元素范圍,而是選擇“條帶化”策略。例如,如果有兩個(gè)工作器,則一個(gè)工作器可能處理奇數(shù)元素,而另一個(gè)工作器處理偶數(shù)元素。TakeWhile 運(yùn)算符幾乎肯定會(huì)觸發(fā)條帶化策略,以避免在序列的后期不必要地處理元素。

          優(yōu)化自定義聚合

          PLINQ 可以有效地并行化 Sum、平均值、最小值和最大值運(yùn)算符,無需額外干預(yù)。但是,聚合運(yùn)算符給 PLINQ 帶來了特殊的挑戰(zhàn)。如所述,聚合執(zhí)行自定義聚合。例如,下面對數(shù)字序列求和,模仿 Sum 運(yùn)算符:

          int[] numbers = { 1, 2, 3 };
          int sum = numbers.Aggregate (0, (total, n) => total + n);   // 6

          我們還在第 中看到,對于聚合,提供的委托必須是關(guān)聯(lián)和交換的。如果違反此規(guī)則,PLINQ 將給出不正確的結(jié)果,因?yàn)樗鼜妮斎胄蛄兄刑崛《鄠€(gè)種子,以便同時(shí)聚合序列的分區(qū)。

          顯式種子聚合似乎是 PLINQ 的安全選項(xiàng),但不幸的是,由于依賴于單個(gè)種子,這些聚合通常按順序執(zhí)行。為了緩解此問題,PLINQ 提供了另一個(gè)聚合重載,允許您指定多個(gè)種子,或者更確切地說,指定。對于每個(gè)線程,它執(zhí)行此函數(shù)以生成一個(gè)單獨(dú)的種子,該種子成為累加器,它在本地聚合元素。

          您還必須提供一個(gè)函數(shù)來指示如何組合本地和主累加器。最后,此聚合重載(有點(diǎn)無償)期望委托對結(jié)果執(zhí)行任何最終轉(zhuǎn)換(您可以通過之后自己對結(jié)果運(yùn)行某些函數(shù)來輕松實(shí)現(xiàn)此目的)。因此,以下是四個(gè)代表,按通過順序排列:

          種子工廠

          返回新的本地累加器

          更新累加器功能

          將元素聚合到本地累加器

          組合蓄能器功能

          將本地蓄能器與主蓄能器相結(jié)合

          結(jié)果選擇器

          對最終結(jié)果應(yīng)用任何最終轉(zhuǎn)換

          注意

          在簡單方案中,可以指定種子工廠。當(dāng)種子是要更改的引用類型時(shí),此策略將失敗,因?yàn)槊總€(gè)線程將共享相同的實(shí)例。

          舉一個(gè)非常簡單的例子,下面對數(shù)字?jǐn)?shù)組中的值求和:

          numbers.AsParallel().Aggregate (
           () => 0,                                      // seedFactory
            (localTotal, n) => localTotal + n,           // updateAccumulatorFunc
            (mainTot, localTot) => mainTot + localTot,   // combineAccumulatorFunc
            finalResult => finalResult)                  // resultSelector

          這個(gè)例子是人為的,因?yàn)槲覀兛梢允褂酶唵蔚姆椒ǎɡ缥床シN聚合,或者更好的 Sum 運(yùn)算符)同樣有效地獲得相同的答案。舉一個(gè)更現(xiàn)實(shí)的例子,假設(shè)我們要計(jì)算給定字符串中英文字母表中每個(gè)字母的頻率。一個(gè)簡單的順序解決方案可能如下所示:

          string text = "Let’s suppose this is a really long string";
          var letterFrequencies = new int[26];
          foreach (char c in text)
          {
            int index = char.ToUpper (c) - 'A';
            if (index >= 0 && index < 26) letterFrequencies [index]++;
          };

          注意

          輸入文本可能很長的一個(gè)例子是基因測序。然后,“字母表”將由字母,,和組成。

          為了并行化這一點(diǎn),我們可以將 foreach 語句替換為對 Parallel.ForEach 的調(diào)用(我們將在下一節(jié)中介紹),但這將讓我們處理共享陣列上的并發(fā)問題。鎖定訪問該陣列幾乎會(huì)扼殺并行化的潛力。

          聚合提供了一個(gè)整潔的解決方案。在這種情況下,累加器是一個(gè)數(shù)組,就像我們前面示例中的 letterFrequency 數(shù)組一樣。下面是使用聚合的順序版本:

          int[] result =
            text.Aggregate (
              new int[26],                // Create the "accumulator"
              (letterFrequencies, c) =>   // Aggregate a letter into the accumulator
              {
                int index = char.ToUpper (c) - 'A';
                if (index >= 0 && index < 26) letterFrequencies [index]++;
                return letterFrequencies;
              });

          現(xiàn)在并行版本,使用 PLINQ 的特殊重載:

          int[] result =
            text.AsParallel().Aggregate (
             () => new int[26],             // Create a new local accumulator
          
              (localFrequencies, c) =>       // Aggregate into the local accumulator
              {
                int index = char.ToUpper (c) - 'A';
                if (index >= 0 && index < 26) localFrequencies [index]++;
                return localFrequencies;
              },
                                             // Aggregate local->main accumulator
              (mainFreq, localFreq) =>
                mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),
          
              finalResult => finalResult     // Perform any final transformation
            );                               // on the end result.

          請注意,局部累積函數(shù) localFrequency 數(shù)組。這種執(zhí)行此優(yōu)化的能力非常重要,并且是合法的,因?yàn)?localFrequency 是每個(gè)線程的本地。

          平行類

          PFX 通過并行類中的三種靜態(tài)方法提供結(jié)構(gòu)化并行的基本形式:

          Parallel.Invoke

          并行執(zhí)行委托數(shù)組

          Parallel.For

          執(zhí)行 C# for 循環(huán)的并行等效項(xiàng)

          Parallel.ForEach

          執(zhí)行 C# foreach 循環(huán)的并行等效項(xiàng)

          所有三種方法都會(huì)阻止,直到所有工作完成。與 PLINQ 一樣,在發(fā)生未經(jīng)處理的異常后,剩余的工作線程將在當(dāng)前迭代后停止,并將異常(或多個(gè)異常)拋回調(diào)用方 — 包裝在 AggregateException 中(請參閱)。

          Parallel.Invoke

          Parallel.Invoke 并行執(zhí)行一組 Action 委托,然后等待它們完成。該方法的最簡單版本定義如下:

          public static void Invoke (params Action[] actions);

          與 PLINQ 一樣,并行 .* 方法針對計(jì)算密集型工作而非 I/O 密集型工作進(jìn)行了優(yōu)化。但是,一次下載兩個(gè)網(wǎng)頁提供了一種演示 Parallel.Invoke 的簡單方法:

          Parallel.Invoke (
           () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
           () => new WebClient().DownloadFile ("http://microsoft.com", "ms.html"));

          從表面上看,這似乎是創(chuàng)建和等待兩個(gè)線程綁定 Task 對象的便捷快捷方式。但有一個(gè)重要的區(qū)別:如果你傳入一個(gè)包含一百萬個(gè)委托的數(shù)組,Parallel.Invoke 仍然有效地工作。這是因?yàn)樗鼘⒋罅吭貫榉峙浣o少數(shù)基礎(chǔ)任務(wù)的批次,而不是為每個(gè)委托創(chuàng)建單獨(dú)的任務(wù)。

          與 Parallel 的所有方法一樣,在整理結(jié)果時(shí),您只能靠自己。這意味著您需要牢記線程安全性。例如,以下內(nèi)容是線程不安全的:

          var data = new List<string>();
          Parallel.Invoke (
           () => data.Add (new WebClient().DownloadString ("http://www.foo.com")),
           () => data.Add (new WebClient().DownloadString ("http://www.far.com")));

          鎖定添加到列表可以解決此問題,但如果具有更大的快速執(zhí)行委托數(shù)組,則鎖定會(huì)產(chǎn)生瓶頸。更好的解決方案是使用線程安全集合,我們將在后面的部分中介紹 - 將是理想的選擇。

          Parallel.Invoke 也被重載以接受 ParallelOptions 對象:

          public static void Invoke (ParallelOptions options,
                                     params Action[] actions);

          使用 ParallelOptions ,您可以插入取消令牌、限制最大并發(fā)性以及指定自定義任務(wù)計(jì)劃程序。當(dāng)您執(zhí)行的任務(wù)(大約)多于核心時(shí),取消令牌是相關(guān)的:取消后,任何未啟動(dòng)的委托都將被放棄。但是,任何已經(jīng)執(zhí)行任務(wù)的代表將繼續(xù)完成。有關(guān)如何使用取消令牌的示例,請參閱

          Parallel.For 和 Parallel.ForEach

          Parallel.For 和 Parallel.ForEach 執(zhí)行等效的 C# for 和 foreach 循環(huán),但每次迭代并行執(zhí)行,而不是按順序執(zhí)行。以下是他們的(最簡單的)簽名:

          public static ParallelLoopResult For (
            int fromInclusive, int toExclusive, Action<int> body)
          
          public static ParallelLoopResult ForEach<TSource> (
            IEnumerable<TSource> source, Action<TSource> body)

          此順序 for 循環(huán)

          for (int i = 0; i < 100; i++)
            Foo (i);

          像這樣并行化

          Parallel.For (0, 100, i => Foo (i));

          或者更簡單地說:

          Parallel.For (0, 100, Foo);

          而這個(gè)順序的 foreach

          foreach (char c in "Hello, world")
            Foo (c);

          像這樣并行化:

          Parallel.ForEach ("Hello, world", Foo);

          舉一個(gè)實(shí)際的例子,如果我們導(dǎo)入 System.Security.Cryptography 命名空間,我們可以并行生成六個(gè)公鑰/私鑰對字符串,如下所示:

          var keyPairs = new string[6];
          
          Parallel.For (0, keyPairs.Length,
                        i => keyPairs[i] = RSA.Create().ToXmlString (true));

          與 Parallel.Invoke 一樣,我們可以為 Parallel.For 和 Parallel.ForEach 提供大量的工作項(xiàng),它們將被有效地劃分為幾個(gè)任務(wù)。

          注意

          后一個(gè)查詢也可以使用 PLINQ 完成:

          string[] keyPairs =
            ParallelEnumerable.Range (0, 6)
            .Select (i => RSA.Create().ToXmlString (true))
            .ToArray();

          外環(huán)與內(nèi)環(huán)

          Parallel.For 和 Parallel.ForEach 通常在外部循環(huán)而不是內(nèi)部循環(huán)上效果最好。這是因?yàn)閷τ谇罢撸鷮⑻峁└蟮墓ぷ鲏K來并行化,從而稀釋了管理開銷。通常不需要并行化內(nèi)部和外部循環(huán)。在以下示例中,我們通常需要 100 多個(gè)內(nèi)核才能從內(nèi)部并行化中受益:

          Parallel.For (0, 100, i =>
          {
            Parallel.For (0, 50, j => Foo (i, j));   // Sequential would be better
          });                                        // for the inner loop.

          Indexed Parallel.ForEach

          有時(shí),了解循環(huán)迭代索引很有用。使用順序 foreach ,很容易:

          int i = 0;
          foreach (char c in "Hello, world")
            Console.WriteLine (c.ToString() + i++);

          但是,在并行上下文中,遞增共享變量不是線程安全的。您必須改用以下版本的 ForEach :

          public static ParallelLoopResult ForEach<TSource> (
            IEnumerable<TSource> source, Action<TSource,ParallelLoopState,long> body)

          我們將忽略 ParallelLoopState(我們將在下一節(jié)中介紹)。現(xiàn)在,我們對 Action 的第三個(gè)類型參數(shù) long 感興趣,它表示循環(huán)索引:

          Parallel.ForEach ("Hello, world", (c, state, i) =>
          {
             Console.WriteLine (c.ToString() + i);
          });

          為了將其置于實(shí)際上下文中,讓我們重新審視一下我們使用 PLINQ 編寫的拼寫檢查器。以下代碼加載一個(gè)字典以及一個(gè)包含一百萬個(gè)單詞的數(shù)組進(jìn)行測試:

          if (!File.Exists ("WordLookup.txt"))    // Contains about 150,000 words
            new WebClient().DownloadFile (
              "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
          
          var wordLookup = new HashSet<string> (
            File.ReadAllLines ("WordLookup.txt"),
            StringComparer.InvariantCultureIgnoreCase);
          
          var random = new Random();
          string[] wordList = wordLookup.ToArray();
          
          string[] wordsToTest = Enumerable.Range (0, 1000000)
            .Select (i => wordList [random.Next (0, wordList.Length)])
            .ToArray();
          
          wordsToTest [12345] = "woozsh";     // Introduce a couple
          wordsToTest [23456] = "wubsie";     // of spelling mistakes.

          我們可以使用 Parallel.ForEach 的索引版本對我們的 wordsToTest 數(shù)組執(zhí)行拼寫檢查,如下所示:

          var misspellings = new ConcurrentBag<Tuple<int,string>>();
          
          Parallel.ForEach (wordsToTest, (word, state, i) =>
          {
            if (!wordLookup.Contains (word))
              misspellings.Add (Tuple.Create ((int) i, word));
          });

          請注意,我們必須將結(jié)果整理到線程安全集合中:與使用 PLINQ 相比,必須這樣做是缺點(diǎn)。與 PLINQ 相比,它的優(yōu)勢在于我們避免了應(yīng)用索引選擇查詢運(yùn)算符的成本,而索引 Select 查詢運(yùn)算符的效率低于索引的 ForEach。

          ParallelLoopState:盡早脫離循環(huán)

          由于并行 For 或 ForEach 中的循環(huán)主體是委托,因此不能使用 break 語句提前退出循環(huán)。相反,您必須在 ParallelLoopState 對象上調(diào)用 Break 或 Stop:

          public class ParallelLoopState
          {
            public void Break();
            public void Stop();
          
            public bool IsExceptional { get; }
            public bool IsStopped { get; }
            public long? LowestBreakIteration { get; }
            public bool ShouldExitCurrentIteration { get; }
          }

          獲取 ParallelLoopState 很容易:所有版本的 For 和 ForEach 都被重載以接受 Action<TSource,ParallelLoopState> 類型的循環(huán)體。所以,為了并行化這個(gè)

          foreach (char c in "Hello, world")
            if (c == ',')
              break;
            else
              Console.Write (c);

          這樣做:

          Parallel.ForEach ("Hello, world", (c, loopState) =>
          {
            if (c == ',')
              loopState.Break();
            else
              Console.Write (c);
          });
          
          // OUTPUT: Hlloe

          從輸出中可以看出,循環(huán)主體可以按隨機(jī)順序完成。除了這種差異之外,調(diào)用 Break 至少會(huì)產(chǎn)生與按順序執(zhí)行循環(huán)相同的元素:此示例將始終至少按某種 、、l、 和 。 相反,調(diào)用 Stop 而不是 Break 會(huì)強(qiáng)制所有線程在其當(dāng)前迭代后立即完成。在我們的示例中,如果另一個(gè)線程滯后,調(diào)用 Stop 可以為我們提供字母 、、l、 和 的子集。 當(dāng)您找到要查找的內(nèi)容時(shí),或者當(dāng)出現(xiàn)問題并且您不會(huì)查看結(jié)果時(shí),呼叫 Stop 非常有用。

          注意

          Parallel.For 和 Parallel.ForEach 方法返回一個(gè) ParallelLoopResult 對象,該對象公開名為 IsDone 和 LowestBreakIteration 的屬性。這些告訴您循環(huán)是否運(yùn)行完成;如果沒有,則指示循環(huán)在哪個(gè)周期中斷。

          如果 LowestBreakIteration 返回 null,則表示您在循環(huán)中調(diào)用了 Stop(而不是 Break)。

          如果循環(huán)體很長,則可能希望其他線程在方法主體中途中斷,以防早期中斷或停止。可以通過在代碼中的不同位置輪詢 ShouldExitCurrentIteration 屬性來執(zhí)行此操作;此屬性在停止后立即變?yōu)?true,或者在中斷后不久變?yōu)?true。

          注意

          ShouldExitCurrentIteration 在取消請求后也會(huì)變?yōu)?true,或者在循環(huán)中拋出異常。

          IsExceptional 可讓您知道另一個(gè)線程上是否發(fā)生了異常。任何未處理的異常都將導(dǎo)致循環(huán)在每個(gè)線程的當(dāng)前迭代后停止:若要避免這種情況,必須在代碼中顯式處理異常。

          使用局部值進(jìn)行優(yōu)化

          Parallel.For 和 Parallel.ForEach 都提供了一組重載,這些重載具有一個(gè)名為 TLocal 的泛型類型參數(shù)。這些重載旨在幫助您使用迭代密集型循環(huán)優(yōu)化數(shù)據(jù)排序規(guī)則。最簡單的是這樣的:

          public static ParallelLoopResult For <TLocal> (
            int fromInclusive,
            int toExclusive,
            Func <TLocal> localInit,
            Func <int, ParallelLoopState, TLocal, TLocal> body,
            Action <TLocal> localFinally);

          在實(shí)踐中很少需要這些方法,因?yàn)樗鼈兊哪繕?biāo)場景主要由 PLINQ 覆蓋(這是幸運(yùn)的,因?yàn)檫@些重載有點(diǎn)嚇人!

          從本質(zhì)上講,問題是這樣的:假設(shè)我們要對數(shù)字 1 到 10,000,000 的平方根求和。計(jì)算 10 萬個(gè)平方根很容易并行化,但對它們的值求和很麻煩,因?yàn)槲覀儽仨氭i定更新總數(shù):

          object locker = new object();
          double total = 0;
          Parallel.For (1, 10000000,
                        i => { lock (locker) total += Math.Sqrt (i); });

          并行化的收益被獲得 10 萬個(gè)鎖的成本以及由此產(chǎn)生的阻塞所抵消。

          然而,現(xiàn)實(shí)情況是,我們實(shí)際上不需要10萬把鎖。想象一下,一隊(duì)志愿者撿起大量垃圾。如果所有工人共用一個(gè)垃圾桶,那么旅行和爭用將使該過程效率極低。顯而易見的解決方案是讓每個(gè)工人都有一個(gè)私人或“本地”垃圾桶,偶爾會(huì)將其倒入主垃圾箱。

          For 和 ForEach 的 TLocal 版本正是以這種方式工作的。志愿者是內(nèi)部工作線程,本地垃圾桶。要使執(zhí)行此作業(yè),必須向其提供兩個(gè)指示的附加委托:

          1. 如何初始化新的本地值
          2. 如何將本地聚合與主值組合

          此外,而不是返回 void 的主體委托,它應(yīng)該返回本地值的新聚合。下面是重構(gòu)的示例:

          object locker = new object();
          double grandTotal = 0;
          
          Parallel.For (1, 10000000,
          
            () => 0.0,                        // Initialize the local value.
          
            (i, state, localTotal) =>         // Body delegate. Notice that it
               localTotal + Math.Sqrt (i),    // returns the new local total.
          
            localTotal =>                                    // Add the local value
              { lock (locker) grandTotal += localTotal; }    // to the master value.
          );

          我們?nèi)匀槐仨氭i定,但只能將局部值聚合到總計(jì)。這使得該過程大大提高了效率。

          注意

          如前所述,PLINQ 通常非常適合這些場景。我們的示例可以像這樣與 PLINQ 并行化:

          ParallelEnumerable.Range (1, 10000000)
                            .Sum (i => Math.Sqrt (i))

          (請注意,我們使用 ParallelEnumerable 來強(qiáng)制:在這種情況下,這可以提高性能,因?yàn)樗袛?shù)字的處理時(shí)間都相同。

          在更復(fù)雜的方案中,可以使用 LINQ 的聚合運(yùn)算符而不是 Sum 。如果您提供了本地種子工廠,則情況有點(diǎn)類似于使用 Parallel.For 提供本地值函數(shù)。

          任務(wù)并行性

          是使用 PFX 并行化的最低級(jí)別方法。用于在此級(jí)別工作的類在 System.Threading.Tasks 命名空間中定義,包括以下內(nèi)容:

          目的

          任務(wù)

          用于管理工作單元

          任務(wù)<TResult>

          用于管理具有返回值的工作單位

          任務(wù)工廠

          用于創(chuàng)建任務(wù)

          TaskFactory<TResult>

          用于創(chuàng)建具有相同返回類型的任務(wù)和延續(xù)

          任務(wù)計(jì)劃程序

          用于管理任務(wù)計(jì)劃

          任務(wù)完成源

          用于手動(dòng)控制任務(wù)的工作流

          我們在中介紹了任務(wù)的基礎(chǔ)知識(shí);在本節(jié)中,我們將介紹針對并行編程的任務(wù)的高級(jí)功能:

          • 調(diào)整任務(wù)的計(jì)劃
          • 當(dāng)一個(gè)任務(wù)從另一個(gè)任務(wù)啟動(dòng)時(shí)建立父/子關(guān)系
          • 延續(xù)的高級(jí)使用
          • 任務(wù)工廠

          注意

          任務(wù)并行庫允許您以最小的開銷創(chuàng)建數(shù)百(甚至數(shù)千)個(gè)任務(wù)。但是,如果要?jiǎng)?chuàng)建數(shù)百萬個(gè)任務(wù),則需要將這些任務(wù)劃分為更大的工作單元以保持效率。并行類和 PLINQ 會(huì)自動(dòng)執(zhí)行此操作。

          注意

          Visual Studio 提供了一個(gè)用于監(jiān)視任務(wù)(調(diào)試→窗口→并行任務(wù))的窗口。這等效于“線程”窗口,但用于任務(wù)。“并行堆棧”窗口還具有用于任務(wù)的特殊模式。

          創(chuàng)建和啟動(dòng)任務(wù)

          如所述,Task.Run創(chuàng)建并啟動(dòng)一個(gè)任務(wù)或任務(wù)<> 。此方法實(shí)際上是調(diào)用 Task.Factory.StartNew 的快捷方式,它通過額外的重載提供了更大的靈活性。

          指定狀態(tài)對象

          Task.Factory.StartNew 允許您指定傳遞給目標(biāo)然后,目標(biāo)方法的簽名必須包含單個(gè)對象類型參數(shù):

          var task = Task.Factory.StartNew (Greet, "Hello");
          task.Wait();  // Wait for task to complete.
          
          void Greet (object state) { Console.Write (state); }   // Hello

          這避免了執(zhí)行調(diào)用 Greet 的 lambda 表達(dá)式所需的閉包成本。這是一個(gè)微優(yōu)化,在實(shí)踐中很少需要,因此我們可以更好地利用對象,即為任務(wù)分配一個(gè)有意義的名稱。然后,我們可以使用 AsyncState 屬性來查詢其名稱:

          var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting");
          Console.WriteLine (task.AsyncState);   // Greeting
          task.Wait();
          
          void Greet (string message) { Console.Write (message); }

          注意

          Visual Studio 在“并行任務(wù)”窗口中顯示每個(gè)任務(wù)的 AsyncState,因此在此處使用有意義的名稱可以大大簡化調(diào)試。

          任務(wù)創(chuàng)建選項(xiàng)

          您可以通過在調(diào)用 StartNew(或?qū)嵗蝿?wù))時(shí)指定 TaskCreationOptions 枚舉來調(diào)整任務(wù)的執(zhí)行。TaskCreationOptions 是一個(gè)具有以下(可組合)值的標(biāo)志枚舉:

          LongRunning, PreferFairness, AttachedToParent

          LongRun 建議調(diào)度程序?qū)⒁粋€(gè)線程專用于任務(wù),正如我們中所述,這對于 I/O 綁定任務(wù)和長時(shí)間運(yùn)行的任務(wù)是有益的,否則這些任務(wù)可能會(huì)迫使短期運(yùn)行的任務(wù)在調(diào)度之前等待不合理的時(shí)間。

          PreferFairness 指示調(diào)度程序嘗試確保按啟動(dòng)順序調(diào)度任務(wù)。它通常可能會(huì)這樣做,因?yàn)樗褂帽镜毓ぷ鞲`取隊(duì)列在內(nèi)部優(yōu)化任務(wù)調(diào)度 — 這種優(yōu)化允許創(chuàng)建任務(wù),而不會(huì)產(chǎn)生單個(gè)工作隊(duì)列產(chǎn)生的爭用開銷。通過指定“附加到父項(xiàng)”來創(chuàng)建子任務(wù)。

          子任務(wù)

          當(dāng)一個(gè)任務(wù)啟動(dòng)另一個(gè)任務(wù)時(shí),您可以選擇建立父子:

          Task parent = Task.Factory.StartNew (() =>
          {
            Console.WriteLine ("I am a parent");
          
            Task.Factory.StartNew (() =>        // Detached task
            {
              Console.WriteLine ("I am detached");
            });
          
            Task.Factory.StartNew (() =>        // Child task
            {
              Console.WriteLine ("I am a child");
            }, TaskCreationOptions.AttachedToParent);
          });

          子任務(wù)的特殊之處在于,當(dāng)您等待任務(wù)完成時(shí),它也會(huì)等待任何子任務(wù)。此時(shí),任何子異常都會(huì)冒泡:

          TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
          var parent = Task.Factory.StartNew (() => 
          {
            Task.Factory.StartNew (() =>   // Child
            {
              Task.Factory.StartNew (() => { throw null; }, atp);   // Grandchild
            }, atp);
          });
          
          // The following call throws a NullReferenceException (wrapped
          // in nested AggregateExceptions):
          parent.Wait();

          當(dāng)子任務(wù)是延續(xù)時(shí),這可能特別有用,您很快就會(huì)看到。

          等待多個(gè)任務(wù)

          我們在中看到,您可以通過調(diào)用其 Wait 方法或訪問其 Result 屬性(如果它是 Task<TResult> )來等待單個(gè)任務(wù)。您還可以通過靜態(tài)方法 Task.WaitAll(等待所有指定任務(wù)完成)和 Task.WaitAny(僅等待一個(gè)任務(wù)完成)一次等待多個(gè)任務(wù)。

          WaitAll 類似于依次等待每個(gè)任務(wù),但效率更高,因?yàn)樗ㄗ疃啵┲恍枰粋€(gè)上下文切換。此外,如果一個(gè)或多個(gè)任務(wù)引發(fā)未經(jīng)處理的異常,WaitAll 仍會(huì)等待每個(gè)任務(wù)。然后,它會(huì)重新拋出一個(gè) AggregateException,該異常累積每個(gè)錯(cuò)誤任務(wù)的異常(這是 AggregateException 真正有用的地方)。這相當(dāng)于這樣做:

          // Assume t1, t2 and t3 are tasks:
          var exceptions = new List<Exception>();
          try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
          try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
          try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
          if (exceptions.Count > 0) throw new AggregateException (exceptions);

          調(diào)用 WaitAny 等效于等待 ManualResetEventSlim,該任務(wù)在完成時(shí)由每個(gè)任務(wù)發(fā)出信號(hào)。

          除了超時(shí)之外,您還可以將傳遞給 Wait 方法:這允許您取消等待,。

          取消任務(wù)

          您可以選擇在啟動(dòng)任務(wù)時(shí)傳入取消令牌。然后,如果通過該令牌取消,任務(wù)本身將進(jìn)入“已取消”狀態(tài):

          var cts = new CancellationTokenSource();
          CancellationToken token = cts.Token;
          cts.CancelAfter (500);
          
          Task task = Task.Factory.StartNew (() => 
          {
            Thread.Sleep (1000);
            token.ThrowIfCancellationRequested();  // Check for cancellation request
          }, token);
          
          try { task.Wait(); }
          catch (AggregateException ex)
          {
            Console.WriteLine (ex.InnerException is TaskCanceledException);  // True
            Console.WriteLine (task.IsCanceled);                             // True
            Console.WriteLine (task.Status);                             // Canceled
          }

          TaskCanceledException 是 OperationCanceledException 的一個(gè)子類。如果要顯式拋出 OperationCanceledException(而不是調(diào)用令牌。ThrowIfCancelRequest),您必須將取消令牌傳遞到 OperationCanceledException 的構(gòu)造函數(shù)中。如果不這樣做,任務(wù)將不會(huì)以 TaskStatus.Canceled 狀態(tài)結(jié)束,也不會(huì)觸發(fā) OnlyOnCanceled 。

          如果任務(wù)在啟動(dòng)之前被取消,則不會(huì)計(jì)劃它 - 操作已取消異常將立即拋給任務(wù)。

          由于取消令牌由其他 API 識(shí)別,因此您可以將它們傳遞到其他構(gòu)造中,取消將無縫傳播:

          var cancelSource = new CancellationTokenSource();
          CancellationToken token = cancelSource.Token;
          
          Task task = Task.Factory.StartNew (() =>
          {
            // Pass our cancellation token into a PLINQ query:
            var query = someSequence.AsParallel().WithCancellation (token)...
            ... enumerate query ...
          });

          在此示例中調(diào)用 Cancel on cancelSource 將取消 PLINQ 查詢,該查詢將在任務(wù)正文上引發(fā) OperationCanceledException,然后取消任務(wù)。

          注意

          可以傳遞到“等待”和“取消和等待”等方法中的取消令牌允許您取消操作,而不是取消任務(wù)本身。

          延續(xù)

          方法在任務(wù)結(jié)束后立即執(zhí)行委托:

          Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
          Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));

          一旦任務(wù) 1(任務(wù))完成、失敗或取消,任務(wù) 2()就會(huì)啟動(dòng)。(如果任務(wù) 1 在第二行代碼運(yùn)行之前已完成,則任務(wù) 2 將計(jì)劃立即執(zhí)行。傳遞給延續(xù)的 lambda 表達(dá)式的 ant 參數(shù)是對先驗(yàn)任務(wù)的引用。ContinueWith 本身會(huì)返回一個(gè)任務(wù),以便輕松添加進(jìn)一步的延續(xù)。

          默認(rèn)情況下,前置任務(wù)和延續(xù)任務(wù)可以在不同的線程上執(zhí)行。您可以通過指定 TaskContinuationOptions.ExecuteSync 在調(diào)用 ContinueWith 時(shí)強(qiáng)制它們在同一線程上執(zhí)行:這可以通過減少間接性來提高非常細(xì)粒度的延續(xù)的性能。

          延續(xù)和任務(wù)<任務(wù)>

          就像普通任務(wù)一樣,延續(xù)可以是 Task<TResult> 類型并返回?cái)?shù)據(jù)。在下面的示例中,我們使用一系列鏈?zhǔn)饺蝿?wù)計(jì)算 Math.Sqrt(8*2),然后寫出結(jié)果:

          Task.Factory.StartNew<int> (() => 8)
            .ContinueWith (ant => ant.Result * 2)
            .ContinueWith (ant => Math.Sqrt (ant.Result))
            .ContinueWith (ant => Console.WriteLine (ant.Result));   // 4

          為了簡單起見,我們的例子有些做作;在現(xiàn)實(shí)生活中,這些 lambda 表達(dá)式會(huì)調(diào)用計(jì)算密集型函數(shù)。

          延續(xù)和例外

          延續(xù)可以通過查詢前置任務(wù)的 Exception 屬性來了解前置任務(wù)是否出錯(cuò),或者只是通過調(diào)用 Result / Wait 并捕獲生成的 AggregateException。如果先前的錯(cuò)誤,而延續(xù)也不存在,則異常被視為,并且靜態(tài) TaskScheduler.UnobservedTaskException 事件在稍后對任務(wù)進(jìn)行垃圾回收時(shí)觸發(fā)。

          安全的模式是重新引發(fā)先前的異常。只要延續(xù)是 Wait edon,異常就會(huì)傳播并重新拋出到 Waiter:

          Task continuation = Task.Factory.StartNew     (()  => { throw null; })
                                          .ContinueWith (ant =>
            {
              ant.Wait();
              // Continue processing...
            });
          
          continuation.Wait();    // Exception is now thrown back to caller.

          處理異常的另一種方法是為特殊與非異常結(jié)果指定不同的延續(xù)。這是通過任務(wù)延續(xù)選項(xiàng)完成的:

          Task task1 = Task.Factory.StartNew (() => { throw null; });
          
          Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),
                                           TaskContinuationOptions.OnlyOnFaulted);
          
          Task ok = task1.ContinueWith (ant => Console.Write ("Success!"),
                                        TaskContinuationOptions.NotOnFaulted);

          此模式在與子任務(wù)結(jié)合使用時(shí)特別有用,您很快就會(huì)看到。

          以下擴(kuò)展方法“吞噬”任務(wù)的未處理異常:

          public static void IgnoreExceptions (this Task task)
          {
            task.ContinueWith (t => { var ignore = t.Exception; },
              TaskContinuationOptions.OnlyOnFaulted);
          }

          (這可以通過添加代碼來記錄異常來改進(jìn)。以下是它的使用方式:

          Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions();

          延續(xù)和子任務(wù)

          延續(xù)的一個(gè)強(qiáng)大功能是,它們僅在所有子任務(wù)完成時(shí)啟動(dòng)(參見)。此時(shí),子項(xiàng)引發(fā)的任何異常都將封送到延續(xù)中。

          在下面的示例中,我們啟動(dòng)三個(gè)子任務(wù),每個(gè)子任務(wù)拋出一個(gè) NullReferenceException 。然后,我們通過父級(jí)的延續(xù)一舉捕獲所有這些:

          TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
          Task.Factory.StartNew (() =>
          {
            Task.Factory.StartNew (() => { throw null; }, atp);
            Task.Factory.StartNew (() => { throw null; }, atp);
            Task.Factory.StartNew (() => { throw null; }, atp);
          })
          .ContinueWith (p => Console.WriteLine (p.Exception),
                              TaskContinuationOptions.OnlyOnFaulted);


          有條件的延續(xù)

          默認(rèn)情況下,無論前置任務(wù)完成、引發(fā)異常還是取消,都會(huì)無地安排延續(xù)。您可以通過 TaskContinuationOptions 枚舉中包含的一組(可組合的)標(biāo)志來更改此行為。以下是控制條件延續(xù)的三個(gè)核心標(biāo)志:

          NotOnRanToCompletion = 0x10000,
          NotOnFaulted = 0x20000,
          NotOnCanceled = 0x40000,

          這些標(biāo)志是減法的,因?yàn)槟鷳?yīng)用的越多,執(zhí)行延續(xù)的可能性就越小。為方便起見,還有以下預(yù)組合值:

          OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled,
          OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled,
          OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted

          (組合所有 Not* 標(biāo)志 [ NotOnRanToCompletion, NotOnFaulted , NotOnCanceled ] 是荒謬的,因?yàn)樗鼤?huì)導(dǎo)致延續(xù)總是。

          “RanToComplete”表示前置成功,沒有取消或未處理的異常。

          “錯(cuò)誤”表示在前置項(xiàng)上拋出未處理的異常。

          “已取消”是指以下兩種情況之一:

          • 前因已通過其取消令牌取消。換句話說,在前置項(xiàng)上拋出了一個(gè) OperationCanceledException,其 CancelToken 屬性與啟動(dòng)時(shí)傳遞給前置項(xiàng)的屬性匹配。
          • 前置項(xiàng)被隱式取消,不滿足條件延續(xù)謂詞。

          必須掌握的是,當(dāng)延續(xù)沒有通過這些標(biāo)志執(zhí)行時(shí),延續(xù)不會(huì)被遺忘或放棄 - 它會(huì)被取消。這意味著延續(xù)本身的任何延續(xù),除非您使用 NotOnCanceled 謂詞它們。例如,考慮一下:

          Task t1 = Task.Factory.StartNew (...);
          
          Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),
                                        TaskContinuationOptions.OnlyOnFaulted);
          
          Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));

          就目前而言,t3 將始終被調(diào)度 — 即使 t1 沒有引發(fā)異常(參見)。這是因?yàn)槿绻?t1 成功,故障任務(wù)將被取消,并且對 t3 沒有繼續(xù)限制,t3 將無條件執(zhí)行。


          如果我們希望 t3 僅在錯(cuò)誤實(shí)際運(yùn)行時(shí)執(zhí)行,我們必須這樣做:

          Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"),
                                        TaskContinuationOptions.NotOnCanceled);

          (或者,我們可以指定 OnlyOnRanToCompletion;不同之處在于,如果在錯(cuò)誤中拋出異常,t3 將不會(huì)執(zhí)行。

          具有多個(gè)先行的延續(xù)

          可以使用 TaskFactory 類中的 ContinueWhenAll 和 ContinueWhenAny 方法,根據(jù)多個(gè)前因的完成情況來安排繼續(xù)執(zhí)行。然而,隨著(WhenAll和WhenAny)中討論的任務(wù)組合器的引入,這些方法變得多余。具體來說,給定以下任務(wù)

          var task1 = Task.Run (() => Console.Write ("X"));
          var task2 = Task.Run (() => Console.Write ("Y"));

          我們可以安排在兩者都完成后執(zhí)行的延續(xù),如下所示:

          var continuation = Task.Factory.ContinueWhenAll (
            new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));

          下面是與 WhenAll 任務(wù)組合器相同的結(jié)果:

          var continuation = Task.WhenAll (task1, task2)
                                 .ContinueWith (ant => Console.WriteLine ("Done"));

          單個(gè)先驗(yàn)的多個(gè)延續(xù)

          對同一任務(wù)多次調(diào)用 ContinueWith 會(huì)在單個(gè)前因上創(chuàng)建多個(gè)延續(xù)。當(dāng)前置完成時(shí),所有延續(xù)將一起開始(除非您指定 TaskContinuationOptions.ExecuteSyncly ,在這種情況下,延續(xù)將按順序執(zhí)行)。

          以下內(nèi)容等待一秒鐘,然后寫入 XY 或 YX:

          var t = Task.Factory.StartNew (() => Thread.Sleep (1000));
          t.ContinueWith (ant => Console.Write ("X"));
          t.ContinueWith (ant => Console.Write ("Y"));

          任務(wù)計(jì)劃程序

          任務(wù)計(jì)劃程序?qū)⑷蝿?wù)分配給線程,并由抽象的任務(wù)類表示。 .NET 提供了兩個(gè)具體的實(shí)現(xiàn):與 CLR 線程池協(xié)同工作的默認(rèn)計(jì)劃程序和。 后者(主要)旨在幫助您使用 WPF 和 Windows 窗體的線程模型,該模型要求用戶界面元素和控件只能從創(chuàng)建它們的線程訪問(請參見中的)。通過捕獲它,我們可以指示任務(wù)或延續(xù)在此上下文中執(zhí)行:

          // Suppose we are on a UI thread in a Windows Forms / WPF application:
          _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();

          假設(shè) Foo 是一個(gè)返回字符串的計(jì)算綁定方法,并且 lblResult 是一個(gè) WPF 或 Windows 窗體標(biāo)簽,那么我們可以在操作完成后安全地更新標(biāo)簽,如下所示:

          Task.Run (() => Foo())
            .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);

          當(dāng)然,C# 的異步函數(shù)更常用于這種事情。

          也可以編寫我們自己的任務(wù)調(diào)度程序(通過子類化 任務(wù)調(diào)度程序 ),盡管這只有在非常專業(yè)的情況下才會(huì)這樣做。對于自定義計(jì)劃,您更常使用 TaskCompletionSource。

          任務(wù)工廠

          當(dāng)您調(diào)用 Task.Factory 時(shí),您將在 Task 上調(diào)用返回默認(rèn) TaskFactory 對象的靜態(tài)屬性。任務(wù)工廠的目的是創(chuàng)建任務(wù);具體來說,有三種任務(wù):

          • “普通”任務(wù)(通過StartNew)
          • 具有多個(gè)先行的延續(xù)(通過 ContinueWhenAll 和 ContinueWhenAny )
          • 包裝已失效 APM 后的方法的任務(wù)(通過 FromAsync;請參閱中的)。

          創(chuàng)建任務(wù)的另一種方法是實(shí)例化任務(wù)并調(diào)用 Start 。但是,這允許您僅創(chuàng)建“普通”任務(wù),而不創(chuàng)建延續(xù)。

          創(chuàng)建自己的任務(wù)工廠

          TaskFactory 不是一個(gè)工廠:您實(shí)際上可以實(shí)例化類,當(dāng)您想要使用相同(非標(biāo)準(zhǔn))值為 TaskCreationOptions、TaskContinuationOptions 或 TaskScheduler 重復(fù)創(chuàng)建任務(wù)時(shí),這很有用。例如,如果我們想重復(fù)創(chuàng)建長時(shí)間運(yùn)行的任務(wù),我們可以創(chuàng)建一個(gè)自定義工廠,如下所示:

          var factory = new TaskFactory (
            TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,
            TaskContinuationOptions.None);

          然后,只需在工廠調(diào)用 StartNew,即可創(chuàng)建任務(wù):

          Task task1 = factory.StartNew (Method1);
          Task task2 = factory.StartNew (Method2);
          ...

          自定義繼續(xù)選項(xiàng)在調(diào)用 ContinueWhenAll 和 ContinueWhenAny 時(shí)應(yīng)用。

          使用 AggregateException

          正如我們所看到的,PLINQ、并行類和任務(wù)會(huì)自動(dòng)將異常封送到使用者。若要了解為什么這是必不可少的,請考慮以下 LINQ 查詢,該查詢在第一次迭代時(shí)引發(fā) DivideByZeroException:

          try
          {
            var query = from i in Enumerable.Range (0, 1000000)
                        select 100 / i;
            ...
          }
          catch (DivideByZeroException)
          {
            ...
          }

          如果我們要求 PLINQ 并行化此查詢,并且它忽略了異常的處理,則可能會(huì)在上拋出 DivideByZeroException,從而繞過我們的 catch 塊并導(dǎo)致應(yīng)用程序死亡。

          因此,會(huì)自動(dòng)捕獲異常并將其重新拋出給調(diào)用方。但不幸的是,這并不像捕獲DivideByZeroException那么簡單。由于這些庫使用許多線程,因此實(shí)際上可以同時(shí)引發(fā)兩個(gè)或多個(gè)異常。因此,為了確保報(bào)告所有異常,異常將包裝在 AggregateException 容器中,該容器公開一個(gè)包含每個(gè)捕獲異常的 InnerExceptions 屬性:

          try
          {
            var query = from i in ParallelEnumerable.Range (0, 1000000)
                        select 100 / i;
            // Enumerate query
            ...
          }
          catch (AggregateException aex)
          {
            foreach (Exception ex in aex.InnerExceptions)
              Console.WriteLine (ex.Message);
          }

          注意

          PLINQ 和 Parallel 類在遇到第一個(gè)異常時(shí)都會(huì)結(jié)束查詢或循環(huán)執(zhí)行,方法是不處理任何其他元素或循環(huán)體。但是,在當(dāng)前周期完成之前,可能會(huì)引發(fā)更多異常。AggregateException 中的第一個(gè)異常在 InnerException 屬性中可見。

          展平和處理

          AggregateException 類提供了幾種方法來簡化異常處理:Flatten 和 Handle 。

          扁平 化

          AggregateExceptions 通常會(huì)包含其他 AggregateExceptions。發(fā)生這種情況的一個(gè)示例是子任務(wù)引發(fā)異常。您可以通過調(diào)用 Flatten 來消除任何級(jí)別的嵌套以簡化處理。此方法返回一個(gè)新的 AggregateException,其中包含內(nèi)部異常的簡單平面列表:

          catch (AggregateException aex)
          {
            foreach (Exception ex in aex.Flatten().InnerExceptions)
              myLogWriter.LogException (ex);
          }

          處理

          有時(shí),僅捕獲特定的異常類型并重新引發(fā)其他類型的異常類型很有用。AggregateException 上的 Handle 方法提供了執(zhí)行此操作的快捷方式。它接受一個(gè)異常謂詞,它在每個(gè)內(nèi)部上運(yùn)行:

          public void Handle (Func<Exception, bool> predicate)

          如果謂詞返回 true,則認(rèn)為該異常“已處理”。委托運(yùn)行每個(gè)異常后,將發(fā)生以下情況:

          • 如果所有異常都“處理”(委托返回 true),則不會(huì)重新引發(fā)異常。
          • 如果存在委托返回 false 的任何異常(“未處理”),則會(huì)構(gòu)建包含這些異常的新 AggregateException,并重新引發(fā)。

          例如,以下內(nèi)容最終會(huì)重新拋出另一個(gè)包含單個(gè) NullReferenceException 的 AggregateException:

          var parent = Task.Factory.StartNew (() => 
          {
            // We’ll throw 3 exceptions at once using 3 child tasks:
          
            int[] numbers = { 0 };
          
            var childFactory = new TaskFactory
             (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
          
            childFactory.StartNew (() => 5 / numbers[0]);   // Division by zero
            childFactory.StartNew (() => numbers [1]);      // Index out of range
            childFactory.StartNew (() => { throw null; });  // Null reference
          });
          
          try { parent.Wait(); }
          catch (AggregateException aex)
          {
            aex.Flatten().Handle (ex =>   // Note that we still need to call Flatten
            {
              if (ex is DivideByZeroException)
              {
                Console.WriteLine ("Divide by zero");
                return true;                           // This exception is "handled"
              }
              if (ex is IndexOutOfRangeException)
              {
                Console.WriteLine ("Index out of range");
                return true;                           // This exception is "handled"   
              }
              return false;    // All other exceptions will get rethrown
            });
          }

          并發(fā)集合

          .NET 在 System.Collections.Concurrent 中提供線程安全集合:

          并發(fā)收集

          非并發(fā)等效項(xiàng)

          ConcurrentStack<T>

          堆棧<T>

          ConcurrentQueue<T>

          隊(duì)列<T>

          ConcurrentBag<T>

          (無)

          并發(fā)詞典<啦,電視>

          詞典<啦,電視>

          并發(fā)集合針對高并發(fā)場景進(jìn)行了優(yōu)化;但是,每當(dāng)需要線程安全集合(作為鎖定普通集合的替代方法)時(shí),它們也很有用。不過,有一些注意事項(xiàng):

          • 傳統(tǒng)集合在除高并發(fā)方案之外的所有方案中都優(yōu)于并發(fā)集合。
          • 線程安全集合不能保證使用它的代碼是線程安全的(參見中的)。
          • 如果在另一個(gè)線程修改并發(fā)集合時(shí)枚舉該集合,則不會(huì)引發(fā)異常,而是會(huì)混合使用新。
          • 沒有 List<T 的并發(fā)版本> 。
          • 并發(fā)堆棧、隊(duì)列和包類在內(nèi)部通過鏈表實(shí)現(xiàn)。這使得它們的內(nèi)存效率低于非并發(fā)堆棧和隊(duì)列類,但更適合并發(fā)訪問,因?yàn)殒湵碛欣跓o鎖或低鎖實(shí)現(xiàn)。(這是因?yàn)閷⒐?jié)點(diǎn)插入鏈表只需要更新幾個(gè)引用,而將元素插入類似 List<T> 的結(jié)構(gòu)可能需要移動(dòng)數(shù)千個(gè)現(xiàn)有元素。

          換句話說,這些集合不僅僅是使用帶鎖的普通集合的快捷方式。為了演示,如果我們在線程上執(zhí)行以下代碼

          var d = new ConcurrentDictionary<int,int>();
          for (int i = 0; i < 1000000; i++) d[i] = 123;

          它的運(yùn)行速度比這慢三倍:

          var d = new Dictionary<int,int>();
          for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123;

          (但是 ConcurrentDictionary 讀取速度很快,因?yàn)樽x取是。

          并發(fā)集合與傳統(tǒng)集合的不同之處在于,它們公開了執(zhí)行原子測試和操作操作的特殊方法,例如 TryPop 。這些方法中的大多數(shù)都是通過IProducerConsumerCollection<T>統(tǒng)一的。

          IProducerConsumerCollection<T>

          生產(chǎn)者/使用者集合是兩個(gè)主要用例的集合:

          • 添加元素(“生產(chǎn)”)
          • 在刪除元素時(shí)檢索元素(“消耗”)

          典型的示例是堆棧和隊(duì)列。生產(chǎn)者/使用者集合在并行編程中非常重要,因?yàn)樗鼈冇欣诟咝У臒o鎖實(shí)現(xiàn)。

          IProducerConsumerCollection<T> 接口表示線程安全的集合。以下類實(shí)現(xiàn)此接口:

          ConcurrentStack<T>
          ConcurrentQueue<T>
          ConcurrentBag<T>

          IProducerConsumerCollection<T>擴(kuò)展了ICollection,增加了以下方法:

          void CopyTo (T[] array, int index);
          T[] ToArray();
          bool TryAdd (T item);
          bool TryTake (out T item);

          TryAdd 和 TryTake 方法測試是否可以執(zhí)行添加/刪除操作;如果是這樣,他們將執(zhí)行添加/刪除。測試和操作以原子方式執(zhí)行,無需像在傳統(tǒng)集合周圍那樣鎖定:

          int result;
          lock (myStack) if (myStack.Count > 0) result = myStack.Pop();

          如果集合為空,則 TryTake 返回 false。TryAdd 始終成功,并在提供的三個(gè)實(shí)現(xiàn)中返回 true。但是,如果您編寫了自己的并發(fā)集合,該集合禁止重復(fù),則如果元素已存在,則會(huì)使 TryAdd 返回 false(例如,如果您編寫了并發(fā))。

          TryTake 刪除的特定元素由子類定義:

          • 對于堆棧,TryTake 會(huì)刪除最近添加的元素。
          • 對于隊(duì)列,TryTake 會(huì)刪除最近添加最少的元素。
          • 使用袋子,TryTake 可以最有效地去除任何元素。

          這三個(gè)具體的類大多顯式地實(shí)現(xiàn) TryTake 和 TryAdd 方法,通過更具體命名的公共方法(如 TryDequeue 和 TryPop)公開相同的功能。

          ConcurrentBag<T>

          ConcurrentBag<T> 存儲(chǔ)對象的集合(允許重復(fù))。ConcurrentBag<T> 適用于在調(diào)用 Take 或 TryTake 時(shí)您哪個(gè)元素的情況。

          ConcurrentBag<T> 相對于并發(fā)隊(duì)列或堆棧的好處是,當(dāng)同時(shí)被多個(gè)線程調(diào)用時(shí),包的 Add 用。相反,在隊(duì)列或堆棧上并行調(diào)用 Add 會(huì)導(dǎo)致爭用(盡管比鎖定集合要少得多)。在并發(fā)包上調(diào)用 Take 也非常有效——只要每個(gè)線程占用的元素不超過它添加的元素。

          在并發(fā)包中,每個(gè)線程都有自己的私有鏈表。元素被添加到屬于調(diào)用 Add 的線程的私有列表中,消除了。枚舉包時(shí),枚舉器遍歷每個(gè)線程的私有列表,依次生成其每個(gè)元素。

          當(dāng)您調(diào)用 Take 時(shí),包首先查看當(dāng)前線程的私有列表。如果至少有一個(gè)元素,1它可以輕松完成任務(wù),而不會(huì)爭用。但是,如果列表為空,則必須從另一個(gè)線程的私有列表中“竊取”元素,并引起爭用的可能性。

          因此,準(zhǔn)確地說,調(diào)用 Take 會(huì)為您提供最近在該線程上添加的元素;如果該線程上沒有元素,它將為您提供最近在另一個(gè)線程上添加的元素,這是隨機(jī)選擇的。

          當(dāng)集合上的并行操作主要包括添加元素時(shí),或者當(dāng)添加 s 和 Take s 在線程上平衡時(shí),并發(fā)袋是理想的選擇。我們之前在使用 Parallel.ForEach 實(shí)現(xiàn)并行拼寫檢查器時(shí)看到了前者的示例:

          var misspellings = new ConcurrentBag<Tuple<int,string>>();
          
          Parallel.ForEach (wordsToTest, (word, state, i) =>
          {
            if (!wordLookup.Contains (word))
              misspellings.Add (Tuple.Create ((int) i, word));
          });

          對于生產(chǎn)者/消費(fèi)者隊(duì)列來說,并發(fā)包將是一個(gè)糟糕的選擇,因?yàn)樵厥怯删€程添加和刪除的。

          阻止集合<T>

          如果在上一節(jié)中討論的任何生產(chǎn)者/使用者集合上調(diào)用 TryTake,ConcurrentStack<T>、ConcurrentQueue<T> 和 ConcurrentBag<T> ,并且集合為空,則該方法返回 false 。有時(shí),在這種情況下,到元素可用會(huì)更有用。

          PFX 的設(shè)計(jì)人員沒有使用此功能重載 TryTake 方法(在允許取消令牌和超時(shí)后會(huì)導(dǎo)致成員井噴),而是將此功能封裝到名為 BlockingCollection<T> 的包裝類中。阻塞集合包裝實(shí)現(xiàn) IProducerConsumerCollection<T> 的任何集合,并允許您從包裝的集合中獲取元素 — 如果沒有可用的元素,則阻止。

          阻止集合還允許您限制集合的總大小,如果超過該大小,則阻止。以這種方式限制的集合稱為。

          要使用 BlockingCollection<T> :

          1. 實(shí)例化類,可以選擇指定要包裝的 IProducerConsumerCollection<T> 以及集合的最大大小(綁定)。
          2. 調(diào)用 Add 或 TryAdd 以將元素添加到基礎(chǔ)集合。
          3. 調(diào)用 Take 或 TryTake 以從基礎(chǔ)中刪除(使用)元素。

          如果在不傳入集合的情況下調(diào)用構(gòu)造函數(shù),則該類將自動(dòng)實(shí)例化 ConcurrentQueue<T> 。生成和使用方法允許您指定取消令牌和超時(shí)。如果集合大小有限,則添加和 TryAdd 可能會(huì)阻止;當(dāng)集合為空時(shí),Take 和 TryTake 塊。

          使用元素的另一種方法是調(diào)用 GetConsumingEnumerable 。這將返回一個(gè)(可能)無限序列,該序列在元素可用時(shí)生成元素。您可以通過調(diào)用 CompleteAdd 來強(qiáng)制序列結(jié)束:此方法還可以防止其他元素排隊(duì)。

          BlockingCollection 還提供了稱為 AddToAny 和 TakeFromAny 的靜態(tài)方法,它們允許您在指定多個(gè)阻塞集合的同時(shí)添加或獲取元素。然后,能夠?yàn)檎埱筇峁┓?wù)的第一個(gè)集合將執(zhí)行該操作。

          編寫生產(chǎn)者/使用者隊(duì)列

          生產(chǎn)者/使用者隊(duì)列是一種有用的結(jié)構(gòu),無論是在并行編程還是常規(guī)并發(fā)方案中。以下是它的工作原理:

          • 設(shè)置一個(gè)隊(duì)列來描述工作項(xiàng)或工作的數(shù)據(jù)。
          • 當(dāng)任務(wù)需要執(zhí)行時(shí),它會(huì)排隊(duì),調(diào)用方繼續(xù)處理其他事情。
          • 一個(gè)或多個(gè)工作線程在后臺(tái)插入,選取并執(zhí)行排隊(duì)的項(xiàng)目。

          生產(chǎn)者/使用者隊(duì)列可讓您精確控制一次執(zhí)行多少工作線程,這不僅可用于限制 CPU 消耗,還可用于限制其他資源。例如,如果任務(wù)執(zhí)行密集型磁盤 I/O,則可以限制并發(fā)性以避免使操作系統(tǒng)和其他應(yīng)用程序匱乏。您還可以在隊(duì)列的整個(gè)生命周期內(nèi)動(dòng)態(tài)添加和刪除工作線程。CLR 的線程池本身是一種生產(chǎn)者/使用者隊(duì)列,針對短期運(yùn)行的計(jì)算綁定作業(yè)進(jìn)行了優(yōu)化。

          生產(chǎn)者/使用者隊(duì)列通常包含對其執(zhí)行(相同)任務(wù)的數(shù)據(jù)項(xiàng)。例如,數(shù)據(jù)項(xiàng)可能是文件名,任務(wù)可能是加密這些文件。但是,通過將項(xiàng)目設(shè)置為委托,您可以編寫一個(gè)更通用的生產(chǎn)者/使用者隊(duì)列,其中每個(gè)項(xiàng)目都可以執(zhí)行任何操作。

          ,我們展示了如何使用AutoResetEvent從頭開始編寫生產(chǎn)者/消費(fèi)者隊(duì)列(以及后來使用監(jiān)視器的等待和脈沖)。但是,從頭開始編寫生產(chǎn)者/消費(fèi)者是不必要的,因?yàn)榇蠖鄶?shù)功能都是由 BlockingCollection<T> 提供的。以下是使用它的方法:

          public class PCQueue : IDisposable
          {
            BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
          
            public PCQueue (int workerCount)
            {
              // Create and start a separate Task for each consumer:
              for (int i = 0; i < workerCount; i++)
                Task.Factory.StartNew (Consume);
            }
          
            public void Enqueue (Action action) { _taskQ.Add (action); }
          
            void Consume()
            {
              // This sequence that we’re enumerating will block when no elements
              // are available and will end when CompleteAdding is called.
          
              foreach (Action action in _taskQ.GetConsumingEnumerable())
                action();     // Perform task.
            }
          
            public void Dispose() { _taskQ.CompleteAdding(); }
          }

          因?yàn)槲覀儧]有將任何東西傳遞到 BlockingCollection 的構(gòu)造函數(shù)中,所以它會(huì)自動(dòng)實(shí)例化一個(gè)并發(fā)隊(duì)列。如果我們在ConcurrentStack中傳遞,我們最終會(huì)得到一個(gè)生產(chǎn)者/消費(fèi)者堆棧。

          使用任務(wù)

          我們剛剛編寫的生產(chǎn)者/使用者是不靈活的,因?yàn)槲覀儫o法在工作項(xiàng)排隊(duì)后跟蹤它們。如果我們能做到以下幾點(diǎn),那就太好了:

          • 了解工作項(xiàng)何時(shí)完成(并等待它)
          • 取消工作項(xiàng)
          • 優(yōu)雅地處理工作項(xiàng)引發(fā)的任何異常

          一個(gè)理想的解決方案是讓 Enqueue 方法返回一些對象,為我們提供剛才描述的功能。好消息是,已經(jīng)存在一個(gè)類來做到這一點(diǎn) - Task 類,我們可以使用 TaskCompletionSource 或通過直接實(shí)例化(創(chuàng)建未啟動(dòng)或任務(wù))來生成它:

          public class PCQueue : IDisposable
          {
            BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();
          
            public PCQueue (int workerCount)
            {
              // Create and start a separate Task for each consumer:
              for (int i = 0; i < workerCount; i++)
                Task.Factory.StartNew (Consume);
            }
          
            public Task Enqueue (Action action, CancellationToken cancelToken
                                                      = default (CancellationToken))
            {
              var task = new Task (action, cancelToken);
              _taskQ.Add (task);
              return task;
            }
          
            public Task<TResult> Enqueue<TResult> (Func<TResult> func, 
                        CancellationToken cancelToken = default (CancellationToken))
            {
              var task = new Task<TResult> (func, cancelToken);
              _taskQ.Add (task);
              return task;
            }
            
            void Consume()
            {
              foreach (var task in _taskQ.GetConsumingEnumerable())
                try 
                {
                    if (!task.IsCanceled) task.RunSynchronously();
                } 
                catch (InvalidOperationException) { }  // Race condition
            }
          
            public void Dispose() { _taskQ.CompleteAdding(); }
          }

          在 排隊(duì) ,我們將創(chuàng)建但不啟動(dòng)的任務(wù)排隊(duì)并返回給調(diào)用方。

          在 消費(fèi) 中,我們在使用者的線程上同步運(yùn)行任務(wù)。我們捕獲一個(gè) InvalidOperationException 來處理在檢查任務(wù)是否已取消和運(yùn)行它之間取消任務(wù)的不太可能的事件。

          以下是我們?nèi)绾问褂么祟悾?/span>

          var pcQ = new PCQueue (2);    // Maximum concurrency of 2
          string result = await pcQ.Enqueue (() => "That was easy!");
          ...

          因此,我們擁有任務(wù)的所有好處(異常傳播、返回值和取消),同時(shí)完全控制調(diào)度。


          主站蜘蛛池模板: 亚洲一区二区三区影院 | 久久精品国产一区二区三区| 亚洲国产AV一区二区三区四区| 91在线精品亚洲一区二区| 久久精品免费一区二区| 一区一区三区产品乱码| 午夜无码一区二区三区在线观看 | 国产凸凹视频一区二区| 538国产精品一区二区在线| 久久精品国产一区二区三| 极品尤物一区二区三区| 视频一区二区中文字幕| 天堂资源中文最新版在线一区| 亚洲中文字幕一区精品自拍| 精品一区二区三区水蜜桃| 国产av一区二区三区日韩| 无码一区二区波多野结衣播放搜索 | 麻豆AV无码精品一区二区| 中文字幕精品一区二区2021年| 无码视频一区二区三区在线观看 | 国产精品 视频一区 二区三区| 中文字幕在线无码一区二区三区| 无码囯产精品一区二区免费| 国产丝袜一区二区三区在线观看 | 99久久精品日本一区二区免费| 福利片福利一区二区三区| 亚洲国产精品一区二区成人片国内 | 国产成人一区二区三区视频免费| 亚洲午夜福利AV一区二区无码| 亚洲综合av一区二区三区不卡 | 国精品无码一区二区三区在线| 亚洲AV日韩综合一区尤物| 制服中文字幕一区二区| 国产精品成人免费一区二区 | 黑巨人与欧美精品一区| 国产午夜福利精品一区二区三区| 无码人妻精品一区二区三区不卡 | 成人丝袜激情一区二区| 久久国产一区二区| 一区二区三区高清视频在线观看| 日韩在线一区二区|