RPC全稱Remote Procedure Call,即遠程過程調用,對于調用者無感知這是一個遠程調用功能。目前流行的開源RPC 框架有阿里的Dubbo、google 的 gRPC、Twitter 的Finagle 等。本次RPC框架的設計主要參考的是阿里的Dubbo,這里Netty 基本上是作為架構的技術底層而存在的,主要完成高性能的網絡通信,從而實現高效的遠程調用。
Dubbo的架構與Spring
其實在之前的文章中《談談京東的服務框架》,探討過Dubbo的組成和架構。
另外使用Dubbo最方便的地方在于它可以和Spring非常方便的集成,Dubbo對于配置的優化也是隨著Spring一脈相承的,從最早的XML形式到后來的注解方式以及自動裝配,都是在不斷地簡化開發過程來提高開發效率。
Dubbo在Spring框架中的工作流程:
1、Spring的IOC容器啟動
2、把服務注冊到注冊中心(zookeeper軟件)中
3、消費者啟動時會把它需要用到的服務從注冊中心拉取下來
4、提供者的地址發生改變時,注冊中心會馬上通知消費者
5、根據注冊中心中的服務地址直接就可以調用提供者了,如果調用了提供者,就會把提供者的地址主動緩存起來
6、監控消費者調用提供者的次數
RPC實現的關鍵
1、序列化與反序列化
在遠程過程調用時,客戶端跟服務端是不同的進程,甚至有時候客戶端用JAVA,服務端用C++。這時候就需要客戶端把參數先轉成一個字節流,傳給服務端后,再把字節流轉成自己能讀取的格式,這個過程叫序列化和反序列化,同理,從服務端返回的值也需要序列化反序列化的過程。在序列化的時候,我們選擇Netty自身的對象序列化器。
2、數據網絡傳輸
解決了序列化的問題,那么剩下的就是如何把數據參數傳到生產者,網絡傳輸層需要把序列化后的參數字節流傳給服務端,然后再把序列化后的調用結果傳回客戶端,雖然大部分RPC框架都采用了TCP作為傳輸協議,其實UDP也可以作為傳輸協議的,基于TCP和UDP我們可以自定義任意規則的協議,加之我們要使用NIO通信方式作為高性能網絡服務的前提,于是Netty似乎更符合我們Java程序員的口味,Netty真香!
3、告訴注冊中心我要調誰
現在調用參數的序列化和網絡傳輸都已經具備,但是還有個問題,那就是消費者要調用誰的問題,一個函數或者方法,我們可以理解為一個服務,這些服務注冊在注冊中心上面,只有當消費者告訴注冊中心要調用誰,才可以進行遠程調用。所以不但要把將要調用的服務的參數傳過去,也要把要調用的服務信息傳過去。
簡易RPC框架的架構
Dubbo 核心模塊主要有四個:Registry 注冊中心、Provider 服務提供者、Consumer 服務消費者、Monitor監控,為了方便直接砍掉了監控模塊,同時把服務提供者模塊與注冊中心模塊寫在一起,通過實現自己的簡易IOC容器,完成對服務提供者的實例化。
關于使用Netty進行Socket編程的部分可以參考Netty的官網 或者我之前的博客《Netty編碼實戰與Channel生命周期》,在這里Netty的編碼技巧和方式不作為本文的重點。
RPC框架編碼實現
首先需要引入的依賴如下(Netty + Lombok):
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.8</version>
</dependency>
1、Registry與Provider
目錄結構如下:
───src
└─main
├─java
│ └─edu
│ └─xpu
│ └─rpc
│ ├─api
│ │ IRpcCalc.java
│ │ IRpcHello.java
│ │
│ ├─core
│ │ InvokerMessage.java
│ │
│ ├─provider
│ │ RpcCalcProvider.java
│ │ RpcHelloProvider.java
│ │
│ └─registry
│ MyRegistryHandler.java
│ RpcRegistry.java
│
└─resources
───pom.xml
IRpcCalc.java與IRpcHello.java是兩個Service接口。IRpcCalc.java內容如下,完成模擬業務加、減、乘、除運算
public interface IRpcCalc {
// 加
int add(int a, int b);
// 減
int sub(int a, int b);
// 乘
int mul(int a, int b);
// 除
int div(int a, int b);
}
IRpcHello.java,測試服務是否可用:
public interface IRpcHello {
String hello(String name);
}
至此API 模塊就定義完成了,非常簡單的兩個接口。接下來,我們要確定傳輸規則,也就是傳輸協議,協議內容當然要自定義,才能體現出Netty 的優勢。
設計一個InvokerMessage類,里面包含了服務名稱、調用方法、參數列表、參數值,這就是我們自定義協議的協議包:
@Data
public class InvokerMessage implements Serializable {
private String className; // 服務名稱
private String methodName; // 調用哪個方法
private Class<?>[] params; // 參數列表
private Object[] values; // 參數值
}
通過定義這樣的協議類,就能知道我們需要調用哪個服務,服務中的哪個方法,方法需要傳遞的參數列表(參數類型+參數值),這些信息正確傳遞過去了才能拿到正確的調用返回值。
接下來創建這兩個服務的具體實現類,IRpcHello的實現類如下:
public class RpcHelloProvider implements IRpcHello {
public String hello(String name) {
return "Hello, " + name + "!";
}
}
IRpcCalc的實現類如下:
public class RpcCalcProvider implements IRpcCalc {
@Override
public int add(int a, int b) {
return a + b;
}
@Override
public int sub(int a, int b) {
return a - b;
}
@Override
public int mul(int a, int b) {
return a * b;
}
@Override
public int div(int a, int b) {
return a / b;
}
}
Registry 注冊中心主要功能就是負責將所有Provider的服務名稱和服務引用地址注冊到一個容器中(這里為了方便直接使用接口類名作為服務名稱,前提是假定我們每個服務只有一個實現類),并對外發布。Registry 應該要啟動一個對外的服務,很顯然應該作為服務端,并提供一個對外可以訪問的端口。先啟動一個Netty服務,創建RpcRegistry 類,RpcRegistry.java的具體代碼如下:
public class RpcRegistry {
private final int port;
public RpcRegistry(int port){
this.port = port;
}
public void start(){
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NIOServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 處理拆包、粘包的編解碼器
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 處理序列化的編解碼器
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
// 自己的業務邏輯
pipeline.addLast(new MyRegistryHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true); // 設置長連接
ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
System.out.println("RPC Registry start listen at " + this.port);
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new RpcRegistry(8080).start();
}
}
接下來只需要實現我們自己的Handler即可,創建MyRegistryHandler.java,內容如下:
public class MyRegistryHandler extends ChannelInboundHandlerAdapter {
// 在注冊中心注冊服務需要有容器存放
public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<>();
// 類名的緩存位置
private static final List<String> classCache = new ArrayList<>();
// 約定,只要是寫在provider下所有的類都認為是一個可以對完提供服務的實現類
// edu.xpu.rpc.provider
public MyRegistryHandler(){
scanClass("edu.xpu.rpc.provider");
doRegister();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = new Object();
// 客戶端傳過來的調用信息
InvokerMessage request = (InvokerMessage)msg;
// 先判斷有沒有這個服務
String serverClassName = request.getClassName();
if(registryMap.containsKey(serverClassName)){
// 獲取服務對象
Object clazz = registryMap.get(serverClassName);
Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParams());
result = method.invoke(clazz, request.getValues());
System.out.println("request=" + request);
System.out.println("result=" + result);
}
ctx.writeAndFlush(result);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
// 實現簡易IOC容器
// 掃描出包里面所有的Class
private void scanClass(String packageName){
ClassLoader classLoader = this.getClass().getClassLoader();
URL url = classLoader.getResource(packageName.replaceAll("\.", "/"));
File dir = new File(url.getFile());
File[] files = dir.listFiles();
for (File file: files){
if(file.isDirectory()){
scanClass(packageName + "." + file.getName());
}else{
// 拿出類名
String className = packageName + "." + file.getName().replace(".class", "").trim();
classCache.add(className);
}
}
}
// 把掃描到的Class實例化,放到Map中
// 注冊的服務名稱就叫做接口的名字 [約定優于配置]
private void doRegister(){
if(classCache.size() == 0) return;
for (String className: classCache){
try {
Class<?> clazz = Class.forName(className);
// 服務名稱
Class<?> anInterface = clazz.getInterfaces()[0];
registryMap.put(anInterface.getName(), clazz.newInstance());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
在這里還通過反射實現了簡易的IOC容器,先遞歸掃描provider包底下的類,把這些類的對象作為服務對象放到IOC容器中進行管理,由于IOC是一個Map實現的,所以將類名作為服務名稱,也就是Key,服務對象作為Value。根據消費者傳過來的服務名稱,就可以找到對應的服務,到此,Registry和Provider已經全部寫完了。
2、consumer
目錄結構如下:
└─src
├─main
│ ├─java
│ │ └─edu
│ │ └─xpu
│ │ └─rpc
│ │ ├─api
│ │ │ IRpcCalc.java
│ │ │ IRpcHello.java
│ │ │
│ │ ├─consumer
│ │ │ │ RpcConsumer.java
│ │ │ │
│ │ │ └─proxy
│ │ │ RpcProxy.java
│ │ │ RpcProxyHandler.java
│ │ │
│ │ └─core
│ │ InvokerMessage.java
│ │
│ └─resources
└─test
└─java
└─ pom.xml
在看客戶端的實現之前,先梳理一下RPC流程。API 模塊中的接口只在服務端實現了。因此,客戶端調用API 中定義的某一個接口方法時,實際上是要發起一次網絡請求去調用服務端的某一個服務。而這個網絡請求首先被注冊中心接收,由注冊中心先確定需要調用的服務的位置,再將請求轉發至真實的服務實現,最終調用服務端代碼,將返回值通過網絡傳輸給客戶端。整個過程對于客戶端而言是完全無感知的,就像調用本地方法一樣,所以必定要對客戶端的API接口做代理,隱藏網絡請求的細節。
由上圖的流程圖可知,要讓用戶調用無感知,必須創建出代理類來完成網絡請求的操作。
RpcProxy.java如下:
public class RpcProxy {
public static <T> T create(Class<?> clazz) {
//clazz傳進來本身就是interface
MethodProxy proxy = new MethodProxy(clazz);
T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz} , proxy);
return result;
}
private static class MethodProxy implements InvocationHandler {
private Class<?> clazz;
public MethodProxy(Class<?> clazz) {
this.clazz = clazz;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 如果傳進來是一個已實現的具體類
if (Object.class.equals(method.getDeclaringClass())) {
try {
return method.invoke(this, args);
} catch (Throwable t) {
t.printStackTrace();
}
// 如果傳進來的是一個接口(核心)
} else {
return rpcInvoke(method, args);
}
return null;
}
// 實現接口的核心方法
public Object rpcInvoke(Method method, Object[] args) {
// 傳輸協議封裝
InvokerMessage invokerMessage = new InvokerMessage();
invokerMessage.setClassName(this.clazz.getName());
invokerMessage.setMethodName(method.getName());
invokerMessage.setValues(args);
invokerMessage.setParams(method.getParameterTypes());
final RpcProxyHandler consumerHandler = new RpcProxyHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定義協議編碼器
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
//對象參數類型編碼器
pipeline.addLast("encoder", new ObjectEncoder());
//對象參數類型解碼器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler", consumerHandler);
}
});
ChannelFuture future = bootstrap.connect("localhost", 8080).sync();
future.channel().writeAndFlush(invokerMessage).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
return consumerHandler.getResponse();
}
}
}
我們通過傳進來的接口對象,獲得了要調用的服務名,服務方法名,參數類型列表,參數列表,這樣就把自定義的RPC協議包封裝好了,只需要把協議包發出去等待結果返回即可,所以為了接收返回值數據還需要自定義一個接收用的Handler,RpcProxyHandlerdiamante如下:
public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
private Object result;
public Object getResponse() {
return result;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception is general");
}
}
這樣就算是完成了整個流程,下面開始測試一下吧,測試的RpcConsumer.java代碼如下:
public class RpcConsumer {
public static void main(String[] args) {
// 本機之間的正常調用
// IRpcHello iRpcHello = new RpcHelloProvider();
// iRpcHello.hello("Tom");
// 肯定是用動態代理來實現的
// 傳給它接口,返回一個接口的實例,偽代理
IRpcHello rpcHello = RpcProxy.create(IRpcHello.class);
System.out.println(rpcHello.hello("ZouChangLin"));
int a = 10;
int b = 5;
IRpcCalc iRpcCalc = RpcProxy.create(IRpcCalc.class);
System.out.println(String.format("%d + %d = %d", a, b, iRpcCalc.add(a, b)));
System.out.println(String.format("%d - %d = %d ", a, b, iRpcCalc.sub(a, b)));
System.out.println(String.format("%d * %d = %d", a, b, iRpcCalc.mul(a, b)));
System.out.println(String.format("%d / %d = %d", a, b, iRpcCalc.div(a, b)));
}
}
3、效果測試
先開啟Registry,運行端口是8080:
開啟consumer開始調用
調用完成后可以看到調用結果正確,并且在Registry這邊也看到了日志:
可以發現,簡易RPC框架順利完工!
作者:zchanglin
鏈接:
https://juejin.cn/post/6948351262668636174
來源:掘金