作者:DavidDing 來源:https://zhuanlan.zhihu.com/p/56135195
、前言
最近公司在預(yù)研設(shè)備app端與服務(wù)端的交互方案,主要方案有:
雖然上面的一些成熟方案肯定更利于上生產(chǎn)環(huán)境,但它們通訊基礎(chǔ)也都是socket長連接,所以本人主要是預(yù)研了一下socket長連接的交互,寫了個簡單demo,采用了BIO的多線程方案,集成了springboot,實現(xiàn)了自定義簡單協(xié)議,心跳機制,socket客戶端身份強制驗證,socket客戶端斷線獲知等功能,并暴露了一些接口,可通過接口簡單實現(xiàn)客戶端與服務(wù)端的socket交互。
Github源碼:
https://github.com/DavidDingXu/springboot-socket-demo
二、IO通訊模型
1. IO通訊模型簡介
IO通訊模型主要包括阻塞式同步IO(BIO),非阻塞式同步IO,多路復(fù)用IO以及異步IO。
該部分內(nèi)容總結(jié)自專欄文章:
https://blog.csdn.net/yinwenjie/column/info/sys-communication/3
1.1 阻塞式同步IO
BIO就是:blocking IO。最容易理解、最容易實現(xiàn)的IO工作方式,應(yīng)用程序向操作系統(tǒng)請求網(wǎng)絡(luò)IO操作,這時應(yīng)用程序會一直等待;另一方面,操作系統(tǒng)收到請求后,也會等待,直到網(wǎng)絡(luò)上有數(shù)據(jù)傳到監(jiān)聽端口;操作系統(tǒng)在收集數(shù)據(jù)后,會把數(shù)據(jù)發(fā)送給應(yīng)用程序;最后應(yīng)用程序受到數(shù)據(jù),并解除等待狀態(tài)。
BIO通訊示意圖
1.2 非阻塞式同步IO
這種模式下,應(yīng)用程序的線程不再一直等待操作系統(tǒng)的IO狀態(tài),而是在等待一段時間后,就解除阻塞。如果沒有得到想要的結(jié)果,則再次進行相同的操作。這樣的工作方式,暴增了應(yīng)用程序的線程可以不會一直阻塞,而是可以進行一些其他工作。
非阻塞式IO示意圖
1.3 多路復(fù)用IO(阻塞+非阻塞)
多路復(fù)用IO示意圖
目前流程的多路復(fù)用IO實現(xiàn)主要包括四種:select、poll、epoll、kqueue。下表是他們的一些重要特性的比較:
1.4 異步IO
異步IO則是采用“訂閱-通知”模式:即應(yīng)用程序向操作系統(tǒng)注冊IO監(jiān)聽,然后繼續(xù)做自己的事情。當(dāng)操作系統(tǒng)發(fā)生IO事件,并且準(zhǔn)備好數(shù)據(jù)后,在主動通知應(yīng)用程序,觸發(fā)相應(yīng)的函數(shù)。
異步IO示意圖
和同步IO一樣,異步IO也是由操作系統(tǒng)進行支持的。微軟的windows系統(tǒng)提供了一種異步IO技術(shù):IOCP(I/O Completion Port,I/O完成端口);
Linux下由于沒有這種異步IO技術(shù),所以使用的是epoll(上文介紹過的一種多路復(fù)用IO技術(shù)的實現(xiàn))對異步IO進行模擬。
2. Java對IO模型的支持
由于是要實現(xiàn)socket長連接的demo,主要關(guān)注其一些實現(xiàn)注意點及方案,所以本demo采用了BIO的多線程方案,該方案代碼比較簡單、直觀,引入了多線程技術(shù)后,IO的處理吞吐量也大大提高了。下面是BIO多線程方案server端的簡單實現(xiàn):
public static void main(String[] args) throws Exception{ ServerSocket serverSocket=new ServerSocket(83); try { while(true) { Socket socket=null; socket=serverSocket.accept(); //這邊獲得socket連接后開啟一個線程監(jiān)聽處理數(shù)據(jù) SocketServerThread socketServerThread=new SocketServerThread(socket); new Thread(socketServerThread).start(); } } catch(Exception e) { log.error("Socket accept failed. Exception:{}", e.getMessage()); } finally { if(serverSocket !=null) { serverSocket.close(); } } } } @slf4j class SocketServerThread implements Runnable { private Socket socket; public SocketServerThread (Socket socket) { this.socket=socket; } @Override public void run() { InputStream in=null; OutputStream out=null; try { in=socket.getInputStream(); out=socket.getOutputStream(); Integer sourcePort=socket.getPort(); int maxLen=2048; byte[] contextBytes=new byte[maxLen]; int realLen; StringBuffer message=new StringBuffer(); BIORead:while(true) { try { while((realLen=in.read(contextBytes, 0, maxLen)) !=-1) { message.append(new String(contextBytes , 0 , realLen)); /* * 我們假設(shè)讀取到“over”關(guān)鍵字, * 表示客戶端的所有信息在經(jīng)過若干次傳送后,完成 * */ if(message.indexOf("over") !=-1) { break BIORead; } } } //下面打印信息 log.info("服務(wù)器(收到來自于端口:" + sourcePort + "的信息:" + message); //下面開始發(fā)送信息 out.write("回發(fā)響應(yīng)信息!".getBytes()); //關(guān)閉 out.close(); in.close(); this.socket.close(); } catch(Exception e) { log.error("Socket read failed. Exception:{}", e.getMessage()); } } }
三、注意點及實現(xiàn)方案
1. TCP粘包/拆包
1.1 問題說明
假設(shè)客戶端分別發(fā)送了兩個數(shù)據(jù)包D1和D2給服務(wù)端,由于服務(wù)端一次讀取到的字節(jié)數(shù)是不確定的,故可能存在以下4種情況。 1. 服務(wù)端分兩次讀取到了兩個獨立的數(shù)據(jù)包,分別是D1和D2,沒有粘包和拆包; 2. 服務(wù)端一次接收到了兩個數(shù)據(jù)包,D1和D2粘合在一起,被稱為TCP粘包; 3. 服務(wù)端分兩次讀取到了兩個數(shù)據(jù)包,第一次讀取到了完整的D1包和D2包的部分內(nèi)容,第二次讀取到了D2包的剩余內(nèi)容,這被稱為TCP拆包; 4. 服務(wù)端分兩次讀取到了兩個數(shù)據(jù)包,第一次讀取到了D1包的部分內(nèi)容D1_1,第二次讀取到了D1包的剩余內(nèi)容D1_2和D2包的整包。如果此時服務(wù)端TCP接收滑窗非常小,而數(shù)據(jù)包D1和D2比較大,很有可能會發(fā)生第五種可能,即服務(wù)端分多次才能將D1和D2包接收完全,期間發(fā)生多次拆包。
1.2 解決思路
由于底層的TCP無法理解上層的業(yè)務(wù)數(shù)據(jù),所以在底層是無法保證數(shù)據(jù)包不被拆分和重組的,這個問題只能通過上層的應(yīng)用協(xié)議棧設(shè)計來解決,根據(jù)業(yè)界的主流協(xié)議的解決方案,可以歸納如下: 1. 消息定長,例如每個報文的大小為固定長度200字節(jié),如果不夠,空位補空格; 2. 在包尾增加回車換行符進行分割,例如FTP協(xié)議; 3. 將消息分為消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段,通常設(shè)計思路為消息頭的第一個字段使用int32來表示消息的總長度; 4. 更復(fù)雜的應(yīng)用層協(xié)議。
1.3 demo方案
作為socket長連接的demo,使用了上述的解決思路2,即在包尾增加回車換行符進行數(shù)據(jù)的分割,同時整體數(shù)據(jù)使用約定的Json體進行作為消息的傳輸格式。
使用換行符進行數(shù)據(jù)分割,可如下進行數(shù)據(jù)的單行讀取:
BufferedReader reader=new BufferedReader(new InputStreamReader(socket.getInputStream())); String message; while ((message=reader.readLine()) !=null) { //.... }
可如下進行數(shù)據(jù)的單行寫入:
PrintWriter writer=new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true); writer.println(message);
Json消息格式如下:
(1) 服務(wù)端接收消息實體類
@Data public class ServerReceiveDto implements Serializable { private static final long serialVersionUID=6600253865619639317L; /** * 功能碼 0 心跳 1 登陸 2 登出 3 發(fā)送消息 */ private Integer functionCode; /** * 用戶id */ private String userId; /** * 這邊假設(shè)是string的消息體 */ private String message; }
(2) 服務(wù)端發(fā)送消息實體類
@Data public class ServerSendDto implements Serializable { private static final long serialVersionUID=-7453297551797390215L; /** * 狀態(tài)碼 20000 成功,否則有errorMessage */ private Integer statusCode; private String message; /** * 功能碼 */ private Integer functionCode; /** * 錯誤消息 */ private String errorMessage; }
(3) 客戶端發(fā)送消息實體類
@Data public class ClientSendDto implements Serializable { private static final long serialVersionUID=97085384412852967L; /** * 功能碼 0 心跳 1 登陸 2 登出 3 發(fā)送消息 */ private Integer functionCode; /** * 用戶id */ private String userId; /** * 這邊假設(shè)是string的消息體 */ private String message; }
2. 客戶端或服務(wù)端掉線檢測功能
2.1 實現(xiàn)思路
通過自定義心跳包來實現(xiàn)掉線檢測功能,具體思路如下:
客戶端連接上服務(wù)端后,在服務(wù)端會維護一個在線客戶端列表。客戶端每隔一段時間,向服務(wù)端發(fā)送一個心跳包,服務(wù)端受收到包以后,會更新客戶端最近一次在線時間。一旦服務(wù)端超過規(guī)定時間沒有接收到客戶端發(fā)來的包,則視為掉線。
2.2 代碼實現(xiàn)
維護一個客戶端map,其中key代表用戶的唯一id(用戶唯一id的身份驗證下面會說明),value代表用戶對應(yīng)的一個實體
/** * 存儲當(dāng)前由用戶信息活躍的的socket線程 */ private ConcurrentMap<String, Connection> existSocketMap=new ConcurrentHashMap<>();
其中Connection對象包含的信息如下:
@Slf4j @Data public class Connection { /** * 當(dāng)前的socket連接實例 */ private Socket socket; /** * 當(dāng)前連接線程 */ private ConnectionThread connectionThread; /** * 當(dāng)前連接是否登陸 */ private boolean isLogin; /** * 存儲當(dāng)前的user信息 */ private String userId; /** * 創(chuàng)建時間 */ private Date createTime; /** * 最后一次更新時間,用于判斷心跳 */ private Date lastOnTime; }
主要關(guān)注其中的lastOnTime字段,每次服務(wù)端接收到標(biāo)識是心跳數(shù)據(jù),會更新當(dāng)前的lastOnTime字段,代碼如下:
if (functionCode.equals(FunctionCodeEnum.HEART.getValue())) { //心跳類型 connection.setLastOnTime(new Date()); //發(fā)送同樣的心跳數(shù)據(jù)給客戶端 ServerSendDto dto=new ServerSendDto(); dto.setFunctionCode(FunctionCodeEnum.HEART.getValue()); connection.println(JSONObject.toJSONString(dto)); }
額外會有一個監(jiān)測進程,以一定頻率來監(jiān)測上述維護的map中的每一個Connection對象,如果當(dāng)前時間與lastOnTime的時間間隔超過自定義的長度,則自動將其對應(yīng)的socket連接關(guān)閉,代碼如下:
Date now=new Date(); Date lastOnTime=connectionThread.getConnection().getLastOnTime(); long heartDuration=now.getTime() - lastOnTime.getTime(); if (heartDuration > SocketConstant.HEART_RATE) { //心跳超時,關(guān)閉當(dāng)前線程 log.error("心跳超時"); connectionThread.stopRunning(); }
在上面代碼中,服務(wù)端收到標(biāo)識是心跳數(shù)據(jù)的時候,除了更新該socket對應(yīng)的lastOnTime,還會同樣同樣心跳類型的數(shù)據(jù)給客戶端,客戶端收到標(biāo)識是心跳數(shù)據(jù)的時候也會更新自己的lastOnTime字段,同時也有一個心跳監(jiān)測線程在監(jiān)測當(dāng)前的socket連接心跳是否超時
3. 客戶端身份獲知、強制身份驗證
3.1 實現(xiàn)思路
通過代碼socket=serverSocket.accept()獲得的一個socket連接我們僅僅只能知道其客戶端的ip以及端口號,并不能獲知這個socket連接對應(yīng)的到底是哪一個客戶端,因此必須得先獲得客戶端的身份并且驗證通過其身份才能讓其正常連接。
具體的實現(xiàn)思路是:
自定義一個登陸處理接口,當(dāng)server端受到標(biāo)識是用戶登陸的時候(此時會攜帶用戶信息或者token,此處簡化為用戶id),調(diào)用用戶的登陸驗證,驗證通過的話則將該socket連接與用戶信息綁定,設(shè)置其為已登錄,并且封裝對應(yīng)的對象放入前面提的客戶端map中,由此可獲得具體用戶對應(yīng)的哪一個socket連接。
為了實現(xiàn)socket連接的強制驗證,在監(jiān)測線程中,也會判斷當(dāng)前用戶多長時間內(nèi)沒有實現(xiàn)登錄態(tài),若超時則認為該socket連接為非法連接,主動關(guān)閉該socket連接。
3.2 代碼實現(xiàn)
自定義登陸處理接口,這邊簡單以userId來判斷是否允許登陸:
public interface LoginHandler { /** * client登陸的處理函數(shù) * * @param userId 用戶id * * @return 是否驗證通過 */ boolean canLogin(String userId); }
收到客戶端發(fā)來的數(shù)據(jù)時候的處理:
if (functionCode.equals(FunctionCodeEnum.LOGIN.getValue())) { //登陸,身份驗證 String userId=receiveDto.getUserId(); if (socketServer.getLoginHandler().canLogin(userId)) { //設(shè)置用戶對象已登錄狀態(tài) connection.setLogin(true); connection.setUserId(userId); if (socketServer.getExistSocketMap().containsKey(userId)) { //存在已登錄的用戶,發(fā)送登出指令并主動關(guān)閉該socket Connection existConnection=socketServer.getExistSocketMap().get(userId); ServerSendDto dto=new ServerSendDto(); dto.setStatusCode(999); dto.setFunctionCode(FunctionCodeEnum.MESSAGE.getValue()); dto.setErrorMessage("force logout"); existConnection.println(JSONObject.toJSONString(dto)); existConnection.getConnectionThread().stopRunning(); log.error("用戶被客戶端重入踢出,userId:{}", userId); } //添加到已登錄map中 socketServer.getExistSocketMap().put(userId, connection); }
監(jiān)測線程判斷用戶是否完成身份驗證:
if (!connectionThread.getConnection().isLogin()) { //還沒有用戶登陸成功 Date createTime=connectionThread.getConnection().getCreateTime(); long loginDuration=now.getTime() - createTime.getTime(); if (loginDuration > SocketConstant.LOGIN_DELAY) { //身份驗證超時 log.error("身份驗證超時"); connectionThread.stopRunning(); } }
4. socket異常處理與垃圾線程回收
4.1 實現(xiàn)思路
socket在讀取數(shù)據(jù)或者發(fā)送數(shù)據(jù)的時候會出現(xiàn)各種異常,比如客戶端的socket已斷開連接(正常斷開或物理連接斷開等),但是服務(wù)端還在發(fā)送數(shù)據(jù)或者還在接受數(shù)據(jù)的過程中,此時socket會拋出相關(guān)異常,對于該異常的處理需要將自身的socket連接關(guān)閉,避免資源的浪費,同時由于是多線程方案,還需將該socket對應(yīng)的線程正常清理。
4.2 代碼實現(xiàn)
下面以server端發(fā)送數(shù)據(jù)為例,該代碼中加入了重試機制:
public void println(String message) { int count=0; PrintWriter writer; do { try { writer=new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true); writer.println(message); break; } catch (IOException e) { count++; if (count >=RETRY_COUNT) { //重試多次失敗,說明client端socket異常 this.connectionThread.stopRunning(); } } try { Thread.sleep(2 * 1000); } catch (InterruptedException e1) { log.error("Connection.println.IOException interrupt,userId:{}", userId); } } while (count < 3); }
上述調(diào)用的this.connectionThread.stopRunning()代碼如下:
public void stopRunning() { //設(shè)置線程對象狀態(tài),便于線程清理 isRunning=false; try { //異常情況需要將該socket資源釋放 socket.close(); } catch (IOException e) { log.error("ConnectionThread.stopRunning failed.exception:{}", e); } }
上述代碼中設(shè)置了線程對象的狀態(tài),下述代碼在監(jiān)測線程中執(zhí)行,將沒有運行的線程給清理掉
/** * 存儲只要有socket處理的線程 */ private List<ConnectionThread> existConnectionThreadList=Collections.synchronizedList(new ArrayList<>()); /** * 中間list,用于遍歷的時候刪除 */ private List<ConnectionThread> noConnectionThreadList=Collections.synchronizedList(new ArrayList<>()); //... //刪除list中沒有用的thread引用 existConnectionThreadList.forEach(connectionThread -> { if (!connectionThread.isRunning()) { noConnectionThreadList.add(connectionThread); } }); noConnectionThreadList.forEach(connectionThread -> { existConnectionThreadList.remove(connectionThread); if (connectionThread.getConnection().isLogin()) { //說明用戶已經(jīng)身份驗證成功了,需要刪除map this.existSocketMap.remove(connectionThread.getConnection().getUserId()); } }); noConnectionThreadList.clear();
四、項目結(jié)構(gòu)
由于使用了springboot框架來實現(xiàn)該demo,所以項目結(jié)構(gòu)如下:
整體項目結(jié)構(gòu)圖
socket工具包目錄如下:
socket工具包目錄
pom文件主要添加了springboot的相關(guān)依賴,以及json工具和lombok工具等,依賴如下:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.3.RELEASE</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency> </dependencies>
自己寫的socket工具包的使用方式如下:
@Configuration @Slf4j public class SocketServerConfig { @Bean public SocketServer socketServer() { SocketServer socketServer=new SocketServer(60000); socketServer.setLoginHandler(userId -> { log.info("處理socket用戶身份驗證,userId:{}", userId); //用戶名中包含了dingxu則允許登陸 return userId.contains("dingxu"); }); socketServer.setMessageHandler((connection, receiveDto) -> log .info("處理socket消息,userId:{},receiveDto:{}", connection.getUserId(), JSONObject.toJSONString(receiveDto))); socketServer.start(); return socketServer; } }
該demo中主要提供了以下幾個接口進行測試:
具體的postman文件也放已在項目中,具體可點此鏈接獲得
demo中還提供了一個簡單壓測函數(shù),如下:
@Slf4j public class SocketClientTest { public static void main(String[] args) { ExecutorService clientService=Executors.newCachedThreadPool(); String userId="dingxu"; for (int i=0; i < 1000; i++) { int index=i; clientService.execute(() -> { try { SocketClient client; client=new SocketClient(InetAddress.getByName("127.0.0.1"), 60000); //登陸 ClientSendDto dto=new ClientSendDto(); dto.setFunctionCode(FunctionCodeEnum.LOGIN.getValue()); dto.setUserId(userId + index); client.println(JSONObject.toJSONString(dto)); ScheduledExecutorService clientHeartExecutor=Executors.newSingleThreadScheduledExecutor( r -> new Thread(r, "socket_client+heart_" + r.hashCode())); clientHeartExecutor.scheduleWithFixedDelay(() -> { try { ClientSendDto heartDto=new ClientSendDto(); heartDto.setFunctionCode(FunctionCodeEnum.HEART.getValue()); client.println(JSONObject.toJSONString(heartDto)); } catch (Exception e) { log.error("客戶端異常,userId:{},exception:{}", userId, e.getMessage()); client.close(); } }, 0, 5, TimeUnit.SECONDS); while (true){ } } catch (Exception e) { log.error(e.getMessage()); } }); } } }
源碼地址如下,僅供學(xué)習(xí)參考
github.com/DavidDingXu/springboot-socket-demo
五、參考
隆匯6月29日丨極光(JG.US)盤前漲逾9%,報1.19美元。近日,極光的核心產(chǎn)品極光推送(JPush)順利通過亞馬遜云科技的多項測試及審核,正式上線亞馬遜云科技Marketplace。當(dāng)前,亞馬遜云科技Marketplace上的用戶,通過其中國區(qū)網(wǎng)站,即可直接購買和體驗極光推送(JPush)。
本文源自格隆匯
*請認真填寫需求信息,我們會在24小時內(nèi)與您取得聯(lián)系。