Java高并发程序设计—Akka

2019-05-03  本文已影响0人  页川叶川

注:其一、本文章为作者读完《实战Java高并发程序设计》之后所总结的知识,其中涵盖了每一章节的精髓之处。其二、文章中一部分代码直接引自书中。

使用Akka构建高并发程序设计

1.1.新并发模型:Actor

在并发程序中,线程始终是并发程序的基本执行单元。但是在Akka中,你将会使用一个全新的执行单元—Actor,而不再需要线程的概念。

在Actor模型中,我们不在需要调用对象(例如Actor)的方法去执行一些业务逻辑代码,而是通过给Actor发送一条消息。Actor接收到消息之后,它会根据消息的内容做出一些行为,包括改变自身的状态(自发式进行)

1.2.Akka之HelloWorld

第一个Actor实现

public class Greeter extends UntypedActor {

    public static enum Msg{
        GERRT, DONE;                   //消息类型
    }

    @Override
    public void onReceive(Object msg){
        if(msg == Msg.GERRT){
            System.out.println("Hello World!");
            getSender().tell(Msg.DONE, getSelf());
        } else {
            unhandled(msg);
        }
    }

}

以上代码定义了一个欢迎者(Greeter)Actor,它继承自UntypedActor(Akka的核心成员)。

与Actor交流的另外一个Actor是HelloWorld,他的实现如下:

public class HelloWorld extends UntypedActor {

    ActorRef greeter;

    //Akka的回调方法,在Actor启动前由Akka调用完成一些初始化操作
    @Override
    public void preStart() {            
        //创建Greeter实例
        greeter = getContext().actorOf(Props.create(Greeter.class),"greeter");      
        System.out.println("Greeter Actor Path:" + greeter.path());
        //向Greeter发送GREET信息
        greeter.tell(Greeter.Msg.GERRT,getSelf());                 
    }

    //onReceive()函数为HelloWorld的消息处理函数。这里只处理了DONE消息,然后向Greeter发送了一条GREET信息
    //因此,Greeter会收到前后两条GREET消息,打印两次Hello World“”
    @Override
    public void onReceive(Object msg) throws Exception {           
        if(msg == Greeter.Msg.DONE) {
            greeter.tell(Greeter.Msg.GERRT, getSelf());
            getContext().stop(getSelf());
        } else {
            unhandled(msg);
        }
    }

}

主函数main():

public class HelloMainSimple {

    public static  void main(String[] args) {
        ActorSystem system = ActorSystem.create("Hello", ConfigFactory.load("samplehello.conf"));
        ActorRef a = system.actorOf(Props.create(HelloWorld.class),"helloworld");
        System.out.println("HelloWorld Actor Path:" + a.path());
    }

}
akka {
    loglevel = INFO
}

1.?.消息路由

public class WatchActor extends UntypedActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    public Router router;
    {
        List<Routee> routees = new ArrayList<Routee>();
        for(int i=0; i<5; i++) {
            ActorRef worker = getContext().actorOf(Props.create(MyWorker.class),"worker_"+i);
            getContext().watch(worker);
            routees.add(new ActorRefRoutee(worker));
        }
        router = new Router(new RoundRobinRoutingLogic(),routees);
    }

    @Override
    public void onReceive(Object msg) {
        if(msg instanceof MyWorker.Msg) {
            router.route(msg, getSender());
        } else if (msg instanceof Terminated) {
            router = router.removeRoutee(((Terminated) msg).actor());
            System.out.println(((Terminated) msg).actor().path() + " is closed,routees=" + router.routees().size());
            if(router.routees().size() == 0) {
                System.out.println("Close system");
                RouteMain.flag.send(false);
                getContext().system().shutdown();
            }
        } else {
            unhandled(msg);
        }
    }

}
public class RouteMain {

    public static Agent<Boolean> flag = Agent.create(true, ExecutionContexts.global());

    public static void main(String[] args) throws InterruptedException {
        ActorSystem system = ActorSystem.create("route", ConfigFactory.load("samplehello.conf"));
        ActorRef w = system.actorOf(Props.create(WatchActor.class), "watcher");
        int i = 1;
        while(flag.get()) {
            w.tell(MyWorker.Msg.CLOSE, ActorRef.noSender());
            i++;
            Thread.sleep(100);
        }
    }

}

文集推荐:

Java基础方法集1
Python基础知识完整版
Spring Boot学习笔记
Linux指令进阶
Java高并发编程
SpringMVC基础知识进阶
Mysql基础知识完整版
健康管理系统学习花絮(学习记录)
Node.js基础知识(随手笔记)
MongoDB基础知识
Dubbo学习笔记
Vue学习笔记(随手笔记)

声明:发表此文是出于传递更多信息之目的。若有来源标注错误或侵犯了您的合法权益,请作者持权属证明与本我们(QQ:981086665;邮箱:981086665@qq.com)联系联系,我们将及时更正、删除,谢谢。

上一篇下一篇

猜你喜欢

热点阅读