RPC
基本概念
RPC(Remote Procedure Call)遠(yuǎn)程過程調(diào)用,簡單的理解是一個(gè)節(jié)點(diǎn)請求另一個(gè)節(jié)點(diǎn)提供的服務(wù)
本地過程調(diào)用:如果需要將本地student對象的age+1,可以實(shí)現(xiàn)一個(gè)addAge()方法,將student對象傳入,對年齡進(jìn)行更新之后返回即可,本地方法調(diào)用的函數(shù)體通過函數(shù)指針來指定。
遠(yuǎn)程過程調(diào)用:上述操作的過程中,如果addAge()這個(gè)方法在服務(wù)端,執(zhí)行函數(shù)的函數(shù)體在遠(yuǎn)程機(jī)器上,如何告訴機(jī)器需要調(diào)用這個(gè)方法呢?
- 首先客戶端需要告訴服務(wù)器,需要調(diào)用的函數(shù),這里函數(shù)和進(jìn)程ID存在一個(gè)映射,客戶端遠(yuǎn)程調(diào)用時(shí),需要查一下函數(shù),找到對應(yīng)的ID,然后執(zhí)行函數(shù)的代碼。
- 客戶端需要把本地參數(shù)傳給遠(yuǎn)程函數(shù),本地調(diào)用的過程中,直接壓棧即可,但是在遠(yuǎn)程調(diào)用過程中不再同一個(gè)內(nèi)存里,無法直接傳遞函數(shù)的參數(shù),因此需要客戶端把參數(shù)轉(zhuǎn)換成字節(jié)流,傳給服務(wù)端,然后服務(wù)端將字節(jié)流轉(zhuǎn)換成自身能讀取的格式,是一個(gè)序列化和反序列化的過程。
- 備好了之后,如何進(jìn)行傳輸?網(wǎng)絡(luò)傳輸層需要把調(diào)用的ID和序列化后的參數(shù)傳給服務(wù)端,然后把計(jì)算好的結(jié)果序列化傳給客戶端,因此TCP層即可完成上述過程,gRPC中采用的是HTTP2協(xié)議。
總結(jié)一下上述過程:
// Client端
// Student student = Call(ServerAddr, addAge, student)
1. 將這個(gè)調(diào)用映射為Call ID。
2. 將Call ID,student(params)序列化,以二進(jìn)制形式打包
3. 把2中得到的數(shù)據(jù)包發(fā)送給ServerAddr,這需要使用網(wǎng)絡(luò)傳輸層
4. 等待服務(wù)器返回結(jié)果
5. 如果服務(wù)器調(diào)用成功,那么就將結(jié)果反序列化,并賦給student,年齡更新
// Server端
1. 在本地維護(hù)一個(gè)Call ID到函數(shù)指針的映射call_id_map,可以用Map<String, Method> callIdMap
2. 等待服務(wù)端請求
3. 得到一個(gè)請求后,將其數(shù)據(jù)包反序列化,得到Call ID
4. 通過在callIdMap中查找,得到相應(yīng)的函數(shù)指針
5. 將student(params)反序列化后,在本地調(diào)用addAge()函數(shù),得到結(jié)果
6. 將student結(jié)果序列化后通過網(wǎng)絡(luò)返回給Client
- 在微服務(wù)的設(shè)計(jì)中,一個(gè)服務(wù)A如果訪問另一個(gè)Module下的服務(wù)B,可以采用HTTP REST傳輸數(shù)據(jù),并在兩個(gè)服務(wù)之間進(jìn)行序列化和反序列化操作,服務(wù)B把執(zhí)行結(jié)果返回過來。
- 由于HTTP在應(yīng)用層中完成,整個(gè)通信的代價(jià)較高,遠(yuǎn)程過程調(diào)用中直接基于TCP進(jìn)行遠(yuǎn)程調(diào)用,數(shù)據(jù)傳輸在傳輸層TCP層完成,更適合對效率要求比較高的場景,RPC主要依賴于客戶端和服務(wù)端之間建立Socket鏈接進(jìn)行,底層實(shí)現(xiàn)比REST更復(fù)雜。
創(chuàng)建三個(gè)maven項(xiàng)目
服務(wù)者
消費(fèi)者
API
讓服務(wù)者和消費(fèi)者都依賴API
在消費(fèi)者創(chuàng)建ConsumerApp類
使用代理對象
具體代碼在ProxyUtils中
public class ConsumerApp {
public static void main(String[] args) {
//while死循環(huán)是為了測試調(diào)用提供者是否為隨機(jī)
while (true) {
try {
Thread.sleep(2000);
// 獲得代理對象
AddService addService = ProxyUtils.getProxy(AddService.class);
// 只要調(diào)用方法就會進(jìn)入代理對象invoke方法
int result = addService.add(15, 684);
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
API中創(chuàng)建Request,AddService,ProxyUtils,ZkUtils
創(chuàng)建Request(該類為傳輸對象,必須實(shí)現(xiàn)序列化)
public class Request implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String interfaceName;
private String methodName;
private Object[] args;
public String getInterfaceName() {
return interfaceName;
}
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
@Override
public String toString() {
return "Request [interfaceName=" + interfaceName + ", methodName=" + methodName + ", args="
+ Arrays.toString(args) + "]";
}
}
創(chuàng)建AddService
package com.chenlei.service;
public interface AddService {
public int add(Integer a, Integer b);
}
創(chuàng)建ProxyUtils(重點(diǎn))
public class ProxyUtils {
private static Random RDM = new Random();
@SuppressWarnings("unchecked")
public static <T> T getProxy(Class<T> interfaces) {
T proxy = (T) Proxy.newProxyInstance(ProxyUtils.class.getClassLoader(), new Class<?>[] { interfaces },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if ("toString".equals(methodName)) {
return interfaces.getClass().getName() + "$Proxy";
}
if ("hashCode".equals(methodName)) {
return Object.class.hashCode();
}
if ("equals".equals(methodName)) {
return Object.class.equals(this);
}
// 消費(fèi)者發(fā)送過去
Request request = new Request();
request.setInterfaceName(interfaces.getName());
request.setMethodName(methodName);
request.setArgs(args);
// 找到interfaces下的所有節(jié)點(diǎn)
List<String> serverList = ZkUtils.discover(interfaces.getName());
String one = randomOne(serverList);// 拿到的結(jié)果為ip:port 如127.0.0.1:8888
String[] split = one.split(":");
String address = split[0];
Integer port = Integer.valueOf(split[1]);
Socket socket = null;
// 打開書出管道,發(fā)送請求
Object result = null;
OutputStream outputStream = null;
ObjectOutputStream objectOutputStream = null;
InputStream inputStream = null;
ObjectInputStream objectInputStream = null;
try {
socket = new Socket(address, port);
outputStream = socket.getOutputStream();
objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(request);
inputStream = socket.getInputStream();
objectInputStream = new ObjectInputStream(inputStream);
result = objectInputStream.readObject();
System.out.println("本次調(diào)用的是======" + port);
} catch (Exception e) {
e.printStackTrace();
} finally {
closeResources(objectInputStream, inputStream, objectOutputStream, outputStream, socket);
}
return result;
}
});
return proxy;
}
/**
* 從節(jié)點(diǎn)中隨機(jī)找出一個(gè)
*
* @param serverList
* @return
*/
private static String randomOne(List<String> serverList) {
if (null == serverList || 0 == serverList.size()) {
return null;
}
int index = RDM.nextInt(serverList.size());
return serverList.get(index);
}
/**
* 關(guān)閉資源的方法
*/
public static void closeResources(Closeable... resources) {
for (Closeable resource : resources) {
if (null != resource) {
try {
resource.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
resource = null;
}
}
}
}
}
創(chuàng)建ZkUtils(zookeeper注冊和發(fā)現(xiàn),另加緩存解決臟讀)
在API項(xiàng)目中導(dǎo)入zkclient的依賴
public class ZkUtils {
private static final String ZK_URL = "自己的域名:2181";
private static ZkClient zkClient = null;
//創(chuàng)建zookeeper緩存
private static Map<String, List<String>> cache = new HashMap<String, List<String>>();
static {
zkClient = new ZkClient(ZK_URL, 10000, 10000);
}
/**
* 服務(wù)節(jié)點(diǎn)向zookeeper的注冊
*
* @param serverName
* @param serverPort
*/
public static void register(String serverName, String serverPort) {
if (null == serverName || "".equals(serverName)) {
throw new RuntimeException("服務(wù)名不能為空");
}
if (null == serverPort || "".equals(serverPort)) {
throw new RuntimeException("服務(wù)ip和端口不能為空");
}
if (!zkClient.exists("/" + serverName)) {
zkClient.createPersistent("/" + serverName);
}
if (!zkClient.exists("/" + serverName + "/" + serverPort)) {
zkClient.createEphemeral("/" + serverName + "/" + serverPort);
}
System.out.println("注冊一個(gè)服務(wù)節(jié)點(diǎn)為" + "/" + serverName + "/" + serverPort);
}
/**
* 向zookeeper發(fā)現(xiàn)服務(wù)節(jié)點(diǎn)
*
* @param serverName
* @return
*/
public static List<String> discover(String serverName) {
if (null == serverName || "".equals(serverName)) {
throw new RuntimeException("服務(wù)名不能為空");
}
// 先從緩存里找
if (cache.containsKey(serverName)) {
System.out.println("在緩存中找到" + serverName + "節(jié)點(diǎn)");
}
// 如果該節(jié)點(diǎn)在zookeeper中不存在,直接返回空
if (!zkClient.exists("/" + serverName)) {
return null;
}
zkClient.subscribeChildChanges("/" + serverName, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
// 一旦進(jìn)入此方法,證明有節(jié)點(diǎn)改變
cache.put(serverName, currentChilds);
System.out.println(serverName + "節(jié)點(diǎn)有變化-----" + "緩存完成更新");
}
});
return zkClient.getChildren("/" + serverName);
}
}
寫提供者代碼
創(chuàng)建AddServiceImpl
注意類名最好是AddService+Impl,并且類全路徑也要對應(yīng)com.chenlei.service.impl.AddServiceImpl,否則代碼需要調(diào)整
package com.chenlei.service.impl;
import com.chenlei.service.AddService;
public class AddServiceImpl implements AddService {
@Override
public int add(Integer a, Integer b) {
return a + b;
}
}
創(chuàng)建ProviderApp(重點(diǎn))
public class ProviderApp {
public static void main(String[] args) {
Integer port = 7777;
ServerSocket serverSocket = bind(port);
// 向zookeeper注冊
ZkUtils.register(AddService.class.getName(), "127.0.0.1" + ":" + port);
// 監(jiān)聽+處理請求
listener(serverSocket);
}
/**
* 監(jiān)聽和處理請求
*
* @param serverSocket
*/
private static void listener(ServerSocket serverSocket) {
//此處死循環(huán)是為了讓次提供者一直處于工作狀態(tài)
while (true) {
Socket socket = null;
InputStream inputStream = null;
ObjectInputStream objectInputStream = null;
OutputStream outputStream = null;
ObjectOutputStream objectOutputStream = null;
try {
socket = serverSocket.accept();
inputStream = socket.getInputStream();
objectInputStream = new ObjectInputStream(inputStream);
Request request = (Request) objectInputStream.readObject();
Object answer = invoker(request);
outputStream = socket.getOutputStream();
objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(answer);
} catch (Exception e) {
e.printStackTrace();
} finally {
ProxyUtils.closeResources(objectOutputStream, outputStream, objectInputStream, inputStream, socket);
}
}
}
/**
* 處理請求返回結(jié)果
*
* @param request
* @return
*/
private static Object invoker(Request request) {
// 獲得從消費(fèi)者傳過來的信息
String interfaceName = request.getInterfaceName();
String methodName = request.getMethodName();
Object[] args = request.getArgs();
// 獲得對應(yīng)實(shí)現(xiàn)類全名
String className = getClassNameByInterfaceName(interfaceName);
Object answer = null;
try {
// 找到該類
Class<?> clazz = Class.forName(className);
// 創(chuàng)建一個(gè)對象
Object object = clazz.newInstance();
Class<?>[] argsType = new Class<?>[args.length];
if (null != args || 0 != args.length) {
for (int i = 0; i < args.length; i++) {
argsType[i] = args[i].getClass();
}
}
Method method = clazz.getMethod(methodName, argsType);
answer = method.invoke(object, args);
} catch (Exception e) {
e.printStackTrace();
}
return answer;
}
/**
* 通過請求者傳來的類信息,獲得對應(yīng)實(shí)現(xiàn)類的所有信息,并返回實(shí)現(xiàn)類的全名
*
* @param interfaceName
* @return
*/
private static String getClassNameByInterfaceName(String interfaceName) {
// 傳過來的接口名為com.chenlei.service.AddService
int index = interfaceName.lastIndexOf(".");
StringBuilder sb = new StringBuilder();
// com.chenlei.service
sb.append(interfaceName.subSequence(0, index));
// com.chenlei.service.impl.
sb.append(".impl.");
// com.chenlei.service.impl.AddService
sb.append(interfaceName.substring(index + 1)).append("Impl");
return sb.toString();
}
/**
* 綁定一個(gè)端口
*
* @param port
* @return
*/
private static ServerSocket bind(Integer port) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
} catch (IOException e) {
e.printStackTrace();
}
return serverSocket;
}
/**
* 測試代碼
*/
//public static void main(String[] args) {
// String interfaceName = "com.chenlei.service.AddService";
// int index = interfaceName.lastIndexOf(".");
// StringBuilder sb = new StringBuilder();
// // com.chenlei.service
// sb.append(interfaceName.subSequence(0, index));
// // com.chenlei.service.impl.
// sb.append(".impl.");
// // com.chenlei.service.impl.AddService
// sb.append(interfaceName.substring(index + 1)).append("Impl");
// System.out.println(sb.toString());
//}
}
更改提供者端口,分別啟動三個(gè)提供者
再啟動消費(fèi)者,查看結(jié)果
其他錯(cuò)誤和注意事項(xiàng)
寫代碼思路