flink rpc akka 源代码刨析

2020-04-28  本文已影响0人  邵红晓

注意:Flink内部节点之间的通信是用Akka,比如JobManager和TaskManager之间的通信。而operator之间的数据传输是利用NettyFlink uses Akka for RPC between components (JobManager/TaskManager/ResourceManager). Flink does not use Akka for data transport.

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
        this.rpcService = checkNotNull(rpcService, "rpcService");
        this.endpointId = checkNotNull(endpointId, "endpointId");

        this.rpcServer = rpcService.startServer(this);
The main thread executor to be used to execute future callbacks in the main thread
 of the executing rpc server
        this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
    }

注意:The main thread executor to be used to execute future callbacks in the main thread of the executing rpc server
对于同一个flink的 rpcEndpoint(actor) 调用都是在同一个主线里串行执行,因此不会有并发问题

RpcServer
AkkaRpcService implements RpcService
AkkaRpcService #startServer 启动了akka actor

if (rpcEndpoint instanceof FencedRpcEndpoint) {
            akkaRpcActorProps = Props.create(
                FencedAkkaRpcActor.class,
                rpcEndpoint,
                terminationFuture,
                getVersion(),
                configuration.getMaximumFramesize());
        } else {
            akkaRpcActorProps = Props.create(
                AkkaRpcActor.class,
                rpcEndpoint,
                terminationFuture,
                getVersion(),
                configuration.getMaximumFramesize());
        }
        ActorRef actorRef;
        synchronized (lock) {
            checkState(!stopped, "RpcService is stopped");
注意:创建akka actor,利用父actor上下文创建子actor
            actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
            actors.put(actorRef, rpcEndpoint);
        }

创建FencedAkkaRpcActor(需要验证rpc tocken,The rpc is then only executed if the attached fencing token equals the endpoint's own token)或AkkaRpcActor(Akka rpc actor which receivesAkka rpc actor which receives,不需要验证)

AkkaInvocationHandler implements InvocationHandler反射实现类
AkkaInvocationHandler # invoke(),真正执行rpc远程调用逻辑,有
1.Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()))异步调用方式有返回值
2.rpcEndpoint.tell(message, ActorRef.noSender())无返回值调用

    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
            .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage) 握手消息
            .match(ControlMessages.class, this::handleControlMessage) 控制起停消息
            .matchAny(this::handleMessage)
            .build();
    }
private void handleMessage(final Object message) {
        if (state.isRunning()) {
            mainThreadValidator.enterMainThread();
            try {
                handleRpcMessage(message);
            } finally {
                mainThreadValidator.exitMainThread();
            }
        } else {
            log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
                rpcEndpoint.getClass().getName(),
                message.getClass().getName());

            sendErrorIfSender(new AkkaRpcException(
                String.format("Discard message, because the rpc endpoint %s has not been started yet.", rpcEndpoint.getAddress())));
        }
    }

1.handleMessage真正消息处理端
handleRpcInvocation((RpcInvocation) message);->
result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
同样执行的是动态代理调用方式
2.mainThreadValidator.enterMainThread(),这里会获取RpcEndpoint,的主线程并且进入,使用主线程进行消息处理

总结:

akka如何解决多线程问题并发,数据一致性问题?

递归调用处理mailbox消息
/**
   * Process the messages in the mailbox
   */
  @tailrec private final def processMailbox(
      left: Int = java.lang.Math.max(dispatcher.throughput, 1),
      deadlineNs: Long =
        if (dispatcher.isThroughputDeadlineTimeDefined)
          System.nanoTime + dispatcher.throughputDeadlineTime.toNanos
        else 0L): Unit =
    if (shouldProcessMessage) {
      val next = dequeue()
      if (next ne null) {
        if (Mailbox.debug) println(actor.self + " processing message " + next)
        actor.invoke(next)
        if (Thread.interrupted())
          throw new InterruptedException("Interrupted while processing actor messages")
        processAllSystemMessages()
        if ((left > 1) && (!dispatcher.isThroughputDeadlineTimeDefined || (System.nanoTime - deadlineNs) < 0))
          processMailbox(left - 1, deadlineNs)
      }
    }

是否有线程正在调度执行该MailBox的任务,若没有则去更改状态为以调度,直到被其他线程抢占或者更改成功
@tailrec
  final def setAsScheduled(): Boolean = {  
    val s = currentStatus
    /*
     * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with
     * Scheduled bit already set.
     */
    if ((s & shouldScheduleMask) != Open) false
    else updateStatus(s, s | Scheduled) || setAsScheduled()
  }


示例代码

package org.apache.flink.runtime.rpc;

import akka.actor.ActorSystem;
import akka.actor.Terminated;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author shao.hongxiao
 */
public class RpcTest {
    private static final Time TIMEOUT = Time.seconds(10L);
    private static ActorSystem actorSystem = null;
    private static RpcService rpcService = null;

    // 定义通信协议
    public interface HelloGateway extends RpcGateway {
        String hello();
    }

    public interface HiGateway extends RpcGateway {
        String hi();
    }

    // 具体实现
    public static class HelloRpcEndpoint extends RpcEndpoint implements HelloGateway {
        protected HelloRpcEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public String hello() {
            return "hello";
        }
    }

    public static class HiRpcEndpoint extends RpcEndpoint implements HiGateway {
        protected HiRpcEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public String hi() {
            return "hi";
        }
    }

    @BeforeClass
    public static void setup() {
        actorSystem = AkkaUtils.createDefaultActorSystem();
        // 创建 RpcService, 基于 AKKA 的实现
        rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterClass
    public static void teardown() throws Exception {

        final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService();
        final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());

        FutureUtils
            .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
            .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Test
    public void test() throws Exception {
        HelloRpcEndpoint helloEndpoint = new HelloRpcEndpoint(rpcService);
        HiRpcEndpoint hiEndpoint = new HiRpcEndpoint(rpcService);

        helloEndpoint.start();
        //获取 endpoint 的 self gateway
        HelloGateway helloGateway = helloEndpoint.getSelfGateway(HelloGateway.class);
        String hello = helloGateway.hello();
        System.out.println(hello);

        hiEndpoint.start();
        // 通过 endpoint 的地址获得代理
        HiGateway hiGateway = rpcService.connect(hiEndpoint.getAddress(),HiGateway.class).get();
        String hi = hiGateway.hi();
        System.out.println(hi);
    }
}

import akka.actor.{Actor, ActorSystem, Props}
import akka.event.Logging
import akka.pattern.Patterns

import scala.util.{Failure, Success}

trait Action{
  val message: String
  val time: Int
}
case class TurnOnLight(time: Int) extends Action {   // 开灯消息
  val message = "Turn on the living room light"
}
case class BoilWater(time: Int) extends Action {   // 烧水消息
  val message = "Burn a pot of water"
}
class RobotActor extends Actor {
  val log = Logging(context.system, this)
  def receive: Receive = { //机器人接受指令
    case t: TurnOnLight => log.info(s"${t.message} after ${t.time} hour")
    case b: BoilWater => log.info(s"${b.message} after ${b.time} hour")
    case s:String  => log.info(s"I can not handle this message: ${s}")
  }
}

/**
  * https://scala.cool/tags/Akka/
  * https://cloud.tencent.com/developer/article/1460210
  *
  * 对并发模型进行了更高的抽象
  * 异步、非阻塞、高性能的事件驱动编程模型
  * 轻量级事件处理(1GB内存可容纳百万级别个Actor)
  *JVM中的Actor有以下几个特点:
  * 每个Actor都有对应一个邮箱
  * Actor是串行处理消息的
  * Actor中的消息是不可变的
  *
  * akka解决多线程问题
  *  1. 数据共享,锁的问题
  * 我们一开始说过并发导致最大的问题就是对共享数据的操作,我们在面对并发问题时多采用的是用锁去保证共享数据的一致性,
  * 但这同样也会带来其他相关问题,比如要去考虑锁的粒度(对方法,程序块等),锁的形式(读锁,写锁等)等问题,
  * 这些问题对并发程序来说是至关重要的,但一个初写并发程序的程序员来说,往往不能掌控的很好,这无疑给程序员在编程上提高了复杂性,
  * 而且还不容易掌控,但使用Actor就不导致这些问题,首先Actor的消息特性就觉得了在与Actor通信上不会有共享数据的困扰,
  * 另外在Actor内部是串行处理消息的,同样不会对Actor内的数据造成污染,用Actor编写并发程序无疑大大降低了编码的复杂度。
  */
object Demo {
  def main(args: Array[String]): Unit = {
    val actorSyatem = ActorSystem("flink")
    val robotActor = actorSyatem.actorOf(Props(new RobotActor()), "robot") //创建一个机器人
    // tel 方式
    robotActor ! TurnOnLight(1) //给机器人发送一个开灯命令
    robotActor ! BoilWater(2) //给机器人发送一个烧水命令
    robotActor ! "who are you" //给机器人发送一个任意命令
    import java.util.concurrent.TimeUnit

    import scala.concurrent.duration.Duration
    val t = Duration.create(1, TimeUnit.SECONDS)

    //使用ask发送消息,actor处理完,必须有返回(超时时间5秒),异步处理
    val res =  Patterns.ask(robotActor, "Hello,world", t)
    import scala.concurrent.ExecutionContext.Implicits.global
    res onComplete {
      case Success(result) => println(result)
      case Failure(e) => println("error: " + e.getMessage)
    }
    actorSyatem terminate ()
  }
}

参考
https://cloud.tencent.com/developer/article/1460210
https://likehui.top/2019/09/05/akka-%E6%A0%B8%E5%BF%83%E7%9F%A5%E8%AF%86%E6%A2%B3%E7%90%86/
https://scala.cool/tags/Akka/

上一篇 下一篇

猜你喜欢

热点阅读