延時消息(定時消息)指的在 分布式異步消息場景 下,生產端發送一條消息,希望在指定延時或者指定時間點被消費端消費到,而不是立刻被消費。
延時消息適用的業務場景非常的廣泛,在分布式系統環境下,延時消息的功能一般會在下沉到中間件層,通常是 MQ 中內置這個功能或者內聚成一個公共基礎服務。
本文旨在探討常見延時消息的實現方案以及方案設計的優缺點。
這里討論的外部存儲指的是在 MQ 本身自帶的存儲以外又引入的其他的存儲系統。
基于外部存儲的方案本質上都是一個套路,將 MQ 和 延時模塊 區分開來,延時消息模塊是一個獨立的服務/進程。延時消息先保留到其他存儲介質中,然后在消息到期時再投遞到 MQ。當然還有一些細節性的設計,比如消息進入的延時消息模塊時已經到期則直接投遞這類的邏輯,這里不展開討論。
下述方案不同的是,采用了不同的存儲系統。
基于關系型數據庫(如MySQL)延時消息表的方式來實現。
CREATE TABLE `delay_msg` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`delivery_time` DATETIME NOT NULL COMMENT '投遞時間',
`payloads` blob COMMENT '消息內容',
PRIMARY KEY (`id`),
KEY `time_index` (`delivery_time`)
)
通過定時線程定時掃描到期的消息,然后進行投遞。定時線程的掃描間隔理論上就是你延時消息的最小時間精度。
優點:
缺點:
RocksDB 的方案其實就是在上述方案上選擇了比較合適的存儲介質。
RocksDB 在筆者之前的文章中有聊過,LSM 樹根更適合大量寫入的場景。滴滴開源的DDMQ中的延時消息模塊 Chronos 就是采用了這個方案。
DDMQ 這個項目簡單來說就是在 RocketMQ 外面加了一層統一的代理層,在這個代理層就可以做一些功能維度的擴展。延時消息的邏輯就是代理層實現了對延時消息的轉發,如果是延時消息,會先投遞到 RocketMQ 中 Chronos 專用的 topic 中。延時消息模塊 Chronos 消費得到延時消息轉出到 RocksDB,后面就是類似的邏輯了,定時掃描到期的消息,然后往 RocketMQ 中投遞。
這個方案老實說是一個比較重要的方案。因為基于 RocksDB 來實現的話,從數據可用性的角度考慮,你還需要自己去處理多副本的數據同步等邏輯。
優點:
缺點:
再來聊聊 Redis 的方案。下面放一個比較完善的方案。
本方案來源于: https://www.cnblogs.com/lylife/p/7881950.html
這個方案選用 Redis 存儲在我看來有以下幾點考慮,
但是這個方案其實也有需要斟酌的地方,上述方案通過創建多個 Delayed Queue 來滿足對于并發性能的要求,但這也帶來了多個 Delayed Queue 如何在多個節點情況下均勻分配,并且很可能出現到期消息并發重復處理的情況,是否要引入分布式鎖之類的并發控制設計?
在量不大的場景下,上述方案的架構其實可以蛻化成主從架構,只允許主節點來處理任務,從節點只做容災備份。實現難度更低更可控。
上述幾個方案中,都通過線程定時掃描的方案來獲取到期的消息。
定時線程的方案在消息量較少的時候,會浪費資源,在消息量非常多的時候,又會出現因為掃描間隔設置不合理導致延時時間不準確的問題。可以借助 JDK Timer 類中的思想,通過 wait-notify 來節省 CPU 資源。
獲取中最近的延時消息,然后wait(執行時間-當前時間),這樣就不需要浪費資源到達時間時會自動響應,如果有新的消息進入,并且比我們等待的消息還要小,那么直接notify喚醒,重新獲取這個更小的消息,然后又wait,如此循環。
再來講講目前自帶延時消息功能的開源MQ,它們是如何實現的
RocketMQ 開源版本支持延時消息,但是只支持 18 個 Level 的延時,并不支持任意時間。只不過這個 Level 在 RocketMQ 中可以自定義的,所幸來說對普通業務算是夠用的。默認值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個level。
通俗地講,設定了延時 Level 的消息會被暫存在名為 SCHEDULE_TOPIC_XXXX 的topic中,并根據 level 存入特定的queue,queueId = delayTimeLevel – 1,**即一個queue只存相同延時的消息,保證具有相同發送延時的消息能夠順序消費。**broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。
下面是整個實現方案的示意圖,紅色代表投遞延時消息,紫色代表定時調度到期的延時消息:
優點:
缺點:
Pulsar 支持“任意時間”的延時消息,但實現方式和 RocketMQ 不同。
通俗的講,Pulsar 的延時消息會直接進入到客戶端發送指定的 Topic 中,然后在堆外內存中創建一個基于時間的優先級隊列,來維護延時消息的索引信息。延時時間最短的會放在頭上,時間越長越靠后。在進行消費邏輯時候,再判斷是否有到期需要投遞的消息,如果有就從隊列里面拿出,根據延時消息的索引查詢到對應的消息進行消費。
如果節點崩潰,在這個 broker 節點上的 Topics 會轉移到其他可用的 broker 上,上面提到的這個優先級隊列也會被重建。
下面是 Pulsar 公眾號中對于 Pulsar 延時消息的示意圖。
乍一看會覺得這個方案其實非常簡單,還能支持任意時間的消息。但是這個方案有幾個比較大的問題
對于前面第一點和第二點的問題,社區也設計了解決方案,在隊列中加入時間分區,Broker 只加載當前較近的時間片的隊列到內存,其余時間片分區持久化磁盤,示例圖如下圖所示:
但是目前,這個方案并沒有對應的版本。可以在實際使用時,規定只能使用較小時間跨度的延時消息,來減少前兩點缺陷的影響。
至于第三個方案,估計是比較難解決的,需要在數據存儲層將延時消息和正常消息區分開來,單獨存儲延時消息。
QMQ提供任意時間的延時/定時消息,你可以指定消息在未來兩年內(可配置)任意時間內投遞。
把 QMQ 放到最后,是因為我覺得 QMQ 是目前開源 MQ 中延時消息設計最合理的。里面設計的核心簡單來說就是 多級時間輪 + 延時加載 + 延時消息單獨磁盤存儲 。
如果對時間輪不熟悉的可以閱讀筆者的這篇文章 從 Kafka 看時間輪算法設計
QMQ的延時/定時消息使用的是兩層 hash wheel 來實現的。第一層位于磁盤上,每個小時為一個刻度(默認為一個小時一個刻度,可以根據實際情況在配置里進行調整),每個刻度會生成一個日志文件(schedule log),因為QMQ支持兩年內的延時消息(默認支持兩年內,可以進行配置修改),則最多會生成 2 * 366 * 24 = 17568 個文件(如果需要支持的最大延時時間更短,則生成的文件更少)。 第二層在內存中,當消息的投遞時間即將到來的時候,會將這個小時的消息索引(索引包括消息在schedule log中的offset和size)從磁盤文件加載到內存中的hash wheel上,內存中的hash wheel則是以500ms為一個刻度 。
總結一下設計上的亮點:
本文匯總了目前業界常見的延時消息方案,并且討論了各個方案的優缺點。希望對讀者有所啟發。
原文 https://ricstudio.top/archives/delay-msg-designs
oxmail郵箱是不少網友都在使用的郵箱,不過,很多foxmail郵箱的人性化功能卻不為人知。比如說,foxmail郵箱的密送功能、定時發送郵件功能等。今天,小編就給大家分享一下關于foxmail郵箱定時發送郵件的設置方法。那么,foxmail郵箱的定時發送功能怎么開啟呢?一起來看看今天的foxmail郵箱使用方法就知道了!
1、首先我們將郵件寫好,點擊右上角菜單==定時發送;
2、出現定時發送設置選項,設置您需要定時發送郵件的發送時間;
3、設置好了之后,點擊發送郵件會自動到草稿箱里面,等待 定時的時間進行郵件發送操作;
4、點擊郵件,查看可以看到 設置好的定時發送郵件的信息如下:
avaScript 是一種異步的、事件驅動的語言,這在處理諸如網絡請求、文件操作或定時任務等操作時非常有用。傳統上,JavaScript 使用回調函數來處理異步操作,但這可能會導致所謂的“回調地獄”。ES6 引入了 Promises 來幫助解決這個問題,而 ES2017 則引入了 async/await,進一步簡化了異步編程。
Async/await 是基于 Promises 的,它允許我們以同步的方式編寫異步代碼,使得代碼更加清晰易懂。下面,我們將通過幾個例子來演示如何在實際中使用 async/await。
在這個例子中,我們將使用 async/await 來異步加載一張圖片,并將其顯示在頁面上。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Async/Await 示例:異步加載圖片</title>
</head>
<body>
<button id="loadImageButton">加載圖片</button>
<div id="imageContainer"></div>
<script>
async function loadImage(url) {
return new Promise((resolve, reject) => {
const image = new Image();
image.onload = () => resolve(image);
image.onerror = () => reject(new Error('圖片加載失敗'));
image.src = url;
});
}
document.getElementById('loadImageButton').addEventListener('click', async function() {
try {
const image = await loadImage('https://via.placeholder.com/150');
document.getElementById('imageContainer').appendChild(image);
} catch (error) {
console.error(error);
}
});
</script>
</body>
</html>
在這個例子中,loadImage 函數返回一個 Promise 對象,我們通過 async 關鍵字聲明了一個異步函數,并在事件監聽器中使用 await 關鍵字等待圖片加載完成。
在這個例子中,我們將使用 async/await 來異步獲取網絡數據,并在頁面上顯示。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Async/Await 示例:異步獲取數據</title>
</head>
<body>
<button id="fetchDataButton">獲取數據</button>
<pre id="dataContainer"></pre>
<script>
async function fetchData(url) {
const response = await fetch(url);
if (!response.ok) {
throw new Error('網絡請求失敗');
}
return response.json();
}
document.getElementById('fetchDataButton').addEventListener('click', async function() {
try {
const data = await fetchData('https://mock.apifox.com/m1/2209590-0-default/pet/findByStatus');
document.getElementById('dataContainer').textContent = JSON.stringify(data, null, 2);
} catch (error) {
console.error(error);
}
});
</script>
</body>
</html>
在這個例子中,fetchData 函數中使用 await 關鍵字等待 fetch API 的響應,并處理成功和失敗的情況。
Async/await 使得異步代碼的錯誤處理變得更加直觀。我們可以使用傳統的 try/catch 語句來捕獲和處理錯誤。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Async/Await 示例:錯誤處理</title>
</head>
<body>
<button id="errorHandlingButton">執行操作</button>
<div id="resultContainer"></div>
<script>
async function riskyOperation() {
throw new Error('出錯了!');
}
document.getElementById('errorHandlingButton').addEventListener('click', async function() {
try {
await riskyOperation();
document.getElementById('resultContainer').textContent = '操作成功';
} catch (error) {
document.getElementById('resultContainer').textContent = error.message;
}
});
</script>
</body>
</html>
在這個例子中,我們故意在 riskyOperation 函數中拋出一個錯誤,然后在事件監聽器中使用 try/catch 來捕獲這個錯誤。
Async/await 提供了一種更加直觀和簡潔的方式來處理 JavaScript 中的異步操作。通過這些例子,我們可以看到它如何幫助我們以更加同步的方式編寫異步代碼,同時保持代碼的可讀性和可維護性。隨著 JavaScript 語言的不斷發展,我們期待未來會有更多的新特性來進一步簡化異步編程。
*請認真填寫需求信息,我們會在24小時內與您取得聯系。