整合營銷服務商

          電腦端+手機端+微信端=數據同步管理

          免費咨詢熱線:

          輕松讀懂 Dubbo,手寫2W+文字 dubbo 原理解析(下篇)

          薦學習

          • 聽說你很懂源碼?Spring讀懂了?還有這20道源碼面試題接得住嗎?
          • 這是什么神仙面試寶典?半月看完25大專題,居然斬獲阿里P7offer
          • 牛掰!“基礎-中級-高級”Java程序員面試集結,看完獻出我的膝蓋

          01 服務發現

          1.1 服務發現流程

          整體duubo的服務消費原理

          Dubbo 框架做服務消費也分為兩大部分 , 第一步通過持有遠程服務實例生成Invoker,這個Invoker 在客戶端是核心的遠程代理對象 。 第二步會把Invoker 通過動態代理轉換成實現用戶接口的動態代理引用 。

          服務消費方引用服務的藍色初始化鏈,時序圖

          1.2 源碼分析應用

          引用入口:ReferenceBean 的getObject 方法,該方法定義在Spring 的FactoryBean 接口中,ReferenceBean 實現了這個方法。

          public Object getObject() throws Exception {
             return get();
          }
          public synchronized T get() {
             // 檢測 ref 是否為空,為空則通過 init 方法創建
             if (ref == null) {
                 // init 方法主要用于處理配置,以及調用 createProxy 生成代理類
                 init();
             }
             return ref;
          }
          

          Dubbo 提供了豐富的配置,用于調整和優化框架行為,性能等。Dubbo 在引用或導出服務時,首先會對這些配置進行檢查和處理,以保證配置的正確性。

          private void init() {
             // 創建代理類
             ref = createProxy(map);
          }

          此方法代碼很長,主要完成的配置加載,檢查,以及創建引用的代理對象。這里要從createProxy 開始看起。從字面意思上來看,createProxy 似乎只是用于創建代理對象的。但實際上并非如此,該方法還會調用其他方法構建以及合并Invoker 實例。具體細節如下。

          private T createProxy(Map<String, String> map) {
             URL tmpUrl = new URL("temp", "localhost", 0, map);
          ...........
          isDvmRefer = InjvmProtocol . getlnjvmProtocol( ) . islnjvmRefer(tmpUrl)
             // 本地引用略
             if (isJvmRefer) {
             } else {
                 // 點對點調用略
                 if (url != null && url.length() > 0) {
                     
                 } else {
                     // 加載注冊中心 url
                     List<URL> us = loadRegistries(false);
                     if (us != null && !us.isEmpty()) {
                         for (URL u : us) {
                             URL monitorUrl = loadMonitor(u);
                             if (monitorUrl != null) {
                                 map.put(Constants.MONITOR_KEY,
          URL.encode(monitorUrl.toFullString()));
                             }
                             // 添加 refer 參數到 url 中,并將 url 添加到 urls 中
                             urls.add(u.addParameterAndEncoded(Constants.REFER_KEY,
          StringUtils.toQueryString(map)));
                         }
                     }
                 }
                 // 單個注冊中心或服務提供者(服務直連,下同)
                 if (urls.size() == 1) {
                     // 調用 RegistryProtocol 的 refer 構建 Invoker 實例
                     invoker = refprotocol.refer(interfaceClass, urls.get(0));
                 // 多個注冊中心或多個服務提供者,或者兩者混合
                 } else {
                     List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                     URL registryURL = null;
                     // 獲取所有的 Invoker
                     for (URL url : urls) {
                         // 通過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時
                         // 根據 url 協議頭加載指定的 Protocol 實例,并調用實例的 refer 方法
                         invokers.add(refprotocol.refer(interfaceClass, url));
                         if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                             registryURL = url;
                         }
                     }
                     if (registryURL != null) {
                         // 如果注冊中心鏈接不為空,則將使用 AvailableCluster
                         URL u = registryURL.addParameter(Constants.CLUSTER_KEY,
          AvailableCluster.NAME);
                         // 創建 StaticDirectory 實例,并由 Cluster 對多個 Invoker 進行合并
                         invoker = cluster.join(new StaticDirectory(u, invokers));
                     } else {
                         invoker = cluster.join(new StaticDirectory(invokers));
                     }
                 }
             }
              //省略無關代碼...
              // 生成代理類
             return (T) proxyFactory.getProxy(invoker);
          }   

          上面代碼很多,不過邏輯比較清晰。
          1、如果是本地調用,直接jvm 協議從內存中獲取實例
          2、如果只有一個注冊中心,直接通過Protocol 自適應拓展類構建Invoker 實例接口
          3、如果有多個注冊中心,此時先根據url 構建Invoker。然后再通過Cluster 合并多個Invoker,最后調用ProxyFactory 生成代理類

          (1)創建客戶端

          在服務消費方,Invoker 用于執行遠程調用。Invoker 是由Protocol 實現類構建而來。Protocol 實現類有很多,這里分析DubboProtocol

          public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
             optimizeSerialization(url);
             // 創建 DubboInvoker
             DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url,
          getClients(url), invokers);
             invokers.add(invoker);
             return invoker;
          }
          

          上面方法看起來比較簡單,創建一個DubboInvoker。通過構造方法傳入遠程調用的client對象。默認情況下,Dubbo 使用NettyClient 進行通信。接下來,我們簡單看一下getClients 方法的邏輯。

          private ExchangeClient[] getClients(URL url) {
             // 是否共享連接
             boolean service_share_connect = false;
          // 獲取連接數,默認為0,表示未配置
             int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
             // 如果未配置 connections,則共享連接
             if (connections == 0) {
                 service_share_connect = true;
                 connections = 1;
             }
             ExchangeClient[] clients = new ExchangeClient[connections];
             for (int i = 0; i < clients.length; i++) {
                 if (service_share_connect) {
                     // 獲取共享客戶端
                     clients[i] = getSharedClient(url);
                 } else {
                     // 初始化新的客戶端
                     clients[i] = initClient(url);
                 }
             }
             return clients;
          }
          

          這里根據connections 數量決定是獲取共享客戶端還是創建新的客戶端實例,getSharedClient 方法中也會調用initClient 方法,因此下面我們一起看一下這個方法。

          private ExchangeClient initClient(URL url) {
             // 獲取客戶端類型,默認為 netty
             String str = url.getParameter(Constants.CLIENT_KEY,
          url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
             //省略無關代碼
             ExchangeClient client;
             try {
                 // 獲取 lazy 配置,并根據配置值決定創建的客戶端類型
                 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                     // 創建懶加載 ExchangeClient 實例
                     client = new LazyConnectExchangeClient(url, requestHandler);
                 } else {
                     // 創建普通 ExchangeClient 實例
                     client = Exchangers.connect(url, requestHandler);
                 }
             } catch (RemotingException e) {
                 throw new RpcException("Fail to create remoting client for service...");
             }
             return client;
          }
          

          initClient 方法首先獲取用戶配置的客戶端類型,默認為netty。下面我們分析一下Exchangers 的connect 方法。

          public static ExchangeClient connect(URL url, ExchangeHandler handler) throws
          RemotingException {
             // 獲取 Exchanger 實例,默認為 HeaderExchangeClient
             return getExchanger(url).connect(url, handler);
          }
          

          如上,getExchanger 會通過SPI 加載HeaderExchangeClient 實例,這個方法比較簡單,大家自己看一下吧。接下來分析HeaderExchangeClient 的實現。

          public ExchangeClient connect(URL url, ExchangeHandler handler) throws
          RemotingException {
             // 這里包含了多個調用,分別如下:
             // 1. 創建 HeaderExchangeHandler 對象
             // 2. 創建 DecodeHandler 對象
             // 3. 通過 Transporters 構建 Client 實例
             // 4. 創建 HeaderExchangeClient 對象
             return new HeaderExchangeClient(Transporters.connect(url, new
          DecodeHandler(new HeaderExchangeHandler(handler))), true);
          }
          

          這里的調用比較多,我們這里重點看一下Transporters 的connect 方法。如下:

          public static Client connect(URL url, ChannelHandler... handlers) throws
          RemotingException {
             if (url == null) {
                 throw new IllegalArgumentException("url == null");
                 }
             ChannelHandler handler;
             if (handlers == null || handlers.length == 0) {
                 handler = new ChannelHandlerAdapter();
             } else if (handlers.length == 1) {
                 handler = handlers[0];
             } else {
                 // 如果 handler 數量大于1,則創建一個 ChannelHandler 分發器
                 handler = new ChannelHandlerDispatcher(handlers);
             }
             
             // 獲取 Transporter 自適應拓展類,并調用 connect 方法生成 Client 實例
             return getTransporter().connect(url, handler);
          }
          

          如上,getTransporter 方法返回的是自適應拓展類,該類會在運行時根據客戶端類型加載指定的Transporter 實現類。若用戶未配置客戶端類型,則默認加載NettyTransporter,并調用該類的connect 方法。如下:

          public Client connect(URL url, ChannelHandler listener) throws RemotingException
          {
             // 創建 NettyClient 對象
             return new NettyClient(url, listener);
          }
          

          (2)注冊

          這里就已經創建好了NettyClient對象。關于DubboProtocol 的refer 方法就分析完了。接下來,繼續分析RegistryProtocol 的refer 方法邏輯。

          public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
             // 取 registry 參數值,并將其設置為協議頭
             url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY,
          Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
             // 獲取注冊中心實例
             Registry registry = registryFactory.getRegistry(url);
             if (RegistryService.class.equals(type)) {
                 return proxyFactory.getInvoker((T) registry, type, url);
             }
             // 將 url 查詢字符串轉為 Map
             Map<String, String> qs =
          StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
             // 獲取 group 配置
             String group = qs.get(Constants.GROUP_KEY);
             if (group != null && group.length() > 0) {
                 if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                         || "*".equals(group)) {
                     // 通過 SPI 加載 MergeableCluster 實例,并調用 doRefer 繼續執行服務引用邏輯
                     return doRefer(getMergeableCluster(), registry, type, url);
                 }
             }
             
             // 調用 doRefer 繼續執行服務引用邏輯
             return doRefer(cluster, registry, type, url);
          }

          上面代碼首先為url 設置協議頭,然后根據url 參數加載注冊中心實例。然后獲取group 配置,根據group 配置決定doRefer 第一個參數的類型。這里的重點是doRefer 方法,如下:

          private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T>
          type, URL url) {
             // 創建 RegistryDirectory 實例
             RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
             // 設置注冊中心和協議
             directory.setRegistry(registry);
             directory.setProtocol(protocol);
             Map<String, String> parameters = new HashMap<String, String>
          (directory.getUrl().getParameters());
             // 生成服務消費者鏈接
             URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL,
          parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
             // 注冊服務消費者,在 consumers 目錄下新節點
             if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                     && url.getParameter(Constants.REGISTER_KEY, true)) {
                 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY,
          Constants.CONSUMERS_CATEGORY,
                         Constants.CHECK_KEY, String.valueOf(false)));
             }
             // 訂閱 providers、configurators、routers 等節點數據
             directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                     Constants.PROVIDERS_CATEGORY
                             + "," + Constants.CONFIGURATORS_CATEGORY
                             + "," + Constants.ROUTERS_CATEGORY));
             // 一個注冊中心可能有多個服務提供者,因此這里需要將多個服務提供者合并為一個
             Invoker invoker = cluster.join(directory);
             ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl,
          directory);
             return invoker;
          }

          如上,doRefer 方法創建一個RegistryDirectory 實例,然后生成服務者消費者鏈接,并向注冊中心進行注冊。注冊完畢后,緊接著訂閱providers、configurators、routers 等節點下的數據。完成訂閱后,RegistryDirectory 會收到這幾個節點下的子節點信息。由于一個服務可能部署在多臺服務器上,這樣就會在providers 產生多個節點,這個時候就需要Cluster 將多個服務節點合并為一個,并生成一個Invoker。

          (3)創建代理對象

          Invoker 創建完畢后,接下來要做的事情是為服務接口生成代理對象。有了代理對象,即可進行遠程調用。代理對象生成的入口方法為ProxyFactory 的getProxy,接下來進行分析。

          public <T> T getProxy(Invoker<T> invoker) throws RpcException {
             // 調用重載方法
             return getProxy(invoker, false);
          }
          public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
             Class<?>[] interfaces = null;
             // 獲取接口列表
              String config = invoker.getUrl().getParameter("interfaces");
             if (config != null && config.length() > 0) {
                 // 切分接口列表
                 String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
                 if (types != null && types.length > 0) {
                     interfaces = new Class<?>[types.length + 2];
                     // 設置服務接口類和 EchoService.class 到 interfaces 中
                     interfaces[0] = invoker.getInterface();
                     interfaces[1] = EchoService.class;
                     for (int i = 0; i < types.length; i++) {
                         // 加載接口類
                         interfaces[i + 1] = ReflectUtils.forName(types[i]);
                     }
                 }
             }
             if (interfaces == null) {
                 interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
             }
             // 為 http 和 hessian 協議提供泛化調用支持,參考 pull request #1827
             if (!invoker.getInterface().equals(GenericService.class) && generic) {
                 int len = interfaces.length;
                 Class<?>[] temp = interfaces;
                 // 創建新的 interfaces 數組
                 interfaces = new Class<?>[len + 1];
                 System.arraycopy(temp, 0, interfaces, 0, len);
                 // 設置 GenericService.class 到數組中
                 interfaces[len] = GenericService.class;
             }
             // 調用重載方法
             return getProxy(invoker, interfaces);
          }
          public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
          

          如上,上面大段代碼都是用來獲取interfaces 數組的,我們繼續往下看。getProxy(Invoker, Class<?>[]) 這個方法是一個抽象方法,下面我們到JavassistProxyFactory 類中看一下該方法的實現代碼。

          public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
             // 生成 Proxy 子類(Proxy 是抽象類)。并調用 Proxy 子類的 newInstance 方法創建Proxy 實例
             return (T) Proxy.getProxy(interfaces).newInstance(new
          InvokerInvocationHandler(invoker));
          }
          

          上面代碼并不多,首先是通過Proxy 的getProxy 方法獲取Proxy 子類,然后創建InvokerInvocationHandler 對象,并將該對象傳給newInstance 生成Proxy 實例。InvokerInvocationHandler 實現JDK 的InvocationHandler 接口,具體的用途是攔截接口類調用。下面以org.apache.dubbo.demo.DemoService 這個接口為例,來看一下該接口代理類代碼大致是怎樣的(忽略EchoService 接口)。

          package org.apache.dubbo.common.bytecode;
          public class proxy0 implements org.apache.dubbo.demo.DemoService {
             public static java.lang.reflect.Method[] methods;
             private java.lang.reflect.InvocationHandler handler;
             public proxy0() {
             }
             public proxy0(java.lang.reflect.InvocationHandler arg0) {
                 handler = $1;
             }
             public java.lang.String sayHello(java.lang.String arg0) {
                 Object[] args = new Object[1];
                 args[0] = ($w) $1;
                 Object ret = handler.invoke(this, methods[0], args);
                 return (java.lang.String) ret;
             }
          }
          

          好了,到這里代理類生成邏輯就分析完了。整個過程比較復雜,大家需要耐心看一下。

          1.3 總結

          1. 從注冊中心發現引用服務:在有注冊中心,通過注冊中心發現提供者地址的情況下,ReferenceConfig 解析出的URL 格式為: registry://registryhost:/org.apache.registry.RegistryService?refer=URL.encode("conumerhost/com.foo.FooService?version=1.0.0") 。
          2. 通過URL 的registry://協議頭識別,就會調用RegistryProtocol#refer()方法
          3. 查詢提供者URL,如 dubbo://service-host/com.foo.FooService?version=1.0.0 ,來獲取注冊中心
          4. 創建一個RegistryDirectory 實例并設置注冊中心和協議
          5. 生成conusmer 連接,在consumer 目錄下創建節點,向注冊中心注冊
          6. 注冊完畢后,訂閱providers,configurators,routers 等節點的數據
          7. 通過URL 的 dubbo:// 協議頭識別,調用 DubboProtocol#refer() 方法,創建一個
            ExchangeClient 客戶端并返回DubboInvoker 實例
          8. 由于一個服務可能會部署在多臺服務器上,這樣就會在providers 產生多個節點,這樣也就會得到多個DubboInvoker 實例,就需要RegistryProtocol 調用Cluster 將多個服務提供者節點偽裝成一個節點,并返回一個Invoker
          9. Invoker 創建完畢后,調用ProxyFactory 為服務接口生成代理對象,返回提供者引用

          02 網絡通信

          在之前的內容中,我們分析了消費者端服務發現與提供者端服務暴露的相關內容,同時也知道消費者端通過內置的負載均衡算法獲取合適的調用invoker進行遠程調用。接下來我們再研究下遠程調用過程即網絡通信。

          網絡通信位于Remoting模塊:

          • Remoting 實現是Dubbo 協議的實現,如果你選擇RMI 協議,整個Remoting 都不會用上;
          • Remoting 內部再劃為 Transport 傳輸層 和 Exchange 信息交換層 ;
          • Transport 層只負責單向消息傳輸,是對Mina, Netty, Grizzly 的抽象,它也可以擴展UDP 傳輸;
          • Exchange 層是在傳輸層之上封裝了Request-Response 語義;

          網絡通信的問題:

          • 客戶端與服務端連通性問題
          • 粘包拆包問題
          • 異步多線程數據一致問題

          2.1 通信協議

          dubbo內置,dubbo協議 ,rmi協議,hessian協議,http協議,webservice協議,thrift協議,rest協議,grpc協議,memcached協議,redis協議等10種通訊協議。各個協議特點如下

          dubbo協議

          Dubbo 缺省協議采用單一長連接和NIO 異步通訊,適合于小數據量大并發的服務調用,以及服務消費者機器數遠大于服務提供者機器數的情況。

          • 缺省協議,使用基于mina 1.1.7 和hessian 3.2.1 的tbremoting 交互。
          • 連接個數:單連接
          • 連接方式:長連接
          • 傳輸協議:TCP
          • 傳輸方式:NIO 異步傳輸
          • 序列化:Hessian 二進制序列化
          • 適用范圍:傳入傳出參數數據包較小(建議小于100K),消費者比提供者個數多,單一消費者無法壓滿提供者,盡量不要用dubbo 協議傳輸大文件或超大字符串。
          • 適用場景:常規遠程服務方法調用

          rmi協議

          RMI 協議采用JDK 標準的 java.rmi.* 實現,采用阻塞式短連接和JDK 標準序列化方式。

          • 連接個數:多連接
          • 連接方式:短連接
          • 傳輸協議:TCP
          • 傳輸方式:同步傳輸
          • 序列化:Java 標準二進制序列化
          • 適用范圍:傳入傳出參數數據包大小混合,消費者與提供者個數差不多,可傳文件。
          • 適用場景:常規遠程服務方法調用,與原生RMI服務互操作

          hessian協議

          Hessian 協議用于集成Hessian 的服務,Hessian 底層采用Http 通訊,采用Servlet 暴露服務,

          Dubbo 缺省內嵌Jetty 作為服務器實現。

          Dubbo 的Hessian 協議可以和原生Hessian 服務互操作,即:提供者用Dubbo 的Hessian 協議暴露服務,消費者直接用標準Hessian 接口調用或者提供方用標準Hessian 暴露服務,消費方用Dubbo 的Hessian 協議調用。

          • 連接個數:多連接
          • 連接方式:短連接
          • 傳輸協議:HTTP
          • 傳輸方式:同步傳輸
          • 序列化:Hessian二進制序列化
          • 適用范圍:傳入傳出參數數據包較大,提供者比消費者個數多,提供者壓力較大,可傳文件。
          • 適用場景:頁面傳輸,文件傳輸,或與原生hessian服務互操作

          http協議

          基于HTTP 表單的遠程調用協議,采用Spring 的HttpInvoker 實現

          • 連接個數:多連接
          • 連接方式:短連接
          • 傳輸協議:HTTP
          • 傳輸方式:同步傳輸
          • 序列化:表單序列化
          • 適用范圍:傳入傳出參數數據包大小混合,提供者比消費者個數多,可用瀏覽器查看,可用表單或URL傳入參數,暫不支持傳文件。
          • 適用場景:需同時給應用程序和瀏覽器JS 使用的服務。

          webservice協議
          基于WebService 的遠程調用協議,基于Apache CXF 實現](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/webservice.html#fn2)。

          可以和原生WebService 服務互操作,即:提供者用Dubbo 的WebService 協議暴露服務,消費者直接用標準WebService 接口調用,或者提供方用標準WebService 暴露服務,消費方用Dubbo 的WebService 協議調用。

          • 連接個數:多連接
          • 連接方式:短連接
          • 傳輸協議:HTTP
          • 傳輸方式:同步傳輸
          • 序列化:SOAP 文本序列化(http + xml)
          • 適用場景:系統集成,跨語言調用

          thrift協議

          當前dubbo 支持[1]的thrift 協議是對thrift 原生協議[2] 的擴展,在原生協議的基礎上添加了一些額外的頭信息,比如service name,magic number 等。

          rest協議

          基于標準的Java REST API——JAX-RS 2.0(Java API for RESTful Web Services的簡寫)實現的REST調用支持

          grpc協議

          Dubbo 自2.7.5 版本開始支持gRPC 協議,對于計劃使用HTTP/2 通信,或者想利用gRPC 帶來的Stream、反壓、Reactive 編程等能力的開發者來說, 都可以考慮啟用gRPC 協議。

          為期望使用gRPC 協議的用戶帶來服務治理能力,方便接入Dubbo 體系用戶可以使用Dubbo 風格的,基于接口的編程風格來定義和使用遠程服務

          memcached協議

          基于memcached實現的RPC 協議

          redis協議

          基于Redis 實現的RPC 協議

          2.2 序列化

          序列化就是將對象轉成字節流,用于網絡傳輸,以及將字節流轉為對象,用于在收到字節流數據后還原成對象。序列化的優勢有很多,例如安全性更好、可跨平臺等。我們知道dubbo基于netty進行網絡通訊,在NettyClient.doOpen() 方法中可以看到Netty的相關類

          bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
             public ChannelPipeline getPipeline() {
                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(),
          NettyClient.this);
                 ChannelPipeline pipeline = Channels.pipeline();
                 pipeline.addLast("decoder", adapter.getDecoder());
                 pipeline.addLast("encoder", adapter.getEncoder());
                 pipeline.addLast("handler", nettyHandler);
                 return pipeline;
             }
          });
          

          然后去看NettyCodecAdapter 類最后進入ExchangeCodec類的encodeRequest方法,如下:

          protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request
          req) throws IOException {
                 Serialization serialization = getSerialization(channel);
                 // header.
                 byte[] header = new byte[HEADER_LENGTH];
          

          是的,就是Serialization接口,默認是Hessian2Serialization序列化接口。

          Dubbo序列化支持java、compactedjava、nativejava、fastjson、dubbo、fst、hessian2、kryo,protostuff其中默認hessian2。其中java、compactedjava、nativejava屬于原生java的序列化。

          dubbo序列化:阿里尚未開發成熟的高效java序列化實現,阿里不建議在生產環境使用它。
          hessian2序列化:hessian是一種跨語言的高效二進制序列化方式。但這里實際不是原生的hessian2序列化,而是阿里修改過的,它是dubbo RPC默認啟用的序列化方式。
          json序列化:目前有兩種實現,一種是采用的阿里的fastjson庫,另一種是采用dubbo中自己實現的簡單json庫,但其實現都不是特別成熟,而且json這種文本序列化性能一般不如上面兩種二進制序列化。
          java序列化:主要是采用JDK自帶的Java序列化實現,性能很不理想。

          2.3 網絡通信

          (1)Dubbo中的數據格式

          解決socket中數據粘包拆包問題,一般有三種方式

          定長協議(數據包長度一致)

          定長的協議是指協議內容的長度是固定的,比如協議byte長度是50,當從網絡上讀取50個byte后,就進行decode解碼操作。定長協議在讀取或者寫入時,效率比較高,因為數據緩存的大小基本都確定了,就好比數組一樣,缺陷就是適應性不足,以RPC場景為例,很難估計出定長的長度是多少。

          特殊結束符(數據尾:通過特殊的字符標識#)

          相比定長協議,如果能夠定義一個特殊字符作為每個協議單元結束的標示,就能夠以變長的方式進行通信,從而在數據傳輸和高效之間取得平衡,比如用特殊字符 \n 。特殊結束符方式的問題是過于簡單的思考了協議傳輸的過程,對于一個協議單元必須要全部讀入才能夠進行處理,除此之外必須要防止用戶傳輸的數據不能同結束符相同,否則就會出現紊亂。

          變長協議(協議頭+payload模式)

          這種一般是自定義協議,會以定長加不定長的部分組成,其中定長的部分需要描述不定長的內容長度。dubbo就是使用這種形式的數據傳輸格式

          Dubbo 數據包分為消息頭和消息體,消息頭用于存儲一些元信息,比如魔數(Magic),數據包類型(Request/Response),消息體長度(Data Length)等。消息體中用于存儲具體的調用消息,比如方法名稱,參數列表等。下面簡單列舉一下消息頭的內容。

          偏移量(Bit) 字段 取值

          0 ~ 7 魔數高位 0xda00

          8 ~ 15 魔數低位 0xbb

          16 數據包類型 0 - Response, 1 - Request

          17 調用方式 僅在第16位被設為1的情況下有效,0 - 單向調用,1 - 雙向調用

          18 事件標 識 0 - 當前數據包是請求或響應包,1 - 當前數據包是心跳包

          19 ~23 序列化器編號 2 - Hessian2Serialization
          3 - JavaSerialization
          4 - CompactedJavaSerialization
          6 - FastJsonSerialization
          7 - NativeJavaSerialization
          8 - KryoSerialization
          9 - FstSerialization

          24 ~31 狀態 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 -BAD_REQUEST 50 - BAD_RESPONSE ......

          32 ~95 請求編號 共8字節,運行時生成

          96 ~127 消息體長度 運行時計算

          (2)消費端發送請求

          /**
          *proxy0#sayHello(String)
          *—> InvokerInvocationHandler#invoke(Object, Method, Object[])
          *   —> MockClusterInvoker#invoke(Invocation)
          *     —> AbstractClusterInvoker#invoke(Invocation)
          *       —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>,LoadBalance)
          *         —> Filter#invoke(Invoker, Invocation)  // 包含多個 Filter 調用
          *          —> ListenerInvokerWrapper#invoke(Invocation)
          *             —> AbstractInvoker#invoke(Invocation)
          *              —> DubboInvoker#doInvoke(Invocation)
          *                 —> ReferenceCountExchangeClient#request(Object, int)
          *                   —> HeaderExchangeClient#request(Object, int)
          *                     —> HeaderExchangeChannel#request(Object, int)
          *                       —> AbstractPeer#send(Object)
          *                         —> AbstractClient#send(Object, boolean)
          *                          —> NettyChannel#send(Object, boolean)
          *                             —> NioClientSocketChannel#write(Object)
          */

          dubbo消費方,自動生成代碼對象如下

          public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
             private InvocationHandler handler;
             public String sayHello(String string) {
                 // 將參數存儲到 Object 數組中
                 Object[] arrobject = new Object[]{string};
                 // 調用 InvocationHandler 實現類的 invoke 方法得到調用結果
                 Object object = this.handler.invoke(this, methods[0], arrobject);
                 // 返回調用結果
                 return (String)object;
             }
          }

          InvokerInvocationHandler 中的invoker 成員變量類型為MockClusterInvoker,MockClusterInvoker內部封裝了服務降級邏輯。下面簡單看一下:

          public Result invoke(Invocation invocation) throws RpcException {
                 Result result = null;
          // 獲取 mock 配置值
                 String value =
          directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY,
          Boolean.FALSE.toString()).trim();
                 if (value.length() == 0 || value.equalsIgnoreCase("false")) {
                      // 無 mock 邏輯,直接調用其他 Invoker 對象的 invoke 方法,
                     // 比如 FailoverClusterInvoker
                     result = this.invoker.invoke(invocation);
                 } else if (value.startsWith("force")) {
                     // force:xxx 直接執行 mock 邏輯,不發起遠程調用
                     result = doMockInvoke(invocation, null);
                 } else {
                      // fail:xxx 表示消費方對調用服務失敗后,再執行 mock 邏輯,不拋出異常
                     try {
                         result = this.invoker.invoke(invocation);
                     } catch (RpcException e) {
                          // 調用失敗,執行 mock 邏輯
                         result = doMockInvoke(invocation, e);
                     }
                 }
                 return result;
             }

          考慮到前文已經詳細分析過FailoverClusterInvoker,因此本節略過FailoverClusterInvoker,直接分析DubboInvoker。

          public abstract class AbstractInvoker<T> implements Invoker<T> {
             
             public Result invoke(Invocation inv) throws RpcException {
                 if (destroyed.get()) {
                     throw new RpcException("Rpc invoker for service ...");
                      }
                 RpcInvocation invocation = (RpcInvocation) inv;
                 // 設置 Invoker
                 invocation.setInvoker(this);
                 if (attachment != null && attachment.size() > 0) {
                     // 設置 attachment
                     invocation.addAttachmentsIfAbsent(attachment);
                 }
                 Map<String, String> contextAttachments =
          RpcContext.getContext().getAttachments();
                 if (contextAttachments != null && contextAttachments.size() != 0) {
                     // 添加 contextAttachments 到 RpcInvocation#attachment 變量中
                     invocation.addAttachments(contextAttachments);
                 }
                 if (getUrl().getMethodParameter(invocation.getMethodName(),
          Constants.ASYNC_KEY, false)) {
                     // 設置異步信息到 RpcInvocation#attachment 中
                     invocation.setAttachment(Constants.ASYNC_KEY,
          Boolean.TRUE.toString());
                 }
                 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
                 try {
                     // 抽象方法,由子類實現
                     return doInvoke(invocation);
                 } catch (InvocationTargetException e) {
                     // ...
                 } catch (RpcException e) {
                     // ...
                 } catch (Throwable e) {
                     return new RpcResult(e);
                 }
             }
             protected abstract Result doInvoke(Invocation invocation) throws Throwable;
             
             // 省略其他方法
          }
          

          上面的代碼來自AbstractInvoker 類,其中大部分代碼用于添加信息到RpcInvocation#attachment 變量中,添加完畢后,調用doInvoke 執行后續的調用。doInvoke 是一個抽象方法,需要由子類實現,下面到DubboInvoker 中看一下。

          @Override
             protected Result doInvoke(final Invocation invocation) throws Throwable {
                 RpcInvocation inv = (RpcInvocation) invocation;
                 final String methodName = RpcUtils.getMethodName(invocation);
                 //將目標方法以及版本號作為參數放入到Invocation中
                 inv.setAttachment(PATH_KEY, getUrl().getPath());
                 inv.setAttachment(VERSION_KEY, version);
                 //獲得客戶端連接
                 ExchangeClient currentClient; //初始化invoker的時候,構建的一個遠程通信連接
                 if (clients.length == 1) { //默認
                     currentClient = clients[0];
                 } else {
                     //通過取模獲得其中一個連接
                      currentClient = clients[index.getAndIncrement() % clients.length];
                 }
                 try {
                     //表示當前的方法是否存在返回值
                     boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                     int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY,
          DEFAULT_TIMEOUT);
                     //isOneway 為 true,表示“單向”通信
                     if (isOneway) {//異步無返回值
                         boolean isSent = getUrl().getMethodParameter(methodName,
          Constants.SENT_KEY, false);
                         currentClient.send(inv, isSent);
                         RpcContext.getContext().setFuture(null);
                         return AsyncRpcResult.newDefaultAsyncResult(invocation);
                     } else { //存在返回值
                         //是否采用異步
                         AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                         CompletableFuture<Object> responseFuture =
          currentClient.request(inv, timeout);
                         responseFuture.whenComplete((obj, t) -> {
                             if (t != null) {
                                 asyncRpcResult.completeExceptionally(t);
                             } else {
                                 asyncRpcResult.complete((AppResponse) obj);
                             }
                         });
                         RpcContext.getContext().setFuture(new
          FutureAdapter(asyncRpcResult));
                         return asyncRpcResult;
                     }
                 }
                 //省略無關代碼
             }

          最終進入到HeaderExchangeChannel#request方法,拼裝Request并將請求發送出去

          public CompletableFuture<Object> request(Object request, int timeout) throws
          RemotingException {
                 if (closed) {
                     throw new RemotingException(this.getLocalAddress(), null, "Failed
          tosend request " + request + ", cause: The channel " + this + " is closed!");
                 }
                 // 創建請求對象
                 Request req = new Request();
                 req.setVersion(Version.getProtocolVersion());
                 req.setTwoWay(true);
                 req.setData(request);
                 DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
                 try {
                     //NettyClient
                     channel.send(req);
                 } catch (RemotingException e) {
                     future.cancel();
                     throw e;
                 }
                 return future;
             }

          請求編碼如何做的?

          在netty啟動時,我們設置了編解碼器,其中通過ExchangeCodec完成編解碼工作如下:

          public class ExchangeCodec extends TelnetCodec {
             // 消息頭長度
             protected static final int HEADER_LENGTH = 16;
             // 魔數內容
             protected static final short MAGIC = (short) 0xdabb;
             protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
             protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
             protected static final byte FLAG_REQUEST = (byte) 0x80;
             protected static final byte FLAG_TWOWAY = (byte) 0x40;
             protected static final byte FLAG_EVENT = (byte) 0x20;
             protected static final int SERIALIZATION_MASK = 0x1f;
             private static final Logger logger =
          LoggerFactory.getLogger(ExchangeCodec.class);
             public Short getMagicCode() {
                 return MAGIC;
             }
             @Override
             public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws
          IOException {
                 if (msg instanceof Request) {
                     // 對 Request 對象進行編碼
                     encodeRequest(channel, buffer, (Request) msg);
                 } else if (msg instanceof Response) {
                     // 對 Response 對象進行編碼,后面分析
                     encodeResponse(channel, buffer, (Response) msg);
                 } else {
                     super.encode(channel, buffer, msg);
                 }
             }
             protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request
          req) throws IOException {
                 Serialization serialization = getSerialization(channel);
                 // 創建消息頭字節數組,長度為 16
                 byte[] header = new byte[HEADER_LENGTH];
                 // 設置魔數
                 Bytes.short2bytes(MAGIC, header);
                 // 設置數據包類型(Request/Response)和序列化器編號
                 header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
                 // 設置通信方式(單向/雙向)
                 if (req.isTwoWay()) {
                     header[2] |= FLAG_TWOWAY;
                 }
                 
                 // 設置事件標識
                 if (req.isEvent()) { header[2] |= FLAG_EVENT;
                 }
                 // 設置請求編號,8個字節,從第4個字節開始設置
                 Bytes.long2bytes(req.getId(), header, 4);
                 // 獲取 buffer 當前的寫位置
                 int savedWriteIndex = buffer.writerIndex();
                 // 更新 writerIndex,為消息頭預留 16 個字節的空間
                 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
                 ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
                 // 創建序列化器,比如 Hessian2ObjectOutput
                 ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
                 if (req.isEvent()) {
                     // 對事件數據進行序列化操作
                     encodeEventData(channel, out, req.getData());
                 } else {
                     // 對請求數據進行序列化操作
                     encodeRequestData(channel, out, req.getData(), req.getVersion());
                 }
                 out.flushBuffer();
                 if (out instanceof Cleanable) {
                     ((Cleanable) out).cleanup();
                 }
                 bos.flush();
                 bos.close();
                 
                 // 獲取寫入的字節數,也就是消息體長度
                 int len = bos.writtenBytes();
                 checkPayload(channel, len);
                 // 將消息體長度寫入到消息頭中
                 Bytes.int2bytes(len, header, 12);
                 // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備
                 buffer.writerIndex(savedWriteIndex);
                 // 從 savedWriteIndex 下標處寫入消息頭
                 buffer.writeBytes(header);
                 // 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
                 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
             }
             
             // 省略其他方法
          }

          以上就是請求對象的編碼過程,該過程首先會通過位運算將消息頭寫入到header 數組中。然后對Request 對象的data 字段執行序列化操作,序列化后的數據最終會存儲到ChannelBuffer 中。序列化操作執行完后,可得到數據序列化后的長度len,緊接著將len 寫入到header 指定位置處。最后再將消息頭字節數組header 寫入到ChannelBuffer 中,整個編碼過程就結束了。本節的最后,我們再來看一下Request 對象的data 字段序列化過程,也就是encodeRequestData 方法的邏輯,如下:

          public class DubboCodec extends ExchangeCodec implements Codec2 {
             
          protected void encodeRequestData(Channel channel, ObjectOutput out, Object
          data, String version) throws IOException {
                 RpcInvocation inv = (RpcInvocation) data;
               // 依次序列化 dubbo version、path、version
                 out.writeUTF(version);
                 out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
                 out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
                 // 序列化調用方法名
                 out.writeUTF(inv.getMethodName());
                 // 將參數類型轉換為字符串,并進行序列化
                 out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
                 Object[] args = inv.getArguments();
                 if (args != null)
                     for (int i = 0; i < args.length; i++) {
                         // 對運行時參數進行序列化
                         out.writeObject(encodeInvocationArgument(channel, inv, i));
                     }
                 
                 // 序列化 attachments
                 out.writeObject(inv.getAttachments());
             }
          }

          至此,關于服務消費方發送請求的過程就分析完了,接下來我們來看一下服務提供方是如何接收請求的。

          (3)提供方接受請求

          請求如何解碼?

          這里直接分析請求數據的解碼邏輯,忽略中間過程,如下:

          public class ExchangeCodec extends TelnetCodec {
             
             @Override
             public Object decode(Channel channel, ChannelBuffer buffer) throws
          IOException {
                 int readable = buffer.readableBytes();
                 // 創建消息頭字節數組
                 byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
                 // 讀取消息頭數據
                 buffer.readBytes(header);
                 // 調用重載方法進行后續解碼工作
                 return decode(channel, buffer, readable, header);
             }
             @Override
             protected Object decode(Channel channel, ChannelBuffer buffer, int readable,
          byte[] header) throws IOException {
                 // 檢查魔數是否相等
                 if (readable > 0 && header[0] != MAGIC_HIGH
                         || readable > 1 && header[1] != MAGIC_LOW) {
                     int length = header.length;
                     if (header.length < readable) {
                         header = Bytes.copyOf(header, readable);
                         buffer.readBytes(header, length, readable - length);
                     }
                     for (int i = 1; i < header.length - 1; i++) {
                         if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                              buffer.readerIndex(buffer.readerIndex() - header.length +
          i);
                             header = Bytes.copyOf(header, i);
                             break;
                         }
                     }
                     // 通過 telnet 命令行發送的數據包不包含消息頭,所以這里
                     // 調用 TelnetCodec 的 decode 方法對數據包進行解碼
                     return super.decode(channel, buffer, readable, header);
                 }
                 
                 // 檢測可讀數據量是否少于消息頭長度,若小于則立即返回
          DecodeResult.NEED_MORE_INPUT
                 if (readable < HEADER_LENGTH) {
                     return DecodeResult.NEED_MORE_INPUT;
                 }
                 // 從消息頭中獲取消息體長度
                 int len = Bytes.bytes2int(header, 12);
                 // 檢測消息體長度是否超出限制,超出則拋出異常
                 checkPayload(channel, len);
                 int tt = len + HEADER_LENGTH;
                 // 檢測可讀的字節數是否小于實際的字節數
                 if (readable < tt) {
                     return DecodeResult.NEED_MORE_INPUT;
                 }
                 
                 ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
                 try {
                     // 繼續進行解碼工作
                     return decodeBody(channel, is, header);
                 } finally {
                     if (is.available() > 0) {
                         try {
                             StreamUtils.skipUnusedStream(is);
                         } catch (IOException e) {
                             logger.warn(e.getMessage(), e);
                         }
                     }
                 }
             }
          }
          

          上面方法通過檢測消息頭中的魔數是否與規定的魔數相等,提前攔截掉非常規數據包,比如通過telnet命令行發出的數據包。接著再對消息體長度,以及可讀字節數進行檢測。最后調用decodeBody 方法進行后續的解碼工作,ExchangeCodec 中實現了decodeBody 方法,但因其子類DubboCodec 覆寫了該方法,所以在運行時DubboCodec 中的decodeBody 方法會被調用。下面我們來看一下該方法的代碼。

          public class DubboCodec extends ExchangeCodec implements Codec2 {
             @Override
             protected Object decodeBody(Channel channel, InputStream is, byte[] header)
          throws IOException {
                 // 獲取消息頭中的第三個字節,并通過邏輯與運算得到序列化器編號byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
                 Serialization s = CodecSupport.getSerialization(channel.getUrl(),
          proto);
                 // 獲取調用編號
                 long id = Bytes.bytes2long(header, 4);
                 // 通過邏輯與運算得到調用類型,0 - Response,1 - Request
                 if ((flag & FLAG_REQUEST) == 0) {
                     // 對響應結果進行解碼,得到 Response 對象。這個非本節內容,后面再分析
                     // ...
                 } else {
                     // 創建 Request 對象
                     Request req = new Request(id);
                     req.setVersion(Version.getProtocolVersion());
                     // 通過邏輯與運算得到通信方式,并設置到 Request 對象中
                     req.setTwoWay((flag & FLAG_TWOWAY) != 0);
                     
                     // 通過位運算檢測數據包是否為事件類型
                     if ((flag & FLAG_EVENT) != 0) {
                         // 設置心跳事件到 Request 對象中
                         req.setEvent(Request.HEARTBEAT_EVENT);
                     }
                     try {
                         Object data;
                         if (req.isHeartbeat()) {
                             // 對心跳包進行解碼,該方法已被標注為廢棄
                             data = decodeHeartbeatData(channel, deserialize(s,
          channel.getUrl(), is));
                         } else if (req.isEvent()) {
                             // 對事件數據進行解碼
                             data = decodeEventData(channel, deserialize(s,
          channel.getUrl(), is));
                         } else {
                             DecodeableRpcInvocation inv;
                             // 根據 url 參數判斷是否在 IO 線程上對消息體進行解碼
                             if (channel.getUrl().getParameter(
                                     Constants.DECODE_IN_IO_THREAD_KEY,
                                     Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                                 inv = new DecodeableRpcInvocation(channel, req, is,
          proto);
                                 // 在當前線程,也就是 IO 線程上進行后續的解碼工作。此工作完成后,
          可將
                                 // 調用方法名、attachment、以及調用參數解析出來
                                 inv.decode();
                             } else {
                                 // 僅創建 DecodeableRpcInvocation 對象,但不在當前線程上執行解
          碼邏輯
                                 inv = new DecodeableRpcInvocation(channel, req,
                                         new
          UnsafeByteArrayInputStream(readMessageData(is)), proto);
                             }
                             data = inv;
                         }
                         
                         // 設置 data 到 Request 對象中
                         req.setData(data);
                     } catch (Throwable t) {
                         // 若解碼過程中出現異常,則將 broken 字段設為 true,
                         // 并將異常對象設置到 Reqeust 對象中
                          req.setBroken(true);
                         req.setData(t);
                     }
                     return req;
                 }
             }
          }
          

          如上,decodeBody 對部分字段進行了解碼,并將解碼得到的字段封裝到Request 中。隨后會調用DecodeableRpcInvocation 的decode 方法進行后續的解碼工作。此工作完成后,可將調用方法名、attachment、以及調用參數解析出來。

          調用服務

          解碼器將數據包解析成Request 對象后,NettyHandler 的messageReceived 方法緊接著會收到這個對象,并將這個對象繼續向下傳遞。整個調用棧如下:

          NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent)
           —> AbstractPeer#received(Channel, Object)
             —> MultiMessageHandler#received(Channel, Object)
               —> HeartbeatHandler#received(Channel, Object)
                 —> AllChannelHandler#received(Channel, Object)
                   —> ExecutorService#execute(Runnable)    // 由線程池執行后續的調用邏輯=

          這里我們直接分析調用棧中的分析第一個和最后一個調用方法邏輯。如下:
          考慮到篇幅,以及很多中間調用的邏輯并非十分重要,所以這里就不對調用棧中的每個方法都進行分析了。這里我們直接分析最后一個調用方法邏輯。如下:

          public class ChannelEventRunnable implements Runnable {
             
             private final ChannelHandler handler;
             private final Channel channel;
             private final ChannelState state;
             private final Throwable exception;
             private final Object message;
             
             @Override
             public void run() {
                 // 檢測通道狀態,對于請求或響應消息,此時 state = RECEIVED
                 if (state == ChannelState.RECEIVED) {
                     try {
                         // 將 channel 和 message 傳給 ChannelHandler 對象,進行后續的調用
                         handler.received(channel, message);
                     } catch (Exception e) {
                         logger.warn("... operation error, channel is ... message is
          ...");
                     }
                 }
                 
                 // 其他消息類型通過 switch 進行處理
                 else {
                     switch (state) {
                     case CONNECTED:
                         try {
                             handler.connected(channel);
                         } catch (Exception e) {
                             logger.warn("... operation error, channel is ...");
                         }
                         break;
                     case DISCONNECTED:
                         // ...
                     case SENT:
                         // ...
                     case CAUGHT:
                         // ...
                     default:
                         logger.warn("unknown state: " + state + ", message is " +
          message);
                     }
                 }
             }
          }

          如上,請求和響應消息出現頻率明顯比其他類型消息高,所以這里對該類型的消息進行了針對性判斷ChannelEventRunnable 僅是一個中轉站,它的run 方法中并不包含具體的調用邏輯,僅用于將參數傳給其他ChannelHandler 對象進行處理,該對象類型為DecodeHandler

          public class DecodeHandler extends AbstractChannelHandlerDelegate {
             public DecodeHandler(ChannelHandler handler) {
                 super(handler);
             }
             @Override
             public void received(Channel channel, Object message) throws
          RemotingException {
                 if (message instanceof Decodeable) {
                     // 對 Decodeable 接口實現類對象進行解碼
                     decode(message);
                 }
                 if (message instanceof Request) {
                     // 對 Request 的 data 字段進行解碼
                     decode(((Request) message).getData());
                 }
                 if (message instanceof Response) {
                     // 對 Request 的 result 字段進行解碼
                     decode(((Response) message).getResult());
                 }
                 // 執行后續邏輯
                 handler.received(channel, message);
             }
             private void decode(Object message) {
                 // Decodeable 接口目前有兩個實現類,
                 // 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult
                 if (message != null && message instanceof Decodeable) {
                     try {
                         // 執行解碼邏輯
                         ((Decodeable) message).decode();
                         } catch (Throwable e) {
                         if (log.isWarnEnabled()) {
                             log.warn("Call Decodeable.decode failed: " + e.getMessage(),
          e);
                         }
                     }
                 }
             }
          }

          DecodeHandler 主要是包含了一些解碼邏輯,完全解碼后的Request 對象會繼續向后傳遞

          public class DubboProtocol extends AbstractProtocol {
             public static final String NAME = "dubbo";
             
             private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
                 @Override
                 public Object reply(ExchangeChannel channel, Object message) throws
          RemotingException {
                     if (message instanceof Invocation) {
                         Invocation inv = (Invocation) message;
                         // 獲取 Invoker 實例
                         Invoker<?> invoker = getInvoker(channel, inv);
                         if
          (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INV
          OKE))) {
                             // 回調相關,忽略
                         }
                       
           RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                         // 通過 Invoker 調用具體的服務
                         return invoker.invoke(inv);
                     }
                     throw new RemotingException(channel, "Unsupported request: ...");
                 }
                 
                 // 忽略其他方法
             }
             
             Invoker<?> getInvoker(Channel channel, Invocation inv) throws
          RemotingException {
                 // 忽略回調和本地存根相關邏輯
                 // ...
                 
                 int port = channel.getLocalAddress().getPort();
                 
                 // 計算 service key,格式為 groupName/serviceName:serviceVersion:port。比
          如:
                 //   dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
                 String serviceKey = serviceKey(port, path,
          inv.getAttachments().get(Constants.VERSION_KEY),
          inv.getAttachments().get(Constants.GROUP_KEY));
                 // 從 exporterMap 查找與 serviceKey 相對應的 DubboExporter 對象,
                 // 服務導出過程中會將 <serviceKey, DubboExporter> 映射關系存儲到 exporterMap
          集合中
                 DubboExporter<?> exporter = (DubboExporter<?>)
          exporterMap.get(serviceKey);
                 if (exporter == null)
                     throw new RemotingException(channel, "Not found exported service
          ...");
                 // 獲取 Invoker 對象,并返回
                 return exporter.getInvoker();
             }
             
             // 忽略其他方法
          }

          在之前課程中介紹過,服務全部暴露完成之后保存到exporterMap中。這里就是通過serviceKey獲取exporter之后獲取Invoker,并通過Invoker 的invoke 方法調用服務邏輯

          public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
             @Override
             public Result invoke(Invocation invocation) throws RpcException {
                 try {
                     // 調用 doInvoke 執行后續的調用,并將調用結果封裝到 RpcResult 中,并
                     return new RpcResult(doInvoke(proxy, invocation.getMethodName(),
          invocation.getParameterTypes(), invocation.getArguments()));
                 } catch (InvocationTargetException e) {
                     return new RpcResult(e.getTargetException());
                 } catch (Throwable e) {
                     throw new RpcException("Failed to invoke remote proxy method ...");
                 }
             }
             
             protected abstract Object doInvoke(T proxy, String methodName, Class<?>[]
          parameterTypes, Object[] arguments) throws Throwable;
          }

          如上,doInvoke 是一個抽象方法,這個需要由具體的Invoker 實例實現。Invoker 實例是在運行時通過JavassistProxyFactory 創建的,創建邏輯如下:

          public class JavassistProxyFactory extends AbstractProxyFactory {
             
             // 省略其他方法
             @Override
             public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
                 final Wrapper wrapper =
          Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ?
          proxy.getClass() : type);
                 // 創建匿名類對象
                 return new AbstractProxyInvoker<T>(proxy, type, url) {
                     @Override
                     protected Object doInvoke(T proxy, String methodName,
                                               Class<?>[] parameterTypes,
                                               Object[] arguments) throws Throwable {
                         // 調用 invokeMethod 方法進行后續的調用
                         return wrapper.invokeMethod(proxy, methodName, parameterTypes,
          arguments);
                     }
                 };
             }
          }

          Wrapper 是一個抽象類,其中invokeMethod 是一個抽象方法。Dubbo 會在運行時通過Javassist 框架為Wrapper 生成實現類,并實現invokeMethod 方法,該方法最終會根據調用信息調用具體的服務。以DemoServiceImpl 為例,Javassist 為其生成的代理類如下。

          /** Wrapper0 是在運行時生成的,大家可使用 Arthas 進行反編譯 */
          public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
             public static String[] pns;
             public static Map pts;
             public static String[] mns;
             public static String[] dmns;
             public static Class[] mts0;
             // 省略其他方法
             public Object invokeMethod(Object object, String string, Class[] arrclass,
          Object[] arrobject) throws InvocationTargetException {
                 DemoService demoService;
                 try {
                     // 類型轉換
                     demoService = (DemoService)object;
                 }
                 catch (Throwable throwable) {
                     throw new IllegalArgumentException(throwable);
                 }
                 try {
                     // 根據方法名調用指定的方法
                     if ("sayHello".equals(string) && arrclass.length == 1) {
                         return demoService.sayHello((String)arrobject[0]);
                     }
                 }
                 catch (Throwable throwable) {
                     throw new InvocationTargetException(throwable);
                 }
                 throw new NoSuchMethodException(new StringBuffer().append("Not found
          method \"").append(string).append("\" in class
          com.alibaba.dubbo.demo.DemoService.").toString());
             }
          }

          到這里,整個服務調用過程就分析完了。最后把調用過程貼出來,如下:

          ChannelEventRunnable#run()
           —> DecodeHandler#received(Channel, Object)
             —> HeaderExchangeHandler#received(Channel, Object)
               —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
                 —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
                   —> Filter#invoke(Invoker, Invocation)
                     —> AbstractProxyInvoker#invoke(Invocation)
                       —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                         —> DemoServiceImpl#sayHello(String)

          (4)提供方返回調用結果

          服務提供方調用指定服務后,會將調用結果封裝到Response 對象中,并將該對象返回給服務消費方。服務提供方也是通過NettyChannel 的send 方法將Response 對象返回,這里就不在重復分析了。

          (5)消費方接收調用結果

          服務消費方在收到響應數據后,首先要做的事情是對響應數據進行解碼,得到Response 對象。然后再將該對象傳遞給下一個入站處理器,這個入站處理器就是NettyHandler。接下來NettyHandler 會將這個對象繼續向下傳遞,最后AllChannelHandler 的received 方法會收到這個對象,并將這個對象派發到線程池中。這個過程和服務提供方接收請求的過程是一樣的,因此這里就不重復分析了

          03 小結

          至此整個dubbo的核心流程原理及其源碼,我們就分析完畢了,整體流程思路不復雜,但是細節很多,要先理解其思想,還是得多花時間再仔細擼一遍。

          原文鏈接:https://www.cnblogs.com/whgk/p/14591580.html

          ython3下JSON和JsonPath

          1.1 JSON介紹

          json簡單說就是javascript中的對象和數組,所以這兩種結構就是對象和數組兩種結構,通過這兩種結構可以表示各種復雜的結構。

          1. 對象:對象在js中表示為{ }括起來的內容,數據結構為 { key:value, key:value, ... }的鍵值對的結構,在面向對象的語言中,key為對象的屬性,value為對應的屬性值,所以很容易理解,取值方法為 對象.key 獲取屬性值,這個屬性值的類型可以是數字、字符串、數組、對象這幾種。

          2.數組:數組在js中是中括號[ ]括起來的內容,數據結構為 ["Python", "javascript", "C++", ...],取值方式和所有語言中一樣,使用索引獲取,字段值的類型可以是 數字、字符串、數組、對象幾種。

          1.2 JSON模塊的四個功能

          json模塊提供了四個功能:dumps、dump、loads、load,用于字符串 和 python數據類型間進行轉換。

          1. json.loads()

          json.loads()實現把Json格式字符串解碼轉換成Python對象 從json到python的類型轉化對照如下:

          JSON

          Python數據類型

          Json格式字符串解碼轉換成Python對象

          例子:

          import json

          # 數據

          js_list = '[1, 2, 3, 4]'

          js_dict = '{"name": "聽海", "age": "年年18"}'

          # 數據轉換前的類型

          print(type(js_list))

          print(type(js_dict))

          # json.loads()實現把Json格式字符串解碼轉換成Python對象

          List = json.loads(js_list)

          Dict = json.loads(js_dict) # json數據自動按Unicode存儲

          print('--------------------------')

          # 轉換后的顯示

          print(List)

          print(Dict)

          #數據轉換后的類型

          print(type(List))

          print(type(Dict))

          運行結果:

          <class 'str'>

          <class 'str'>

          --------------------------

          [1, 2, 3, 4]

          {'age': '年年18', 'name': '聽海'}

          <class 'list'>

          <class 'dict'>

          2. json.dumps()

          json.dumps()實現把python類型轉化為json字符串,返回一個str對象 把一個Python對象編碼轉換成Json字符串,從python原始類型向json類型的轉化對照如下:

          Python數據類型

          JSON

          python類型轉化為json字符串

          例子:

          import json

          listStr = [1, 2, 3, 4]

          tupleStr = ("python3", "selenium3", "appium", "java")

          dictStr = {"name": "聽海", "age": "年年18"}

          # 轉換前的格式

          print(type(listStr))

          print(type(tupleStr))

          print(type(dictStr))

          # 把 python類型轉化為json字符串,返回一個str對象。

          js_strList=json.dumps(listStr)

          js_tupleStr = json.dumps(tupleStr)

          # json.dumps() 序列化時默認使用的ascii編碼,添加參數 ensure_ascii=False 禁用ascii編碼,按utf-8編碼。

          js_dictStr=json.dumps(dictStr,ensure_ascii=False)

          print("---------------")

          # 打印轉換后數據顯示。

          print(js_strList)

          print(js_tupleStr)

          print(js_dictStr)

          # 轉換后的格式

          print(type(js_strList))

          print(type(js_tupleStr))

          print(type(js_dictStr))

          運行結果:

          <class 'list'>

          <class 'tuple'>

          <class 'dict'>

          ---------------

          [1, 2, 3, 4]

          ["python3", "selenium3", "appium", "java"]

          {"name": "聽海", "age": "年年18"}

          <class 'str'>

          <class 'str'>

          <class 'str'>

          注意:json.dumps() 序列化時默認使用的ascii編碼,添加參數 ensure_ascii=False 禁用ascii編碼,按utf-8編碼顯示。

          3. json.dump()

          將Python內置類型序列化為json對象后寫入文件。

          例子:

          import json

          listStr = [{"a1": "1"}, {"b1": "2"}]

          json.dump(listStr, open("listStr.json","w"), ensure_ascii=False)

          dictStr = {"a2": "3", "b2": "4"}

          json.dump(dictStr, open("dictStr.json","w"), ensure_ascii=False)

          運行結果:

          會在當前目錄生成 listStr.json 文件和 dictStr.json 2個文件。

          4. json.load()。

          讀取文件中json形式的字符串元素 轉化成python類型。

          例子:

          接口文檔中一個請求報文示例,我們就把這個示例中的json形式的報文轉換成python類型。

          1.在當前目錄新建一個名為“接口請求報文.json”文件。

          代碼實現:

          import json

          js_t_py = json.load(open("./接口請求報文.json",encoding="utf-8"))

          print(js_t_py)

          print(type(js_t_py))

          運行結果:

          1.3 JsonPath

          JsonPath 是一種信息抽取類庫,是從JSON文檔中抽取指定信息的工具,提供多種語言實現版本,包括:Javascript, Python, PHP 和 Java。

          JsonPath 對于 JSON 來說,相當于 XPATH 對于 XML。

          下載地址:https://pypi.python.org/pypi/jsonpath

          官方使用說明文檔:http://goessner.net/articles/JsonPath

          14.3.1 JsonPath的安裝

          安裝方法一:點擊Download URL鏈接下載jsonpath,解壓之后執行python setup.py install。

          安裝方法二:使用 pip install jsonpath 命令直接安裝。

          14.3.2 JsonPath 官方示例

          { "store": {

          "book": [

          { "category": "reference",

          "author": "Nigel Rees",

          "title": "Sayings of the Century",

          "price": 8.95

          },

          { "category": "fiction",

          "author": "Evelyn Waugh",

          "title": "Sword of Honour",

          "price": 12.99

          },

          { "category": "fiction",

          "author": "Herman Melville",

          "title": "Moby Dick",

          "isbn": "0-553-21311-3",

          "price": 8.99

          },

          { "category": "fiction",

          "author": "J. R. R. Tolkien",

          "title": "The Lord of the Rings",

          "isbn": "0-395-19395-8",

          "price": 22.99

          }

          ],

          "bicycle": {

          "color": "red",

          "price": 19.95

          }

          }

          }

          14.3.3 JsonPath與XPath語法對比

          Json結構清晰,可讀性高,復雜度低,非常容易匹配,下表中對應了XPath的用法。

          示例語法對比。

          1.4 案例

          案例:

          以拉勾網城市JSON文件 http://www.lagou.com/lbs/getAllCitySearchLabels.json 為例,獲取所有城市。

          代碼實現:

          import jsonpath

          url = 'http://www.lagou.com/lbs/getAllCitySearchLabels.json'

          headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36"}

          r = requests.get(url,headers=headers)

          html = r.text

          # 把json格式字符串轉換成python對象

          jsonobj = json.loads(html)

          # 從根節點開始,匹配name節點

          citylist = jsonpath.jsonpath(jsonobj,'$..name')

          print(citylist)

          print(type(citylist))

          #新建一個city.json文件,并設置編碼格式為utf-8

          fp = open('city.json','w',encoding="utf-8")

          content = json.dumps(citylist, ensure_ascii=False)

          print(content)

          fp.write(content)

          fp.close()

          運行結果:

          注意事項:

          json.loads() 是把 Json格式字符串解碼轉換成Python對象,如果在json.loads的時候出錯,要注意被解碼的Json字符的編碼。

          如果傳入的字符串的編碼不是UTF-8的話,需要指定字符編碼的參數 encoding

          dataDict = json.loads(jsonStrGBK);

          dataJsonStr是JSON字符串,假設其編碼本身是非UTF-8的話而是GBK 的,那么上述代碼會導致出錯,改為對應的:

          dataDict = json.loads(jsonStrGBK, encoding="GBK");

          如果 dataJsonStr通過encoding指定了合適的編碼,但是其中又包含了其他編碼的字符,則需要先去將dataJsonStr轉換為Unicode,然后再指定編碼格式調用json.loads()

          dataJsonStrUni = dataJsonStr.decode("GB2312");

          dataDict = json.loads(dataJsonStrUni, encoding="GB2312");

          tring轉成JSON

          這個依賴很重要,我們將圍繞fastjson中的JSONObject這個類來談轉換

           <dependency>
           <groupId>com.alibaba</groupId>
           <artifactId>fastjson</artifactId>
           <version>1.2.15</version>
           </dependency>
          
          1. String轉成JSON
          String json = "{\"abc\":\"1\",\"hahah\":\"2\"}";
          JSONObject jsonObject = JSONObject.parseObject(content);
          一句話就能解決,非常便捷。
          想要取出值,可以對`jsonObject`進行操作:
          jsonObject.getString("abc");
          結果為:`1`
          
          1. 將String轉為list后轉為JSON
          List<String> list = new ArrayList<String>(); 
          list.add("username"); 
          list.add("age"); 
          list.add("sex"); 
          JSONArray array = new JSONArray(); 
          array.add(list); 
          
          1. 將String轉為map后轉為JSON
          Map<String, String> map = new HashMap<String, String>();
           map.put("abc", "abc");
          map.put("def", "efg");
          JSONArray array_test = new JSONArray();
          array_test.add(map);
           JSONObject jsonObject = JSONObject.fromObject(map);
          

          特別注意:從JSONObject中取值,碰到了數字為key的時候,如

          {
           "userAnswer": {
           "28568": {
           "28552": {
           "qId": "28552",
           "order": "1",
           "userScore": {
           "score": 100
           },
           "answer": {
           "28554": "28554"
           },
           "qScore": "100.0",
           "qtype": "SingleChoice",
           "sId": "28568"
           }
           }
           },
           "paperType": "1",
           "paperOid": "28567",
           "instanceId": 30823,
           "remainingTime": -1,
           "examOid": "28570"
          }
          獲取“userAnswer”的value,再轉成JSON,可仿照如下形式:
          JSONObject userJson = JSONObject.parseObject(jsonObject.getString("userAnswer"));
          但是想獲取key"28568"就沒這么容易了。直接像上述的寫法,會報錯。
          我們瀏覽fastjson中的源碼,總結下,應該如下寫:
          JSONObject question = (JSONObject) JSONObject.parseObject(section.getString("28568"), Object.class);
          

          整體代碼:


          主站蜘蛛池模板: 国产一区二区成人| 日本在线观看一区二区三区| 狠狠色婷婷久久一区二区| 韩国精品一区二区三区无码视频 | 伊人久久一区二区三区无码| 亚洲av无码一区二区三区网站| 中文字幕在线无码一区| AV无码精品一区二区三区宅噜噜| 日韩精品无码人妻一区二区三区| 一本岛一区在线观看不卡| 一区二区不卡视频在线观看| 福利一区二区视频| 亚洲第一区在线观看| 久夜色精品国产一区二区三区| 久久99精品免费一区二区| 在线精品亚洲一区二区三区| 亚洲AV综合色一区二区三区| 亚洲国产精品自在线一区二区| 在线播放一区二区| 国产成人午夜精品一区二区三区| 欧洲精品一区二区三区| 亚洲AV日韩精品一区二区三区| 中文字幕在线播放一区| 日韩精品乱码AV一区二区| 波多野结衣一区视频在线| 国产成人综合亚洲一区| 国产精久久一区二区三区| 久久精品国产一区| 亚洲一区二区三区国产精华液| 国产乱码精品一区二区三区四川| 亚洲AⅤ无码一区二区三区在线| 国产亚洲3p无码一区二区| 偷拍激情视频一区二区三区| 免费视频精品一区二区| 精品视频一区二区三区免费| 无码一区18禁3D| 国产精品视频一区二区三区不卡| 亚洲AV成人一区二区三区AV | 日韩精品一区二区三区中文版| 一区二区高清在线| 人妻天天爽夜夜爽一区二区|