文鏈接: go-zero 的自適應熔斷器
上篇文章我們介紹了微服務的限流,詳細分析了計數器限流和令牌桶限流算法,這篇文章來說說熔斷。
熔斷和限流還不太一樣,限流是控制請求速率,只要還能承受,那么都會處理,但熔斷不是。
在一條調用鏈上,如果發現某個服務異常,比如響應超時。那么調用者為了避免過多請求導致資源消耗過大,最終引發系統雪崩,會直接返回錯誤,而不是瘋狂調用這個服務。
本篇文章會介紹主流熔斷器的工作原理,并且會借助 go-zero 源碼,分析 googleBreaker 是如何通過滑動窗口來統計流量,并且最終執行熔斷的。
這部分主要介紹兩種熔斷器的工作原理,分別是 Netflix 開源的 Hystrix,其也是 Spring Cloud 默認的熔斷組件,和 Google 的自適應的熔斷器。
Hystrix is no longer in active development, and is currently in maintenance mode.
注意,Hystrix 官方已經宣布不再積極開發了,目前處在維護模式。
Hystrix 官方推薦替代的開源組件:Resilience4j,還有阿里開源的 Sentinel 也是不錯的替代品。
Hystrix 采用了熔斷器模式,相當于電路中的保險絲,系統出現緊急問題,立刻禁止所有請求,已達到保護系統的作用。
系統需要維護三種狀態,分別是:
通過狀態的變更,可以有效防止系統雪崩的問題。同時,在半斷開狀態下,又可以讓系統進行自我修復。
googleBreaker 實現了一種自適應的熔斷模式,來看一下算法的計算公式,客戶端請求被拒絕的概率。
參數很少,也比較好理解:
通過分析公式,我們可以得到下面幾個結論,也就是產生熔斷的實際原理:
總的來說,googleBreaker 的實現方案更加優雅,而且參數也少,不用維護那么多的狀態。
go-zero 就是采用了 googleBreaker 的方案,下面就來分析代碼,看看到底是怎么實現的。
接口定義這部分我個人感覺還是挺不好理解的,看了好多遍才理清了它們之間的關系。
其實看代碼和看書是一樣的,書越看越薄,代碼會越看越短。剛開始看感覺代碼很長,隨著看懂的地方越來越多,明顯感覺代碼變短了。所以遇到不懂的代碼不要怕,反復看,總會看懂的。
首先來看一下 breaker 部分的 UML 圖,有了這張圖,很多地方看起來還是相對清晰的,下面來詳細分析。
這里用到了靜態代理模式,也可以說是接口裝飾器,接下來就看看到底是怎么定義的:
// core/breaker/breaker.go
internalThrottle interface {
allow() (internalPromise, error)
doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}
// core/breaker/googlebreaker.go
type googleBreaker struct {
k float64
stat *collection.RollingWindow
proba *mathx.Proba
}
這個接口是最終實現熔斷方法的接口,由 googleBreaker 結構體實現。
// core/breaker/breaker.go
throttle interface {
allow() (Promise, error)
doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}
type loggedThrottle struct {
name string
internalThrottle
errWin *errorWindow
}
func newLoggedThrottle(name string, t internalThrottle) loggedThrottle {
return loggedThrottle{
name: name,
internalThrottle: t,
errWin: new(errorWindow),
}
}
這個是實現了日志收集的結構體,首先它實現了 throttle 接口,然后它包含了一個字段 internalThrottle,相當于具體的熔斷方法是代理給 internalThrottle 來做的。
// core/breaker/breaker.go
func (lt loggedThrottle) allow() (Promise, error) {
promise, err :=lt.internalThrottle.allow()
return promiseWithReason{
promise: promise,
errWin: lt.errWin,
}, lt.logError(err)
}
func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
accept :=acceptable(err)
if !accept && err !=nil {
lt.errWin.add(err.Error())
}
return accept
}))
}
所以當它執行相應方法時,都是直接調用 internalThrottle 接口的方法,然后再加上自己的邏輯。
這也就是代理所起到的作用,在不改變原方法的基礎上,擴展原方法的功能。
// core/breaker/breaker.go
circuitBreaker struct {
name string
throttle
}
// NewBreaker returns a Breaker object.
// opts can be used to customize the Breaker.
func NewBreaker(opts ...Option) Breaker {
var b circuitBreaker
for _, opt :=range opts {
opt(&b)
}
if len(b.name)==0 {
b.name=stringx.Rand()
}
b.throttle=newLoggedThrottle(b.name, newGoogleBreaker())
return &b
}
最終的熔斷器又將功能代理給了 throttle。
這就是它們之間的關系,如果感覺有點亂的話,就反復看,看的次數多了,就清晰了。
上文介紹過了,loggedThrottle 是為了記錄日志而設計的代理層,這部分內容來分析一下是如何記錄日志的。
// core/breaker/breaker.go
type errorWindow struct {
// 記錄日志的數組
reasons [numHistoryReasons]string
// 索引
index int
// 數組元素數量,小于等于 numHistoryReasons
count int
lock sync.Mutex
}
func (ew *errorWindow) add(reason string) {
ew.lock.Lock()
// 記錄錯誤日志內容
ew.reasons[ew.index]=fmt.Sprintf("%s %s", time.Now().Format(timeFormat), reason)
// 對 numHistoryReasons 進行取余來得到數組索引
ew.index=(ew.index + 1) % numHistoryReasons
ew.count=mathx.MinInt(ew.count+1, numHistoryReasons)
ew.lock.Unlock()
}
func (ew *errorWindow) String() string {
var reasons []string
ew.lock.Lock()
// reverse order
for i :=ew.index - 1; i >=ew.index-ew.count; i-- {
reasons=append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
}
ew.lock.Unlock()
return strings.Join(reasons, "\n")
}
核心就是這里采用了一個環形數組,通過維護兩個字段來實現,分別是 index 和 count。
count 表示數組中元素的個數,最大值是數組的長度;index 是索引,每次 +1,然后對數組長度取余得到新索引。
我之前有一次面試就讓我設計一個環形數組,當時答的還不是很好,這次算是學會了。
一般來說,想要判斷是否需要觸發熔斷,那么首先要知道一段時間的請求數量,一段時間內的數量統計可以使用滑動窗口來實現。
首先看一下滑動窗口的定義:
// core/collection/rollingwindow.go
type RollingWindow struct {
lock sync.RWMutex
// 窗口大小
size int
// 窗口數據容器
win *window
// 時間間隔
interval time.Duration
// 游標,用于定位當前應該寫入哪個 bucket
offset int
// 匯總數據時,是否忽略當前正在寫入桶的數據
// 某些場景下因為當前正在寫入的桶數據并沒有經過完整的窗口時間間隔
// 可能導致當前桶的統計并不準確
ignoreCurrent bool
// 最后寫入桶的時間
// 用于計算下一次寫入數據間隔最后一次寫入數據的之間
// 經過了多少個時間間隔
lastTime time.Duration // start time of the last bucket
}
再來看一下 window 的結構:
type Bucket struct {
// 桶內值的和
Sum float64
// 桶內 add 次數
Count int64
}
func (b *Bucket) add(v float64) {
b.Sum +=v
b.Count++
}
func (b *Bucket) reset() {
b.Sum=0
b.Count=0
}
type window struct {
// 桶,一個桶就是一個時間間隔
buckets []*Bucket
// 窗口大小,也就是桶的數量
size int
}
有了這兩個結構之后,我們就可以畫出這個滑動窗口了,如圖所示。
現在來看一下向窗口中添加數據,是怎樣一個過程。
func (rw *RollingWindow) Add(v float64) {
rw.lock.Lock()
defer rw.lock.Unlock()
// 獲取當前寫入下標
rw.updateOffset()
// 向 bucket 中寫入數據
rw.win.add(rw.offset, v)
}
func (rw *RollingWindow) span() int {
// 計算距離 lastTime 經過了多少個時間間隔,也就是多少個桶
offset :=int(timex.Since(rw.lastTime) / rw.interval)
// 如果在窗口范圍內,返回實際值,否則返回窗口大小
if 0 <=offset && offset < rw.size {
return offset
}
return rw.size
}
func (rw *RollingWindow) updateOffset() {
// 經過了多少個時間間隔,也就是多少個桶
span :=rw.span()
// 還在同一單元時間內不需要更新
if span <=0 {
return
}
offset :=rw.offset
// reset expired buckets
// 這里是清除過期桶的數據
// 也是對數組大小進行取余的方式,類似上文介紹的環形數組
for i :=0; i < span; i++ {
rw.win.resetBucket((offset + i + 1) % rw.size)
}
// 更新游標
rw.offset=(offset + span) % rw.size
now :=timex.Now()
// align to interval time boundary
// 這里應該是一個時間的對齊,保持在桶內指向位置是一致的
rw.lastTime=now - (now-rw.lastTime)%rw.interval
}
// 向桶內添加數據
func (w *window) add(offset int, v float64) {
// 根據 offset 對數組大小取余得到索引,然后添加數據
w.buckets[offset%w.size].add(v)
}
// 重置桶數據
func (w *window) resetBucket(offset int) {
w.buckets[offset%w.size].reset()
}
我畫了一張圖,來模擬整個滑動過程:
主要經歷 4 個步驟:
比如上圖,剛開始 offset 指向了 bucket[1],經過了兩個 span 之后,bucket[2] 和 bucket[3] 會被清空,同時,新的 offset 會指向 bucket[3],新添加的數據會寫入到 bucket[3]。
再來看看數據統計,也就是窗口內的有效數據量是多少。
// Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set.
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
rw.lock.RLock()
defer rw.lock.RUnlock()
var diff int
span :=rw.span()
// ignore current bucket, because of partial data
if span==0 && rw.ignoreCurrent {
diff=rw.size - 1
} else {
diff=rw.size - span
}
// 需要統計的 bucket 數量,窗口大小減去 span 數量
if diff > 0 {
// 獲取統計的起始位置,span 是已經被重置的 bucket
offset :=(rw.offset + span + 1) % rw.size
rw.win.reduce(offset, diff, fn)
}
}
func (w *window) reduce(start, count int, fn func(b *Bucket)) {
for i :=0; i < count; i++ {
// 自定義統計函數
fn(w.buckets[(start+i)%w.size])
}
}
統計出窗口數據之后,就可以判斷是否需要熔斷了。
接下來就是執行熔斷了,主要就是看看自適應熔斷是如何實現的。
// core/breaker/googlebreaker.go
const (
// 250ms for bucket duration
window=time.Second * 10
buckets=40
k=1.5
protection=5
)
窗口的定義部分,整個窗口是 10s,然后分成 40 個 bucket,每個 bucket 就是 250ms。
// googleBreaker is a netflixBreaker pattern from google.
// see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
type googleBreaker struct {
k float64
stat *collection.RollingWindow
proba *mathx.Proba
}
func (b *googleBreaker) accept() error {
// 獲取最近一段時間的統計數據
accepts, total :=b.history()
// 根據上文提到的算法來計算一個概率
weightedAccepts :=b.k * float64(accepts)
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
dropRatio :=math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
// 如果小于等于 0 直接通過,不熔斷
if dropRatio <=0 {
return nil
}
// 隨機產生 0.0-1.0 之間的隨機數與上面計算出來的熔斷概率相比較
// 如果隨機數比熔斷概率小則進行熔斷
if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable
}
return nil
}
func (b *googleBreaker) history() (accepts, total int64) {
b.stat.Reduce(func(b *collection.Bucket) {
accepts +=int64(b.Sum)
total +=b.Count
})
return
}
以上就是自適應熔斷的邏輯,通過概率的比較來隨機淘汰掉部分請求,然后隨著服務恢復,淘汰的請求會逐漸變少,直至不淘汰。
func (b *googleBreaker) allow() (internalPromise, error) {
if err :=b.accept(); err !=nil {
return nil, err
}
// 返回一個 promise 異步回調對象,可由開發者自行決定是否上報結果到熔斷器
return googlePromise{
b: b,
}, nil
}
// req - 熔斷對象方法
// fallback - 自定義快速失敗函數,可對熔斷產生的err進行包裝后返回
// acceptable - 對本次未熔斷時執行請求的結果進行自定義的判定,比如可以針對http.code,rpc.code,body.code
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
if err :=b.accept(); err !=nil {
// 熔斷中,如果有自定義的fallback則執行
if fallback !=nil {
return fallback(err)
}
return err
}
defer func() {
// 如果執行req()過程發生了panic,依然判定本次執行失敗上報至熔斷器
if e :=recover(); e !=nil {
b.markFailure()
panic(e)
}
}()
err :=req()
// 上報結果
if acceptable(err) {
b.markSuccess()
} else {
b.markFailure()
}
return err
}
熔斷器對外暴露兩種類型的方法:
1、簡單場景直接判斷對象是否被熔斷,執行請求后必須需手動上報執行結果至熔斷器。
func (b *googleBreaker) allow() (internalPromise, error)
2、復雜場景下支持自定義快速失敗,自定義判定請求是否成功的熔斷方法,自動上報執行結果至熔斷器。
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
個人感覺,熔斷這部分代碼,相較于前幾篇文章,理解起來是更困難的。但其中的一些設計思想,和底層的實現原理也是非常值得學習的,希望這篇文章能夠對大家有幫助。
以上就是本文的全部內容,如果覺得還不錯的話歡迎點贊,轉發和關注,感謝支持。
參考文章:
推薦閱讀:
者 | 小馬
編輯 | CV君
報道 | 我愛計算機視覺(微信id:aicvml)
本文提出了一種簡單有效的魯棒目標檢測無監督自適應方法(SimROD)。為了克服域轉移(domain shift)和偽標簽噪聲(pseudo-label noise)等問題,本文的方法集成了域中心增強方法(domain-centric augmentation method) 、漸進的自標簽適應機制(gradual self-labeling adaptation procedure) 和教師指導的微調機制(teacher-guided fine-tuning mechanism) 。
使用本文的方法,目標域樣本可以用來調整目標檢測模型,而不改變模型結構或生成合成的數據。當應用于圖像損壞和高級跨域自適應基準數據集上測試時,本文的方法在多個域自適應基準數據集上優于之前的baseline方法。
SimROD: A Simple Adaptation Method for Robust Object Detection
論文地址:https://arxiv.org/abs/2107.13389
代碼地址:https://github.com/reactivetype/simrod
當測試集的數據分布和訓練集相似時,SOTA的目標檢測模型能夠達到比較高的精度。但是,當部署到新環境中時,比如天氣變化(如雨或霧)、光照條件變化或圖像損壞(如運動模糊),模型的精度就會大幅度下降。
這種失敗不利于自動駕駛等場景,在這些場景中,域轉移是常見且不可避免的。為了使它們在以可靠性為關鍵的應用程序中取得更高的性能,使檢測模型對域轉移具有更強的魯棒性是很重要的。
目前,已經提出了許多方法來克服域轉移的目標檢測。它們大致可以分為數據增強、域對齊、域映射和自標記技術。
數據增強 方法可以提高在一些固定的域位移集上的性能,但不能推廣到與增強樣本不相似的數據中。
域對齊 方法使用來自目標域的樣本來對齊網絡的中間特征。
域映射 方法使用圖像到圖像的轉換網絡(比如:GAN)將標記的源圖像轉換為看起來像未標記的目標域圖像的新圖像。
自標記 是一種不錯的方法,因為它利用了來自目標域的未標記的訓練樣本。
然而,在域轉移下生成準確的偽標簽是困難的;當偽標簽有噪聲時,使用目標域樣本進行自適應是無效的 。
在本文中,作者提出了一種簡單的魯棒對象檢測自適應方法(SimROD),利用域混合數據增強和教師指導下的逐步自適應策略來減輕域轉移的影響。SimROD主要有三個特點:
1)首先,它不需要目標域數據的Ground Truth標簽,而是利用未標記的樣本。
2)其次,它既不需要復雜的模型結構更改,也不需要生成模型來創建合成數據
3)第三,它與模型結構無關的,并不局限于基于區域的檢測器。
給定一個參數為的目標檢測的源模型M,該模型由源訓練數據集進行訓練,其中是一個圖像,每個標簽由目標類別和邊界框坐標組成。原始源數據D的輸入分布與目標測試集數據分布之間存在偏移的情況。即,而。
在無監督的域自適應設置中,可以目標域取出了一組未標記的圖像,在訓練中可以使用這部分數據。任務的目標是將模型參數更新到中,以在源測試集和目標測試集上都能實現良好的性能。為了有效地利用中的附加信息,需要解決兩個問題:
1)首先,目標訓練集沒有Ground Truth標簽。
2)其次,利用源模型為生成偽標簽會導致由域位移引起的有噪聲監督,阻礙了自適應過程。
本文提出了簡單的自適應方法SimROD,以實現魯棒的目標檢測模型。SimROD集成了一種教師指導的微調 、一種新的DomainMix增強方 法和一種逐步適應技術 。
本文方法的motivation是標簽噪聲會被域位移加劇 。因此,本文的方法旨在在目標域圖像上生成準確的偽標簽,并將來自源域和目標域的混合圖像一起使用,從而為模型的調整提供強有力的監督信號。由于學生目標模型可能不足以生成準確的偽標簽,作者首先使用可以生成高質量偽標簽的輔助教師模型,然后再用學生模型進行微調。整個算法的流程如上圖所示。大致可以分為幾步:
1)基于源數據,訓練一個比學生模型容量大的源教師模型,得到參數。源教師模型用于生成目標數據上的初始偽標簽。
2)利用逐步適應算法,將大型教師模型參數從逐步更改為。在這一步中,使用的是由DomainMix增強生成的混合圖像,而不是單獨的源數據集或者目標數據集的圖片。
3)使用自適應的教師模型參數來細化目標數據上的偽標簽。然后,使用這些偽標簽來微調學生模型。
這種方法的一個好處是,它可以使小模型和大模型同時適應域的轉移,因為即使在學生網絡很小時,它也能產生高質量的偽標簽。另一個優點是,教師和學生不需要共享相同的結構。因此,教師模型可以選擇的一個參數量大、計算量大的模型來提高精度,學生模型可以選擇一個輕量級的模型。
在本文中,作者提出了一種新的增強方法DomainMix。如上圖所示,它均勻地對來自源域和目標域的圖像進行采樣,并將這些圖像連同其(偽)標簽混合到一個新的圖像中。
上圖顯示了一個來自自然和藝術領域的DomainMix 圖像的示例。
DomainMix使用了許多簡單的想法來減輕域的轉移和標簽噪聲:
DomainMix的數據增強方法如上圖所示,對于一個Batch中的每一幅圖像,首先從源和目標數據中隨機抽取另外三張圖像,并混合這些圖像的隨機crop,在2×2的模板中創建一個新的域混合圖像。并將偽標簽和真實標簽都標注到混合的圖片中,目標的邊界框坐標是根據新的混合圖像中每個crop的相對位置計算的。此外,作者使用加權平衡采樣器從這兩個域中進行均勻采樣。
接下來,作者提出了一個逐步適應的方法來優化檢測模型的參數,該算法減輕了標簽噪聲的影響。由源模型生成的偽標簽可能在目標域圖像上有噪聲,直接微調模型所有的層會阻礙模型的適應。
作者提出了一種分階段性的方法。首先,凍結了所有的卷積層,在前w個epoch只適應BN層,在第一階段結束后,BN層的參數就被更新了。然后使用部分適應的模型來生成更精確的偽標簽,為了簡單起見,它被離線完成。在第二階段,所有的層都被解凍,然后使用精細的偽標簽進行微調。在這兩個階段,都使用由DomainMix增強生成的混合圖像樣本。算法的流程如下所示:
上表展示了Sim10K到Cityscapes上,本文方法和SOTA方法的對比。
上圖展示了與之前的baseline相比,SimROD將模型從Sim10K調整到Cityscapes的有效性。
上表顯示了KITTI到Cityscapes上,本文提出的SimROD在性能上優于各種baseline方法。
上表展示了VOC(真實圖片)到Watercolor(水彩畫)數據集上的性能對比。
上表顯示了Pascal-C、COCO-C和Cityscapes-C數據集上的Yolov5m模型的實驗結果。
上表顯示了Yolov5m模型在Pascal-C數據集上不同模塊消融的實驗結果。
上表展示了一些本文方法和其他方法的一些檢測結果的例子。
在本文中,作者提出了一種簡單而有效的無監督方法來適應域位移下的檢測模型。本文中的自標記框架采用了一種域中心的增強方法和教師指導的微調適應模型。基于現有的小模型和大模型,本文的方法在模型魯棒性方面取得了顯著的性能增益。
本文的方法不僅減輕了由于低級圖像損壞而引起的域位移的影響,而且在源域和目標域之間存在高級風格差異時,它也可以適應模型。
這篇解決的問題是如何讓生成的偽標簽更加好 ,因為如果直接用源模型生成偽標簽效果并不好,會存在噪聲。所以作者就把源模型生成的偽標簽定義為初始偽標簽,然后用一種數據增強的方式,將偽標簽和真實標簽都放在同一張圖片中,類似CutMix,對教師模型進行微調。
微調完了之后,再對目標數據集生成偽標簽,這樣的偽標簽相比于初始偽標簽的效果會更好。因此學生模型在這樣的偽標簽上進行微調效果也會更好。
擊上方藍字關注“小鄭搞碼事”,每天都能學到知識,搞懂一個問題!
關于用樣式來處理圖片自適應的問題,下面分析一下兩種法。
有一個做法,大家都很熟悉,圖片作為div的背景。然后,應用background-size和background-position這兩屬性,就能很方便地按比例來縮放。
在響應式的環境下展示圖片,平時一般都這么搞。
如下代碼:
主要注意background-size的取值,可以是固定值,也可以是百分比,更可以是cover,contain。
1、若取值cover
其定義就是:把背景圖像擴展至足夠大,以使背景圖像完全覆蓋背景區域。背景圖像的某些部分也許無法顯示在背景定位區域中。(圖片同比縮放、塞滿整個容器,而圖片多余的部分則被剪掉了)如下圖:
2、若取值contain
其定義就是:把圖像擴展至最大尺寸,以使其寬度和高度完全適應內容區域。(圖片同比縮放至圖片能完全顯示在容器中,多余空間留白),如下圖:
上面將圖片設置成背景的方法,不好的地方是你無法設置圖片的懶加載、圖片無法被搜索引擎或者其他類似的工具抓取到。然而,再現了方法二。
有一個屬性叫object-fit,直接可以讓圖片自適應布局。
先看一下它的兼容性吧
咋一看,也不很很差,最少移動端還是基本可以兼容的。這個屬性有幾個常用的取值:
1、fill(填充)
替換內容拉伸填滿整個content box, 不保證保持原有的比例。
2、contain(包含)
保持原有尺寸比例,效果圖與background-size:contain對應的。
3、cover(覆蓋)
寬度和高度至少有一個和容器一致。效果圖與background-size:cover對應的
4、none
保持原有尺寸比例。
我做了一張完整的圖,大家對比看一下:
最后總結:
出于性能和其它因素考慮,建議大家可以多關注和使用方法二,方法一是出現的比較早的方法,也是常用方式。
*請認真填寫需求信息,我們會在24小時內與您取得聯系。