HTTP 是客戶端-服務器計算模型中的請求-響應協議。要開始交換,客戶端向服務器提交請求。為了完成交換,服務器向客戶端返回響應。服務器只能向一個客戶端發送響應 (發出請求的那個) 。在 HTTP 協議中,客戶端是消息交換的發起者。
有些場景需要由服務端主動推送消息給客戶端。實現這一點的方法之一是允許服務器在發布/訂閱計算模型中向客戶端推送消息。要開始交換,客戶端從服務器訂閱消息。在交換期間,服務器向許多訂閱的客戶端發送消息(一旦它們可用)。
服務器發送事件 (SSE) 是一種簡單的技術,用于為特定的 Web 應用程序實現服務器到客戶端的異步通信。
有多種技術允許客戶端從服務器接收有關異步更新的消息。它們可以分為兩類:客戶端拉取和服務器推送。
在客戶端拉取技術中,客戶端會定期向服務器請求更新。服務器可以使用更新或尚未更新的特殊響應進行響應。有兩種類型的客戶端拉取:短輪詢和長輪詢。
客戶端定期向服務器發送請求。如果服務器有更新,它會向客戶端發送響應并關閉連接。如果服務器沒有更新,它也會向客戶端發送一個響應并關閉連接。
客戶端向服務器發送請求。如果服務器有更新,它會向客戶端發送響應并關閉連接。如果服務器沒有更新,它會保持連接直到更新可用。當更新可用時,服務器向客戶端發送響應并關閉連接。如果更新在某個超時時間內不可用,服務器會向客戶端發送響應并關閉連接。
在服務器推送技術中,服務器在消息可用后立即主動向客戶端發送消息。其中,有兩種類型的服務器推送:SSE和 WebSocket。
SSE 是一種在基于瀏覽器的 Web 應用程序中僅從服務器向客戶端發送文本消息的技術。SSE基于 HTTP 協議中的持久連接, 具有由 W3C 標準化的網絡協議和 EventSource 客戶端接口,作為 HTML5 標準套件的一部分。
WebSocket 是一種在 Web 應用程序中實現同時、雙向、實時通信的技術。WebSocket 基于 HTTP 以外的協議(TCP),因此可能需要額外設置網絡基礎設施(代理服務器、NAT、防火墻等)。
客戶端通過Http協議請求,在握手階段升級為WebSocket協議。
要訂閱服務器事件,客戶端發出 GET 請求帶有指定的header:
GET /sse HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
服務器應該使用帶有標題的響應來確認訂閱:
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
訂閱后,服務端在消息可用時立即發送給客戶端。事件是采用 UTF-8 編碼的文本消息。事件之間由兩個換行符分隔\n\n。每個事件由一個或多個名稱:值字段組成,由單個換行符\n 分隔。
在數據字段中,服務器可以發送事件數據
data: The first event.
data: The second event.
服務器可以發送唯一的事件標識符(id字段)。如果連接中斷,客戶端會自動重新連接并發送最后接收到的帶有header的 Last-Event-ID 的事件 ID。
在事件字段中,服務器可以發送事件類型。服務器可以在同一個訂閱中發送不同類型的事件,也可以不發送任何類型的事件。
event: type1
data: An event of type1.
event: type2
data: An event of type2.
data: An event without any type.
在重試字段中,服務器可以發送超時(以毫秒為單位),之后客戶端應在連接中斷時自動重新連接。如果未指定此字段,則標準應為 3000 毫秒。
retry: 1000
如果一行以冒號字符 : 開頭,客戶端應該忽略它。這可用于從服務器發送評論或防止某些代理服務器因超時關閉連接。
: ping
要打開連接,應創建一個 EventSource 對象。
var eventSource = new EventSource('/sse);
盡管 SSE 旨在將事件從服務器發送到客戶端,但可以使用 GET 查詢參數將數據從客戶端傳遞到服務器。
var eventSource = new EventSource('/sse?event=type1);
...
eventSource.close();
eventSource = new EventSource('/sse?event=type1&event=type2);
...
要關閉連接,應調用方法 close()。
eventSource.close();
有表示連接狀態的 readyState 屬性:
要處理連接的建立,它應該訂閱 onopen 事件處理程序。
eventSource.onopen = function () {
console.log('connection is established');
};
為了處理連接狀態的一些異常或致命錯誤,它應該訂閱 onerrror 事件處理程序。
eventSource.onerror = function (event) {
console.log('connection state: ' + eventSource.readyState + ', error: ' + event);
};
客戶端接收消息并處理他們,可以使用onmessage方法
eventSource.onmessage = function (event) {
console.log('id: ' + event.lastEventId + ', data: ' + event.data);
};
SSE可被大多數瀏覽器支持:
Spring Web MVC 框架 5.2.0 是基于 Servlet 3.1 API 且用線程池實現異步應用程序. 所以應用能夠被使用在 Servlet 3.1+ 的容器,比如:Tomcat 8.5 和 Jetty 9.3.
使用Spring MVC來發送事件:
示例:
@RestController
public class SseWebMvcController
private SseEmitter emitter;
@GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter createConnection() {
emitter = new SseEmitter();
return emitter;
}
// in another thread
void sendEvents() {
try {
emitter.send("Alpha");
emitter.send("Omega");
emitter.complete();
} catch(Exception e) {
emitter.completeWithError(e);
}
}
}
在這個例子中,服務器每秒發送一個持續時間短的周期性事件流 - 一個有限的詞流,直到詞完成。
示例:
@Controller
@RequestMapping("/sse/mvc")
public class WordsController {
private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");
private final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
@GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getWords() {
SseEmitter emitter = new SseEmitter();
cachedThreadPool.execute(() -> {
try {
for (int i = 0; i < WORDS.length; i++) {
emitter.send(WORDS[i]);
TimeUnit.SECONDS.sleep(1);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}
運行效果:
客戶端示例(words.html):
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Server-Sent Events client example with EventSource</title>
</head>
<body>
<script>
if (window.EventSource == null) {
alert('The browser does not support Server-Sent Events');
} else {
var eventSource = new EventSource('/sse/mvc/words');
eventSource.onopen = function () {
console.log('connection is established');
};
eventSource.onerror = function (error) {
console.log('connection state: ' + eventSource.readyState + ', error: ' + event);
};
eventSource.onmessage = function (event) {
console.log('id: ' + event.lastEventId + ', data: ' + event.data);
if (event.data.endsWith('.')) {
eventSource.close();
console.log('connection is closed');
}
};
}
</script>
</body>
</html>
運行效果:
在此示例中,服務器發送持久的周期性事件流 - 每秒可能無限的服務器性能信息流:
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/sse/mvc")
public class LongEventController {
private final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
private SseEmitter emitter;
@PostConstruct
public void init() {
scheduledThreadPool.scheduleAtFixedRate(() -> {
try {
if (emitter != null) {
emitter.send(UUID.randomUUID().toString());
}
} catch (IOException e) {
e.printStackTrace();
}
}, 0, 1, TimeUnit.SECONDS);
}
@GetMapping(path = "/getEvents", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter getEvents() {
emitter = new SseEmitter();
return emitter;
}
}
效果預覽(每秒輸出一次):
非周期性是指沒有固定的時間周期,可能由其他因素在任意時刻都可能觸發,下面示例通過spring event來模擬觸發因子。
@RestController
@RequestMapping("/sse/mvc")
public class EventController {
private SseEmitter emitter;
@Autowired
private ApplicationContext applicationContext;
/**
* 訂閱事件通道
* @return
*/
@GetMapping(path = "/event", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter event() {
emitter = new SseEmitter();
return emitter;
}
/**
* 模擬某一事件觸發動作
* @param eventType
*/
@GetMapping(path = "/trigger")
public void trigger(String eventType) {
applicationContext.publishEvent(new MyEvent(eventType));
}
/**
* 監聽動作,發送給客戶端數據
*/
@EventListener(classes = MyEvent.class)
public void triggerEvent(MyEvent event) throws IOException {
emitter.send(event);
}
}
效果:
模擬觸發動作:調用 http://localhost:8080/sse/mvc/trigger?eventType=customer
客戶端收到數據:
Spring Web Flux 框架 5.2.0 是基于 Reactive Streams API 且使用 event-loop 計算模型來實現異步java應用程序。此類應用程序可以在非阻塞 Web 服務器(例如 Netty 4.1 和 Undertow 1.4)和 Servlet 3.1+ 容器(例如 Tomcat 8.5 和 Jetty 9.3)上運行。
使用 Spring Web Flux 框架實現發送事件:
簡單示例:
@RestController
public class ExampleController
@GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> createConnectionAndSendEvents() {
return Flux.just("Alpha", "Omega");
}
}
和上面spring mvc的示例一樣,也是每秒輸出數據,實現如下:
@GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getWords() {
return Flux
.zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1);
}
效果:
對比spring mvc的實現,我們改為flux實現,如下:
@GetMapping(path = "/getEvents", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getEvents() {
return Flux
.interval(Duration.ofSeconds(1))
.map(sequence -> UUID.randomUUID().toString());
}
效果和上面是一樣的,可以看出,reactive api是非常的簡潔。
探索ChatGPT的使用過程中,我們發現GPT采用了流式數據返回的方式。理論上,這種情況可以通過全雙工通信協議實現持久化連接,或者依賴于基于EventStream的事件流。然而,ChatGPT選擇了后者,也就是本文即將深入探討的SSE(Server-Sent Events)技術。
要理解這個選擇,我們需要關注ChatGPT的使用場景。作為一個基于深度學習的大型語言模型,ChatGPT需要處理大量的自然語言數據,這無疑需要大量的計算資源和時間。相較于普通的讀取數據庫操作,其響應速度自然會慢許多。
對于這種可能需要長時間等待響應的對話場景,ChatGPT采用了一種巧妙的策略:它會將已經計算出的數據“推送”給用戶,并利用SSE技術在計算過程中持續返回數據。這樣做可以避免用戶因等待時間過長而選擇關閉頁面。
SSE(Server-Sent Events)是一種Web技術,它允許服務器實時向客戶端推送數據。相比于傳統的輪詢和長輪詢機制,SSE提供了一種更高效且實時的數據推送方式。這種技術主要應用于構建實時應用,例如實時消息推送、股票行情更新等。
SSE是HTML5規范中的一個通信相關API,它主要包含兩個部分:服務端與瀏覽器端的通信協議(基于HTTP協議),以及瀏覽器端JavaScript可使用的EventSource對象。
SSE運行在HTTP協議之上,它允許服務器以事件流(Event Stream)的形式將數據發送給客戶端。客戶端通過建立持久化的HTTP連接,并監聽這個事件流,從而可以實時接收到服務器推送的數據。
SSE具有以下幾個主要特點:
WebSocket是一種Web技術,用于實現實時雙向通信,它與SSE(Server-Sent Events)在某些方面存在差異。以下是對兩者的比較:
選擇使用SSE還是WebSocket主要取決于具體的業務需求和場景。如果你只需要實現從服務器向客戶端的單向數據推送,并且希望保持操作簡便且兼容性好,那么SSE是一個理想的選擇。然而,如果你需要實現雙向通信,或者需要更高級的功能和控制,那么WebSocket可能會更適合你的需求。
以下是SSE(Server-Sent Events)的實現原理:
總結起來,SSE使用了基于文本和HTTP協議的簡單機制,使得服務器能夠實時地將數據推送到客戶端,而無需客戶端頻繁地發起新的請求。
以下是在使用SSE(Server-Sent Events)技術進行實時數據推送時需要注意的幾個關鍵點:
以上這些注意事項可以根據具體應用需求進行調整和優化。在實際應用中,確保服務器的穩定性、安全性和性能是非常重要的。同時,在處理SSE連接時,可以考慮適當的限流和安全控制措施,以防止濫用和惡意連接的出現。總的來說,使用SSE技術時需要全面考慮各個方面的因素,才能實現高效、穩定、安全的實時數據推送服務。
假設正在開發一個實時股票價格監控應用,需要將股票價格實時推送給客戶端。以下為Spring Boot中集成SSE技術實現的場景示例代碼。
首先,定義一個控制器來處理SSE請求和發送實時股票價格:
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Random;
@RestController
public class StockController {
?
@GetMapping(value = "/stock-price", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamStockPrice() {
SseEmitter emitter = new SseEmitter();
// 模擬生成實時股票價格并推送給客戶端
Random random = new Random();
new Thread(() -> {
try {
while (true) {
// 生成隨機的股票價格
double price = 100 + random.nextDouble() * 10;
// 構造股票價格的消息
String message = String.format("%.2f", price);
// 發送消息給客戶端
emitter.send(SseEmitter.event().data(message));
// 休眠1秒鐘
Thread.sleep(1000);
}
} catch (Exception e) {
emitter.completeWithError(e);
}
}).start();
return emitter;
}
}
在上述代碼中,定義了一個streamStockPrice()方法,該方法使用@GetMapping注解將/stock-price路徑映射到該方法上,并指定produces = MediaType.TEXT_EVENT_STREAM_VALUE以表明該方法將產生SSE事件流。
在方法內部創建了一個SseEmitter對象作為事件發射器,并在一個單獨的線程中不斷生成隨機的股票價格,并將價格轉換為字符串形式發送給客戶端。
通過emitter.send()方法發送的數據會被封裝為SSE事件流的形式,客戶端可以通過監聽該事件流來實時接收股票價格。
在前端頁面中,創建一個簡單的HTML頁面來展示實時股票價格:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>實時股票價格監控</title>
</head>
<body>
<h1>實時股票價格</h1>
<div id="stock-price"></div>
?
<script>
const eventSource = new EventSource('/stock-price');
eventSource.onmessage = function (event) {
document.getElementById('stock-price').innerHTML = event.data;
};
</script>
</body>
</html>
上述代碼中,通過new EventSource('/stock-price')創建了一個EventSource對象,它與/stock-price路徑建立SSE連接。然后,通過eventSource.onmessage定義了接收消息的回調函數,在收到新消息時更新頁面上的股票價格。
通過以上代碼,可以在瀏覽器中打開該HTML頁面,它會建立與服務器的SSE連接,并實時接收并展示股票價格。這只是使用SSE實現實時數據推送的一個簡單示例。在實踐中,可以根據具體的業務需求和場景,進行更復雜和豐富的實現。
SSE(Server-Sent Events)是一種基于HTTP協議的輕量級實時通信技術,具備服務端推送、斷線重連和簡單輕量等優點。然而,它也存在一些限制,例如無法進行雙向通信、連接數受限以及僅支持GET請求等。
在Web應用程序中,SSE可以實現各種即時數據推送功能,如股票在線數據更新、日志推送、實時顯示聊天室人數等。
然而,需要注意的是,并非所有的實時推送場景都適合使用SSE。在需要處理高并發、高吞吐量和低延遲的場景下,WebSocket可能是更好的選擇。而對于那些需要輕量級推送解決方案的場景,SSE可能會更加適合。
因此,在選擇實時更新方案時,我們需要根據具體的需求和應用場景來做出決策。只有這樣,我們才能確保選擇的技術能夠最大程度地滿足我們的需求。
是 碼農小胖哥。天天有編程干貨分享。覺得寫的不錯。點個贊,轉發一下,關注一下。本文為個人原創文章,轉載請注明出處,非法轉載抄襲將追究其責任。
場景
今天項目經理交給我一個開發任務。如果有人在前臺下了訂單就給后臺倉庫管理一個發貨通知。也就是服務端觸發一個事件,推送消息到客戶端。
如果我用websocket來做還要搞個websocket服務器,而且還 有不少配置。websocket是全雙工通信,單向通信簡直是殺雞用牛刀。用輪詢吧,浪費服務器資源不說,還不一定實時,訂單處理慢了豈不是怠慢了客戶。有沒有別的選擇呢?當然有!
1.SSE推送技術
SSE全稱Server-sent Events,是HTML 5 規范的一個組成部分,具體去MDN網站查看相關文檔。該規范十分簡單,主要由兩個部分組成:第一個部分是服務器端與瀏覽器端之間的通訊協議,第二部分是在瀏覽器端可供 JavaScript 使用的 EventSource 對象。通訊協議是基于純文本的簡單協議。服務器響應的內容類型是“text/event-stream”。響應文本的內容可以看成是一個事件流,由不同的事件所組成。每個事件由類型和數據兩部分組成,同時每個事件可以有一個可選的標識符。不同事件的內容之間通過僅包含回車符和換行符的空行(“\r\n”)來分隔。每個事件的數據可能由多行組成。
如上圖所示,每個事件之間通過空行來分隔。每一行都是由鍵值對組成。如果鍵為空則表示該行為注釋,會在處理時被忽略。例如第10行。
第1行表示一個只包含數據的事件。會按照默認事件走(message事件)。第3-4行代表一個附帶eventID的事件。第6-8行代表一個自定義事件。第10-14行代表一個多行數據事件,多行數據由換行符鏈接
key定義有以下幾種:
SSE只適用于高級瀏覽器,但是注意IE不直接支持。IE上的XMLHttpRequest對象不支持獲取部分的響應內容,所以不支持。每次總有IE,怪不得快被淘汰了。
2. SSE VS Websocket
3. Spring Mvc中的SSE
Spring Mvc對SSE進行了支持。如果你要聲明一個SSE連接。只需要在你的控制器聲明一個如下接口:
必須必須返回SseEmitter對象,SseEmitter對象是Session級別的,如果你要點對點針對每個session要獨立存儲。如果你是廣播可以共用一個SseEmitter對象。按照SSE規范也必須聲明produces為"text/event-stream"。當你調用該接口的時候將建立起SSE連接。
你可以在另一個線程中調用SseEmitter的send方法向客戶端發送事件。你也可以在發送事件后調用complete方法來關閉SSE連接。
4.瀏覽器端的EventSource
由于SSE 是HTML5規范。所以對于APP端必須有HTML才能支持。并且IE如果要支持需要使用一些兼容開發包,比如polyfill庫。客戶端因為只接受事件所以開發比較簡單:
5.總結
今天介紹了SSE 服務端推送。和長輪訓、comet、websocket相比而言比較輕量級。在一些需要服務器實時推送規模不大的業務場景實現更簡單點。相信看了本文后你會很快入門。在實際開發中要根據業務對這幾種推送進行技術選型。沒有最好的只有最適合的。SSE對大多數開發者來說不夠熟悉。相關代碼碼云倉庫:
https://gitee.com/felord/sse-push
*請認真填寫需求信息,我們會在24小時內與您取得聯系。