整體duubo的服務消費原理
Dubbo 框架做服務消費也分為兩大部分 , 第一步通過持有遠程服務實例生成Invoker,這個Invoker 在客戶端是核心的遠程代理對象 。 第二步會把Invoker 通過動態代理轉換成實現用戶接口的動態代理引用 。
服務消費方引用服務的藍色初始化鏈,時序圖
引用入口:ReferenceBean 的getObject 方法,該方法定義在Spring 的FactoryBean 接口中,ReferenceBean 實現了這個方法。
public Object getObject() throws Exception {
return get();
}
public synchronized T get() {
// 檢測 ref 是否為空,為空則通過 init 方法創建
if (ref == null) {
// init 方法主要用于處理配置,以及調用 createProxy 生成代理類
init();
}
return ref;
}
Dubbo 提供了豐富的配置,用于調整和優化框架行為,性能等。Dubbo 在引用或導出服務時,首先會對這些配置進行檢查和處理,以保證配置的正確性。
private void init() {
// 創建代理類
ref = createProxy(map);
}
此方法代碼很長,主要完成的配置加載,檢查,以及創建引用的代理對象。這里要從createProxy 開始看起。從字面意思上來看,createProxy 似乎只是用于創建代理對象的。但實際上并非如此,該方法還會調用其他方法構建以及合并Invoker 實例。具體細節如下。
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
...........
isDvmRefer = InjvmProtocol . getlnjvmProtocol( ) . islnjvmRefer(tmpUrl)
// 本地引用略
if (isJvmRefer) {
} else {
// 點對點調用略
if (url != null && url.length() > 0) {
} else {
// 加載注冊中心 url
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY,
URL.encode(monitorUrl.toFullString()));
}
// 添加 refer 參數到 url 中,并將 url 添加到 urls 中
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY,
StringUtils.toQueryString(map)));
}
}
}
// 單個注冊中心或服務提供者(服務直連,下同)
if (urls.size() == 1) {
// 調用 RegistryProtocol 的 refer 構建 Invoker 實例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多個注冊中心或多個服務提供者,或者兩者混合
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 獲取所有的 Invoker
for (URL url : urls) {
// 通過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時
// 根據 url 協議頭加載指定的 Protocol 實例,并調用實例的 refer 方法
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url;
}
}
if (registryURL != null) {
// 如果注冊中心鏈接不為空,則將使用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY,
AvailableCluster.NAME);
// 創建 StaticDirectory 實例,并由 Cluster 對多個 Invoker 進行合并
invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
//省略無關代碼...
// 生成代理類
return (T) proxyFactory.getProxy(invoker);
}
上面代碼很多,不過邏輯比較清晰。
1、如果是本地調用,直接jvm 協議從內存中獲取實例
2、如果只有一個注冊中心,直接通過Protocol 自適應拓展類構建Invoker 實例接口
3、如果有多個注冊中心,此時先根據url 構建Invoker。然后再通過Cluster 合并多個Invoker,最后調用ProxyFactory 生成代理類
(1)創建客戶端
在服務消費方,Invoker 用于執行遠程調用。Invoker 是由Protocol 實現類構建而來。Protocol 實現類有很多,這里分析DubboProtocol
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 創建 DubboInvoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url,
getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
上面方法看起來比較簡單,創建一個DubboInvoker。通過構造方法傳入遠程調用的client對象。默認情況下,Dubbo 使用NettyClient 進行通信。接下來,我們簡單看一下getClients 方法的邏輯。
private ExchangeClient[] getClients(URL url) {
// 是否共享連接
boolean service_share_connect = false;
// 獲取連接數,默認為0,表示未配置
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果未配置 connections,則共享連接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
// 獲取共享客戶端
clients[i] = getSharedClient(url);
} else {
// 初始化新的客戶端
clients[i] = initClient(url);
}
}
return clients;
}
這里根據connections 數量決定是獲取共享客戶端還是創建新的客戶端實例,getSharedClient 方法中也會調用initClient 方法,因此下面我們一起看一下這個方法。
private ExchangeClient initClient(URL url) {
// 獲取客戶端類型,默認為 netty
String str = url.getParameter(Constants.CLIENT_KEY,
url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
//省略無關代碼
ExchangeClient client;
try {
// 獲取 lazy 配置,并根據配置值決定創建的客戶端類型
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
// 創建懶加載 ExchangeClient 實例
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 創建普通 ExchangeClient 實例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service...");
}
return client;
}
initClient 方法首先獲取用戶配置的客戶端類型,默認為netty。下面我們分析一下Exchangers 的connect 方法。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws
RemotingException {
// 獲取 Exchanger 實例,默認為 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
}
如上,getExchanger 會通過SPI 加載HeaderExchangeClient 實例,這個方法比較簡單,大家自己看一下吧。接下來分析HeaderExchangeClient 的實現。
public ExchangeClient connect(URL url, ExchangeHandler handler) throws
RemotingException {
// 這里包含了多個調用,分別如下:
// 1. 創建 HeaderExchangeHandler 對象
// 2. 創建 DecodeHandler 對象
// 3. 通過 Transporters 構建 Client 實例
// 4. 創建 HeaderExchangeClient 對象
return new HeaderExchangeClient(Transporters.connect(url, new
DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
這里的調用比較多,我們這里重點看一下Transporters 的connect 方法。如下:
public static Client connect(URL url, ChannelHandler... handlers) throws
RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handler 數量大于1,則創建一個 ChannelHandler 分發器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取 Transporter 自適應拓展類,并調用 connect 方法生成 Client 實例
return getTransporter().connect(url, handler);
}
如上,getTransporter 方法返回的是自適應拓展類,該類會在運行時根據客戶端類型加載指定的Transporter 實現類。若用戶未配置客戶端類型,則默認加載NettyTransporter,并調用該類的connect 方法。如下:
public Client connect(URL url, ChannelHandler listener) throws RemotingException
{
// 創建 NettyClient 對象
return new NettyClient(url, listener);
}
(2)注冊
這里就已經創建好了NettyClient對象。關于DubboProtocol 的refer 方法就分析完了。接下來,繼續分析RegistryProtocol 的refer 方法邏輯。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 取 registry 參數值,并將其設置為協議頭
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY,
Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 獲取注冊中心實例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// 將 url 查詢字符串轉為 Map
Map<String, String> qs =
StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
// 獲取 group 配置
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
// 通過 SPI 加載 MergeableCluster 實例,并調用 doRefer 繼續執行服務引用邏輯
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 調用 doRefer 繼續執行服務引用邏輯
return doRefer(cluster, registry, type, url);
}
上面代碼首先為url 設置協議頭,然后根據url 參數加載注冊中心實例。然后獲取group 配置,根據group 配置決定doRefer 第一個參數的類型。這里的重點是doRefer 方法,如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T>
type, URL url) {
// 創建 RegistryDirectory 實例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 設置注冊中心和協議
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>
(directory.getUrl().getParameters());
// 生成服務消費者鏈接
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL,
parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 注冊服務消費者,在 consumers 目錄下新節點
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY,
Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 訂閱 providers、configurators、routers 等節點數據
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 一個注冊中心可能有多個服務提供者,因此這里需要將多個服務提供者合并為一個
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl,
directory);
return invoker;
}
如上,doRefer 方法創建一個RegistryDirectory 實例,然后生成服務者消費者鏈接,并向注冊中心進行注冊。注冊完畢后,緊接著訂閱providers、configurators、routers 等節點下的數據。完成訂閱后,RegistryDirectory 會收到這幾個節點下的子節點信息。由于一個服務可能部署在多臺服務器上,這樣就會在providers 產生多個節點,這個時候就需要Cluster 將多個服務節點合并為一個,并生成一個Invoker。
(3)創建代理對象
Invoker 創建完畢后,接下來要做的事情是為服務接口生成代理對象。有了代理對象,即可進行遠程調用。代理對象生成的入口方法為ProxyFactory 的getProxy,接下來進行分析。
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
// 調用重載方法
return getProxy(invoker, false);
}
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
// 獲取接口列表
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
// 切分接口列表
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
// 設置服務接口類和 EchoService.class 到 interfaces 中
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
// 加載接口類
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
// 為 http 和 hessian 協議提供泛化調用支持,參考 pull request #1827
if (!invoker.getInterface().equals(GenericService.class) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
// 創建新的 interfaces 數組
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
// 設置 GenericService.class 到數組中
interfaces[len] = GenericService.class;
}
// 調用重載方法
return getProxy(invoker, interfaces);
}
public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
如上,上面大段代碼都是用來獲取interfaces 數組的,我們繼續往下看。getProxy(Invoker, Class<?>[]) 這個方法是一個抽象方法,下面我們到JavassistProxyFactory 類中看一下該方法的實現代碼。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// 生成 Proxy 子類(Proxy 是抽象類)。并調用 Proxy 子類的 newInstance 方法創建Proxy 實例
return (T) Proxy.getProxy(interfaces).newInstance(new
InvokerInvocationHandler(invoker));
}
上面代碼并不多,首先是通過Proxy 的getProxy 方法獲取Proxy 子類,然后創建InvokerInvocationHandler 對象,并將該對象傳給newInstance 生成Proxy 實例。InvokerInvocationHandler 實現JDK 的InvocationHandler 接口,具體的用途是攔截接口類調用。下面以org.apache.dubbo.demo.DemoService 這個接口為例,來看一下該接口代理類代碼大致是怎樣的(忽略EchoService 接口)。
package org.apache.dubbo.common.bytecode;
public class proxy0 implements org.apache.dubbo.demo.DemoService {
public static java.lang.reflect.Method[] methods;
private java.lang.reflect.InvocationHandler handler;
public proxy0() {
}
public proxy0(java.lang.reflect.InvocationHandler arg0) {
handler = $1;
}
public java.lang.String sayHello(java.lang.String arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String) ret;
}
}
好了,到這里代理類生成邏輯就分析完了。整個過程比較復雜,大家需要耐心看一下。
在之前的內容中,我們分析了消費者端服務發現與提供者端服務暴露的相關內容,同時也知道消費者端通過內置的負載均衡算法獲取合適的調用invoker進行遠程調用。接下來我們再研究下遠程調用過程即網絡通信。
網絡通信位于Remoting模塊:
網絡通信的問題:
dubbo內置,dubbo協議 ,rmi協議,hessian協議,http協議,webservice協議,thrift協議,rest協議,grpc協議,memcached協議,redis協議等10種通訊協議。各個協議特點如下
dubbo協議
Dubbo 缺省協議采用單一長連接和NIO 異步通訊,適合于小數據量大并發的服務調用,以及服務消費者機器數遠大于服務提供者機器數的情況。
rmi協議
RMI 協議采用JDK 標準的 java.rmi.* 實現,采用阻塞式短連接和JDK 標準序列化方式。
hessian協議
Hessian 協議用于集成Hessian 的服務,Hessian 底層采用Http 通訊,采用Servlet 暴露服務,
Dubbo 缺省內嵌Jetty 作為服務器實現。
Dubbo 的Hessian 協議可以和原生Hessian 服務互操作,即:提供者用Dubbo 的Hessian 協議暴露服務,消費者直接用標準Hessian 接口調用或者提供方用標準Hessian 暴露服務,消費方用Dubbo 的Hessian 協議調用。
http協議
基于HTTP 表單的遠程調用協議,采用Spring 的HttpInvoker 實現
webservice協議
基于WebService 的遠程調用協議,基于Apache CXF 實現](http://dubbo.apache.org/zh-cn/docs/user/references/protocol/webservice.html#fn2)。
可以和原生WebService 服務互操作,即:提供者用Dubbo 的WebService 協議暴露服務,消費者直接用標準WebService 接口調用,或者提供方用標準WebService 暴露服務,消費方用Dubbo 的WebService 協議調用。
thrift協議
當前dubbo 支持[1]的thrift 協議是對thrift 原生協議[2] 的擴展,在原生協議的基礎上添加了一些額外的頭信息,比如service name,magic number 等。
rest協議
基于標準的Java REST API——JAX-RS 2.0(Java API for RESTful Web Services的簡寫)實現的REST調用支持
grpc協議
Dubbo 自2.7.5 版本開始支持gRPC 協議,對于計劃使用HTTP/2 通信,或者想利用gRPC 帶來的Stream、反壓、Reactive 編程等能力的開發者來說, 都可以考慮啟用gRPC 協議。
為期望使用gRPC 協議的用戶帶來服務治理能力,方便接入Dubbo 體系用戶可以使用Dubbo 風格的,基于接口的編程風格來定義和使用遠程服務
memcached協議
基于memcached實現的RPC 協議
redis協議
基于Redis 實現的RPC 協議
序列化就是將對象轉成字節流,用于網絡傳輸,以及將字節流轉為對象,用于在收到字節流數據后還原成對象。序列化的優勢有很多,例如安全性更好、可跨平臺等。我們知道dubbo基于netty進行網絡通訊,在NettyClient.doOpen() 方法中可以看到Netty的相關類
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(),
NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
然后去看NettyCodecAdapter 類最后進入ExchangeCodec類的encodeRequest方法,如下:
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request
req) throws IOException {
Serialization serialization = getSerialization(channel);
// header.
byte[] header = new byte[HEADER_LENGTH];
是的,就是Serialization接口,默認是Hessian2Serialization序列化接口。
Dubbo序列化支持java、compactedjava、nativejava、fastjson、dubbo、fst、hessian2、kryo,protostuff其中默認hessian2。其中java、compactedjava、nativejava屬于原生java的序列化。
dubbo序列化:阿里尚未開發成熟的高效java序列化實現,阿里不建議在生產環境使用它。
hessian2序列化:hessian是一種跨語言的高效二進制序列化方式。但這里實際不是原生的hessian2序列化,而是阿里修改過的,它是dubbo RPC默認啟用的序列化方式。
json序列化:目前有兩種實現,一種是采用的阿里的fastjson庫,另一種是采用dubbo中自己實現的簡單json庫,但其實現都不是特別成熟,而且json這種文本序列化性能一般不如上面兩種二進制序列化。
java序列化:主要是采用JDK自帶的Java序列化實現,性能很不理想。
(1)Dubbo中的數據格式
解決socket中數據粘包拆包問題,一般有三種方式
定長協議(數據包長度一致)
定長的協議是指協議內容的長度是固定的,比如協議byte長度是50,當從網絡上讀取50個byte后,就進行decode解碼操作。定長協議在讀取或者寫入時,效率比較高,因為數據緩存的大小基本都確定了,就好比數組一樣,缺陷就是適應性不足,以RPC場景為例,很難估計出定長的長度是多少。
特殊結束符(數據尾:通過特殊的字符標識#)
相比定長協議,如果能夠定義一個特殊字符作為每個協議單元結束的標示,就能夠以變長的方式進行通信,從而在數據傳輸和高效之間取得平衡,比如用特殊字符 \n 。特殊結束符方式的問題是過于簡單的思考了協議傳輸的過程,對于一個協議單元必須要全部讀入才能夠進行處理,除此之外必須要防止用戶傳輸的數據不能同結束符相同,否則就會出現紊亂。
變長協議(協議頭+payload模式)
這種一般是自定義協議,會以定長加不定長的部分組成,其中定長的部分需要描述不定長的內容長度。dubbo就是使用這種形式的數據傳輸格式
Dubbo 數據包分為消息頭和消息體,消息頭用于存儲一些元信息,比如魔數(Magic),數據包類型(Request/Response),消息體長度(Data Length)等。消息體中用于存儲具體的調用消息,比如方法名稱,參數列表等。下面簡單列舉一下消息頭的內容。
偏移量(Bit) 字段 取值
0 ~ 7 魔數高位 0xda00
8 ~ 15 魔數低位 0xbb
16 數據包類型 0 - Response, 1 - Request
17 調用方式 僅在第16位被設為1的情況下有效,0 - 單向調用,1 - 雙向調用
18 事件標 識 0 - 當前數據包是請求或響應包,1 - 當前數據包是心跳包
19 ~23 序列化器編號 2 - Hessian2Serialization
3 - JavaSerialization
4 - CompactedJavaSerialization
6 - FastJsonSerialization
7 - NativeJavaSerialization
8 - KryoSerialization
9 - FstSerialization
24 ~31 狀態 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 -BAD_REQUEST 50 - BAD_RESPONSE ......
32 ~95 請求編號 共8字節,運行時生成
96 ~127 消息體長度 運行時計算
(2)消費端發送請求
/**
*proxy0#sayHello(String)
*—> InvokerInvocationHandler#invoke(Object, Method, Object[])
* —> MockClusterInvoker#invoke(Invocation)
* —> AbstractClusterInvoker#invoke(Invocation)
* —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>,LoadBalance)
* —> Filter#invoke(Invoker, Invocation) // 包含多個 Filter 調用
* —> ListenerInvokerWrapper#invoke(Invocation)
* —> AbstractInvoker#invoke(Invocation)
* —> DubboInvoker#doInvoke(Invocation)
* —> ReferenceCountExchangeClient#request(Object, int)
* —> HeaderExchangeClient#request(Object, int)
* —> HeaderExchangeChannel#request(Object, int)
* —> AbstractPeer#send(Object)
* —> AbstractClient#send(Object, boolean)
* —> NettyChannel#send(Object, boolean)
* —> NioClientSocketChannel#write(Object)
*/
dubbo消費方,自動生成代碼對象如下
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
private InvocationHandler handler;
public String sayHello(String string) {
// 將參數存儲到 Object 數組中
Object[] arrobject = new Object[]{string};
// 調用 InvocationHandler 實現類的 invoke 方法得到調用結果
Object object = this.handler.invoke(this, methods[0], arrobject);
// 返回調用結果
return (String)object;
}
}
InvokerInvocationHandler 中的invoker 成員變量類型為MockClusterInvoker,MockClusterInvoker內部封裝了服務降級邏輯。下面簡單看一下:
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// 獲取 mock 配置值
String value =
directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY,
Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
// 無 mock 邏輯,直接調用其他 Invoker 對象的 invoke 方法,
// 比如 FailoverClusterInvoker
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
// force:xxx 直接執行 mock 邏輯,不發起遠程調用
result = doMockInvoke(invocation, null);
} else {
// fail:xxx 表示消費方對調用服務失敗后,再執行 mock 邏輯,不拋出異常
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
// 調用失敗,執行 mock 邏輯
result = doMockInvoke(invocation, e);
}
}
return result;
}
考慮到前文已經詳細分析過FailoverClusterInvoker,因此本節略過FailoverClusterInvoker,直接分析DubboInvoker。
public abstract class AbstractInvoker<T> implements Invoker<T> {
public Result invoke(Invocation inv) throws RpcException {
if (destroyed.get()) {
throw new RpcException("Rpc invoker for service ...");
}
RpcInvocation invocation = (RpcInvocation) inv;
// 設置 Invoker
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
// 設置 attachment
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments =
RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
// 添加 contextAttachments 到 RpcInvocation#attachment 變量中
invocation.addAttachments(contextAttachments);
}
if (getUrl().getMethodParameter(invocation.getMethodName(),
Constants.ASYNC_KEY, false)) {
// 設置異步信息到 RpcInvocation#attachment 中
invocation.setAttachment(Constants.ASYNC_KEY,
Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
// 抽象方法,由子類實現
return doInvoke(invocation);
} catch (InvocationTargetException e) {
// ...
} catch (RpcException e) {
// ...
} catch (Throwable e) {
return new RpcResult(e);
}
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
// 省略其他方法
}
上面的代碼來自AbstractInvoker 類,其中大部分代碼用于添加信息到RpcInvocation#attachment 變量中,添加完畢后,調用doInvoke 執行后續的調用。doInvoke 是一個抽象方法,需要由子類實現,下面到DubboInvoker 中看一下。
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
//將目標方法以及版本號作為參數放入到Invocation中
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
//獲得客戶端連接
ExchangeClient currentClient; //初始化invoker的時候,構建的一個遠程通信連接
if (clients.length == 1) { //默認
currentClient = clients[0];
} else {
//通過取模獲得其中一個連接
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
//表示當前的方法是否存在返回值
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY,
DEFAULT_TIMEOUT);
//isOneway 為 true,表示“單向”通信
if (isOneway) {//異步無返回值
boolean isSent = getUrl().getMethodParameter(methodName,
Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else { //存在返回值
//是否采用異步
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture =
currentClient.request(inv, timeout);
responseFuture.whenComplete((obj, t) -> {
if (t != null) {
asyncRpcResult.completeExceptionally(t);
} else {
asyncRpcResult.complete((AppResponse) obj);
}
});
RpcContext.getContext().setFuture(new
FutureAdapter(asyncRpcResult));
return asyncRpcResult;
}
}
//省略無關代碼
}
最終進入到HeaderExchangeChannel#request方法,拼裝Request并將請求發送出去
public CompletableFuture<Object> request(Object request, int timeout) throws
RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed
tosend request " + request + ", cause: The channel " + this + " is closed!");
}
// 創建請求對象
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
//NettyClient
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
請求編碼如何做的?
在netty啟動時,我們設置了編解碼器,其中通過ExchangeCodec完成編解碼工作如下:
public class ExchangeCodec extends TelnetCodec {
// 消息頭長度
protected static final int HEADER_LENGTH = 16;
// 魔數內容
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;
private static final Logger logger =
LoggerFactory.getLogger(ExchangeCodec.class);
public Short getMagicCode() {
return MAGIC;
}
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws
IOException {
if (msg instanceof Request) {
// 對 Request 對象進行編碼
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
// 對 Response 對象進行編碼,后面分析
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request
req) throws IOException {
Serialization serialization = getSerialization(channel);
// 創建消息頭字節數組,長度為 16
byte[] header = new byte[HEADER_LENGTH];
// 設置魔數
Bytes.short2bytes(MAGIC, header);
// 設置數據包類型(Request/Response)和序列化器編號
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
// 設置通信方式(單向/雙向)
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
// 設置事件標識
if (req.isEvent()) { header[2] |= FLAG_EVENT;
}
// 設置請求編號,8個字節,從第4個字節開始設置
Bytes.long2bytes(req.getId(), header, 4);
// 獲取 buffer 當前的寫位置
int savedWriteIndex = buffer.writerIndex();
// 更新 writerIndex,為消息頭預留 16 個字節的空間
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
// 創建序列化器,比如 Hessian2ObjectOutput
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
// 對事件數據進行序列化操作
encodeEventData(channel, out, req.getData());
} else {
// 對請求數據進行序列化操作
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 獲取寫入的字節數,也就是消息體長度
int len = bos.writtenBytes();
checkPayload(channel, len);
// 將消息體長度寫入到消息頭中
Bytes.int2bytes(len, header, 12);
// 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備
buffer.writerIndex(savedWriteIndex);
// 從 savedWriteIndex 下標處寫入消息頭
buffer.writeBytes(header);
// 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
// 省略其他方法
}
以上就是請求對象的編碼過程,該過程首先會通過位運算將消息頭寫入到header 數組中。然后對Request 對象的data 字段執行序列化操作,序列化后的數據最終會存儲到ChannelBuffer 中。序列化操作執行完后,可得到數據序列化后的長度len,緊接著將len 寫入到header 指定位置處。最后再將消息頭字節數組header 寫入到ChannelBuffer 中,整個編碼過程就結束了。本節的最后,我們再來看一下Request 對象的data 字段序列化過程,也就是encodeRequestData 方法的邏輯,如下:
public class DubboCodec extends ExchangeCodec implements Codec2 {
protected void encodeRequestData(Channel channel, ObjectOutput out, Object
data, String version) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
// 依次序列化 dubbo version、path、version
out.writeUTF(version);
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
// 序列化調用方法名
out.writeUTF(inv.getMethodName());
// 將參數類型轉換為字符串,并進行序列化
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
if (args != null)
for (int i = 0; i < args.length; i++) {
// 對運行時參數進行序列化
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
// 序列化 attachments
out.writeObject(inv.getAttachments());
}
}
至此,關于服務消費方發送請求的過程就分析完了,接下來我們來看一下服務提供方是如何接收請求的。
(3)提供方接受請求
請求如何解碼?
這里直接分析請求數據的解碼邏輯,忽略中間過程,如下:
public class ExchangeCodec extends TelnetCodec {
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws
IOException {
int readable = buffer.readableBytes();
// 創建消息頭字節數組
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
// 讀取消息頭數據
buffer.readBytes(header);
// 調用重載方法進行后續解碼工作
return decode(channel, buffer, readable, header);
}
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable,
byte[] header) throws IOException {
// 檢查魔數是否相等
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length +
i);
header = Bytes.copyOf(header, i);
break;
}
}
// 通過 telnet 命令行發送的數據包不包含消息頭,所以這里
// 調用 TelnetCodec 的 decode 方法對數據包進行解碼
return super.decode(channel, buffer, readable, header);
}
// 檢測可讀數據量是否少于消息頭長度,若小于則立即返回
DecodeResult.NEED_MORE_INPUT
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// 從消息頭中獲取消息體長度
int len = Bytes.bytes2int(header, 12);
// 檢測消息體長度是否超出限制,超出則拋出異常
checkPayload(channel, len);
int tt = len + HEADER_LENGTH;
// 檢測可讀的字節數是否小于實際的字節數
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
// 繼續進行解碼工作
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
上面方法通過檢測消息頭中的魔數是否與規定的魔數相等,提前攔截掉非常規數據包,比如通過telnet命令行發出的數據包。接著再對消息體長度,以及可讀字節數進行檢測。最后調用decodeBody 方法進行后續的解碼工作,ExchangeCodec 中實現了decodeBody 方法,但因其子類DubboCodec 覆寫了該方法,所以在運行時DubboCodec 中的decodeBody 方法會被調用。下面我們來看一下該方法的代碼。
public class DubboCodec extends ExchangeCodec implements Codec2 {
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header)
throws IOException {
// 獲取消息頭中的第三個字節,并通過邏輯與運算得到序列化器編號byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(),
proto);
// 獲取調用編號
long id = Bytes.bytes2long(header, 4);
// 通過邏輯與運算得到調用類型,0 - Response,1 - Request
if ((flag & FLAG_REQUEST) == 0) {
// 對響應結果進行解碼,得到 Response 對象。這個非本節內容,后面再分析
// ...
} else {
// 創建 Request 對象
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
// 通過邏輯與運算得到通信方式,并設置到 Request 對象中
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
// 通過位運算檢測數據包是否為事件類型
if ((flag & FLAG_EVENT) != 0) {
// 設置心跳事件到 Request 對象中
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
if (req.isHeartbeat()) {
// 對心跳包進行解碼,該方法已被標注為廢棄
data = decodeHeartbeatData(channel, deserialize(s,
channel.getUrl(), is));
} else if (req.isEvent()) {
// 對事件數據進行解碼
data = decodeEventData(channel, deserialize(s,
channel.getUrl(), is));
} else {
DecodeableRpcInvocation inv;
// 根據 url 參數判斷是否在 IO 線程上對消息體進行解碼
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is,
proto);
// 在當前線程,也就是 IO 線程上進行后續的解碼工作。此工作完成后,
可將
// 調用方法名、attachment、以及調用參數解析出來
inv.decode();
} else {
// 僅創建 DecodeableRpcInvocation 對象,但不在當前線程上執行解
碼邏輯
inv = new DecodeableRpcInvocation(channel, req,
new
UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
// 設置 data 到 Request 對象中
req.setData(data);
} catch (Throwable t) {
// 若解碼過程中出現異常,則將 broken 字段設為 true,
// 并將異常對象設置到 Reqeust 對象中
req.setBroken(true);
req.setData(t);
}
return req;
}
}
}
如上,decodeBody 對部分字段進行了解碼,并將解碼得到的字段封裝到Request 中。隨后會調用DecodeableRpcInvocation 的decode 方法進行后續的解碼工作。此工作完成后,可將調用方法名、attachment、以及調用參數解析出來。
調用服務
解碼器將數據包解析成Request 對象后,NettyHandler 的messageReceived 方法緊接著會收到這個對象,并將這個對象繼續向下傳遞。整個調用棧如下:
NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent)
—> AbstractPeer#received(Channel, Object)
—> MultiMessageHandler#received(Channel, Object)
—> HeartbeatHandler#received(Channel, Object)
—> AllChannelHandler#received(Channel, Object)
—> ExecutorService#execute(Runnable) // 由線程池執行后續的調用邏輯=
這里我們直接分析調用棧中的分析第一個和最后一個調用方法邏輯。如下:
考慮到篇幅,以及很多中間調用的邏輯并非十分重要,所以這里就不對調用棧中的每個方法都進行分析了。這里我們直接分析最后一個調用方法邏輯。如下:
public class ChannelEventRunnable implements Runnable {
private final ChannelHandler handler;
private final Channel channel;
private final ChannelState state;
private final Throwable exception;
private final Object message;
@Override
public void run() {
// 檢測通道狀態,對于請求或響應消息,此時 state = RECEIVED
if (state == ChannelState.RECEIVED) {
try {
// 將 channel 和 message 傳給 ChannelHandler 對象,進行后續的調用
handler.received(channel, message);
} catch (Exception e) {
logger.warn("... operation error, channel is ... message is
...");
}
}
// 其他消息類型通過 switch 進行處理
else {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("... operation error, channel is ...");
}
break;
case DISCONNECTED:
// ...
case SENT:
// ...
case CAUGHT:
// ...
default:
logger.warn("unknown state: " + state + ", message is " +
message);
}
}
}
}
如上,請求和響應消息出現頻率明顯比其他類型消息高,所以這里對該類型的消息進行了針對性判斷ChannelEventRunnable 僅是一個中轉站,它的run 方法中并不包含具體的調用邏輯,僅用于將參數傳給其他ChannelHandler 對象進行處理,該對象類型為DecodeHandler
public class DecodeHandler extends AbstractChannelHandlerDelegate {
public DecodeHandler(ChannelHandler handler) {
super(handler);
}
@Override
public void received(Channel channel, Object message) throws
RemotingException {
if (message instanceof Decodeable) {
// 對 Decodeable 接口實現類對象進行解碼
decode(message);
}
if (message instanceof Request) {
// 對 Request 的 data 字段進行解碼
decode(((Request) message).getData());
}
if (message instanceof Response) {
// 對 Request 的 result 字段進行解碼
decode(((Response) message).getResult());
}
// 執行后續邏輯
handler.received(channel, message);
}
private void decode(Object message) {
// Decodeable 接口目前有兩個實現類,
// 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult
if (message != null && message instanceof Decodeable) {
try {
// 執行解碼邏輯
((Decodeable) message).decode();
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Call Decodeable.decode failed: " + e.getMessage(),
e);
}
}
}
}
}
DecodeHandler 主要是包含了一些解碼邏輯,完全解碼后的Request 對象會繼續向后傳遞
public class DubboProtocol extends AbstractProtocol {
public static final String NAME = "dubbo";
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public Object reply(ExchangeChannel channel, Object message) throws
RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
// 獲取 Invoker 實例
Invoker<?> invoker = getInvoker(channel, inv);
if
(Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INV
OKE))) {
// 回調相關,忽略
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 通過 Invoker 調用具體的服務
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: ...");
}
// 忽略其他方法
}
Invoker<?> getInvoker(Channel channel, Invocation inv) throws
RemotingException {
// 忽略回調和本地存根相關邏輯
// ...
int port = channel.getLocalAddress().getPort();
// 計算 service key,格式為 groupName/serviceName:serviceVersion:port。比
如:
// dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
String serviceKey = serviceKey(port, path,
inv.getAttachments().get(Constants.VERSION_KEY),
inv.getAttachments().get(Constants.GROUP_KEY));
// 從 exporterMap 查找與 serviceKey 相對應的 DubboExporter 對象,
// 服務導出過程中會將 <serviceKey, DubboExporter> 映射關系存儲到 exporterMap
集合中
DubboExporter<?> exporter = (DubboExporter<?>)
exporterMap.get(serviceKey);
if (exporter == null)
throw new RemotingException(channel, "Not found exported service
...");
// 獲取 Invoker 對象,并返回
return exporter.getInvoker();
}
// 忽略其他方法
}
在之前課程中介紹過,服務全部暴露完成之后保存到exporterMap中。這里就是通過serviceKey獲取exporter之后獲取Invoker,并通過Invoker 的invoke 方法調用服務邏輯
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
try {
// 調用 doInvoke 執行后續的調用,并將調用結果封裝到 RpcResult 中,并
return new RpcResult(doInvoke(proxy, invocation.getMethodName(),
invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method ...");
}
}
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[]
parameterTypes, Object[] arguments) throws Throwable;
}
如上,doInvoke 是一個抽象方法,這個需要由具體的Invoker 實例實現。Invoker 實例是在運行時通過JavassistProxyFactory 創建的,創建邏輯如下:
public class JavassistProxyFactory extends AbstractProxyFactory {
// 省略其他方法
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper =
Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ?
proxy.getClass() : type);
// 創建匿名類對象
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 調用 invokeMethod 方法進行后續的調用
return wrapper.invokeMethod(proxy, methodName, parameterTypes,
arguments);
}
};
}
}
Wrapper 是一個抽象類,其中invokeMethod 是一個抽象方法。Dubbo 會在運行時通過Javassist 框架為Wrapper 生成實現類,并實現invokeMethod 方法,該方法最終會根據調用信息調用具體的服務。以DemoServiceImpl 為例,Javassist 為其生成的代理類如下。
/** Wrapper0 是在運行時生成的,大家可使用 Arthas 進行反編譯 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
// 省略其他方法
public Object invokeMethod(Object object, String string, Class[] arrclass,
Object[] arrobject) throws InvocationTargetException {
DemoService demoService;
try {
// 類型轉換
demoService = (DemoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
// 根據方法名調用指定的方法
if ("sayHello".equals(string) && arrclass.length == 1) {
return demoService.sayHello((String)arrobject[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found
method \"").append(string).append("\" in class
com.alibaba.dubbo.demo.DemoService.").toString());
}
}
到這里,整個服務調用過程就分析完了。最后把調用過程貼出來,如下:
ChannelEventRunnable#run()
—> DecodeHandler#received(Channel, Object)
—> HeaderExchangeHandler#received(Channel, Object)
—> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
—> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
—> Filter#invoke(Invoker, Invocation)
—> AbstractProxyInvoker#invoke(Invocation)
—> Wrapper0#invokeMethod(Object, String, Class[], Object[])
—> DemoServiceImpl#sayHello(String)
(4)提供方返回調用結果
服務提供方調用指定服務后,會將調用結果封裝到Response 對象中,并將該對象返回給服務消費方。服務提供方也是通過NettyChannel 的send 方法將Response 對象返回,這里就不在重復分析了。
(5)消費方接收調用結果
服務消費方在收到響應數據后,首先要做的事情是對響應數據進行解碼,得到Response 對象。然后再將該對象傳遞給下一個入站處理器,這個入站處理器就是NettyHandler。接下來NettyHandler 會將這個對象繼續向下傳遞,最后AllChannelHandler 的received 方法會收到這個對象,并將這個對象派發到線程池中。這個過程和服務提供方接收請求的過程是一樣的,因此這里就不重復分析了
至此整個dubbo的核心流程原理及其源碼,我們就分析完畢了,整體流程思路不復雜,但是細節很多,要先理解其思想,還是得多花時間再仔細擼一遍。
原文鏈接:https://www.cnblogs.com/whgk/p/14591580.html
1.1 JSON介紹
json簡單說就是javascript中的對象和數組,所以這兩種結構就是對象和數組兩種結構,通過這兩種結構可以表示各種復雜的結構。
1. 對象:對象在js中表示為{ }括起來的內容,數據結構為 { key:value, key:value, ... }的鍵值對的結構,在面向對象的語言中,key為對象的屬性,value為對應的屬性值,所以很容易理解,取值方法為 對象.key 獲取屬性值,這個屬性值的類型可以是數字、字符串、數組、對象這幾種。
2.數組:數組在js中是中括號[ ]括起來的內容,數據結構為 ["Python", "javascript", "C++", ...],取值方式和所有語言中一樣,使用索引獲取,字段值的類型可以是 數字、字符串、數組、對象幾種。
1.2 JSON模塊的四個功能
json模塊提供了四個功能:dumps、dump、loads、load,用于字符串 和 python數據類型間進行轉換。
1. json.loads()
json.loads()實現把Json格式字符串解碼轉換成Python對象 從json到python的類型轉化對照如下:
JSON
Python數據類型
Json格式字符串解碼轉換成Python對象
例子:
import json
# 數據
js_list = '[1, 2, 3, 4]'
js_dict = '{"name": "聽海", "age": "年年18"}'
# 數據轉換前的類型
print(type(js_list))
print(type(js_dict))
# json.loads()實現把Json格式字符串解碼轉換成Python對象
List = json.loads(js_list)
Dict = json.loads(js_dict) # json數據自動按Unicode存儲
print('--------------------------')
# 轉換后的顯示
print(List)
print(Dict)
#數據轉換后的類型
print(type(List))
print(type(Dict))
運行結果:
<class 'str'>
<class 'str'>
--------------------------
[1, 2, 3, 4]
{'age': '年年18', 'name': '聽海'}
<class 'list'>
<class 'dict'>
2. json.dumps()
json.dumps()實現把python類型轉化為json字符串,返回一個str對象 把一個Python對象編碼轉換成Json字符串,從python原始類型向json類型的轉化對照如下:
Python數據類型
JSON
python類型轉化為json字符串
例子:
import json
listStr = [1, 2, 3, 4]
tupleStr = ("python3", "selenium3", "appium", "java")
dictStr = {"name": "聽海", "age": "年年18"}
# 轉換前的格式
print(type(listStr))
print(type(tupleStr))
print(type(dictStr))
# 把 python類型轉化為json字符串,返回一個str對象。
js_strList=json.dumps(listStr)
js_tupleStr = json.dumps(tupleStr)
# json.dumps() 序列化時默認使用的ascii編碼,添加參數 ensure_ascii=False 禁用ascii編碼,按utf-8編碼。
js_dictStr=json.dumps(dictStr,ensure_ascii=False)
print("---------------")
# 打印轉換后數據顯示。
print(js_strList)
print(js_tupleStr)
print(js_dictStr)
# 轉換后的格式
print(type(js_strList))
print(type(js_tupleStr))
print(type(js_dictStr))
運行結果:
<class 'list'>
<class 'tuple'>
<class 'dict'>
---------------
[1, 2, 3, 4]
["python3", "selenium3", "appium", "java"]
{"name": "聽海", "age": "年年18"}
<class 'str'>
<class 'str'>
<class 'str'>
注意:json.dumps() 序列化時默認使用的ascii編碼,添加參數 ensure_ascii=False 禁用ascii編碼,按utf-8編碼顯示。
3. json.dump()
將Python內置類型序列化為json對象后寫入文件。
例子:
import json
listStr = [{"a1": "1"}, {"b1": "2"}]
json.dump(listStr, open("listStr.json","w"), ensure_ascii=False)
dictStr = {"a2": "3", "b2": "4"}
json.dump(dictStr, open("dictStr.json","w"), ensure_ascii=False)
運行結果:
會在當前目錄生成 listStr.json 文件和 dictStr.json 2個文件。
4. json.load()。
讀取文件中json形式的字符串元素 轉化成python類型。
例子:
接口文檔中一個請求報文示例,我們就把這個示例中的json形式的報文轉換成python類型。
1.在當前目錄新建一個名為“接口請求報文.json”文件。
代碼實現:
import json
js_t_py = json.load(open("./接口請求報文.json",encoding="utf-8"))
print(js_t_py)
print(type(js_t_py))
運行結果:
1.3 JsonPath
JsonPath 是一種信息抽取類庫,是從JSON文檔中抽取指定信息的工具,提供多種語言實現版本,包括:Javascript, Python, PHP 和 Java。
JsonPath 對于 JSON 來說,相當于 XPATH 對于 XML。
下載地址:https://pypi.python.org/pypi/jsonpath
官方使用說明文檔:http://goessner.net/articles/JsonPath
14.3.1 JsonPath的安裝
安裝方法一:點擊Download URL鏈接下載jsonpath,解壓之后執行python setup.py install。
安裝方法二:使用 pip install jsonpath 命令直接安裝。
14.3.2 JsonPath 官方示例
{ "store": {
"book": [
{ "category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{ "category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
{ "category": "fiction",
"author": "Herman Melville",
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
},
{ "category": "fiction",
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"isbn": "0-395-19395-8",
"price": 22.99
}
],
"bicycle": {
"color": "red",
"price": 19.95
}
}
}
14.3.3 JsonPath與XPath語法對比
Json結構清晰,可讀性高,復雜度低,非常容易匹配,下表中對應了XPath的用法。
示例語法對比。
1.4 案例
案例:
以拉勾網城市JSON文件 http://www.lagou.com/lbs/getAllCitySearchLabels.json 為例,獲取所有城市。
代碼實現:
import jsonpath
url = 'http://www.lagou.com/lbs/getAllCitySearchLabels.json'
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36"}
r = requests.get(url,headers=headers)
html = r.text
# 把json格式字符串轉換成python對象
jsonobj = json.loads(html)
# 從根節點開始,匹配name節點
citylist = jsonpath.jsonpath(jsonobj,'$..name')
print(citylist)
print(type(citylist))
#新建一個city.json文件,并設置編碼格式為utf-8
fp = open('city.json','w',encoding="utf-8")
content = json.dumps(citylist, ensure_ascii=False)
print(content)
fp.write(content)
fp.close()
運行結果:
注意事項:
json.loads() 是把 Json格式字符串解碼轉換成Python對象,如果在json.loads的時候出錯,要注意被解碼的Json字符的編碼。
如果傳入的字符串的編碼不是UTF-8的話,需要指定字符編碼的參數 encoding
dataDict = json.loads(jsonStrGBK);
dataJsonStr是JSON字符串,假設其編碼本身是非UTF-8的話而是GBK 的,那么上述代碼會導致出錯,改為對應的:
dataDict = json.loads(jsonStrGBK, encoding="GBK");
如果 dataJsonStr通過encoding指定了合適的編碼,但是其中又包含了其他編碼的字符,則需要先去將dataJsonStr轉換為Unicode,然后再指定編碼格式調用json.loads()
dataJsonStrUni = dataJsonStr.decode("GB2312");
dataDict = json.loads(dataJsonStrUni, encoding="GB2312");
這個依賴很重要,我們將圍繞fastjson中的JSONObject這個類來談轉換
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.15</version> </dependency>
String json = "{\"abc\":\"1\",\"hahah\":\"2\"}"; JSONObject jsonObject = JSONObject.parseObject(content); 一句話就能解決,非常便捷。 想要取出值,可以對`jsonObject`進行操作: jsonObject.getString("abc"); 結果為:`1`
List<String> list = new ArrayList<String>(); list.add("username"); list.add("age"); list.add("sex"); JSONArray array = new JSONArray(); array.add(list);
Map<String, String> map = new HashMap<String, String>(); map.put("abc", "abc"); map.put("def", "efg"); JSONArray array_test = new JSONArray(); array_test.add(map); JSONObject jsonObject = JSONObject.fromObject(map);
特別注意:從JSONObject中取值,碰到了數字為key的時候,如
{ "userAnswer": { "28568": { "28552": { "qId": "28552", "order": "1", "userScore": { "score": 100 }, "answer": { "28554": "28554" }, "qScore": "100.0", "qtype": "SingleChoice", "sId": "28568" } } }, "paperType": "1", "paperOid": "28567", "instanceId": 30823, "remainingTime": -1, "examOid": "28570" } 獲取“userAnswer”的value,再轉成JSON,可仿照如下形式: JSONObject userJson = JSONObject.parseObject(jsonObject.getString("userAnswer")); 但是想獲取key"28568"就沒這么容易了。直接像上述的寫法,會報錯。 我們瀏覽fastjson中的源碼,總結下,應該如下寫: JSONObject question = (JSONObject) JSONObject.parseObject(section.getString("28568"), Object.class);
整體代碼:
*請認真填寫需求信息,我們會在24小時內與您取得聯系。