錄
考慮到深度學(xué)習(xí)領(lǐng)域中的數(shù)據(jù)規(guī)模一般都比較大,尤其是訓(xùn)練集,這個(gè)限制條件對應(yīng)到實(shí)際編程中就意味著,我們很有可能無法將整個(gè)數(shù)據(jù)文件的內(nèi)容全部都加載到內(nèi)存中。那么就需要一些特殊的處理方式,比如:創(chuàng)建內(nèi)存映射文件來替代原始文件被加載到內(nèi)存中、預(yù)處理數(shù)據(jù)后再加載內(nèi)存中以及單次只加載文件的片段。其中關(guān)于內(nèi)存映射技術(shù)的一些應(yīng)用,在前面的這2篇博客1和博客2中有所介紹,而本文將要介紹的是從文件中只讀取特定行的內(nèi)容的3種解決方案。
在python中如果要將一個(gè)文件完全加載到內(nèi)存中,通過file.readlines()即可,但是在文件占用較高時(shí),我們是無法完整的將文件加載到內(nèi)存中的,這時(shí)候就需要用到python的file.readline()進(jìn)行迭代式的逐行讀取:
filename = 'hello.txt'
with open(filename, 'r') as file:
line = file.readline()
counts = 1
while line:
if counts >= 50000000:
break
line = file.readline()
counts += 1
這里我們的實(shí)現(xiàn)方式是先用一個(gè)with語句打開一個(gè)文件,然后用readline()函數(shù)配合while循環(huán)逐行加載,最終通過一個(gè)序號(hào)標(biāo)記來結(jié)束循環(huán)遍歷,輸出文件第50000000行的內(nèi)容。該代碼的執(zhí)行效果如下:
dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py
real 0m10.359s
user 0m10.062s
sys 0m0.296s
雖然在python的readline函數(shù)中并沒有實(shí)現(xiàn)讀取指定行內(nèi)容的方案,但是在另一個(gè)庫linecache中是實(shí)現(xiàn)了的,由于使用的方式較為簡單,這里直接放上代碼示例供參考:
filename = 'hello.txt'
import linecache
text = linecache.getline(filename, 50000000)
該代碼的執(zhí)行結(jié)果如下:
dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py
real 0m11.904s
user 0m5.672s
sys 0m6.231s
雖然在實(shí)現(xiàn)方式上簡化了許多,但是我們發(fā)現(xiàn)這個(gè)實(shí)現(xiàn)的用時(shí)超過了11s,還不如我們自己手動(dòng)實(shí)現(xiàn)的循環(huán)遍歷方案。因此如果是對于性能有一定要求的場景,是不建議采用這個(gè)方案的。
我們知道用Linux系統(tǒng)本身自帶的sed指令也是可以獲取到文件指定行或者是指定行范圍的數(shù)據(jù)的,其執(zhí)行指令為:sed -n 50000000p filename即表示讀取文件的第50000000行的內(nèi)容。同時(shí)結(jié)合python的話,我們可以在python代碼中執(zhí)行系統(tǒng)指令并獲取輸出結(jié)果:
filename = 'hello.txt'
import os
result = os.popen('sed -n {}p {}'.format(50000000, filename)).read()
需要注意的是,如果直接運(yùn)行os.system()是沒有返回值的,只有os.popen()是有返回值的,并且需要在尾巴加上一個(gè)read()的選項(xiàng)。該代碼的執(zhí)行結(jié)果如下:
dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py
real 0m2.532s
user 0m0.032s
sys 0m0.020s
可以看到直接使用sed指令的執(zhí)行速度很快,但是用這種方法并不是一本萬利的,比如以下這個(gè)例子:
filename = 'hello.txt'
import os
result = os.popen('sed -n {}p {}'.format(500, filename)).read()
我們把讀取第50000000行內(nèi)容改為讀取第500行的內(nèi)容,再運(yùn)行一次程序:
dechin@ubuntu2004:~/projects/gitlab/dechin/$ time python3 get_line.py
real 0m2.540s
user 0m0.037s
sys 0m0.013s
然而我們發(fā)現(xiàn)這個(gè)速度并沒有因?yàn)橐x取的行數(shù)減少了而變少,而是幾乎保持不變的。
本文通過4個(gè)測試案例分析了在python中讀取文件指定行內(nèi)容的方案,并得到了一些運(yùn)行耗時(shí)的數(shù)據(jù)。從需求上來說,如果是對于小規(guī)模的數(shù)據(jù),比如幾百行規(guī)模的數(shù)據(jù),建議使用readline循環(huán)遍歷來操作,速度也相當(dāng)不錯(cuò),或者是linecache中的函數(shù)實(shí)現(xiàn)也是可以的,甚至可以直接用readlines將整個(gè)文本內(nèi)容加載到內(nèi)存中。但是對于數(shù)據(jù)規(guī)模比較大的場景,比如超過了千萬行的級(jí)別,那么使用sed指令的方式對指定行內(nèi)容進(jìn)行讀取的方式,應(yīng)該是所有方式中最快速的。
本文首發(fā)鏈接為:https://www.cnblogs.com/dechinphy/p/lbl.html
作者ID:DechinPhy
更多原著文章請參考:https://www.cnblogs.com/dechinphy/
打賞專用鏈接:https://www.cnblogs.com/dechinphy/gallery/image/379634.html
騰訊云專欄同步:https://cloud.tencent.com/developer/column/91958
頭條號(hào)同步:https://www.toutiao.com/c/user/token/MS4wLjABAAAA0XQA38KoijOGhh-GLoqYttlkilR7BRepm5pujqXuidY/?tab=all
>微服務(wù)環(huán)境搭建本章中,我們將介紹旨在利用多核處理器的多線程 API 和構(gòu)造:
這些構(gòu)造統(tǒng)稱為(松散地)并行 (PFX)。并行類與任務(wù)并行性構(gòu)造一起稱為 (TPL)。
在閱讀本章之前,您需要熟悉第 中的基礎(chǔ)知識(shí),尤其是鎖定、線程安全和 Task 類。
.NET 提供了許多其他專用 API 來幫助進(jìn)行并行和異步編程:
在過去的 15 年里,CPU 制造商已經(jīng)從單核處理器轉(zhuǎn)向多核處理器。這對我們程序員來說是有問題的,因?yàn)閱尉€程代碼不會(huì)因?yàn)檫@些額外的內(nèi)核而自動(dòng)運(yùn)行得更快。
對于大多數(shù)服務(wù)器應(yīng)用程序來說,使用多個(gè)內(nèi)核很容易,其中每個(gè)線程都可以獨(dú)立處理單獨(dú)的客戶端請求,但在桌面上則更加困難,因?yàn)樗ǔP枰@取計(jì)算密集型代碼并執(zhí)行以下操作:
盡管您可以使用經(jīng)典的多線程構(gòu)造完成所有這些操作,但它很尷尬,尤其是分區(qū)和整理的步驟。另一個(gè)問題是,當(dāng)許多線程同時(shí)處理相同的數(shù)據(jù)時(shí),通常的線程安全鎖定策略會(huì)導(dǎo)致大量爭用。
PFX 庫專為幫助這些方案而設(shè)計(jì)。
利用多核或多處理器的編程稱為。這是更廣泛的多線程概念的子集。
在線程之間劃分工作有兩種策略:。
當(dāng)必須對許多數(shù)據(jù)值執(zhí)行一組任務(wù)時(shí),我們可以通過讓每個(gè)線程對值子集執(zhí)行(相同的)任務(wù)集來并行化。這稱為數(shù)據(jù),因?yàn)槲覀冊诰€程之間對進(jìn)行分區(qū)。相反,使用任務(wù),我們對進(jìn)行分區(qū);換句話說,我們讓每個(gè)線程執(zhí)行不同的任務(wù)。
通常,數(shù)據(jù)并行性更容易,并且可以更好地?cái)U(kuò)展到高度并行的硬件,因?yàn)樗梢詼p少或消除共享數(shù)據(jù)(從而減少爭用和線程安全問題)。此外,數(shù)據(jù)并行性利用了數(shù)據(jù)值通常比離散任務(wù)更多的事實(shí),從而增加了并行性的潛力。
數(shù)據(jù)并行性也有利于結(jié)構(gòu)化并行,這意味著并行工作單元在程序中的同一位置開始和結(jié)束。相比之下,任務(wù)并行性往往是非結(jié)構(gòu)化的,這意味著并行工作單元可能在分散在程序中的位置開始和結(jié)束。結(jié)構(gòu)化并行性更簡單,更不容易出錯(cuò),允許您將分區(qū)和線程協(xié)調(diào)(甚至結(jié)果排序)的困難工作轉(zhuǎn)移到庫中。
PFX 包含兩層功能,如圖 所示。較高層由兩個(gè) API 組成:PLINQ 和 Parallel 類。下層包含任務(wù)并行類,以及一組有助于并行編程活動(dòng)的附加構(gòu)造。
PLINQ 提供了最豐富的功能:它自動(dòng)執(zhí)行并行化的所有步驟,包括將工作劃分為任務(wù)、在線程上執(zhí)行這些任務(wù)以及將結(jié)果整理到單個(gè)輸出序列中。它稱為聲明式,因?yàn)槟恍枰⑿谢墓ぷ鳎鷮⑵錁?gòu)造為 LINQ 查詢),并讓運(yùn)行時(shí)處理實(shí)現(xiàn)細(xì)節(jié)。相比之下,其他方法是的,因?yàn)槟枰@式編寫代碼來分區(qū)或整理。如以下概要所示,對于并行類,您必須自己整理結(jié)果;使用任務(wù)并行構(gòu)造,您還必須自己對工作進(jìn)行分區(qū):
分區(qū)工作 | 整理結(jié)果 | |
普林克 | 是的 | 是的 |
并行類 | 是的 | 不 |
PFX 的任務(wù)并行性 | 不 | 不 |
并發(fā)集合和旋轉(zhuǎn)基元可幫助您進(jìn)行較低級(jí)別的并行編程活動(dòng)。這些很重要,因?yàn)?PFX 不僅適用于當(dāng)今的硬件,還適用于具有更多內(nèi)核的未來幾代處理器。如果你想移動(dòng)一堆切碎的木頭,并且你有32名工人來完成這項(xiàng)工作,那么最大的挑戰(zhàn)是在工人互不妨礙的情況下移動(dòng)木材。這與在 32 個(gè)內(nèi)核之間劃分算法相同:如果使用普通鎖來保護(hù)公共資源,則由此產(chǎn)生的阻塞可能意味著這些內(nèi)核中只有一小部分實(shí)際上同時(shí)處于繁忙狀態(tài)。并發(fā)集合專門針對高并發(fā)訪問進(jìn)行了優(yōu)化,重點(diǎn)是最小化或消除阻塞。PLINQ 和 Parallel 類本身依賴于并發(fā)集合和旋轉(zhuǎn)基元來有效地管理工作。
PFX 的主要用例是:利用多核處理器來加速計(jì)算密集型代碼。
并行編程中的一個(gè)挑戰(zhàn)是阿姆達(dá)爾定律,該定律指出并行化的最大性能改進(jìn)由必須按順序執(zhí)行的代碼部分決定。例如,如果算法的執(zhí)行時(shí)間只有三分之二是可并行化的,則即使內(nèi)核數(shù)量無限,性能提升也永遠(yuǎn)不會(huì)超過三倍。
并行編程結(jié)構(gòu)不僅可用于利用多核,還可用于其他方案:
因此,在繼續(xù)之前,有必要驗(yàn)證瓶頸是否在可并行化代碼中。同樣值得考慮的是,計(jì)算密集型 - 優(yōu)化通常是最簡單、最有效的方法。但是,需要權(quán)衡的是,某些優(yōu)化技術(shù)可能會(huì)使并行化代碼變得更加困難。
最簡單的收益來自所謂的問題 - 這是當(dāng)一個(gè)作業(yè)可以很容易地劃分為可以有效地自行執(zhí)行的任務(wù)時(shí)(結(jié)構(gòu)化并行性非常適合此類問題)。示例包括數(shù)學(xué)或密碼學(xué)中的許多圖像處理任務(wù)、光線追蹤和暴力破解方法。非尷尬并行問題的一個(gè)例子是實(shí)現(xiàn)快速排序算法的優(yōu)化版本 - 一個(gè)好的結(jié)果需要一些思考,并且可能需要非結(jié)構(gòu)化并行性。
PLINQ 會(huì)自動(dòng)并行處理本地 LINQ 查詢。PLINQ 的優(yōu)點(diǎn)是易于使用,因?yàn)樗鼘⒐ぷ鞣謪^(qū)和結(jié)果排序規(guī)則的負(fù)擔(dān)卸載到 .NET。
若要使用 PLINQ,只需在輸入序列上調(diào)用 AsParallel(),然后像往常一樣繼續(xù) LINQ 查詢。以下查詢計(jì)算 3 到 100,000 之間的質(zhì)數(shù),充分利用目標(biāo)計(jì)算機(jī)上的所有內(nèi)核:
// Calculate prime numbers using a simple (unoptimized) algorithm.
IEnumerable<int> numbers = Enumerable.Range (3, 100000-3);
var parallelQuery =
from n in numbers.AsParallel()
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;
int[] primes = parallelQuery.ToArray();
AsParallel 是 System.Linq.ParallelEnumerable 中的擴(kuò)展方法。它將輸入包裝在基于 ParallelQuery<TSource> 的序列中,這會(huì)導(dǎo)致隨后調(diào)用的 LINQ 查詢運(yùn)算符綁定到在 ParallelEnumerable 中定義的一組備用擴(kuò)展方法。它們提供了每個(gè)標(biāo)準(zhǔn)查詢運(yùn)算符的并行實(shí)現(xiàn)。本質(zhì)上,它們的工作原理是將輸入序列劃分為在不同線程上執(zhí)行的塊,將結(jié)果整理回單個(gè)輸出序列以供使用,如圖 所示。
調(diào)用 AsSequential() 將解開 ParallelQuery 序列,以便后續(xù)查詢運(yùn)算符綁定到標(biāo)準(zhǔn)查詢運(yùn)算符并按順序執(zhí)行。在調(diào)用具有副作用或不是線程安全的方法之前,這是必需的。
對于接受兩個(gè)輸入序列(聯(lián)接、組聯(lián)、康卡特、并集、相交、except 和 Zip)的查詢運(yùn)算符,必須將 AsParallel() 應(yīng)用于兩個(gè)輸入序列(否則將引發(fā)異常)。但是,您不需要在查詢進(jìn)行時(shí)繼續(xù)將 AsParallel 應(yīng)用于查詢,因?yàn)?PLINQ 的查詢運(yùn)算符會(huì)輸出另一個(gè) ParallelQuery 序列。事實(shí)上,再次調(diào)用 AsParallel 會(huì)導(dǎo)致效率低下,因?yàn)樗鼤?huì)強(qiáng)制合并和重新分區(qū)查詢:
mySequence.AsParallel() // Wraps sequence in ParallelQuery<int>
.Where (n => n > 100) // Outputs another ParallelQuery<int>
.AsParallel() // Unnecessary - and inefficient!
.Select (n => n * n)
并非所有查詢運(yùn)算符都可以有效地并行化。對于那些不能實(shí)現(xiàn)運(yùn)算符(請參閱 將按順序?qū)崿F(xiàn)運(yùn)算符。如果 PLINQ 懷疑并行化的開銷實(shí)際上會(huì)減慢特定查詢的速度,則 PLINQ 也可能按順序運(yùn)行。
PLINQ 僅適用于本地集合:例如,它不適用于實(shí)體框架,因?yàn)樵谶@些情況下,LINQ 會(huì)轉(zhuǎn)換為 SQL,然后在數(shù)據(jù)庫服務(wù)器上執(zhí)行。但是, PLINQ 對從數(shù)據(jù)庫查詢獲取的結(jié)果集執(zhí)行其他本地查詢。
如果 PLINQ 查詢引發(fā)異常,則會(huì)將其重新引發(fā)為 AggregateException,其 InnerExceptions 屬性包含真正的異常(或多個(gè)異常)。有關(guān)更多詳細(xì)信息,請參閱
與普通的 LINQ 查詢一樣,PLINQ 查詢是延遲計(jì)算的。這意味著只有在您開始使用結(jié)果時(shí)才會(huì)觸發(fā)執(zhí)行 — 通常通過 foreach 循環(huán)(盡管它也可以通過轉(zhuǎn)換運(yùn)算符(如 ToArray 或返回單個(gè)元素或值的運(yùn)算符)觸發(fā))。
但是,在枚舉結(jié)果時(shí),執(zhí)行過程與普通順序查詢的執(zhí)行方式略有不同。順序查詢完全由使用者以“拉取”方式提供支持:輸入序列中的每個(gè)元素在使用者需要時(shí)準(zhǔn)確獲取。并行查詢通常使用獨(dú)立的線程從輸入序列中獲取元素,略于使用者需要它們的時(shí)間(更像新聞閱讀器的提詞器)。然后,它通過查詢鏈并行處理元素,將結(jié)果保存在一個(gè)小緩沖區(qū)中,以便它們?yōu)榘葱枋褂谜咦龊脺?zhǔn)備。如果使用者提前暫停或中斷枚舉,查詢處理器也會(huì)暫停或停止,以免浪費(fèi) CPU 時(shí)間或內(nèi)存。
您可以通過在 AsParallel 之后調(diào)用 WithMergeOptions 來調(diào)整 PLINQ 的緩沖行為。自動(dòng)緩沖的默認(rèn)值通常可提供最佳的總體結(jié)果。NotBuffered 禁用緩沖區(qū),如果您希望盡快看到結(jié)果,它很有用;完全緩沖在將整個(gè)結(jié)果集呈現(xiàn)給使用者之前對其進(jìn)行緩存(OrderBy 和反向運(yùn)算符自然以這種方式工作,元素、聚合和轉(zhuǎn)換運(yùn)算符也是如此)。
鑒于 AsParallel 透明地并行化 LINQ 查詢,問題就來了:為什么Microsoft不簡單地并行化標(biāo)準(zhǔn)查詢運(yùn)算符并將 PLINQ 設(shè)為默認(rèn)值?
方法的原因有很多。首先,要使 PLINQ 有用,必須有合理數(shù)量的計(jì)算密集型工作才能將其分配給工作線程。大多數(shù) LINQ 到對象查詢的執(zhí)行速度都非常快;因此,不僅不需要并行化,而且分區(qū)、整理和協(xié)調(diào)額外線程的開銷實(shí)際上可能會(huì)減慢速度。
此外:
最后,PLINQ 提供了相當(dāng)多的鉤子用于調(diào)整和調(diào)整。給標(biāo)準(zhǔn)的 LINQ to-Objects API 增加這些細(xì)微差別的負(fù)擔(dān)會(huì)增加分心。
并行化查詢運(yùn)算符的副作用是,在整理結(jié)果時(shí),結(jié)果的提交順序不一定相同(參見)。換句話說,LINQ 對序列的正常順序保留保證不再有效。
如果需要順序保留,可以通過在 之后調(diào)用 AsOrdered() 來強(qiáng)制保留它:
myCollection.AsParallel().AsOrdered()...
調(diào)用 AsOrdered 會(huì)導(dǎo)致大量元素的性能下降,因?yàn)?PLINQ 必須跟蹤每個(gè)元素的原始位置。
您可以通過調(diào)用 AsUnordered 來抵消查詢中稍后的 AsOrdered 的影響:這將引入一個(gè)“隨機(jī)隨機(jī)洗牌點(diǎn)”,它允許點(diǎn)開始更有效地執(zhí)行。因此,如果只想保留前兩個(gè)查詢運(yùn)算符的輸入序列順序,則可以執(zhí)行以下操作:
inputSequence.AsParallel().AsOrdered()
.QueryOperator1()
.QueryOperator2()
.AsUnordered() // From here on, ordering doesn’t matter
.QueryOperator3()
...
AsOrdered 不是默認(rèn)值,因?yàn)閷τ诖蠖鄶?shù)查詢,原始輸入順序無關(guān)緊要。換句話說,如果 AsOrdered 是默認(rèn)值,則需要將 AsUnordered 應(yīng)用于大多數(shù)并行查詢以獲得最佳性能,這將很麻煩。
PLINQ 可以并行化的內(nèi)容存在實(shí)際限制。默認(rèn)情況下,以下查詢運(yùn)算符會(huì)阻止并行化,除非源元素位于其原始索引位置:
Select 、SelectMany 和 ElementAt 的索引版本
大多數(shù)查詢運(yùn)算符會(huì)更改元素的索引位置(包括刪除元素的運(yùn)算符,例如 Where )。這意味著,如果要使用前面的運(yùn)算符,它們通常需要位于查詢的開頭。
以下查詢運(yùn)算符是可并行化的,但使用昂貴的分區(qū)策略,有時(shí)可能比順序處理慢:
連接 、分組依據(jù) 、組連接 、非重復(fù) 、 并集 、 相交 和 除外
聚合運(yùn)算符在其標(biāo)準(zhǔn)化身中的重載不可并行化 — PLINQ 提供了特殊的重載來處理此問題(請參閱)。
所有其他運(yùn)算符都是可并行化的,盡管使用這些運(yùn)算符并不能保證查詢將被并行化。如果 PLINQ 懷疑并行化的開銷會(huì)減慢該特定查詢的速度,則 PLINQ 可能會(huì)按順序運(yùn)行查詢。您可以通過在 AsParallel() 之后調(diào)用以下內(nèi)容來覆蓋此行為并強(qiáng)制并行:
.WithExecutionMode (ParallelExecutionMode.ForceParallelism)
假設(shè)我們要編寫一個(gè)拼寫檢查器,該拼寫檢查器利用所有可用內(nèi)核快速運(yùn)行非常大的文檔。通過將我們的算法表述為 LINQ 查詢,我們可以很容易地將其并行化。
第一步是將英語單詞詞典下載到HashSet中,以便高效查找:
if (!File.Exists ("WordLookup.txt") // Contains about 150,000 words
File.WriteAllText ("WordLookup.txt",
await new HttpClient().GetStringAsync (
"http://www.albahari.com/ispell/allwords.txt"));
var wordLookup = new HashSet<string> (
File.ReadAllLines ("WordLookup.txt"),
StringComparer.InvariantCultureIgnoreCase);
然后,我們使用單詞查找創(chuàng)建一個(gè)測試“文檔”,其中包含一百萬個(gè)隨機(jī)單詞的數(shù)組。構(gòu)建數(shù)組后,讓我們引入幾個(gè)拼寫錯(cuò)誤:
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();
wordsToTest [12345] = "woozsh"; // Introduce a couple
wordsToTest [23456] = "wubsie"; // of spelling mistakes.
現(xiàn)在我們可以通過測試單詞到測試來執(zhí)行并行拼寫檢查 wordLookup .PLINQ 使這變得非常簡單:
var query = wordsToTest
.AsParallel()
.Select ((word, index) => new IndexedWord { Word=word, Index=index })
.Where (iword => !wordLookup.Contains (iword.Word))
.OrderBy (iword => iword.Index);
foreach (var mistake in query)
Console.WriteLine (mistake.Word + " - index = " + mistake.Index);
// OUTPUT:
// woozsh - index = 12345
// wubsie - index = 23456
索引字是一個(gè)自定義結(jié)構(gòu),我們定義如下:
struct IndexedWord { public string Word; public int Index; }
謂詞中的 wordLookup.Contains 方法為查詢提供了一些“肉”,使其值得并行化。
我們可以通過使用匿名類型而不是 IndexedWord 結(jié)構(gòu)來稍微簡化查詢。但是,這會(huì)降低性能,因?yàn)槟涿愋停悾虼耸且妙愋停?huì)產(chǎn)生基于堆的分配和后續(xù)垃圾回收的成本。
對于順序查詢,這種差異可能不足以影響,但對于并行查詢,支持基于堆棧的分配可能非常有利。這是因?yàn)榛诙褩5姆峙涫歉叨瓤刹⑿谢模ㄒ驗(yàn)槊總€(gè)線程都有自己的堆棧),而所有線程必須競爭同一個(gè)堆 - 由單個(gè)內(nèi)存管理器和垃圾回收器管理。
讓我們通過并行創(chuàng)建隨機(jī)測試詞列表本身來擴(kuò)展我們的示例。我們將其構(gòu)造為 LINQ 查詢,因此應(yīng)該很容易。這是順序版本:
string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();
不幸的是,調(diào)用隨機(jī)。Next 不是線程安全的,因此它不像將 AsParallel() 插入查詢那么簡單。一個(gè)可能的解決方案是編寫一個(gè)鎖定隨機(jī)的函數(shù)。下一個(gè);但是,這將限制并發(fā)性。更好的選擇是使用 ThreadLocal<Random>(參見中的創(chuàng)建一個(gè)單獨(dú)的 Random 對象。然后,我們可以并行化查詢,如下所示:
var localRandom = new ThreadLocal<Random>
( () => new Random (Guid.NewGuid().GetHashCode()) );
string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
.Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
.ToArray();
在用于實(shí)例化隨機(jī)對象的工廠函數(shù)中,我們傳入 Guid 的哈希碼,以確保如果在短時(shí)間內(nèi)創(chuàng)建兩個(gè)隨機(jī)對象,它們將產(chǎn)生不同的隨機(jī)數(shù)序列。
由于 PLINQ 在并行線程上運(yùn)行查詢,因此必須注意不要執(zhí)行線程不安全的操作。特別是,寫入,因此線程不安全:
// The following query multiplies each element by its position.
// Given an input of Enumerable.Range(0,999), it should output squares.
int i = 0;
var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;
我們可以通過使用鎖使增量 i 線程安全,但問題仍然存在,i 不一定對應(yīng)于輸入元素的位置。將 AsOrdered 添加到查詢中并不能解決后一個(gè)問題,因?yàn)?AsOrdered 只能確保元素的輸出順序與按順序處理的順序一致,而實(shí)際上它并不按順序它們。
正確的解決方案是重寫我們的查詢以使用 Select 的索引版本:
在現(xiàn)有應(yīng)用程序中搜索 LINQ 查詢并嘗試并行化它們很誘人。這通常是無效的,因?yàn)?LINQ 顯然是最佳解決方案的大多數(shù)問題往往執(zhí)行得非常快,因此無法從并行化中受益。更好的方法是查找 CPU 密集型瓶頸,然后考慮是否可以將其表示為 LINQ 查詢。(這種重組的一個(gè)受歡迎的副作用是 LINQ 通常使代碼更小且更具可讀性。
PLINQ 非常適合令人尷尬的并行問題。然而,對于成像來說,這可能是一個(gè)糟糕的選擇,因?yàn)閷?shù)百萬像素整理成一個(gè)輸出序列會(huì)產(chǎn)生瓶頸。相反,最好將像素直接寫入數(shù)組或非托管內(nèi)存塊,并使用并行類或任務(wù)并行性來管理多線程。(但是,使用 ForAll 可以擊敗結(jié)果排序規(guī)則 — 我們在中對此進(jìn)行了討論。如果圖像處理算法自然適合 LINQ,則這樣做是有意義的。
var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);
為了獲得最佳性能,從查詢運(yùn)算符調(diào)用的任何方法都應(yīng)該是線程安全的,因?yàn)樗粚懭胱侄位驅(qū)傩裕ǚ歉弊饔没颍H绻鼈兺ㄟ^鎖定是線程安全的,則查詢的并行性潛力將受到持續(xù)時(shí)間除以在該函數(shù)中花費(fèi)的總時(shí)間的限制。
默認(rèn)情況下,PLINQ 為正在使用的處理器選擇最佳并行度。您可以通過在 AsParallel 之后調(diào)用 WithDegreeOfParallelism 來覆蓋它:
...AsParallel().WithDegreeOfPallelism(4)...
當(dāng)并行度增加到超出核心計(jì)數(shù)時(shí),一個(gè)例子是 I/O 密集型工作(例如,一次下載多個(gè)網(wǎng)頁)。然而,任務(wù)組合器和異步函數(shù)提供了一種同樣簡單和的解決方案(參見中的)。與任務(wù) s 不同,PLINQ 無法在不阻塞線程(和線程,使情況更糟)的情況下執(zhí)行 I/O 綁定工作。
在 PLINQ 查詢中只能調(diào)用一次 WithDegreeOfParallelism。如果需要再次調(diào)用它,則必須通過在查詢中再次調(diào)用 AsParallel() 來強(qiáng)制合并和重新分區(qū)查詢:
"The Quick Brown Fox"
.AsParallel().WithDegreeOfParallelism (2)
.Where (c => !char.IsWhiteSpace (c))
.AsParallel().WithDegreeOfParallelism (3) // Forces Merge + Partition
.Select (c => char.ToUpper (c))
取消 PLINQ 查詢(在 foreach 循環(huán)中使用其結(jié)果)很容易:只需脫離 foreach ,查詢將自動(dòng)取消,因?yàn)槊杜e器是隱式釋放的。
對于以轉(zhuǎn)換、元素或聚合運(yùn)算符終止的查詢,可以通過取消令牌從另一個(gè)線程它(請參閱中的)。若要插入令牌,請?jiān)谡{(diào)用 AsParallel 后調(diào)用 WithCancel,傳入 CancelTokenSource 對象的 Token 屬性。然后,另一個(gè)線程可以在令牌源上調(diào)用 Cancel,這會(huì)在查詢的使用者上引發(fā) OperationCanceledException:
IEnumerable<int> million = Enumerable.Range (3, 1000000);
var cancelSource = new CancellationTokenSource();
var primeNumberQuery =
from n in million.AsParallel().WithCancellation (cancelSource.Token)
where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
select n;
new Thread (() => {
Thread.Sleep (100); // Cancel query after
cancelSource.Cancel(); // 100 milliseconds.
}
).Start();
try
{
// Start query running:
int[] primes = primeNumberQuery.ToArray();
// We'll never get here because the other thread will cancel us.
}
catch (OperationCanceledException)
{
Console.WriteLine ("Query canceled");
}
取消后,PLINQ 會(huì)等待每個(gè)工作線程完成其當(dāng)前元素,然后再結(jié)束查詢。這意味著查詢調(diào)用的任何外部方法都將運(yùn)行完成。
PLINQ 的優(yōu)勢之一是它可以方便地將并行化工作的結(jié)果整理到單個(gè)輸出序列中。但是,有時(shí),您最終對該序列所做的只是在每個(gè)元素上運(yùn)行一次某個(gè)函數(shù):
foreach (int n in parallelQuery)
DoSomething (n);
如果是這種情況,并且您不關(guān)心元素的處理順序,則可以使用 PLINQ 的 ForAll 方法提高效率。
ForAll 方法對 ParallelQuery 的每個(gè)輸出元素運(yùn)行一個(gè)委托。它直接掛接到 PLINQ 的內(nèi)部,繞過整理和枚舉結(jié)果的步驟。這里有一個(gè)簡單的例子:
"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);
顯示了該過程。
整理和枚舉結(jié)果并不是一項(xiàng)成本高昂的操作,因此當(dāng)有大量快速執(zhí)行的輸入時(shí),F(xiàn)orAll 優(yōu)化會(huì)產(chǎn)生最大的收益。
PLINQ 有三種分區(qū)策略,用于將輸入元素分配給線程:
策略 | 元素分配 | 相對性能 |
區(qū)塊分區(qū) | 動(dòng)態(tài) | 平均 |
范圍分區(qū) | 靜態(tài)的 | 從差到優(yōu) |
哈希分區(qū) | 靜態(tài)的 | 窮 |
對于需要比較元素( GroupBy 、 Join 、 GroupJoin 、 相交 、 except 、 并集 和 非重復(fù) )的查詢運(yùn)算符,您別無選擇:PLINQ 始終使用。哈希分區(qū)的效率相對較低,因?yàn)樗仨氼A(yù)先計(jì)算每個(gè)元素的哈希代碼(以便可以在同一線程上處理具有相同哈希代碼的元素)。如果發(fā)現(xiàn)這太慢,唯一的選擇是調(diào)用 AsSequential 以禁用并行化。
對于所有其他查詢運(yùn)算符,您可以選擇是使用范圍分區(qū)還是塊分區(qū)。默認(rèn)情況下:
簡而言之,對于長序列,范圍分區(qū)更快,每個(gè)元素都需要相似的 CPU 時(shí)間來處理。否則,塊分區(qū)通常更快。
要強(qiáng)制范圍分區(qū):
ParallelEnumerable.Range 不僅僅是調(diào)用 Enumerable.Range( ... ) 的快捷方式。作為并行() .它通過激活范圍分區(qū)來更改查詢的性能。
要強(qiáng)制塊分區(qū),請將輸入序列包裝在對 Partitioner.Create(在 System.Collection.Concurrent 中)的調(diào)用中,如下所示:
int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery =
Partitioner.Create (numbers, true).AsParallel()
.Where (...)
Partitioner.Create 的第二個(gè)參數(shù)表示您希望對查詢,這是您希望塊分區(qū)的另一種說法。
塊分區(qū)的工作原理是讓每個(gè)工作線程定期從輸入序列中抓取小的元素“塊”進(jìn)行處理(參見)。PLINQ 首先分配非常小的塊(一次一個(gè)或兩個(gè)元素)。然后,它會(huì)隨著查詢的進(jìn)行增加塊大小:這可確保小序列有效地并行化,并且大序列不會(huì)導(dǎo)致過多的往返。如果一個(gè)工人碰巧得到了“簡單”的元素(這個(gè)過程很快),它最終會(huì)得到更多的塊。該系統(tǒng)使每個(gè)線程保持同樣繁忙(并且內(nèi)核“平衡”);唯一的缺點(diǎn)是從共享輸入序列中獲取元素需要同步(通常是獨(dú)占鎖),這可能會(huì)導(dǎo)致一些開銷和爭用。
范圍分區(qū)繞過正常的輸入端枚舉,并為每個(gè)工作線程預(yù)分配相同數(shù)量的元素,從而避免對輸入序列進(jìn)行爭用。但是,如果某些線程碰巧獲得簡單的元素并提前完成,則它們將閑置,而其余線程將繼續(xù)工作。我們早期的素?cái)?shù)計(jì)算器在范圍分區(qū)方面可能表現(xiàn)不佳。當(dāng)范圍分區(qū)可以很好地工作時(shí),計(jì)算前 10 萬個(gè)整數(shù)的平方根之和的一個(gè)例子:
ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i))
ParallelEnumerable.Range 返回一個(gè) ParallelQuery<T> ,因此您隨后不需要調(diào)用 AsParallel 。
范圍分區(qū)不一定在塊中分配元素范圍,而是選擇“條帶化”策略。例如,如果有兩個(gè)工作器,則一個(gè)工作器可能處理奇數(shù)元素,而另一個(gè)工作器處理偶數(shù)元素。TakeWhile 運(yùn)算符幾乎肯定會(huì)觸發(fā)條帶化策略,以避免在序列的后期不必要地處理元素。
PLINQ 可以有效地并行化 Sum、平均值、最小值和最大值運(yùn)算符,無需額外干預(yù)。但是,聚合運(yùn)算符給 PLINQ 帶來了特殊的挑戰(zhàn)。如所述,聚合執(zhí)行自定義聚合。例如,下面對數(shù)字序列求和,模仿 Sum 運(yùn)算符:
int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate (0, (total, n) => total + n); // 6
我們還在第 中看到,對于聚合,提供的委托必須是關(guān)聯(lián)和交換的。如果違反此規(guī)則,PLINQ 將給出不正確的結(jié)果,因?yàn)樗鼜妮斎胄蛄兄刑崛《鄠€(gè)種子,以便同時(shí)聚合序列的分區(qū)。
顯式種子聚合似乎是 PLINQ 的安全選項(xiàng),但不幸的是,由于依賴于單個(gè)種子,這些聚合通常按順序執(zhí)行。為了緩解此問題,PLINQ 提供了另一個(gè)聚合重載,允許您指定多個(gè)種子,或者更確切地說,指定。對于每個(gè)線程,它執(zhí)行此函數(shù)以生成一個(gè)單獨(dú)的種子,該種子成為累加器,它在本地聚合元素。
您還必須提供一個(gè)函數(shù)來指示如何組合本地和主累加器。最后,此聚合重載(有點(diǎn)無償)期望委托對結(jié)果執(zhí)行任何最終轉(zhuǎn)換(您可以通過之后自己對結(jié)果運(yùn)行某些函數(shù)來輕松實(shí)現(xiàn)此目的)。因此,以下是四個(gè)代表,按通過順序排列:
種子工廠
返回新的本地累加器
更新累加器功能
將元素聚合到本地累加器
組合蓄能器功能
將本地蓄能器與主蓄能器相結(jié)合
結(jié)果選擇器
對最終結(jié)果應(yīng)用任何最終轉(zhuǎn)換
在簡單方案中,可以指定種子工廠。當(dāng)種子是要更改的引用類型時(shí),此策略將失敗,因?yàn)槊總€(gè)線程將共享相同的實(shí)例。
舉一個(gè)非常簡單的例子,下面對數(shù)字?jǐn)?shù)組中的值求和:
numbers.AsParallel().Aggregate (
() => 0, // seedFactory
(localTotal, n) => localTotal + n, // updateAccumulatorFunc
(mainTot, localTot) => mainTot + localTot, // combineAccumulatorFunc
finalResult => finalResult) // resultSelector
這個(gè)例子是人為的,因?yàn)槲覀兛梢允褂酶唵蔚姆椒ǎɡ缥床シN聚合,或者更好的 Sum 運(yùn)算符)同樣有效地獲得相同的答案。舉一個(gè)更現(xiàn)實(shí)的例子,假設(shè)我們要計(jì)算給定字符串中英文字母表中每個(gè)字母的頻率。一個(gè)簡單的順序解決方案可能如下所示:
string text = "Let’s suppose this is a really long string";
var letterFrequencies = new int[26];
foreach (char c in text)
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) letterFrequencies [index]++;
};
輸入文本可能很長的一個(gè)例子是基因測序。然后,“字母表”將由字母,,和組成。
為了并行化這一點(diǎn),我們可以將 foreach 語句替換為對 Parallel.ForEach 的調(diào)用(我們將在下一節(jié)中介紹),但這將讓我們處理共享陣列上的并發(fā)問題。鎖定訪問該陣列幾乎會(huì)扼殺并行化的潛力。
聚合提供了一個(gè)整潔的解決方案。在這種情況下,累加器是一個(gè)數(shù)組,就像我們前面示例中的 letterFrequency 數(shù)組一樣。下面是使用聚合的順序版本:
int[] result =
text.Aggregate (
new int[26], // Create the "accumulator"
(letterFrequencies, c) => // Aggregate a letter into the accumulator
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) letterFrequencies [index]++;
return letterFrequencies;
});
現(xiàn)在并行版本,使用 PLINQ 的特殊重載:
int[] result =
text.AsParallel().Aggregate (
() => new int[26], // Create a new local accumulator
(localFrequencies, c) => // Aggregate into the local accumulator
{
int index = char.ToUpper (c) - 'A';
if (index >= 0 && index < 26) localFrequencies [index]++;
return localFrequencies;
},
// Aggregate local->main accumulator
(mainFreq, localFreq) =>
mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),
finalResult => finalResult // Perform any final transformation
); // on the end result.
請注意,局部累積函數(shù) localFrequency 數(shù)組。這種執(zhí)行此優(yōu)化的能力非常重要,并且是合法的,因?yàn)?localFrequency 是每個(gè)線程的本地。
PFX 通過并行類中的三種靜態(tài)方法提供結(jié)構(gòu)化并行的基本形式:
Parallel.Invoke
并行執(zhí)行委托數(shù)組
Parallel.For
執(zhí)行 C# for 循環(huán)的并行等效項(xiàng)
Parallel.ForEach
執(zhí)行 C# foreach 循環(huán)的并行等效項(xiàng)
所有三種方法都會(huì)阻止,直到所有工作完成。與 PLINQ 一樣,在發(fā)生未經(jīng)處理的異常后,剩余的工作線程將在當(dāng)前迭代后停止,并將異常(或多個(gè)異常)拋回調(diào)用方 — 包裝在 AggregateException 中(請參閱)。
Parallel.Invoke 并行執(zhí)行一組 Action 委托,然后等待它們完成。該方法的最簡單版本定義如下:
public static void Invoke (params Action[] actions);
與 PLINQ 一樣,并行 .* 方法針對計(jì)算密集型工作而非 I/O 密集型工作進(jìn)行了優(yōu)化。但是,一次下載兩個(gè)網(wǎng)頁提供了一種演示 Parallel.Invoke 的簡單方法:
Parallel.Invoke (
() => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
() => new WebClient().DownloadFile ("http://microsoft.com", "ms.html"));
從表面上看,這似乎是創(chuàng)建和等待兩個(gè)線程綁定 Task 對象的便捷快捷方式。但有一個(gè)重要的區(qū)別:如果你傳入一個(gè)包含一百萬個(gè)委托的數(shù)組,Parallel.Invoke 仍然有效地工作。這是因?yàn)樗鼘⒋罅吭貫榉峙浣o少數(shù)基礎(chǔ)任務(wù)的批次,而不是為每個(gè)委托創(chuàng)建單獨(dú)的任務(wù)。
與 Parallel 的所有方法一樣,在整理結(jié)果時(shí),您只能靠自己。這意味著您需要牢記線程安全性。例如,以下內(nèi)容是線程不安全的:
var data = new List<string>();
Parallel.Invoke (
() => data.Add (new WebClient().DownloadString ("http://www.foo.com")),
() => data.Add (new WebClient().DownloadString ("http://www.far.com")));
鎖定添加到列表可以解決此問題,但如果具有更大的快速執(zhí)行委托數(shù)組,則鎖定會(huì)產(chǎn)生瓶頸。更好的解決方案是使用線程安全集合,我們將在后面的部分中介紹 - 將是理想的選擇。
Parallel.Invoke 也被重載以接受 ParallelOptions 對象:
public static void Invoke (ParallelOptions options,
params Action[] actions);
使用 ParallelOptions ,您可以插入取消令牌、限制最大并發(fā)性以及指定自定義任務(wù)計(jì)劃程序。當(dāng)您執(zhí)行的任務(wù)(大約)多于核心時(shí),取消令牌是相關(guān)的:取消后,任何未啟動(dòng)的委托都將被放棄。但是,任何已經(jīng)執(zhí)行任務(wù)的代表將繼續(xù)完成。有關(guān)如何使用取消令牌的示例,請參閱
Parallel.For 和 Parallel.ForEach 執(zhí)行等效的 C# for 和 foreach 循環(huán),但每次迭代并行執(zhí)行,而不是按順序執(zhí)行。以下是他們的(最簡單的)簽名:
public static ParallelLoopResult For (
int fromInclusive, int toExclusive, Action<int> body)
public static ParallelLoopResult ForEach<TSource> (
IEnumerable<TSource> source, Action<TSource> body)
此順序 for 循環(huán)
for (int i = 0; i < 100; i++)
Foo (i);
像這樣并行化
Parallel.For (0, 100, i => Foo (i));
或者更簡單地說:
Parallel.For (0, 100, Foo);
而這個(gè)順序的 foreach
foreach (char c in "Hello, world")
Foo (c);
像這樣并行化:
Parallel.ForEach ("Hello, world", Foo);
舉一個(gè)實(shí)際的例子,如果我們導(dǎo)入 System.Security.Cryptography 命名空間,我們可以并行生成六個(gè)公鑰/私鑰對字符串,如下所示:
var keyPairs = new string[6];
Parallel.For (0, keyPairs.Length,
i => keyPairs[i] = RSA.Create().ToXmlString (true));
與 Parallel.Invoke 一樣,我們可以為 Parallel.For 和 Parallel.ForEach 提供大量的工作項(xiàng),它們將被有效地劃分為幾個(gè)任務(wù)。
后一個(gè)查詢也可以使用 PLINQ 完成:
string[] keyPairs =
ParallelEnumerable.Range (0, 6)
.Select (i => RSA.Create().ToXmlString (true))
.ToArray();
Parallel.For 和 Parallel.ForEach 通常在外部循環(huán)而不是內(nèi)部循環(huán)上效果最好。這是因?yàn)閷τ谇罢撸鷮⑻峁└蟮墓ぷ鲏K來并行化,從而稀釋了管理開銷。通常不需要并行化內(nèi)部和外部循環(huán)。在以下示例中,我們通常需要 100 多個(gè)內(nèi)核才能從內(nèi)部并行化中受益:
Parallel.For (0, 100, i =>
{
Parallel.For (0, 50, j => Foo (i, j)); // Sequential would be better
}); // for the inner loop.
有時(shí),了解循環(huán)迭代索引很有用。使用順序 foreach ,很容易:
int i = 0;
foreach (char c in "Hello, world")
Console.WriteLine (c.ToString() + i++);
但是,在并行上下文中,遞增共享變量不是線程安全的。您必須改用以下版本的 ForEach :
public static ParallelLoopResult ForEach<TSource> (
IEnumerable<TSource> source, Action<TSource,ParallelLoopState,long> body)
我們將忽略 ParallelLoopState(我們將在下一節(jié)中介紹)。現(xiàn)在,我們對 Action 的第三個(gè)類型參數(shù) long 感興趣,它表示循環(huán)索引:
Parallel.ForEach ("Hello, world", (c, state, i) =>
{
Console.WriteLine (c.ToString() + i);
});
為了將其置于實(shí)際上下文中,讓我們重新審視一下我們使用 PLINQ 編寫的拼寫檢查器。以下代碼加載一個(gè)字典以及一個(gè)包含一百萬個(gè)單詞的數(shù)組進(jìn)行測試:
if (!File.Exists ("WordLookup.txt")) // Contains about 150,000 words
new WebClient().DownloadFile (
"http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
var wordLookup = new HashSet<string> (
File.ReadAllLines ("WordLookup.txt"),
StringComparer.InvariantCultureIgnoreCase);
var random = new Random();
string[] wordList = wordLookup.ToArray();
string[] wordsToTest = Enumerable.Range (0, 1000000)
.Select (i => wordList [random.Next (0, wordList.Length)])
.ToArray();
wordsToTest [12345] = "woozsh"; // Introduce a couple
wordsToTest [23456] = "wubsie"; // of spelling mistakes.
我們可以使用 Parallel.ForEach 的索引版本對我們的 wordsToTest 數(shù)組執(zhí)行拼寫檢查,如下所示:
var misspellings = new ConcurrentBag<Tuple<int,string>>();
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
if (!wordLookup.Contains (word))
misspellings.Add (Tuple.Create ((int) i, word));
});
請注意,我們必須將結(jié)果整理到線程安全集合中:與使用 PLINQ 相比,必須這樣做是缺點(diǎn)。與 PLINQ 相比,它的優(yōu)勢在于我們避免了應(yīng)用索引選擇查詢運(yùn)算符的成本,而索引 Select 查詢運(yùn)算符的效率低于索引的 ForEach。
由于并行 For 或 ForEach 中的循環(huán)主體是委托,因此不能使用 break 語句提前退出循環(huán)。相反,您必須在 ParallelLoopState 對象上調(diào)用 Break 或 Stop:
public class ParallelLoopState
{
public void Break();
public void Stop();
public bool IsExceptional { get; }
public bool IsStopped { get; }
public long? LowestBreakIteration { get; }
public bool ShouldExitCurrentIteration { get; }
}
獲取 ParallelLoopState 很容易:所有版本的 For 和 ForEach 都被重載以接受 Action<TSource,ParallelLoopState> 類型的循環(huán)體。所以,為了并行化這個(gè)
foreach (char c in "Hello, world")
if (c == ',')
break;
else
Console.Write (c);
這樣做:
Parallel.ForEach ("Hello, world", (c, loopState) =>
{
if (c == ',')
loopState.Break();
else
Console.Write (c);
});
// OUTPUT: Hlloe
從輸出中可以看出,循環(huán)主體可以按隨機(jī)順序完成。除了這種差異之外,調(diào)用 Break 至少會(huì)產(chǎn)生與按順序執(zhí)行循環(huán)相同的元素:此示例將始終至少按某種 、、l、 和 。 相反,調(diào)用 Stop 而不是 Break 會(huì)強(qiáng)制所有線程在其當(dāng)前迭代后立即完成。在我們的示例中,如果另一個(gè)線程滯后,調(diào)用 Stop 可以為我們提供字母 、、l、 和 的子集。 當(dāng)您找到要查找的內(nèi)容時(shí),或者當(dāng)出現(xiàn)問題并且您不會(huì)查看結(jié)果時(shí),呼叫 Stop 非常有用。
Parallel.For 和 Parallel.ForEach 方法返回一個(gè) ParallelLoopResult 對象,該對象公開名為 IsDone 和 LowestBreakIteration 的屬性。這些告訴您循環(huán)是否運(yùn)行完成;如果沒有,則指示循環(huán)在哪個(gè)周期中斷。
如果 LowestBreakIteration 返回 null,則表示您在循環(huán)中調(diào)用了 Stop(而不是 Break)。
如果循環(huán)體很長,則可能希望其他線程在方法主體中途中斷,以防早期中斷或停止。可以通過在代碼中的不同位置輪詢 ShouldExitCurrentIteration 屬性來執(zhí)行此操作;此屬性在停止后立即變?yōu)?true,或者在中斷后不久變?yōu)?true。
ShouldExitCurrentIteration 在取消請求后也會(huì)變?yōu)?true,或者在循環(huán)中拋出異常。
IsExceptional 可讓您知道另一個(gè)線程上是否發(fā)生了異常。任何未處理的異常都將導(dǎo)致循環(huán)在每個(gè)線程的當(dāng)前迭代后停止:若要避免這種情況,必須在代碼中顯式處理異常。
Parallel.For 和 Parallel.ForEach 都提供了一組重載,這些重載具有一個(gè)名為 TLocal 的泛型類型參數(shù)。這些重載旨在幫助您使用迭代密集型循環(huán)優(yōu)化數(shù)據(jù)排序規(guī)則。最簡單的是這樣的:
public static ParallelLoopResult For <TLocal> (
int fromInclusive,
int toExclusive,
Func <TLocal> localInit,
Func <int, ParallelLoopState, TLocal, TLocal> body,
Action <TLocal> localFinally);
在實(shí)踐中很少需要這些方法,因?yàn)樗鼈兊哪繕?biāo)場景主要由 PLINQ 覆蓋(這是幸運(yùn)的,因?yàn)檫@些重載有點(diǎn)嚇人!
從本質(zhì)上講,問題是這樣的:假設(shè)我們要對數(shù)字 1 到 10,000,000 的平方根求和。計(jì)算 10 萬個(gè)平方根很容易并行化,但對它們的值求和很麻煩,因?yàn)槲覀儽仨氭i定更新總數(shù):
object locker = new object();
double total = 0;
Parallel.For (1, 10000000,
i => { lock (locker) total += Math.Sqrt (i); });
并行化的收益被獲得 10 萬個(gè)鎖的成本以及由此產(chǎn)生的阻塞所抵消。
然而,現(xiàn)實(shí)情況是,我們實(shí)際上不需要10萬把鎖。想象一下,一隊(duì)志愿者撿起大量垃圾。如果所有工人共用一個(gè)垃圾桶,那么旅行和爭用將使該過程效率極低。顯而易見的解決方案是讓每個(gè)工人都有一個(gè)私人或“本地”垃圾桶,偶爾會(huì)將其倒入主垃圾箱。
For 和 ForEach 的 TLocal 版本正是以這種方式工作的。志愿者是內(nèi)部工作線程,本地垃圾桶。要使執(zhí)行此作業(yè),必須向其提供兩個(gè)指示的附加委托:
此外,而不是返回 void 的主體委托,它應(yīng)該返回本地值的新聚合。下面是重構(gòu)的示例:
object locker = new object();
double grandTotal = 0;
Parallel.For (1, 10000000,
() => 0.0, // Initialize the local value.
(i, state, localTotal) => // Body delegate. Notice that it
localTotal + Math.Sqrt (i), // returns the new local total.
localTotal => // Add the local value
{ lock (locker) grandTotal += localTotal; } // to the master value.
);
我們?nèi)匀槐仨氭i定,但只能將局部值聚合到總計(jì)。這使得該過程大大提高了效率。
如前所述,PLINQ 通常非常適合這些場景。我們的示例可以像這樣與 PLINQ 并行化:
ParallelEnumerable.Range (1, 10000000)
.Sum (i => Math.Sqrt (i))
(請注意,我們使用 ParallelEnumerable 來強(qiáng)制:在這種情況下,這可以提高性能,因?yàn)樗袛?shù)字的處理時(shí)間都相同。
在更復(fù)雜的方案中,可以使用 LINQ 的聚合運(yùn)算符而不是 Sum 。如果您提供了本地種子工廠,則情況有點(diǎn)類似于使用 Parallel.For 提供本地值函數(shù)。
是使用 PFX 并行化的最低級(jí)別方法。用于在此級(jí)別工作的類在 System.Threading.Tasks 命名空間中定義,包括以下內(nèi)容:
類 | 目的 |
任務(wù) | 用于管理工作單元 |
任務(wù)<TResult> | 用于管理具有返回值的工作單位 |
任務(wù)工廠 | 用于創(chuàng)建任務(wù) |
TaskFactory<TResult> | 用于創(chuàng)建具有相同返回類型的任務(wù)和延續(xù) |
任務(wù)計(jì)劃程序 | 用于管理任務(wù)計(jì)劃 |
任務(wù)完成源 | 用于手動(dòng)控制任務(wù)的工作流 |
我們在中介紹了任務(wù)的基礎(chǔ)知識(shí);在本節(jié)中,我們將介紹針對并行編程的任務(wù)的高級(jí)功能:
任務(wù)并行庫允許您以最小的開銷創(chuàng)建數(shù)百(甚至數(shù)千)個(gè)任務(wù)。但是,如果要?jiǎng)?chuàng)建數(shù)百萬個(gè)任務(wù),則需要將這些任務(wù)劃分為更大的工作單元以保持效率。并行類和 PLINQ 會(huì)自動(dòng)執(zhí)行此操作。
Visual Studio 提供了一個(gè)用于監(jiān)視任務(wù)(調(diào)試→窗口→并行任務(wù))的窗口。這等效于“線程”窗口,但用于任務(wù)。“并行堆棧”窗口還具有用于任務(wù)的特殊模式。
如所述,Task.Run創(chuàng)建并啟動(dòng)一個(gè)任務(wù)或任務(wù)<> 。此方法實(shí)際上是調(diào)用 Task.Factory.StartNew 的快捷方式,它通過額外的重載提供了更大的靈活性。
Task.Factory.StartNew 允許您指定傳遞給目標(biāo)然后,目標(biāo)方法的簽名必須包含單個(gè)對象類型參數(shù):
var task = Task.Factory.StartNew (Greet, "Hello");
task.Wait(); // Wait for task to complete.
void Greet (object state) { Console.Write (state); } // Hello
這避免了執(zhí)行調(diào)用 Greet 的 lambda 表達(dá)式所需的閉包成本。這是一個(gè)微優(yōu)化,在實(shí)踐中很少需要,因此我們可以更好地利用對象,即為任務(wù)分配一個(gè)有意義的名稱。然后,我們可以使用 AsyncState 屬性來查詢其名稱:
var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting");
Console.WriteLine (task.AsyncState); // Greeting
task.Wait();
void Greet (string message) { Console.Write (message); }
Visual Studio 在“并行任務(wù)”窗口中顯示每個(gè)任務(wù)的 AsyncState,因此在此處使用有意義的名稱可以大大簡化調(diào)試。
您可以通過在調(diào)用 StartNew(或?qū)嵗蝿?wù))時(shí)指定 TaskCreationOptions 枚舉來調(diào)整任務(wù)的執(zhí)行。TaskCreationOptions 是一個(gè)具有以下(可組合)值的標(biāo)志枚舉:
LongRunning, PreferFairness, AttachedToParent
LongRun 建議調(diào)度程序?qū)⒁粋€(gè)線程專用于任務(wù),正如我們中所述,這對于 I/O 綁定任務(wù)和長時(shí)間運(yùn)行的任務(wù)是有益的,否則這些任務(wù)可能會(huì)迫使短期運(yùn)行的任務(wù)在調(diào)度之前等待不合理的時(shí)間。
PreferFairness 指示調(diào)度程序嘗試確保按啟動(dòng)順序調(diào)度任務(wù)。它通常可能會(huì)這樣做,因?yàn)樗褂帽镜毓ぷ鞲`取隊(duì)列在內(nèi)部優(yōu)化任務(wù)調(diào)度 — 這種優(yōu)化允許創(chuàng)建任務(wù),而不會(huì)產(chǎn)生單個(gè)工作隊(duì)列產(chǎn)生的爭用開銷。通過指定“附加到父項(xiàng)”來創(chuàng)建子任務(wù)。
當(dāng)一個(gè)任務(wù)啟動(dòng)另一個(gè)任務(wù)時(shí),您可以選擇建立父子:
Task parent = Task.Factory.StartNew (() =>
{
Console.WriteLine ("I am a parent");
Task.Factory.StartNew (() => // Detached task
{
Console.WriteLine ("I am detached");
});
Task.Factory.StartNew (() => // Child task
{
Console.WriteLine ("I am a child");
}, TaskCreationOptions.AttachedToParent);
});
子任務(wù)的特殊之處在于,當(dāng)您等待任務(wù)完成時(shí),它也會(huì)等待任何子任務(wù)。此時(shí),任何子異常都會(huì)冒泡:
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() =>
{
Task.Factory.StartNew (() => // Child
{
Task.Factory.StartNew (() => { throw null; }, atp); // Grandchild
}, atp);
});
// The following call throws a NullReferenceException (wrapped
// in nested AggregateExceptions):
parent.Wait();
當(dāng)子任務(wù)是延續(xù)時(shí),這可能特別有用,您很快就會(huì)看到。
我們在中看到,您可以通過調(diào)用其 Wait 方法或訪問其 Result 屬性(如果它是 Task<TResult> )來等待單個(gè)任務(wù)。您還可以通過靜態(tài)方法 Task.WaitAll(等待所有指定任務(wù)完成)和 Task.WaitAny(僅等待一個(gè)任務(wù)完成)一次等待多個(gè)任務(wù)。
WaitAll 類似于依次等待每個(gè)任務(wù),但效率更高,因?yàn)樗ㄗ疃啵┲恍枰粋€(gè)上下文切換。此外,如果一個(gè)或多個(gè)任務(wù)引發(fā)未經(jīng)處理的異常,WaitAll 仍會(huì)等待每個(gè)任務(wù)。然后,它會(huì)重新拋出一個(gè) AggregateException,該異常累積每個(gè)錯(cuò)誤任務(wù)的異常(這是 AggregateException 真正有用的地方)。這相當(dāng)于這樣做:
// Assume t1, t2 and t3 are tasks:
var exceptions = new List<Exception>();
try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
if (exceptions.Count > 0) throw new AggregateException (exceptions);
調(diào)用 WaitAny 等效于等待 ManualResetEventSlim,該任務(wù)在完成時(shí)由每個(gè)任務(wù)發(fā)出信號(hào)。
除了超時(shí)之外,您還可以將傳遞給 Wait 方法:這允許您取消等待,。
您可以選擇在啟動(dòng)任務(wù)時(shí)傳入取消令牌。然后,如果通過該令牌取消,任務(wù)本身將進(jìn)入“已取消”狀態(tài):
var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
cts.CancelAfter (500);
Task task = Task.Factory.StartNew (() =>
{
Thread.Sleep (1000);
token.ThrowIfCancellationRequested(); // Check for cancellation request
}, token);
try { task.Wait(); }
catch (AggregateException ex)
{
Console.WriteLine (ex.InnerException is TaskCanceledException); // True
Console.WriteLine (task.IsCanceled); // True
Console.WriteLine (task.Status); // Canceled
}
TaskCanceledException 是 OperationCanceledException 的一個(gè)子類。如果要顯式拋出 OperationCanceledException(而不是調(diào)用令牌。ThrowIfCancelRequest),您必須將取消令牌傳遞到 OperationCanceledException 的構(gòu)造函數(shù)中。如果不這樣做,任務(wù)將不會(huì)以 TaskStatus.Canceled 狀態(tài)結(jié)束,也不會(huì)觸發(fā) OnlyOnCanceled 。
如果任務(wù)在啟動(dòng)之前被取消,則不會(huì)計(jì)劃它 - 操作已取消異常將立即拋給任務(wù)。
由于取消令牌由其他 API 識(shí)別,因此您可以將它們傳遞到其他構(gòu)造中,取消將無縫傳播:
var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
Task task = Task.Factory.StartNew (() =>
{
// Pass our cancellation token into a PLINQ query:
var query = someSequence.AsParallel().WithCancellation (token)...
... enumerate query ...
});
在此示例中調(diào)用 Cancel on cancelSource 將取消 PLINQ 查詢,該查詢將在任務(wù)正文上引發(fā) OperationCanceledException,然后取消任務(wù)。
可以傳遞到“等待”和“取消和等待”等方法中的取消令牌允許您取消操作,而不是取消任務(wù)本身。
方法在任務(wù)結(jié)束后立即執(zhí)行委托:
Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));
一旦任務(wù) 1(任務(wù))完成、失敗或取消,任務(wù) 2()就會(huì)啟動(dòng)。(如果任務(wù) 1 在第二行代碼運(yùn)行之前已完成,則任務(wù) 2 將計(jì)劃立即執(zhí)行。傳遞給延續(xù)的 lambda 表達(dá)式的 ant 參數(shù)是對先驗(yàn)任務(wù)的引用。ContinueWith 本身會(huì)返回一個(gè)任務(wù),以便輕松添加進(jìn)一步的延續(xù)。
默認(rèn)情況下,前置任務(wù)和延續(xù)任務(wù)可以在不同的線程上執(zhí)行。您可以通過指定 TaskContinuationOptions.ExecuteSync 在調(diào)用 ContinueWith 時(shí)強(qiáng)制它們在同一線程上執(zhí)行:這可以通過減少間接性來提高非常細(xì)粒度的延續(xù)的性能。
就像普通任務(wù)一樣,延續(xù)可以是 Task<TResult> 類型并返回?cái)?shù)據(jù)。在下面的示例中,我們使用一系列鏈?zhǔn)饺蝿?wù)計(jì)算 Math.Sqrt(8*2),然后寫出結(jié)果:
Task.Factory.StartNew<int> (() => 8)
.ContinueWith (ant => ant.Result * 2)
.ContinueWith (ant => Math.Sqrt (ant.Result))
.ContinueWith (ant => Console.WriteLine (ant.Result)); // 4
為了簡單起見,我們的例子有些做作;在現(xiàn)實(shí)生活中,這些 lambda 表達(dá)式會(huì)調(diào)用計(jì)算密集型函數(shù)。
延續(xù)可以通過查詢前置任務(wù)的 Exception 屬性來了解前置任務(wù)是否出錯(cuò),或者只是通過調(diào)用 Result / Wait 并捕獲生成的 AggregateException。如果先前的錯(cuò)誤,而延續(xù)也不存在,則異常被視為,并且靜態(tài) TaskScheduler.UnobservedTaskException 事件在稍后對任務(wù)進(jìn)行垃圾回收時(shí)觸發(fā)。
安全的模式是重新引發(fā)先前的異常。只要延續(xù)是 Wait edon,異常就會(huì)傳播并重新拋出到 Waiter:
Task continuation = Task.Factory.StartNew (() => { throw null; })
.ContinueWith (ant =>
{
ant.Wait();
// Continue processing...
});
continuation.Wait(); // Exception is now thrown back to caller.
處理異常的另一種方法是為特殊與非異常結(jié)果指定不同的延續(xù)。這是通過任務(wù)延續(xù)選項(xiàng)完成的:
Task task1 = Task.Factory.StartNew (() => { throw null; });
Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),
TaskContinuationOptions.OnlyOnFaulted);
Task ok = task1.ContinueWith (ant => Console.Write ("Success!"),
TaskContinuationOptions.NotOnFaulted);
此模式在與子任務(wù)結(jié)合使用時(shí)特別有用,您很快就會(huì)看到。
以下擴(kuò)展方法“吞噬”任務(wù)的未處理異常:
public static void IgnoreExceptions (this Task task)
{
task.ContinueWith (t => { var ignore = t.Exception; },
TaskContinuationOptions.OnlyOnFaulted);
}
(這可以通過添加代碼來記錄異常來改進(jìn)。以下是它的使用方式:
Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions();
延續(xù)的一個(gè)強(qiáng)大功能是,它們僅在所有子任務(wù)完成時(shí)啟動(dòng)(參見)。此時(shí),子項(xiàng)引發(fā)的任何異常都將封送到延續(xù)中。
在下面的示例中,我們啟動(dòng)三個(gè)子任務(wù),每個(gè)子任務(wù)拋出一個(gè) NullReferenceException 。然后,我們通過父級(jí)的延續(xù)一舉捕獲所有這些:
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew (() =>
{
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
Task.Factory.StartNew (() => { throw null; }, atp);
})
.ContinueWith (p => Console.WriteLine (p.Exception),
TaskContinuationOptions.OnlyOnFaulted);
默認(rèn)情況下,無論前置任務(wù)完成、引發(fā)異常還是取消,都會(huì)無地安排延續(xù)。您可以通過 TaskContinuationOptions 枚舉中包含的一組(可組合的)標(biāo)志來更改此行為。以下是控制條件延續(xù)的三個(gè)核心標(biāo)志:
NotOnRanToCompletion = 0x10000,
NotOnFaulted = 0x20000,
NotOnCanceled = 0x40000,
這些標(biāo)志是減法的,因?yàn)槟鷳?yīng)用的越多,執(zhí)行延續(xù)的可能性就越小。為方便起見,還有以下預(yù)組合值:
OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled,
OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled,
OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted
(組合所有 Not* 標(biāo)志 [ NotOnRanToCompletion, NotOnFaulted , NotOnCanceled ] 是荒謬的,因?yàn)樗鼤?huì)導(dǎo)致延續(xù)總是。
“RanToComplete”表示前置成功,沒有取消或未處理的異常。
“錯(cuò)誤”表示在前置項(xiàng)上拋出未處理的異常。
“已取消”是指以下兩種情況之一:
必須掌握的是,當(dāng)延續(xù)沒有通過這些標(biāo)志執(zhí)行時(shí),延續(xù)不會(huì)被遺忘或放棄 - 它會(huì)被取消。這意味著延續(xù)本身的任何延續(xù),除非您使用 NotOnCanceled 謂詞它們。例如,考慮一下:
Task t1 = Task.Factory.StartNew (...);
Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),
TaskContinuationOptions.OnlyOnFaulted);
Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));
就目前而言,t3 將始終被調(diào)度 — 即使 t1 沒有引發(fā)異常(參見)。這是因?yàn)槿绻?t1 成功,故障任務(wù)將被取消,并且對 t3 沒有繼續(xù)限制,t3 將無條件執(zhí)行。
如果我們希望 t3 僅在錯(cuò)誤實(shí)際運(yùn)行時(shí)執(zhí)行,我們必須這樣做:
Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"),
TaskContinuationOptions.NotOnCanceled);
(或者,我們可以指定 OnlyOnRanToCompletion;不同之處在于,如果在錯(cuò)誤中拋出異常,t3 將不會(huì)執(zhí)行。
可以使用 TaskFactory 類中的 ContinueWhenAll 和 ContinueWhenAny 方法,根據(jù)多個(gè)前因的完成情況來安排繼續(xù)執(zhí)行。然而,隨著(WhenAll和WhenAny)中討論的任務(wù)組合器的引入,這些方法變得多余。具體來說,給定以下任務(wù)
var task1 = Task.Run (() => Console.Write ("X"));
var task2 = Task.Run (() => Console.Write ("Y"));
我們可以安排在兩者都完成后執(zhí)行的延續(xù),如下所示:
var continuation = Task.Factory.ContinueWhenAll (
new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));
下面是與 WhenAll 任務(wù)組合器相同的結(jié)果:
var continuation = Task.WhenAll (task1, task2)
.ContinueWith (ant => Console.WriteLine ("Done"));
對同一任務(wù)多次調(diào)用 ContinueWith 會(huì)在單個(gè)前因上創(chuàng)建多個(gè)延續(xù)。當(dāng)前置完成時(shí),所有延續(xù)將一起開始(除非您指定 TaskContinuationOptions.ExecuteSyncly ,在這種情況下,延續(xù)將按順序執(zhí)行)。
以下內(nèi)容等待一秒鐘,然后寫入 XY 或 YX:
var t = Task.Factory.StartNew (() => Thread.Sleep (1000));
t.ContinueWith (ant => Console.Write ("X"));
t.ContinueWith (ant => Console.Write ("Y"));
任務(wù)計(jì)劃程序?qū)⑷蝿?wù)分配給線程,并由抽象的任務(wù)類表示。 .NET 提供了兩個(gè)具體的實(shí)現(xiàn):與 CLR 線程池協(xié)同工作的默認(rèn)計(jì)劃程序和。 后者(主要)旨在幫助您使用 WPF 和 Windows 窗體的線程模型,該模型要求用戶界面元素和控件只能從創(chuàng)建它們的線程訪問(請參見中的)。通過捕獲它,我們可以指示任務(wù)或延續(xù)在此上下文中執(zhí)行:
// Suppose we are on a UI thread in a Windows Forms / WPF application:
_uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
假設(shè) Foo 是一個(gè)返回字符串的計(jì)算綁定方法,并且 lblResult 是一個(gè) WPF 或 Windows 窗體標(biāo)簽,那么我們可以在操作完成后安全地更新標(biāo)簽,如下所示:
Task.Run (() => Foo())
.ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);
當(dāng)然,C# 的異步函數(shù)更常用于這種事情。
也可以編寫我們自己的任務(wù)調(diào)度程序(通過子類化 任務(wù)調(diào)度程序 ),盡管這只有在非常專業(yè)的情況下才會(huì)這樣做。對于自定義計(jì)劃,您更常使用 TaskCompletionSource。
當(dāng)您調(diào)用 Task.Factory 時(shí),您將在 Task 上調(diào)用返回默認(rèn) TaskFactory 對象的靜態(tài)屬性。任務(wù)工廠的目的是創(chuàng)建任務(wù);具體來說,有三種任務(wù):
創(chuàng)建任務(wù)的另一種方法是實(shí)例化任務(wù)并調(diào)用 Start 。但是,這允許您僅創(chuàng)建“普通”任務(wù),而不創(chuàng)建延續(xù)。
TaskFactory 不是一個(gè)工廠:您實(shí)際上可以實(shí)例化類,當(dāng)您想要使用相同(非標(biāo)準(zhǔn))值為 TaskCreationOptions、TaskContinuationOptions 或 TaskScheduler 重復(fù)創(chuàng)建任務(wù)時(shí),這很有用。例如,如果我們想重復(fù)創(chuàng)建長時(shí)間運(yùn)行的任務(wù),我們可以創(chuàng)建一個(gè)自定義工廠,如下所示:
var factory = new TaskFactory (
TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,
TaskContinuationOptions.None);
然后,只需在工廠調(diào)用 StartNew,即可創(chuàng)建任務(wù):
Task task1 = factory.StartNew (Method1);
Task task2 = factory.StartNew (Method2);
...
自定義繼續(xù)選項(xiàng)在調(diào)用 ContinueWhenAll 和 ContinueWhenAny 時(shí)應(yīng)用。
正如我們所看到的,PLINQ、并行類和任務(wù)會(huì)自動(dòng)將異常封送到使用者。若要了解為什么這是必不可少的,請考慮以下 LINQ 查詢,該查詢在第一次迭代時(shí)引發(fā) DivideByZeroException:
try
{
var query = from i in Enumerable.Range (0, 1000000)
select 100 / i;
...
}
catch (DivideByZeroException)
{
...
}
如果我們要求 PLINQ 并行化此查詢,并且它忽略了異常的處理,則可能會(huì)在上拋出 DivideByZeroException,從而繞過我們的 catch 塊并導(dǎo)致應(yīng)用程序死亡。
因此,會(huì)自動(dòng)捕獲異常并將其重新拋出給調(diào)用方。但不幸的是,這并不像捕獲DivideByZeroException那么簡單。由于這些庫使用許多線程,因此實(shí)際上可以同時(shí)引發(fā)兩個(gè)或多個(gè)異常。因此,為了確保報(bào)告所有異常,異常將包裝在 AggregateException 容器中,該容器公開一個(gè)包含每個(gè)捕獲異常的 InnerExceptions 屬性:
try
{
var query = from i in ParallelEnumerable.Range (0, 1000000)
select 100 / i;
// Enumerate query
...
}
catch (AggregateException aex)
{
foreach (Exception ex in aex.InnerExceptions)
Console.WriteLine (ex.Message);
}
PLINQ 和 Parallel 類在遇到第一個(gè)異常時(shí)都會(huì)結(jié)束查詢或循環(huán)執(zhí)行,方法是不處理任何其他元素或循環(huán)體。但是,在當(dāng)前周期完成之前,可能會(huì)引發(fā)更多異常。AggregateException 中的第一個(gè)異常在 InnerException 屬性中可見。
AggregateException 類提供了幾種方法來簡化異常處理:Flatten 和 Handle 。
AggregateExceptions 通常會(huì)包含其他 AggregateExceptions。發(fā)生這種情況的一個(gè)示例是子任務(wù)引發(fā)異常。您可以通過調(diào)用 Flatten 來消除任何級(jí)別的嵌套以簡化處理。此方法返回一個(gè)新的 AggregateException,其中包含內(nèi)部異常的簡單平面列表:
catch (AggregateException aex)
{
foreach (Exception ex in aex.Flatten().InnerExceptions)
myLogWriter.LogException (ex);
}
有時(shí),僅捕獲特定的異常類型并重新引發(fā)其他類型的異常類型很有用。AggregateException 上的 Handle 方法提供了執(zhí)行此操作的快捷方式。它接受一個(gè)異常謂詞,它在每個(gè)內(nèi)部上運(yùn)行:
public void Handle (Func<Exception, bool> predicate)
如果謂詞返回 true,則認(rèn)為該異常“已處理”。委托運(yùn)行每個(gè)異常后,將發(fā)生以下情況:
例如,以下內(nèi)容最終會(huì)重新拋出另一個(gè)包含單個(gè) NullReferenceException 的 AggregateException:
var parent = Task.Factory.StartNew (() =>
{
// We’ll throw 3 exceptions at once using 3 child tasks:
int[] numbers = { 0 };
var childFactory = new TaskFactory
(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
childFactory.StartNew (() => 5 / numbers[0]); // Division by zero
childFactory.StartNew (() => numbers [1]); // Index out of range
childFactory.StartNew (() => { throw null; }); // Null reference
});
try { parent.Wait(); }
catch (AggregateException aex)
{
aex.Flatten().Handle (ex => // Note that we still need to call Flatten
{
if (ex is DivideByZeroException)
{
Console.WriteLine ("Divide by zero");
return true; // This exception is "handled"
}
if (ex is IndexOutOfRangeException)
{
Console.WriteLine ("Index out of range");
return true; // This exception is "handled"
}
return false; // All other exceptions will get rethrown
});
}
.NET 在 System.Collections.Concurrent 中提供線程安全集合:
并發(fā)收集 | 非并發(fā)等效項(xiàng) |
ConcurrentStack<T> | 堆棧<T> |
ConcurrentQueue<T> | 隊(duì)列<T> |
ConcurrentBag<T> | (無) |
并發(fā)詞典<啦,電視> | 詞典<啦,電視> |
并發(fā)集合針對高并發(fā)場景進(jìn)行了優(yōu)化;但是,每當(dāng)需要線程安全集合(作為鎖定普通集合的替代方法)時(shí),它們也很有用。不過,有一些注意事項(xiàng):
換句話說,這些集合不僅僅是使用帶鎖的普通集合的快捷方式。為了演示,如果我們在線程上執(zhí)行以下代碼
var d = new ConcurrentDictionary<int,int>();
for (int i = 0; i < 1000000; i++) d[i] = 123;
它的運(yùn)行速度比這慢三倍:
var d = new Dictionary<int,int>();
for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123;
(但是 ConcurrentDictionary 讀取速度很快,因?yàn)樽x取是。
并發(fā)集合與傳統(tǒng)集合的不同之處在于,它們公開了執(zhí)行原子測試和操作操作的特殊方法,例如 TryPop 。這些方法中的大多數(shù)都是通過IProducerConsumerCollection<T>統(tǒng)一的。
生產(chǎn)者/使用者集合是兩個(gè)主要用例的集合:
典型的示例是堆棧和隊(duì)列。生產(chǎn)者/使用者集合在并行編程中非常重要,因?yàn)樗鼈冇欣诟咝У臒o鎖實(shí)現(xiàn)。
IProducerConsumerCollection<T> 接口表示線程安全的集合。以下類實(shí)現(xiàn)此接口:
ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>
IProducerConsumerCollection<T>擴(kuò)展了ICollection,增加了以下方法:
void CopyTo (T[] array, int index);
T[] ToArray();
bool TryAdd (T item);
bool TryTake (out T item);
TryAdd 和 TryTake 方法測試是否可以執(zhí)行添加/刪除操作;如果是這樣,他們將執(zhí)行添加/刪除。測試和操作以原子方式執(zhí)行,無需像在傳統(tǒng)集合周圍那樣鎖定:
int result;
lock (myStack) if (myStack.Count > 0) result = myStack.Pop();
如果集合為空,則 TryTake 返回 false。TryAdd 始終成功,并在提供的三個(gè)實(shí)現(xiàn)中返回 true。但是,如果您編寫了自己的并發(fā)集合,該集合禁止重復(fù),則如果元素已存在,則會(huì)使 TryAdd 返回 false(例如,如果您編寫了并發(fā))。
TryTake 刪除的特定元素由子類定義:
這三個(gè)具體的類大多顯式地實(shí)現(xiàn) TryTake 和 TryAdd 方法,通過更具體命名的公共方法(如 TryDequeue 和 TryPop)公開相同的功能。
ConcurrentBag<T> 存儲(chǔ)對象的集合(允許重復(fù))。ConcurrentBag<T> 適用于在調(diào)用 Take 或 TryTake 時(shí)您哪個(gè)元素的情況。
ConcurrentBag<T> 相對于并發(fā)隊(duì)列或堆棧的好處是,當(dāng)同時(shí)被多個(gè)線程調(diào)用時(shí),包的 Add 用。相反,在隊(duì)列或堆棧上并行調(diào)用 Add 會(huì)導(dǎo)致爭用(盡管比鎖定集合要少得多)。在并發(fā)包上調(diào)用 Take 也非常有效——只要每個(gè)線程占用的元素不超過它添加的元素。
在并發(fā)包中,每個(gè)線程都有自己的私有鏈表。元素被添加到屬于調(diào)用 Add 的線程的私有列表中,消除了。枚舉包時(shí),枚舉器遍歷每個(gè)線程的私有列表,依次生成其每個(gè)元素。
當(dāng)您調(diào)用 Take 時(shí),包首先查看當(dāng)前線程的私有列表。如果至少有一個(gè)元素,1它可以輕松完成任務(wù),而不會(huì)爭用。但是,如果列表為空,則必須從另一個(gè)線程的私有列表中“竊取”元素,并引起爭用的可能性。
因此,準(zhǔn)確地說,調(diào)用 Take 會(huì)為您提供最近在該線程上添加的元素;如果該線程上沒有元素,它將為您提供最近在另一個(gè)線程上添加的元素,這是隨機(jī)選擇的。
當(dāng)集合上的并行操作主要包括添加元素時(shí),或者當(dāng)添加 s 和 Take s 在線程上平衡時(shí),并發(fā)袋是理想的選擇。我們之前在使用 Parallel.ForEach 實(shí)現(xiàn)并行拼寫檢查器時(shí)看到了前者的示例:
var misspellings = new ConcurrentBag<Tuple<int,string>>();
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
if (!wordLookup.Contains (word))
misspellings.Add (Tuple.Create ((int) i, word));
});
對于生產(chǎn)者/消費(fèi)者隊(duì)列來說,并發(fā)包將是一個(gè)糟糕的選擇,因?yàn)樵厥怯删€程添加和刪除的。
如果在上一節(jié)中討論的任何生產(chǎn)者/使用者集合上調(diào)用 TryTake,ConcurrentStack<T>、ConcurrentQueue<T> 和 ConcurrentBag<T> ,并且集合為空,則該方法返回 false 。有時(shí),在這種情況下,到元素可用會(huì)更有用。
PFX 的設(shè)計(jì)人員沒有使用此功能重載 TryTake 方法(在允許取消令牌和超時(shí)后會(huì)導(dǎo)致成員井噴),而是將此功能封裝到名為 BlockingCollection<T> 的包裝類中。阻塞集合包裝實(shí)現(xiàn) IProducerConsumerCollection<T> 的任何集合,并允許您從包裝的集合中獲取元素 — 如果沒有可用的元素,則阻止。
阻止集合還允許您限制集合的總大小,如果超過該大小,則阻止。以這種方式限制的集合稱為。
要使用 BlockingCollection<T> :
如果在不傳入集合的情況下調(diào)用構(gòu)造函數(shù),則該類將自動(dòng)實(shí)例化 ConcurrentQueue<T> 。生成和使用方法允許您指定取消令牌和超時(shí)。如果集合大小有限,則添加和 TryAdd 可能會(huì)阻止;當(dāng)集合為空時(shí),Take 和 TryTake 塊。
使用元素的另一種方法是調(diào)用 GetConsumingEnumerable 。這將返回一個(gè)(可能)無限序列,該序列在元素可用時(shí)生成元素。您可以通過調(diào)用 CompleteAdd 來強(qiáng)制序列結(jié)束:此方法還可以防止其他元素排隊(duì)。
BlockingCollection 還提供了稱為 AddToAny 和 TakeFromAny 的靜態(tài)方法,它們允許您在指定多個(gè)阻塞集合的同時(shí)添加或獲取元素。然后,能夠?yàn)檎埱筇峁┓?wù)的第一個(gè)集合將執(zhí)行該操作。
生產(chǎn)者/使用者隊(duì)列是一種有用的結(jié)構(gòu),無論是在并行編程還是常規(guī)并發(fā)方案中。以下是它的工作原理:
生產(chǎn)者/使用者隊(duì)列可讓您精確控制一次執(zhí)行多少工作線程,這不僅可用于限制 CPU 消耗,還可用于限制其他資源。例如,如果任務(wù)執(zhí)行密集型磁盤 I/O,則可以限制并發(fā)性以避免使操作系統(tǒng)和其他應(yīng)用程序匱乏。您還可以在隊(duì)列的整個(gè)生命周期內(nèi)動(dòng)態(tài)添加和刪除工作線程。CLR 的線程池本身是一種生產(chǎn)者/使用者隊(duì)列,針對短期運(yùn)行的計(jì)算綁定作業(yè)進(jìn)行了優(yōu)化。
生產(chǎn)者/使用者隊(duì)列通常包含對其執(zhí)行(相同)任務(wù)的數(shù)據(jù)項(xiàng)。例如,數(shù)據(jù)項(xiàng)可能是文件名,任務(wù)可能是加密這些文件。但是,通過將項(xiàng)目設(shè)置為委托,您可以編寫一個(gè)更通用的生產(chǎn)者/使用者隊(duì)列,其中每個(gè)項(xiàng)目都可以執(zhí)行任何操作。
,我們展示了如何使用AutoResetEvent從頭開始編寫生產(chǎn)者/消費(fèi)者隊(duì)列(以及后來使用監(jiān)視器的等待和脈沖)。但是,從頭開始編寫生產(chǎn)者/消費(fèi)者是不必要的,因?yàn)榇蠖鄶?shù)功能都是由 BlockingCollection<T> 提供的。以下是使用它的方法:
public class PCQueue : IDisposable
{
BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew (Consume);
}
public void Enqueue (Action action) { _taskQ.Add (action); }
void Consume()
{
// This sequence that we’re enumerating will block when no elements
// are available and will end when CompleteAdding is called.
foreach (Action action in _taskQ.GetConsumingEnumerable())
action(); // Perform task.
}
public void Dispose() { _taskQ.CompleteAdding(); }
}
因?yàn)槲覀儧]有將任何東西傳遞到 BlockingCollection 的構(gòu)造函數(shù)中,所以它會(huì)自動(dòng)實(shí)例化一個(gè)并發(fā)隊(duì)列。如果我們在ConcurrentStack中傳遞,我們最終會(huì)得到一個(gè)生產(chǎn)者/消費(fèi)者堆棧。
我們剛剛編寫的生產(chǎn)者/使用者是不靈活的,因?yàn)槲覀儫o法在工作項(xiàng)排隊(duì)后跟蹤它們。如果我們能做到以下幾點(diǎn),那就太好了:
一個(gè)理想的解決方案是讓 Enqueue 方法返回一些對象,為我們提供剛才描述的功能。好消息是,已經(jīng)存在一個(gè)類來做到這一點(diǎn) - Task 類,我們可以使用 TaskCompletionSource 或通過直接實(shí)例化(創(chuàng)建未啟動(dòng)或任務(wù))來生成它:
public class PCQueue : IDisposable
{
BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();
public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew (Consume);
}
public Task Enqueue (Action action, CancellationToken cancelToken
= default (CancellationToken))
{
var task = new Task (action, cancelToken);
_taskQ.Add (task);
return task;
}
public Task<TResult> Enqueue<TResult> (Func<TResult> func,
CancellationToken cancelToken = default (CancellationToken))
{
var task = new Task<TResult> (func, cancelToken);
_taskQ.Add (task);
return task;
}
void Consume()
{
foreach (var task in _taskQ.GetConsumingEnumerable())
try
{
if (!task.IsCanceled) task.RunSynchronously();
}
catch (InvalidOperationException) { } // Race condition
}
public void Dispose() { _taskQ.CompleteAdding(); }
}
在 排隊(duì) ,我們將創(chuàng)建但不啟動(dòng)的任務(wù)排隊(duì)并返回給調(diào)用方。
在 消費(fèi) 中,我們在使用者的線程上同步運(yùn)行任務(wù)。我們捕獲一個(gè) InvalidOperationException 來處理在檢查任務(wù)是否已取消和運(yùn)行它之間取消任務(wù)的不太可能的事件。
以下是我們?nèi)绾问褂么祟悾?/span>
var pcQ = new PCQueue (2); // Maximum concurrency of 2
string result = await pcQ.Enqueue (() => "That was easy!");
...
因此,我們擁有任務(wù)的所有好處(異常傳播、返回值和取消),同時(shí)完全控制調(diào)度。
*請認(rèn)真填寫需求信息,我們會(huì)在24小時(shí)內(nèi)與您取得聯(lián)系。