大報文問題,在京東物流內較少出現,但每次出現往往是大事故,甚至導致上下游多個系統故障。大報文的背后,是不同商家業務體量不同,特別是B端業務的采購及銷售出庫單,一些頭部商家對京東系統支持業務復雜度及容量能力的要求越來越高。因此我們有必要把這個問題重視起來,從組織上根本上解決。
大報文問題,是指不同的系統通過網絡進行數據交互時payload size過大導致的系統可用性下降問題。
??對于大報文的產生方,過大的報文在序列化時消耗更多內存和CPU,在傳輸時(JSF/MQ)可能超過中間件的大小限制導致傳輸失敗;對于大報文的消費方,過大的報文在反序列化時會產生大對象,消耗更多的內存和CPU,容易觸發FullGC甚至OOM,而在處理過程中要遍歷的內容更多,造成響應變慢,如果涉及數據庫操作容易產生大事務、慢SQL,這些容易觸發超時,如果客戶端有重試機制,會進一步加重大報文消費方負載,嚴重時導致服務集群整體不可用。
此外,由于大報文與小報文是在一個接口上完成的,使用相同的UMP key,它會導致監控失真,報警閾值無效。如果日志記錄了原始報文,也可能磁盤打滿和響應變慢。
在京東物流技術體系內,具體表現為:
大報文場景 | 后果 |
MQ的producer發送了大的Message | 由于JMQ對消息大小的限制,導致producer發送失敗:消息未送達 |
MQ consumer反序列化Message并處理計算時產生大對象,頻繁FullGC,CPU使用率飆升 | |
JSF Consumer調用API時傳入大入參值 | 由于JSF Server對payload大小限制,導致服務端將報文拋棄:無法送達 |
JSF Provider響應變慢,產生大對象,頻繁FullGC,CPU使用率飆升,甚至OOM;請求處理超時 | |
JSF Provider返回值包含大對象 | 由于JSF Consumer對payload大小限制,導致consumer無法獲取響應 |
JSF Consumer產生大對象,頻繁FullGC,CPU使用率飆升,甚至OOM |
JMQ/JSF對payload大小的限制都屬于防御性保護措施,目前的值是科學的,它們都已經足夠大了。在緊急止血情況下可以調整配置參數來暫時提高payload大小限制,但長期看它會加重系統的風險,應該從設計入手避免超過payload大小限制。
根據JMQ的官方文檔,單條消息大小:JMQ4不要超過4M,JMQ2不要超過2M。
具體原理是發送消息時在生產端做主動校驗,如果消息大小超過閾值則拋出異常(代碼實現與官方文檔不一致):
class ClusterManager {
protected volatile int maxSize = 4194304; // 4MB
}
class MessageProducer implement Producer { // Producer接口的具體實現類
ClusterManager clusterManager;
// producer.send時做校驗
int checkMessages(List<Message> messages) {
int size = 0;
for (Message message : messages) {
size += message.getSize() // 壓縮后的大小
}
if (size > this.clusterManager.getMaxSize()) {
throw new IllegalArgumentException("the total bytes of message body must be less than " + this.clusterManager.getMaxSize());
}
}
}
經與JMQ團隊確認,JMQ消息大小的限制,以代碼實現為準(官方文檔不準確):
根據JSF官方文檔,JSF可以在server和consumer端分別設置payload size,默認都是8MB。
?? 需要注意,觸發provider報文長度限制時,JSF consumer(老版本)并不會立即失敗,而是依靠客戶端超時后才返回(感覺是JSF的缺陷)。具體原因:JSF依靠底層netty來實現報文長度限制,當provider從請求報文頭里取得本次請求payload size發現超過限定值時,不會繼續讀取報文體,而是拋出netty定義的TooLongFrameException,而該異常的處理依賴netty的ChannelHandler.exceptionCaught方法,JSF里沒有對TooLongFrameException做處理(吃掉異常),provider端不給consumer任何響應(請求被扔進黑洞),因此造成consumer一直等待響應直到超時,而這可能把consumer端的業務線程池拖死。
class LengthFieldBasedFrameDecoder { // 基于netty io.netty.handler.codec.LengthFieldBasedFrameDecoder的改動
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 從JSF協議的報文頭里獲取本次請求的payload size,此時還沒有讀取8MB的body
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength > maxFrameLength) { // maxFrameLength即8MB限制
throw new TooLongFrameException();
}
}
}
class ServerChannelHandler implements ChannelHandler {
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) {
if (cause instanceof IOException) {
// ...
} else if (cause instanceof RpcException) {
// 這里可以看到遇到這種異常,JSF是如何給consumer端響應的
ResponseMessage responseMessage = new ResponseMessage(); // 給consumer的響應
responseMessage.getMsgHeader().setMsgType(Constants.RESPONSE_MSG);
String causeMsg = cause.getMessage();
String channelInfo = BaseServerHandler.getKey(ctx.channel());
String causeMsg2 = "Remote Error Channel:" + channelInfo + " cause: " + causeMsg;
((RpcException) cause).setErrorMsg(causeMsg2);
responseMessage.setException(cause); // 異常傳遞給consumer
// socket.write回consumer
ChannelFuture channelFuture = ctx.writeAndFlush(responseMessage);
} else {
// TooLongFrameException會走到這里,它的繼承關系如下:
// TooLongFrameException -> DecoderException -> CodecException -> RuntimeException
// 異常被吃掉了,不給consumer響應
logger.warn("catch " + cause.getClass().getName() + " at {} : {}",
NetUtils.channelToString(channel.remoteAddress(), channel.localAddress()),
cause.getMessage());
}
}
}
經與JSF團隊確認,consumer端或provider端發出的消息過大(超過playload)時consumer端得不到正確的異常響應只提示請求超時的問題,已經在1.7.5版本修復:需要provider端升級。升級后,如果consumer端發送的消息過大,provider會立即響應RpcException。
??此外,在JSF舊版本下,consumer使用了默認的5秒超時,但consumer拋出超時異常總用時是48秒,這是為什么?
??這是因為consumer配置的timeout不包括序列化時間,這48秒是把8MB的報文序列化的耗時:
class JSFClientTransport {
// consumer同步調用provider
ResponseMessage send(BaseMessage msg, int timeout) {
MsgFuture<ResponseMessage> future = doSendAsyn(msg, timeout);
return future.get(timeout, TimeUnit.MILLISECONDS);
}
MsgFuture doSendAsyn(final BaseMessage msg, int timeout) {
final MsgFuture resultFuture = new MsgFuture(getChannel(), msg.getMsgHeader(), timeout);
Protocol protocol = ProtocolFactory.getProtocol(msg.getProtocolType(), msg.getMsgHeader().getCodecType());
byteBuf = protocol.encode(request, byteBuf); // 發送報文前的序列化
RequestMessage request = (RequestMessage) msg;
request.setMsg(byteBuf);
channel.writeAndFlush(request, channel.voidPromise()); // socket.write,異步IO
resultFuture.setSentTime(JSFContext.systemClock.now());
}
}
class MsgFuture implements java.util.concurrent.Future {
final long genTime = JSFContext.systemClock.now(); // new的時候就賦值了
volatile long sentTime;
// 拋出超時異常邏輯
ClientTimeoutException clientTimeoutException() {
Date now = new Date();
String errorMsg = "[JSF-22110]Waiting provider return response timeout . Start time: " + DateUtils.dateToMillisStr(new Date(genTime))
+ ", End time: " + DateUtils.dateToMillisStr(now)
+ ", Client elapsed: " + (sentTime - genTime) // 它包括:序列化時間,由于異步IO因此不包括socket.write時間
+ "ms, Server elapsed: " + (now.getTime() - sentTime);
return new ClientTimeoutException(errorMsg);
}
}
物流網關在nginx層通過client_max_body_size做了5MB限制。這意味著,JSF限制了8MB,但通過物流網關對外開放成HTTP JSON API時,調用者實際的限制是5MB。
max_allowed_packet,net_buffer_length等參數在底層控制TCP層的報文長度,京東物流體系內該值足夠大,研發不必關注。
研發需要關注的是字段長度的定義,主要是varchar的長度。MySQL通過sql_mode參數控制字段超過長度后的行為是字段截斷還是中斷事務。對于京東物流業務執行鏈路比較長的場景來講,同一個字段可能多處保存,例如訂單行里的skuName,就會在OFC/WMS等系統保存,sku_name varchar長度的不一致,特殊場景下可能造成上下游交互出現問題。
DUCC value 的長度默認限制為 4W 字符。
UMP Key的限制128。
JMQ的businessId長度限制100,Producer在發送是默認超時2秒,Producer發送失敗默認重試2次。
JMQ消費者拋出異常會導致重試(進入retry-db),首次重試10分鐘,如果重試還不成功會越來越慢推送直至過期。過期時間:JMQ2為3天,JMQ4為30天。
JSF如果不配置consumer timeout,則使用默認值:5秒。
Zookeeper ZNode限制長度 1MB。雖然可以通過jute.maxbuffer這個Java系統屬性修改,但強烈不建議。
原則上,所有依賴的中間件都要確認其限制約束,提升健壯性,避免邊界條件被觸發而產生出乎意料的錯誤。
導致京東物流線上事故的大報文問題中,絕大部分都屬于該類問題。而這又可以細分為兩種場景:
interface JsfAPI {
// 場景1:批量接口,對批量的大小無限制
void foo(List<Request> requests);
}
class Request {
// 場景2:對一個類內部的集合類字段大小無限制
// JMQ產生大報文,絕大部分屬于該場景
List<Item> items;
}
當數據量增大時,報文也會增大,造成幾MB到幾十MB的報文傳輸,系統為了處理這樣大數據量的報文,必然會產生大對象,并且這種對象會一直處于內存中,在數據保存處理時,會造成內存不能釋放,可能觸發頻繁FullGC,CPU使用率飆升。同時,處理集合數據,往往會有數據遍歷過程,如果無并發則時間復雜度是O(N),大的數據集必然帶來更慢的響應速度,而consumer端不會根據payload大小動態設置超時時間,它可能導致consumer端超時,超時可能帶來多次重試,進而加重服務端壓力。
例如:無印良品訂單sku品類過多,比如一個出庫單包含2萬個sku的極端情況。
例如:WMS出庫發貨后向ECLP回傳信息,之前都是通過一個JMQ Topic: eclp_delivery進行回傳,一份消息包含了(訂單主檔,箱明細,包裹明細)3部分信息。后來中石化場景下,一個訂單的包裹明細數量非常多,導致ECLP處理報文時CPU飆升,同時MQ Listener與對外服務共享CPU,導致接單功能可用率降低。后來,從源頭入手把一個訂單按照明細進行分頁式拆分(之前是整單回傳,之后是按明細分頁回傳),同時把eclp_delivery這一個topic拆分成3個topic:(訂單,箱明細,包裹明細),解決了大報文問題。
它指的是某一個字段(不是集合大小),由于沒加長度限制,在特定場景下傳入了遠超預期大小的數據而造成的故障。
ECLP的商品主數據有個下發商品的接口,有個字段skuName,接口沒有對該字段長度進行約束。系統一直平穩運行,直到有個商家下發了某一個商品,它的skuName達到了10KB(事后發現,商家是把該商品詳情頁的整個HTML通過skuName傳過來了),插入數據庫時超過了字段長度限制varchar(200),導致插入失敗,但由于沒有考慮到這種場景,返回了誤導的錯誤提示。展開來看,如果ECLP為skuName定義了MySQL Text類型字段,還會有更嚴重問題:ECLP接收下商品,下發給WMS,但WMS里的skuName是varchar(200),這個問題就只能人工處理了,甚至與商家溝通。
WMS6.0為了考慮多場景全滿足,在出庫單預留了擴展字段,在接單時技術BP自行決定寫入哪個擴展字段。京喜BP下發出庫單時在訂單明細維度傳入了handOverSlip(交接單,其實是團單信息,里面有多層明細嵌套),該字段其實是一個大JSON,單個長度10KB上下,接單環節沒問題。但組建集合單會把多個出庫單組建成一個集合單,共產生3000多個明細,僅handOverSlip就占30MB,造成組建集合單后下發(JSF調用)揀貨時遇到了JSF 8MB限制問題,下發失敗,單據卡在那里,現場生產無法繼續。
WMS6.0的用戶中心系統,為其他系統提供了發送咚咚通知的服務,具體實現是調用集團的咚咚發送接口:xxx生產系統 -> 用戶中心 -> 咚咚系統。鏈路上每一個環節都未對通知內容content字段長度做限制。一次xxx生產系統調用用戶中心傳入了超8MB的content字段,觸發了咚咚系統的JSF底層的報文限制,最終在用戶中心產生了ClientTimeoutException,它導致用戶中心的JSF業務線程池打滿;而由于用戶中心為所有業務生產系統服務,現場操作會依賴它,進而導致生產卡頓,現場多環節無法正常生產。
Amazon FBA的SP-API(Sell Partner API),對可能出現風險的字段都做了長度限制,例如:
String displayableOrderComment; // maxLength: 1000
String sellerSku; // maxLength: 50
String giftMessage; // maxLength: 512
String displayableComment; // maxLength: 250
ECLP主數據有個接口:導出所有warehouse list,調用方很多,訪問頻率不高,每次響應長度3MB。該接口在線上出現過多次事故(2019年)。這個接口顯然是不該存在的,但把它下線需要推動所有的調用方改動,這個周期很長阻力也很大。
最開始,直接查數據庫,出現事故后加入JimDB,再次出現事故后配置了JimDB的local cache,后又加入JSF限流等措施。
出現故障時,ECLP CPU飆升,導致服務超時,京東零售調用方配置的超時設置很短,這導致越來越多的請求打過來,加重了ECLP負擔。
這個問題與【1.2.3 查詢接口返回大量數據】看上去類似,但有很大不同:一個同步調用,返回的數據量相對少,另一個異步執行,返回數據量巨大。
WMS6.0的報表都有導出的需求,例如導出最近3個月的明細數據。貼近商家的OFC(如ECLP),也有類似需求,商家要求導出明細數據。系統執行過程大致是:根據用戶指定的條件異步執行SQL,把數據庫返回的數據集寫入Excel,并存放到blob storage(指定TTL),用戶在規定時間(TTL)內根據storage key去blob storage下載,完成整個導出過程。
這里的關鍵問題是如何查詢數據庫,而數據庫作為共享資源往往是整個系統的瓶頸(增加復本數量意味著成本上升),它變慢會拖垮整個系統。如何查詢數據庫,有8個可選項:
??導出問題的本質,是大范圍table scan,很難設計精細的復合索引。WMS6.0最初使用的是方案1,它會產生深分頁limit offset問題:越往后的頁面越慢,對數據庫的壓力越大。舉例:要導出100萬行記錄,每頁1萬,那么到50萬記錄時,每次分頁查詢相當于數據庫要掃描50萬+行記錄后拋棄絕大部分并返回1萬行,這還要繼續執行50次,此外分頁組件還要額外執行count語句以計算總行數。
如果每頁是1千呢?因此,數據庫的壓力被放大了,可以簡單理解為“全表掃描”了【50 + 100(count計算)=150】次,遠不如不分頁(不分頁還要解決OOM問題)。目前,WMS6.0改用了方案8,根本上解決了數據庫慢查詢問題。思路是不再盲目靜態分頁,而是根據時間條件切分成多個SQL,分別查詢,保證每個SQL返回數據量不大從而避免慢SQL。例如,某個倉要導出最近3個月的出庫單數據,那么把這1個date range拆分(explode)成N個date range,分別執行:
condition = DateRange(from = "2022-01-01 00:00:00", to = "2022-04-01 00:00:00") // 用戶指定的時間范圍:3個月
// sql = select * from ob_shipment_order where xxx and update_time between condition.from and condition.to
List<DateRange> chunks = explode(condition)
for (DateRange chunk : chunks) {
// 該chunk的時間范圍已經變成了1天,甚至是1小時,具體值是根據SQL執行計劃估算得來的:數據量越大則拆分越細
sql = select * from ob_shipment_order where xxx and update_time between chunk.from and chunk.to
mysql.query(sql)
}
鏈路上經過不同的系統,不同系統對payload size的約束不同,也可能產生問題,因為決定是否可以正常處理的是最小的那個,但鏈路長時相關方可能不知道,在異步場景下這個問題尤為明顯。
例如,aws的API Gateway與Lambda對payload size有不同的約束,最終用戶必須知道限制最嚴格的那一個環節。
??對于京東物流,JSF與JMQ的限制不同,理論上可能產生這樣的問題:JSF調用者發送8MB的請求,JSF提供者處理時采用同步轉異步機制,異步把該請求8MB發送MQ,它會導致MQ發送永遠無法成功,而JSF的調用方卻渾然不覺。
如果通過物流網關對外開放,網關nginx限制是5MB,而JSF是8MB,設計上沒問題(fail fast),但可能造成服務方承諾與調用者感知端到端的不一致。
JSF對provider(jsf:server)和consumer可以分別設置不同的報文大小限制,理論上也可能出現問題,但在京東物流尚未出現,可不必關注。
它發生在系統執行過程內部。典型場景是DAO層查詢數據庫返回大結果集,Redis大key問題等。這要根據具體中間件機制來識別,例如,MyBatis支持插件來識別DAO查詢出大結果集:
public class ListResultInterceptor implements org.apache.ibatis.plugin.Interceptor {
private static final int RESULTSET_SIZE_THRESHOLD = 10000;
@Override
public Object intercept(Invocation invocation) throws Throwable {
Object result = invocation.proceed();
if (result != null && result instanceof List) {
int resultSetSize = ((List) result).size();
if (resultSetSize > RESULTSET_SIZE_THRESHOLD) {
// 報警
}
}
return result;
}
}
即,主動防御式自我保護,而不是依靠使用者的“自覺”:外部用戶不可信賴。
對于JSF,可以通過JSR303向API Consumer顯式傳遞約束,并且該約束可以通過框架對業務代碼無侵入地自動執行。對于MQ,由于生產者與消費者解耦,無法直接傳遞約束,只能靠主動監控、人工協調。
它的前提條件,是研發有能力去主動識別出大報文風險。
如果有前端,那么前端加約束,避免大報文傳遞給后端。
對于后端,鏈式的上下游關系中,上游要把好關。
這個原則并不是說下游不用關心大報文問題,恰恰相反,鏈路的每個環節都要關心,但Fail Fast可以降低整體的不必要的損耗成本,也可以緩解某個環節保護機制缺失帶來的人工介入和修數成本。
同一個業務字段在上下游傳遞時,字段長度約束要一致,否則可能會出現上游成功落庫下游無法落庫的情況。
解決大報文的根本思路是拆分報文:大 -> 小。
對應MQ來講,應該是Producer負責拆分大報文為小報文。
對于JSF來講,有兩種情況:
?consumer產生的大報文:應該provider加約束,強迫consumer端分頁拆分請求。參考AJAX機制
典型場景:揀貨下架調用庫存預占接口,一次性傳入1萬個sku
?provider產生的大報文:應該變成分頁返回結果
典型場景:一次性返回所有warehouse列表
?? 需要注意的是,拆分報文,會增加生產方和消費方的復雜度,尤其是消費方:冪等,集齊,(并發和異步調用時產生的)亂序,業務的原子性保證等。例如,一個出庫單明細行過多時,整單預占庫存(大報文) -> 按訂單明細分頁預占(小報文)。
揀貨下架按明細維度分頁調用庫存預占接口場景下,如果訂單不允許缺量:整單預占時,該訂單預占庫存的原子性(要么全成功預占,要么一個sku都不預占)是由庫存系統(provider)保證的;而在按訂單明細維度分頁預占時,原子性需要在揀貨系統(consumer)保證,即如果后面頁碼的預占失敗則需要把前面頁碼的預占釋放。這增加consumer端復雜度,但為了系統的性能和可用性,這是值得的。當然,也有另外一個可選方案,仍舊讓庫存保證原子性,但庫存接口需要增加類似(currentPage, totalPages)的參數,那樣就是庫存更復雜了。無論如何,都增加了整體復雜度。
適用場景:MQ,以及JSF返回大報文響應。
為了保持報文的完整性,也便于消費方實現冪等、集齊等邏輯,需要在報文里額外增加分頁信息:currentPage/totalPages。
class Payload {
List<Item> items;
int currentPage, totalPages;
}
void sendPayload(Payload payload) {
int currentPage = 1;
int totalPages = payload.getItems().size() / batchSize;
Lists.partition(payload.getItems, batchSize).forEach(subItems -> {
Payload subPayload = new Payload(subItems)
subPayload.setPageInfo(currentPage, totalPages)
producer.send(subPayload)
currentPage++;
});
}
在極端復雜場景下,也可以考慮分拆topic,但不推薦,因為它可能額外引入亂序問題。
MQ報文編解碼除了目前的JSON外,也可以考慮Protobuf等更高效格式。例如京東零售訂單快照orderver就由xml升級到了PB。
適用場景:MQ/JSF。
這種方案,也被稱為Claim Check Pattern。
把大的明細List,按照固定batch size轉存到JFS/OSS/JimKV/S3等外部blob storage,在報文里存放指針(blob地址)列表。
class BigPayload {
List<Item> items;
}
class SmallPayload {
List<String> itemBlobKeys;
}
void sendPayload(BigPayload bigPayload) {
SmallPayload smallPayload = new SmallPayload();
Lists.partition(bigPayload.getItems(), batchSize).forEach(subItems -> {
List<String> itemBlobKeys = blogStore.putObjects(subItems)
smallPayload.addItemBlobKeys(itemBlobKeys);
});
producer.send(JSON.encode(smallPayload);
}
目前上游系統(eclp、序列號、OMC等)、DTC、下游系統(各版本WMS)的信息傳遞使用了該辦法,共用一個JFS集群。
Side effects:1)引入額外依賴,而且消費方被迫引入依賴 2)需要Blob存儲的TTL機制或定期清理,否則加大存儲成本 3)為消費方帶來了不確定性,從blob拿回的數據可能超大,在反序列化和處理過程中有OOM/FullGC等風險(雖然一些json庫提供了底層的基于詞法token的Streaming Parsing API,但如果要讀取全部內容仍然耗費大量內存)
適用場景:大字段。
在確定用戶體驗可以接受的情況下,上層進行字段內容截斷(truncate)。及早截斷,不要依賴下層數據庫的截斷機制。
適用場景:JSF。
兩種場景:一種是批量接口,即入參是集合,另一種是入參對象里有集合字段。
class FooRequest {
@javax.validation.constraints.Size(min = 1, max = 200)
private List<Bar> barItems;
}
interface JsfAPI {
// 場景1:批量接口
void foo(@javax.validation.constraints.Size(min = 1, max = 200) List<FooRequest> requests)
// 場景2:請求對象里有集合字段
void bar(FooRequest request);
}
對于JSF Consumer,可以通過JSF異步調用,它相當于redis pipeline模式,也可以通過客戶端線程池并發調用方式實現分頁調用,二者耗時相同,推薦使用前者:1)代碼實現簡單 2)節省了額外線程池成本。
int maxJsfRetries = 3; // JSF async下的自動重試只能應用層自己做了
int retried = 0;
do {
List<ResponseFuture<Result<ObLocatingResultDto>>> futures = new LinkedList();
Lists.partition(voList, batchSize).forEach(subVoList -> {
ObLocatingOrderDto dto = mapper.INSTANCE.toDTO(subVoList);
locatingAppService.outboundOrderLocate(dto); // async JSF call
ResponseFuture<Result<ObLocatingResultDto>> future = RpcContext.getContext().getFuture();
futures.add(future);
});
for (ResponseFuture<Result<ObLocatingResultDto>> future : futures) {
try {
Result<ObLocatingResultDto> result = future.get();
} catch (RpcException jsfException) {
retried++;
} catch (Throwable e) {
// 額外的業務邏輯:與JSF并發同步調用相同的處理邏輯
}
}
} while (retried <= maxJsfRetries);
JSF異步調用時,jsf:consumer配置的retries無效,這是因為異步發送后如果出現網絡超時,只能由業務代碼通過future.get()才能拿到結果,JSF底層沒有機會進行自動重試。而同步調用時,JSF底層可以判斷出超時,它有機會根據配置進行自動重試。更多細節可以查看JSF的FailoverClient.doSendMsg方法。
適用場景:單向通知類請求,相當于AsyncAPI。
大的報文往往意味著更長的處理時長,JSF同步調用下consumer必須同步等待provider端的返回,這會同時占用consumer和provider雙方的線程池資源,極端情況下可能導致雙方線程池用盡。JSF下可能耗盡線程池,進而拖死被強依賴的上游,產生雪崩效應;而MQ下,只會消費積壓。
異步交互,使得上游對下游響應時間的依賴轉換為吞吐率的依賴。JMQ實現了消費者和生產者在時間和空間上的解耦,消息的消費者可以承受更大范圍的處理速度范圍。
根據sku編號查詢商品資料,往往伴隨著多個sku一起查詢的需求,如何設計接口?
有的這樣:
interface JsfAPI {
Result<SkuInfo> getSkuInfo(String sku);
Result<List<SkuInfo>> listSkuInfo(List<String> skus);
}
由于批量接口在技術上已經滿足了單個查詢的功能,有的團隊干脆去掉了單個查詢接口,造成使用者查詢單個sku時:
Result<SkuInfo> result = jsfAPI.listSkuInfo(Lists.newArrayList("EMG1800752592"));
應該這樣:
interface JsfAPI {
Result<SkuInfo> getSkuInfo(String sku);
}
interface JsfBulkAPI {
Result<List<SkuInfo>> listSkuInfo(List<String> skus);
}
JsfAPI與JsfBulkAPI把批量與單一接口進行分離后,可以分配到不同的線程池,盡可能互不干擾,這同理于Bulkhead Pattern。
單一接口 | 批量接口 |
處理關鍵業務,SLA要求更高 | 風險高,性能差 |
JSF可以通過jsf:server定義線程池,并為jsf:provider分配不同的server。
如果大報文實在無法拆分(例如,上游團隊不配合),為了降低極端請求對絕大部分正常請求的影響,可以采用大小報文分離的辦法。
對于JMQ,為了防止某一個大報文的消費長耗時或異常導致小報文的消費積壓,可以把大報文轉發到“慢隊列”進行消費。
此外,也要考慮如何緩解UMP監控失真問題。
??該值決定了MessageListener.onMessage入參messages的size。
interface MessageListener {
void onMessage(List<Message> messages) throws Exception;
}
JMQ Consumer的ACK是以批為單位的,例如設置為10,則10條消息里任意一條產生異常都會導致10條全部重新消費。大報文場景下,如果發現問題,可以把該值調整為1,避免大小報文相互影響。
大批量消費主要有兩個好處:1)壓縮效果好(JMQ在發現報文超過100B時就進行壓縮),TCP I/O性能高 2)降低獲取消息的等待耗時,因為它相當于prefetch(具體原理是LinkedBlockingDeque的capacity,如果拉取的消息數超過它,則IO阻塞以防止拉取新消息)。同時它也有兩大負面效應:1)ACK以批為單位,一個錯誤導致整批錯誤,整批重試 2)消息大小限制取決于整批所有消息大小,可能觸發大報文問題。
??對于京東物流絕大部分業務系統來講,這點提升與繁重的業務處理來比不值一提,例如:I/O節省了5ms,但單個消息處理需要200ms(因為要通過接口查詢,處理,然后寫庫),反倒是side effect成為主要矛盾。因此,絕大部分場景下該值應該設置為1。如果業務邏輯類似于集齊:把N個消息拿下來,本地緩沖暫不處理,等滿足條件了再merge并一次性處理,那么可以調整批量大小為非1。
JMQ Producer提供了批量發送方法:
interface Producer {
void send(List<Message> messages) throws JMQException;
}
我們的業務代碼也在使用,例如:
/**
* 發送分播結果消息
*/
public void send(List<CheckResultDto> checkResultDtos) {
List<Message> messageList = Lists.newArrayList();
for (CheckResultDto checkResultDto : checkResultDtos) {
String messageText = JmqMessage.createReportBody(checkResultDto.getUuid(), Lists.newArrayList(checkResultDto));
messageList.add(JmqMessage.create(topic, messageText, checkResultDto.getUuid(), checkResultDto.getWarehouseNo()));
}
producer.send(messageList);
}
這里要注意,分批發送時,1)發送的超時(默認2s)作用于整批消息,而不是單個消息 2)消息大小限制(4MB)作用于整批消息之和,因此批包含的消息越多越可能失敗。
尤其是AOP/Interceptor/Filter等統一處理的代碼,因為對報文的打印往往需要先json序列化。
if (logger.isInfoEnabled()) {
log.info(JsonUtil.toJson(request); // CPU intensive and disk I/O intensive(雖然日志是順序寫)
}
如果確實要記錄,也可以考慮采樣率方式記錄大報文日志。
開放API由于消費方多而且不確定性高,客觀上造成了“只有一次做對的機會”。
List size limit, property max length limit等,要在開放API的第一時間公布出去。如果開始不約束,后期加約束可能遭遇大的阻力和溝通成本。此外,遵循從嚴開始的規律,為自己爭取主動:你把限制放開,沒人找你岔,反之則阻力大。例如:order.items max size limit由100變成200,你可以放心地做;但由200變成100,你要征得現有使用者的全部確認。
例如,Amazon FBA的SP-API對集合的條數限制絕大部分是50。
無論采用哪種大報文問題解決辦法,識別出大報文場景是前提。
技術上,可以通過JSF Filter分析報文長度,把尚未觸發8MB但有潛在風險的自動識別出來。但JMQ無相關機制,業務系統要自行實現相關攔截機制。
provider端自動識別即可。
@Slf4j
public final class PayloadSizeFilter extends AbstractFilter {
private static final int PAYLOAD_SIZE_THRESHOLD = 4 << 20; // 4MB = 8MB(JSF限制) * 50%
private static final int BATCH_SIZE_THRESHOLD = 1000;
@Override
public ResponseMessage invoke(RequestMessage requestMessage) {
if (!RpcContext.getContext().isProviderSide()) {
// 只在provider端檢查大報文:它才是我們要保護的對象
return getNext().invoke(requestMessage);
}
// 自動識別潛在的大報文場景:針對報文大小
Integer payloadSize = requestMessage.getMsgHeader().getLength();
if (payloadSize != null && payloadSize > PAYLOAD_SIZE_THRESHOLD) {
// 這里使用最簡單的日志把潛在大報文暴露出來,各團隊可以做更細化的機制
// 由于logbook限制只有error level日志才能配置"關鍵字報警",這里使用log.error
// 如果不想自動報警,只是人工巡檢,可以log.warn
String methodName = requestMessage.getMethodName();
String className = requestMessage.getClassName();
log.error("Suspected BIG payload: {}.{}, {}>{}", className, methodName, payloadSize, PAYLOAD_SIZE_THRESHOLD);
}
// 自動識別潛在的大報文場景:報文字節小,但仍會導致處理慢,例如 List<String> orderNos,如果發來1萬個單號?
// 這里只能識別出入參是List的場景,對于字段類型是List的場景無效
Invocation invocation = requestMessage.getInvocationBody();
Class[] argClasses = invocation.getArgClasses();
Object[] args = invocation.getArgs();
for (int i = 0; i < argClasses.length; i++) {
Class argClass = argClasses[i];
if (Collection.class.isAssignableFrom(argClass)) {
// 入參類型是Collection
Collection collection = (Collection) args[i];
if (collection.size() > BATCH_SIZE_THRESHOLD) {
log.error("Too BIG Collection argument: {}>{}", collection.size(), BATCH_SIZE_THRESHOLD);
}
}
}
return getNext().invoke(requestMessage);
}
}
在consumer端加自動識別,如果發現,協同producer方確認風險判斷是否需要改造。
public interface BigPayloadTrait extends MessageListener {
int THRESHOLD_BIG_PAYLOAD = 2 << 20; // 2MB = 4MB(JMQ限制) * 50%
default boolean suspectedBigPayload(List<Message> messages) {
for (Message message : messages) {
if (message.getSize() > THRESHOLD_BIG_PAYLOAD) {
return true;
}
}
return false;
}
}
人工識別會有遺漏場景,關注監控全局指標,尤其是分析一些跳點,可能補充發現大報文場景。
有些大報文問題,可能暫時無法通過技術手段解決,例如,已經有商家接入的對外接口,開放時沒有對List size限制,加限制后需要商家配合修改做客戶端分頁,而商家不配合。這時候,可以采用大促期降級,限流,加開關,加強監控,設計應急預案,為此接口提供獨立的線程池來隔離正常請求等手段解決。
以第三方視角幫助識別出尚未識別的大報文場景,不要自己給自己搗亂。
推進大報文治理工作時,為了便于項目追蹤管理,可以采用如下流程。
這里也包括現有API/MQ上加字段場景。
設計和評審時,檢查:
?字段長度,在上下游上長度對齊
?JSF接口對List等集合類型加@Size顯式約束和校驗,對List性批量接口入參也加@Size
?MQ Producer確保不發出大報文
為所有JSF和MQ加入大報文預先監控機制(具體可參考【5.1 識別大報文場景】,根據是否改得動做相應的治理動作。
?作者:京東物流 高鵬
來源:京東云開發者社區 自猿其說Tech
、模塊訪問
使用:域名/入口文件/模塊/控制器/方法
2、自動生成模塊
在入口文件index.php中,定義常量define('BIND_MODULE','Admin');
3、comment目錄存放公共函數,function.php,thinkphp系統會自動加載,存放在模塊中則,在當前模塊使用,存放在項目目錄的中則在整個項目使用。
4、實例化類,實例化基礎類使用,M(控制器名字),實例化自定義類,D(控制器名稱或者表名)
5、i(大寫)用于接收提交的各種數據 I('接收數據類型',‘【默認值】’,‘【過濾函數名字】’,【額外數據】);例如:I('POST.',' ','int')
6、模板遍歷數據,<foreach name='datalists' item='vo' key=key>{$vo.title}</foreach>
或者<volist name='data' id='vo' key='key'> 循環體</volist>
其中key表示索引下標的值
7、U(‘網址地址’),用在前臺或后臺,格式化,網址信息
8、__PUBLIC__指向,根目錄下Public文件夾,存放css、js、images等文件
9、__APP__指向項目的路徑,__RUL__指向當前模塊
10、驗證碼類:
生成驗證碼
下面是最簡單的方式生成驗證碼:
$Verify = new \Think\Verify();
$Verify->entry();
實例化傳入參數:
$config = array(
'fontSize' => 30, // 驗證碼字體大小
'length' => 3, // 驗證碼位數
'useNoise' => false, // 關閉驗證碼雜點
);
$Verify = new \Think\Verify($config);
$Verify->entry();
或者采用動態設置的方式,如:
$Verify = new \Think\Verify();
$Verify->fontSize = 30;
$Verify->length = 3;
$Verify->useNoise = false;
$Verify->entry();
驗證碼檢測
可以用Think\Verify類的check方法檢測驗證碼的輸入是否正確,例如,下面是封裝的一個驗證碼檢測的函數:
// 檢測輸入的驗證碼是否正確,$code為用戶輸入的驗證碼字符串
function check_verify($code, $id = ''){
$verify = new \Think\Verify();
return $verify->check($code, $id);
}
11、模糊查詢,$map['字段名字']=array('like',"匹配符");例如$map['id']= array('like',"%3%");
12、全部刪除,$map['id'] = array('in',數組),代表要刪除的字段是否存在數組中
13.layer,彈出層,layer.msg(‘提示文字’,{time:2000},function());
14、分頁類:
計算數據總條數-》設置每頁顯示的條數-》實例化分頁類-》顯示頁腳-》limit查詢數據-》分配數據
15、關聯查詢
一對一關聯 :ONE_TO_ONE,包括HAS_ONE 和 BELONGS_TO
一對多關聯 :ONE_TO_MANY,包括HAS_MANY 和 BELONGS_TO
多對多關聯 :MANY_TO_MANY
模型類必須繼承Think\Model\RelationModel類,關聯定義的格式是:
namespace Home\Model;
use Think\Model\RelationModel;
class UserModel extends RelationModel{
protected $_link = array(
'關聯1' => array(
'關聯屬性1' => '定義',
'關聯屬性N' => '定義',
),
'關聯2' => array(
'關聯屬性1' => '定義',
'關聯屬性N' => '定義',
),
'關聯3' => HAS_ONE, // 快捷定義
...
);
}
16、插入數據add(),更新數據save(),刪除數據delete(),查詢一條數據find(),查詢多條數據select();
17、查詢獲取指定字段field(字段1,字段2),獲取指定字段getField(字段)
18、上傳類
實例化上傳類think/upload
19、設置session(‘session名字’,‘session值’)獲取session(session名字)
20布局繼承
第一步,先寫好comment。mthl,吧主體內容使用<blockl name="main">主體內容</block>
第二部在字模板中繼承,<extends name="父模板的路徑例如:comment,不需要寫后綴">
要顯示的替換的位置<blockl name="main">替換的內容</block>
注意name屬性的值,名字必須是一樣的
言
Hi,大家好,我是希留。
在項目的開發工程中,經常有導入導出數據的常見功能場景,Apache的POI是處理導入導出中最常用的,但是其原生的用法太復雜,很繁瑣,總是在Copy… ,無意間發現一款簡單粗暴的神器EasyPoi,EasyPoi也是基于POI的,在SpringBoot中也是做了很好的封裝,讓我們能夠在SpringBoot 快速地使用 EasyPoi 進行開發,很方便,而且支持多種格式的導入導出。
本篇文章就給大家介紹下EasyPoi。如果對你有幫助的話,還不忘點贊支持一下,感謝!文末附有源碼
目錄
一、EasyPoi簡介
EasyPoi功能如同名字easy,主打的功能就是容易,讓一個沒見接觸過poi的人員就可以方便的寫出Excel導出,Excel模板導出,Excel導入,Word模板導出,通過簡單的注解和模板語言(熟悉的表達式語法),完成以前復雜的寫法。
最新官方文檔:
http://doc.wupaas.com/docs/easypoi/easypoi-1c0u6ksp2r091
二、EasyPoi主要功能
三、EasyPoi注解介紹
easypoi起因就是Excel的導入導出,最初的模板是實體和Excel的對應,model–row,filed–col 這樣利用注解我們可以和容易做到excel到導入導出。
這個是必須使用的注解,如果需求簡單只使用這一個注解也是可以的,涵蓋了常用的Excel需求。
屬性 | 類型 | 默認值 | 功能 |
name | String | null | 列名 |
orderNum | String | “0” | 列的排序 |
replace | String[] | {} | 值的替換 {a_id,b_id} |
type | int | 1 | 導出類型 1 是文本 2 是圖片,3 是函數, |
exportFormat | String | “” | 導出的時間格式,以這個是否為空來判斷 |
importFormat | String | “” | 導入的時間格式,以這個是否為空來判斷是否需要格式化日期 |
format | String | “” | 時間格式,相當于同時設置了exportForm |
suffix | String | “” | 文字后綴,如% 90 變成90% |
isHyperlink | boolean | false | 超鏈接,如果是需要實現接口返回對象 |
isImportField | boolean | true | 校驗字段,看看這個字段是不是導入 |
一對多的集合注解,用以標記集合是否被數據以及集合的整體排序
屬性 | 類型 | 默認值 | 功能 |
id | String | null | 定義ID |
name | String | null | 定義集合列名, |
orderNum | int | 0 | 排序,支持name_id |
type | Class<?> | ArrayList.class | 導入時創建對象使用 |
四、開始使用
Maven 依賴:
<dependency>
<groupId>cn.afterturn</groupId>
<artifactId>easypoi-spring-boot-starter</artifactId>
<version>4.2.0</version>
</dependency>
首先定義需要導入的數據類型UserImportVO,并使用@Excel注解與excel列映射,導入的時候通常需要對導入的數據進行一些校驗
EasyPoi的校驗使用也很簡單,對象上加上通用的校驗規則,配置下需要校驗就可以了,校驗主要是JSR 303 規范,可結合Hibernate Validator使用
導入類對象實現IExcelModel、IExcelDataModel 接口,可獲取到錯誤校驗信息。
@Data
public class UserImportVO implements Serializable,IExcelModel, IExcelDataModel {
@NotBlank
@Excel(name = "姓名")
private String realName;
@Excel(name = "性別", replace = { "男_1", "女_2" })
private Integer sex;
@Excel(name = "出生日期", format = "yyyy-MM-dd")
private Date birthday;
@Length(min=1, max=11, message = "請填寫正確的手機號")
@Excel(name = "手機號碼")
private String phone;
@Excel(name = "固定電話")
private String tel;
@Email(message = "請填寫正確的郵箱地址")
@Excel(name = "郵箱")
private String email;
@Excel(name = "頭像地址")
private String avatar;
@Excel(name = "信息")
private String errorMsg;
private Integer rowNum;
@Override
public Integer getRowNum() {
return this.rowNum;
}
@Override
public void setRowNum(Integer i) {
this.rowNum = i;
}
@Override
public String getErrorMsg() {
return this.errorMsg;
}
@Override
public void setErrorMsg(String s) {
this.errorMsg = s;
}
}
在編寫controller層導入方法
@Autowired
private IUserService userService;
@PostMapping("/importExcel")
public String importExcel(@RequestParam("file") MultipartFile file) {
try {
String result = userService.importExcel(file);
return result;
} catch (Exception e) {
return "導入失敗";
}
}
具體的實現是在service層
@Autowired
private UserVerifyHandler userVerifyHandler;
@Override
public String importExcel(MultipartFile file) throws Exception{
ImportParams importParams = new ImportParams();
//表格標題行數,默認0
importParams.setTitleRows(1);
//是否需要校驗上傳的Excel
importParams.setNeedVerify(true);
//告訴easypoi我們自定義的驗證器
importParams.setVerifyHandler(userVerifyHandler);
ExcelImportResult<UserImportVO> result = ExcelImportUtil.importExcelMore(file.getInputStream(),UserImportVO.class,importParams);
if (!result.isVerifyFail() && !CollectionUtils.isEmpty(result.getList())) {
for (UserImportVO vo : result.getList()) {
log.info("從Excel導入數據到數據庫的詳細為 :{}", vo);
User user = new User();
BeanUtil.copyProperties(vo,user);
this.save(user);
}
} else {
for (UserImportVO vo : result.getFailList()) {
log.info("校驗失敗的詳細為 :{}", vo);
}
return "文檔校驗失敗";
}
return "導入成功";
}
2.4 ImportParams 參數
導入參數介紹下
屬性 | 類型 | 默認值 | 功能 |
titleRows | int | 0 | 表格標題行數,默認0 |
headRows | int | 1 | 表頭行數,默認1 |
startRows | int | 0 | 字段真正值和列標題之間 |
startSheetIndex | int | 0 | 開始讀取的sheet位置, |
needVerfiy | boolean | false | 是否需要校驗上傳的Excel |
needSave | boolean | false | 是否需要保存上傳的Excel |
saveUrl | String | “upload | 保存上傳的Excel目錄,默認 |
importFields | String[] | null | 導入時校驗數據模板,是不是正確的Excel |
verifyHanlder | IExcelVerifyHandler | null | 校驗處理接口,自定義校驗 |
dataHanlder | IExcelDataHandler | null | 數據處理接口,以此為主,replace,format都在這后面 |
通用的校驗滿足不了所有的校驗,例如還需要通過查詢數據庫,校驗數據的唯一性,此時需要自定義一個校驗規則,實現IExcelVerifyHandler接口。
@Component
public class UserVerifyHandler implements IExcelVerifyHandler<UserImportVO> {
@Autowired
private IUserService userService;
@Override
public ExcelVerifyHandlerResult verifyHandler(UserImportVO vo) {
ExcelVerifyHandlerResult result = new ExcelVerifyHandlerResult();
//假設我們要添加用戶,現在去數據庫查詢realName,如果存在則表示校驗不通過。
User user = userService.getOne(new LambdaQueryWrapper<User>().eq(User::getRealName,vo.getRealName()));
if (user!=null) {
result.setMsg("唯一校驗失敗");
result.setSuccess(false);
return result;
}
result.setSuccess(true);
return result;
}
}
1.此時,基本上的代碼就編寫完了,開始測試:
使用postman工具進行導入測試,先填充一些不符合規則的數據,可以看到控制臺輸出的校驗錯誤的信息。
圖1-1 測試導入的execl
圖1-2 postman工具請求接口返回
圖1-3 控制臺輸出
2.再填充一些符合規則的數據,可以看到導入成功,數據庫成功插入數據。
圖2-1 測試導入的execl
圖2-2 postman工具請求接口返回
圖2-3 數據庫數據
導出類不需要配置校驗規則,只需定義要導出的信息
@Data
public class UserExportVO implements Serializable {
@Excel(name = "姓名")
private String realName;
@Excel(name = "性別", replace = { "男_1", "女_2" }, suffix = "生")
private Integer sex;
@Excel(name = "出生日期", format = "yyyy-MM-dd")
private Date birthday;
@Excel(name = "手機號碼")
private String phone;
@Excel(name = "固定電話")
private String tel;
@Excel(name = "郵箱")
private String email;
@Excel(name = "頭像地址")
private String avatar;
}
編寫controller層導出方法
@GetMapping("/exportExcel")
public void export(HttpServletResponse response) {
//查詢要導出的數據
List<UserExportVO> users = userService.getUserExportList();
ExcelUtil.exportExcelX(users, "測試導出表", "sheet1", UserExportVO.class, "測試導出表.xlsx", response);
}
3.3 service層
編寫service層查詢需要導出的數據,把查詢出來的集合轉化成導出VO集合。
@Override
public List<UserExportVO> getUserExportList() {
List<User> users = this.list();
//users集合轉成export集合
List<UserExportVO> exportVOList = users.stream().map(user -> {
UserExportVO vo = new UserExportVO();
BeanUtils.copyProperties(user, vo);
return vo;
}).collect(Collectors.toList());
return exportVOList;
}
直接瀏覽器請求導出接口,成功導出。
圖3-1 請求導出接口
圖3-2 導出結果
結語
好了,以上就是今天要講的內容,本文僅僅簡單介紹了使用EasyPoi導入導出功能的使用,而EasyPoi還提供了模板的導出、圖片的導出、word的導出等等功能,感興趣的朋友,可查閱官方文檔進一步探究。
本次demo源碼:
Gitee地址:https://gitee.com/huoqstudy/java-sjzl-demo/tree/master/springboot-easypoi-demo
Github地址:https://github.com/277769738/java-sjzl-demo/tree/master/springboot-easypoi-demo
*請認真填寫需求信息,我們會在24小時內與您取得聯系。