日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

1. 前言

對于Flink中各個組件(JobMaster、TaskManager、Dispatcher等),其底層RPC框架基于Akka實現,本文著重分析Flink中的Rpc框架實現機制及梳理其通信流程。

 

2. Akka介紹

由于Flink底層Rpc是基于Akka實現,我們先了解下Akka的基本使用。

 

Akka是一個開發并發、容錯和可伸縮應用的框架。它是Actor Model的一個實現,和Erlang的并發模型很像。在Actor模型中,所有的實體被認為是獨立的actors。actors和其他actors通過發送異步消息通信。Actor模型的強大來自于異步。它也可以顯式等待響應,這使得可以執行同步操作。但是,強烈不建議同步消息,因為它們限制了系統的伸縮性。每個actor有一個郵箱(mailbox),它收到的消息存儲在里面。另外,每一個actor維護自身單獨的狀態。一個Actors網絡如下所示:

Java架構-還不了解Flink底層RPC使用的框架和原理?那就認真看完

 

每個actor是一個單一的線程,它不斷地從其郵箱中poll(拉取)消息,并且連續不斷地處理。對于已經處理過的消息的結果,actor可以改變它自身的內部狀態或者發送一個新消息或者孵化一個新的actor。盡管單個的actor是自然有序的,但一個包含若干個actor的系統卻是高度并發的并且極具擴展性的。因為那些處理線程是所有actor之間共享的。這也是我們為什么不該在actor線程里調用可能導致阻塞的“調用”。因為這樣的調用可能會阻塞該線程使得他們無法替其他actor處理消息。

 

2.1. 創建Akka系統

Akka系統的核心ActorSystem和Actor,若需構建一個Akka系統,首先需要創建ActorSystem,創建完ActorSystem后,可通過其創建Actor(注意:Akka不允許直接new一個Actor,只能通過 Akka 提供的某些 API 才能創建或查找 Actor,一般會通過 ActorSystem#actorOf和ActorContext#actorOf來創建 Actor),另外,我們只能通過ActorRef(Actor的引用, 其對原生的 Actor 實例做了良好的封裝,外界不能隨意修改其內部狀態)來與Actor進行通信。如下代碼展示了如何配置一個Akka系統。

// 1. 構建ActorSystem
// 使用缺省配置
ActorSystem system = ActorSystem.create("sys");
// 也可顯示指定Appsys配置
// ActorSystem system1 = ActorSystem.create("helloakka", ConfigFactory.load("appsys"));
// 2. 構建Actor,獲取該Actor的引用,即ActorRef
ActorRef helloActor = system.actorOf(Props.create(HelloActor.class), "helloActor");
// 3. 給helloActor發送消息
helloActor.tell("hello helloActor", ActorRef.noSender());
// 4. 關閉ActorSystem
system.terminate();

在Akka中,創建的每個Actor都有自己的路徑,該路徑遵循 ActorSystem 的層級結構,大致如下:

 

本地:akka://sys/user/helloActor
遠程:akka.tcp://sys@l27.0.0.1:2020/user/remoteActor

其中本地路徑含義如下:

 

  • sys,創建的ActorSystem的名字;
  • user,通過ActorSystem#actorOf和ActorContext#actorOf 方法創建的 Actor 都屬于/user下,與/user對應的是/system, 其是系統層面創建的,與系統整體行為有關,在開發階段并不需要對其過多關注
  • helloActor,我們創建的HelloActor

其中遠程部分路徑含義如下:

  • akka.tcp,遠程通信方式為tcp;
  • sys@127.0.0.1:2020,ActorSystem名字及遠程主機ip和端口號。

 

2.2. 根據path獲取Actor

若提供了Actor的路徑,可以通過路徑獲取到ActorRef,然后與之通信,代碼如下所示:

ActorSystem system = ActorSystem.create("sys");
ActorSelection as = system.actorSelection("/path/to/actor");
Timeout timeout = new Timeout(Duration.create(2, "seconds"));
Future<ActorRef> fu = as.resolveOne(timeout);
fu.onSuccess(new OnSuccess<ActorRef>() {
 @Override
public void onSuccess(ActorRef actor) {
 System.out.println("actor:" + actor);
 actor.tell("hello actor", ActorRef.noSender());
 }
}, system.dispatcher());
fu.onFailure(new OnFailure() {
 @Override
public void onFailure(Throwable failure) {
 System.out.println("failure:" + failure);
 }
}, system.dispatcher());

由上面可知,若需要與遠端Actor通信,路徑中必須提供ip:port。

 

2.3. 與Actor通信

2.3.1. tell方式

當使用tell方式時,表示僅僅使用異步方式給某個Actor發送消息,無需等待Actor的響應結果,并且也不會阻塞后續代碼的運行,如:

helloActor.tell("hello helloActor", ActorRef.noSender());

其中:第一個參數為消息,它可以是任何可序列化的數據或對象,第二個參數表示發送者,通常來講是另外一個 Actor 的引用, ActorRef.noSender()表示無發送者((實際上是一個 叫做deadLetters的Actor)。

 

2.3.2. ask方式

當我們需要從Actor獲取響應結果時,可使用ask方法,ask方法會將返回結果包裝在scala.concurrent.Future中,然后通過異步回調獲取返回結果。如調用方:

// 異步發送消息給Actor,并獲取響應結果
Future<Object> fu = Patterns.ask(printerActor, "hello helloActor", timeout);
fu.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, String success) throws Throwable {
if (failure != null) {
 System.out.println("failure is " + failure);
 } else {
 System.out.println("success is " + success);
 }
 }
}, system.dispatcher());

HelloActor處理消息方法的代碼大致如下:

private void handleMessage(Object object) {
if (object instanceof String) {
String str = (String) object;
 log.info("[HelloActor] message is {}, sender is {}", str, getSender().path().toString());
// 給發送者發送消息
 getSender().tell(str, getSelf());
 }
 }

上面主要介紹了Akka中的ActorSystem、Actor,及與Actor的通信;Flink借此構建了其底層通信系統。

 

3. RPC類圖結構

下圖展示了Flink中RPC框架中涉及的主要類。

Java架構-還不了解Flink底層RPC使用的框架和原理?那就認真看完

 

3.1. RpcGateway

Flink的RPC協議通過RpcGateway來定義;由前面可知,若想與遠端Actor通信,則必須提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster會先啟動ActorSystem,此時TaskExecutor的Container還未分配,后面與TaskExecutor通信時,必須讓其提供對應地址,從類繼承圖可以看到基本上所有組件都實現了RpcGateway接口,其代碼如下:

public interface RpcGateway {
/**
 * Returns the fully qualified address under which the associated rpc endpoint is reachable.
 *
 * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable
 */
String getAddress();
/**
 * Returns the fully qualified hostname under which the associated rpc endpoint is reachable.
 *
 * @return Fully qualified hostname under which the associated rpc endpoint is reachable
 */
String getHostname();
}

3.2. RpcEndpoint

每個RpcEndpoint對應了一個路徑(endpointId和actorSystem共同確定),每個路徑對應一個Actor,其實現了RpcGateway接口,其構造函數如下:

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
// 保存rpcService和endpointId
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
// 通過RpcService啟動RpcServer
this.rpcServer = rpcService.startServer(this);
// 主線程執行器,所有調用在主線程中串行執行
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}

在RpcEndpoint中還定義了一些方法如runAsync(Runnable)、callAsync(Callable, Time)方法來執行Rpc調用,值得注意的是在Flink的設計中,對于同一個Endpoint,所有的調用都運行在主線程,因此不會有并發問題,當啟動RpcEndpoint/進行Rpc調用時,其會委托RcpServer進行處理。

 

3.3. RpcService

Rpc服務的接口,其主要作用如下:

 

根據提供的RpcEndpoint來啟動RpcServer(Actor);

根據提供的地址連接到RpcServer,并返回一個RpcGateway;

延遲/立刻調度Runnable、Callable;

停止RpcServer(Actor)或自身服務;

在Flink中其實現類為AkkaRpcService。

 

3.3.1. AkkaRpcService

AkkaRpcService中封裝了ActorSystem,并保存了ActorRef到RpcEndpoint的映射關系,在構造RpcEndpoint時會啟動指定rpcEndpoint上的RpcServer,其會根據Endpoint類型(FencedRpcEndpoint或其他)來創建不同的Actor(FencedAkkaRpcActor或AkkaRpcActor),并將RpcEndpoint和Actor對應的ActorRef保存起來,然后使用動態代理創建RpcServer,具體代碼如下:

public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
 checkNotNull(rpcEndpoint, "rpc endpoint");
 CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
final Props akkaRpcActorProps;
// 根據RpcEndpoint類型創建不同類型的Props
if (rpcEndpoint instanceof FencedRpcEndpoint) {
 akkaRpcActorProps = Props.create(
 FencedAkkaRpcActor.class,
 rpcEndpoint,
 terminationFuture,
 getVersion(),
 configuration.getMaximumFramesize());
 } else {
 akkaRpcActorProps = Props.create(
 AkkaRpcActor.class,
 rpcEndpoint,
 terminationFuture,
 getVersion(),
 configuration.getMaximumFramesize());
 }
 ActorRef actorRef;
// 同步塊,創建Actor,并獲取對應的ActorRef
synchronized (lock) {
 checkState(!stopped, "RpcService is stopped");
 actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
 actors.put(actorRef, rpcEndpoint);
 }
 LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());
// 獲取Actor的路徑
final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String hostname;
 Option<String> host = actorRef.path().address().host();
if (host.isEmpty()) {
 hostname = "localhost";
 } else {
 hostname = host.get();
 }
// 解析該RpcEndpoint實現的所有RpcGateway接口
 Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
 
// 額外添加RpcServer和AkkaBasedEnpoint類
 implementedRpcGateways.add(RpcServer.class);
 implementedRpcGateways.add(AkkaBasedEndpoint.class);
final InvocationHandler akkaInvocationHandler;
// 根據不同類型動態創建代理對象
if (rpcEndpoint instanceof FencedRpcEndpoint) {
// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
 akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
 akkaAddress,
 hostname,
 actorRef,
 configuration.getTimeout(),
 configuration.getMaximumFramesize(),
 terminationFuture,
 ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
 implementedRpcGateways.add(FencedMainThreadExecutable.class);
 } else {
 akkaInvocationHandler = new AkkaInvocationHandler(
 akkaAddress,
 hostname,
 actorRef,
 configuration.getTimeout(),
 configuration.getMaximumFramesize(),
 terminationFuture);
 }
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
 ClassLoader classLoader = getClass().getClassLoader();
// 生成RpcServer對象,而后對該server的調用都會進入Handler的invoke方法處理,handler實現了多個接口的方法
@SuppressWarnings("unchecked")
 RpcServer server = (RpcServer) Proxy.newProxyInstance(
 classLoader,
 implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
 akkaInvocationHandler);
return server;
 }

當啟動RpcServer后,即創建了相應的Actor(注意此時Actor的處于停止狀態)和動態代理對象,需要調用RpcEndpoint#start啟動啟動Actor,此時啟動RpcEndpoint流程如下(以非FencedRpcEndpoint為例):

  • 調用RpcEndpoint#start;
  • 委托給RpcServer#start;
  • 調用動態代理的AkkaInvocationHandler#invoke;發現調用的是StartStoppable#start方法,則直接進行本地方法調用;invoke方法的代碼如下:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
 Class<?> declaringClass = method.getDeclaringClass();
 Object result;
// 先匹配指定類型(handler已實現接口的方法),若匹配成功則直接進行本地方法調用;若匹配為FencedRpcGateway類型,則拋出異常(應該在FencedAkkaInvocationHandler中處理);其他則進行Rpc調用
if (declaringClass.equals(AkkaBasedEndpoint.class) ||
 declaringClass.equals(Object.class) ||
 declaringClass.equals(RpcGateway.class) ||
 declaringClass.equals(StartStoppable.class) ||
 declaringClass.equals(MainThreadExecutable.class) ||
 declaringClass.equals(RpcServer.class)) {
 result = method.invoke(this, args);
 } else if (declaringClass.equals(FencedRpcGateway.class)) {
throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +
 method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +
"fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +
"retrieve a properly FencedRpcGateway.");
 } else {
 result = invokeRpc(method, args);
 }
return result;
 }
  • 調用AkkaInvocationHandler#start;
  • 通過ActorRef#tell給對應的Actor發送消息rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());;
  • 調用AkkaRpcActor#handleControlMessage處理控制類型消息;
  • 在主線程中將自身狀態變更為Started狀態;

經過上述步驟就完成了Actor的啟動過程,Actor啟動后便可與Acto通信讓其執行代碼(如runSync/callSync等)和處理Rpc請求了。下面分別介紹處理執行代碼和處理Rpc請求;

3.3.1.1. 執行代碼

與Actor通信,通過調用runSync/callSync等方法其直接執行代碼。

下面以scheduleRunAsync方法為例分析請求Actor執行代碼流程,方法代碼如下:
public void scheduleRunAsync(Runnable runnable, long delayMillis) {
 checkNotNull(runnable, "runnable");
 checkArgument(delayMillis >= 0, "delay must be zero or greater");
// 判斷是否為本地Actor
if (isLocal) {
long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
// 向Actor發送消息runnable
 tell(new RunAsync(runnable, atTimeNanos));
 } else {
// 拋出異常,不支持遠程發送Runnable消息
throw new RuntimeException("Trying to send a Runnable to a remote actor at " +
 rpcEndpoint.path() + ". This is not supported.");
 }
 }

AkkaInvocationHandler#invoke -> AkkaInvocation#scheduleRunAsync;

 

AkkaRpcActor#handleMessage->AkkaRpcActor#handleRpcMessage,其中handleRpcMessage方法如下:

protected void handleRpcMessage(Object message) {
// 根據消息類型不同進行不同的處理
if (message instanceof RunAsync) {
 handleRunAsync((RunAsync) message);
 } else if (message instanceof CallAsync) {
 handleCallAsync((CallAsync) message);
 } else if (message instanceof RpcInvocation) {
 handleRpcInvocation((RpcInvocation) message);
 } else {
 log.warn(
"Received message of unknown type {} with value {}. Dropping this message!",
 message.getClass().getName(),
 message);
 sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message +
" of type " + message.getClass().getSimpleName() + '.'));
 }
 }

AkkaRpcActor#handleRunAsync,其代碼如下:

private void handleRunAsync(RunAsync runAsync) {
// 獲取延遲調度時間
final long timeToRun = runAsync.getTimeNanos();
final long delayNanos;
// 若為0或已經到了調度時間,則立刻進行調度
if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) {
// run immediately
try {
 runAsync.getRunnable().run();
 } catch (Throwable t) {
 log.error("Caught exception while executing runnable in main thread.", t);
 ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
 }
 }
else {
// schedule for later. send a new message after the delay, which will then be immediately executed
// 計算出延遲時間
 FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
// 重新封裝消息
 RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
final Object envelopedSelfMessage = envelopeSelfMessage(message);
// 等待指定延遲時間后給自己再發送一個消息
 getContext().system().scheduler().scheduleOnce(delay, getSelf(), envelopedSelfMessage,
 getContext().dispatcher(), ActorRef.noSender());
 }
 }

注意:當還未到調度時間時,該Actor會延遲一段時間后再次給自己發送消息;

3.3.1.2. 處理Rpc請求

當調用非AkkaInvocationHandler實現的方法時,則進行Rpc請求。

下面分析處理Rpc調用的流程。

AkkaInvocationHandler#invokeRpc,其方法如下:
private Object invokeRpc(Method method, Object[] args) throws Exception {
// 獲取方法相應的信息
String methodName = method.getName();
 Class<?>[] parameterTypes = method.getParameterTypes();
 Annotation[][] parameterAnnotations = method.getParameterAnnotations();
 Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
// 創建RpcInvocationMessage(可分為LocalRpcInvocation/RemoteRpcInvocation)
 final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args);
 Class<?> returnType = method.getReturnType();
 final Object result;
// 無返回,則使用tell方法
if (Objects.equals(returnType, Void.TYPE)) {
 tell(rpcInvocation);
 result = null;
 } else {
// execute an asynchronous call
// 有返回,則使用ask方法
 CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
 CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
// 調用返回后進行反序列化
if (o instanceof SerializedValue) {
try {
return ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
 } catch (IOException | ClassNotFoundException e) {
throw new CompletionException(
new RpcException("Could not deserialize the serialized payload of RPC method : "
 + methodName, e));
 }
 } else {
// 直接返回
return o;
 }
 });
// 若返回類型為CompletableFuture則直接賦值
if (Objects.equals(returnType, CompletableFuture.class)) {
 result = completableFuture;
 } else {
try {
// 從CompletableFuture獲取
 result = completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
 } catch (ExecutionException ee) {
throw new RpcException("Failure while obtaining synchronous RPC result.", ExceptionUtils.stripExecutionException(ee));
 }
 }
 }
return result;
 }
  • AkkaRpcActor#handleRpcInvocation,其代碼如下:
private void handleRpcInvocation(RpcInvocation rpcInvocation) {
 Method rpcMethod = null;
try {
// 獲取方法的信息
 String methodName = rpcInvocation.getMethodName();
 Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
// 在RpcEndpoint中找指定方法
 rpcMethod = lookupRpcMethod(methodName, parameterTypes);
 } catch (ClassNotFoundException e) {
 log.error("Could not load method arguments.", e);
// 異常處理
 RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e);
 getSender().tell(new Status.Failure(rpcException), getSelf());
 } catch (IOException e) {
 log.error("Could not deserialize rpc invocation message.", e);
// 異常處理
 RpcConnectionException rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e);
 getSender().tell(new Status.Failure(rpcException), getSelf());
 } catch (final NoSuchMethodException e) {
 log.error("Could not find rpc method for rpc invocation.", e);
// 異常處理
 RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
 getSender().tell(new Status.Failure(rpcException), getSelf());
 }
if (rpcMethod != null) {
try {
// this supports declaration of anonymous classes
 rpcMethod.setAccessible(true);
// 返回類型為空則直接進行invoke
if (rpcMethod.getReturnType().equals(Void.TYPE)) {
// No return value to send back
 rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
 }
else {
final Object result;
try {
 result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
 }
catch (InvocationTargetException e) {
 log.debug("Reporting back error thrown in remote procedure {}", rpcMethod, e);
// tell the sender about the failure
 getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
return;
 }
final String methodName = rpcMethod.getName();
// 方法返回類型為CompletableFuture
if (result instanceof CompletableFuture) {
final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
// 發送結果(使用Patterns發送結果給調用者,并會進行序列化并驗證結果大小)
 sendAsyncResponse(responseFuture, methodName);
 } else {
// 類型非CompletableFuture,發送結果(使用Patterns發送結果給調用者,并會進行序列化并驗證結果大小)
 sendSyncResponse(result, methodName);
 }
 }
 } catch (Throwable e) {
 log.error("Error while executing remote procedure call {}.", rpcMethod, e);
// tell the sender about the failure
 getSender().tell(new Status.Failure(e), getSelf());
 }
 }
 }
  • 將結果返回給調用者AkkaInvocationHandler#ask;

經過上述步驟就完成Rpc(本地/遠程)調用,可以看到底層也是通過Akka提供的tell/ask方法進行通信;經過上述步驟就完成Rpc(本地/遠程)調用,可以看到底層也是通過Akka提供的tell/ask方法進行通信;

4. 總結

RPC框架是Flink任務運行的基礎,Flink整個RPC框架基于Akka實現,并對Akka中的ActorSystem、Actor進行了封裝和使用,文章主要分析了Flink底層RPC通信框架的實現和相關流程,Flink整個通信框架的組件主要由RpcEndpoint、RpcService、RpcServer、AkkaInvocationHandler、AkkaRpcActor等構成。RpcEndpoint定義了一個Actor的路徑;RpcService提供了啟動RpcServer、執行代碼體等方法;RpcServer/AkkaInvocationHandler提供了與Actor通信的接口;AkkaRpcActor為Flink封裝的Actor。

分享到:
標簽:架構 Java
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定