Akka之Actor

Akka之Actor

马草原 636 2021-08-14

Akka之Actor

image-1692776122623

简介

Akka是使用Scala语言开发的,因为Scala也是运行在JVM之上的语言,所以我们也可以在Java中使用Akka。

Akka是使用Actor模型,其粒度比线程更小,但是却更容易去开发并发的程序。除此之外,Akka还提供了一套容错机制,允许在Actor出现异常的时候进行一些回复或者重置操作,akka除了能在单机上构建高并发程序之外,还能在网络中构建分布式程序,并提供位置透明的Actor定位服务。

Actor模型

在Akka中,消息在actor之间进行传递和处理,以此驱动任务的执行,不同于常见的OOP需要调用某个对象的方法才能做某事。

Actor的方式更像是问答的:

比如老师问同学1+1等于?然后同学听到了,回答说是2. actor之间的通信方式就如同这样。

Actor也拥有线程安全和轻量级的特点:

  • 线程安全: actor运行于线程池之上,单个actor总是线程安全的,其内部的邮箱保证只有一个消息处理完之后,才会发送下一个消息。并且本身在处理接收到的消息时是串行的
  • 轻量级: 大型应用中,可能同时运行着成千上万个Actor,在Akka中,每个Actor只占用300字节左右。即使单机内存不够用了,也可以方便的切换成分布式模式。

Actor的层级

Akka中的actor一直都会属于某个Parent,一般我们使用如下方式创建Actor:

getContext().actorOf()

这种方式会在已经存在的actor下面创建一个子actor,当前的actor就是新创建actor的父亲。
那么谁是第一个actor呢?
一般所有的actor都有一个共同的父亲,新的actor也可以通过如下方式创建新的actor实例

system.actorOf()

如果我们创建了一个名叫:someActor的actor,那么它的引用路径就是:/user/someActor
image-1692776291748

实际上在我们创建actor之前,akka就已经默认的创建了3个actors,他们用于监管接下来新创建的子actor。

  • Root guardian:整个ActorSystem的根,它是所有actor的父亲,并且在系统终止的时候,它也是最后一个被停止的。
  • /user 这个是我们最常见到的,所有的通过Actorsystem.actorOf()方法创建的Actor都属于该分支下,这个是我们能手动创建的最高级别Actor。 其他通过ActorContext.actorOf() 方法创建的Actor都是其子级。
  • /system 系统层面创建的,主要与系统的整体行为有关,在开发阶段不需要对其有过多的关注。

另外需要注意的是尽量保证每个应用程序内部只需要一个ActorSystem对象

Actor的层级关系Demo:

public class ActorHierarchyDemo {
    public static void main(String[] args) throws java.io.IOException {
        ActorSystem system = ActorSystem.create("macaoyuan");

        ActorRef firstRef = system.actorOf(PrintMyActorRefActor.props(), "first-actor");
        System.out.println("First: " + firstRef);
        firstRef.tell("print", ActorRef.noSender());

        System.out.println(">>> Press ENTER to exit <<<");
        try {
            System.in.read();
        } finally {
            system.terminate();
        }
    }

    static class PrintMyActorRefActor extends AbstractActor {
        static Props props() {
            return Props.create(PrintMyActorRefActor.class, PrintMyActorRefActor::new);
        }

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals(
                            "print",
                            p -> {
                                ActorRef secondRef = getContext().actorOf(Props.empty(), "second-actor");
                                System.out.println("Second: " + secondRef);
                            })
                    .build();
        }
    }
}

运行结果:

First: Actor[akka://macaoyuan/user/first-actor#99771086]
>>> Press ENTER to exit <<<
Second: Actor[akka://macaoyuan/user/first-actor/second-actor#-811498937]
  • akka是协议的前缀,两个路径都是akka的协议
  • testSystem是我们创建的ActorSystem名称,我们也可以去指定其他的名称
  • 因为second-actor是通过first-actor创建的,所以在路径上是它的孩子
  • 最后的那些数字标识符不重要,一般我们可以忽略掉

Actor的生命周期

每个actor都会经历“生老病死”的阶段, 在特定的阶段,我们要做一些正确的事情。Actor暴露了一些生命周期管理的API,但是最常用的是preStart与postStop

  • preStart():在actor启动但是还没有处理第一条消息之前运行
  • postStop():在actor停止之前,这个时候已经没有任何的消息可以处理了

想要去停止一个actor,最好在actor内部去使用

getContext().stop(getSelf())

而不要在另外一个actor去停止其他的actor:getContext().stop(actorRef),这样的做法可能会导致一定的风险,一般我们是给要停止的actor发送一个停止消息。

Demo:

public class ActorLifecycleDemo {
    static class StartStopActor1 extends AbstractActor {
        static Props props() {
            return Props.create(StartStopActor1.class, StartStopActor1::new);
        }

        @Override
        public void preStart() {
            System.out.println("first started");
            getContext().actorOf(StartStopActor2.props(), "second");
        }

        @Override
        public void postStop() {
            System.out.println("first stopped");
        }

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals(
                            "stop",
                            s -> {
                                getContext().stop(getSelf());
                            })
                    .build();
        }
    }

    static class StartStopActor2 extends AbstractActor {

        static Props props() {
            return Props.create(StartStopActor2.class, StartStopActor2::new);
        }

        @Override
        public void preStart() {
            System.out.println("second started");
        }

        @Override
        public void postStop() {
            System.out.println("second stopped");
        }

        @Override
        public Receive createReceive() {
            return receiveBuilder().build();
        }
    }

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("macaoyuan");
        ActorRef first = system.actorOf(StartStopActor1.props(), "first");
        first.tell("stop", ActorRef.noSender());
    }
}

运行结果:

first started
second started
second stopped
first stopped

当我们想要停止first actor的时候,它会首先去停止它的子actor。

错误处理

父子actor通过它们的生命周期连接在一起,当某个actor出现错误的时候,它会被临时的挂起,当失败传递到其父亲actor的时候,父亲actor会判断要怎么处理这个错误信息。从这个角度来看得话,父亲actor是作为子actor的监管,默认的监管策略是停止然后重启子Actor,当然我们也可以自定义自己的监管策略。

默认的监管策略:

public class SupervisorDemo {

    // 监管的
    static class SupervisingActor extends AbstractActor {
        static Props props() {
            return Props.create(SupervisingActor.class, SupervisingActor::new);
        }

        ActorRef child = getContext().actorOf(SupervisedActor.props(), "supervised-actor");

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals(
                            "failChild",
                            f -> {
                                child.tell("fail", getSelf());
                            })
                    .build();
        }
    }

    // 被监管的
    static class SupervisedActor extends AbstractActor {
        static Props props() {
            return Props.create(SupervisedActor.class, SupervisedActor::new);
        }

        @Override
        public void preStart() {
            System.out.println("supervised actor started");
        }

        @Override
        public void postStop() {
            System.out.println("supervised actor stopped");
        }

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals(
                            "fail",
                            f -> {
                                System.out.println("supervised actor fails now");
                                throw new Exception("I failed!");
                            })
                    .build();
        }
    }

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("macaoyuan");
        ActorRef supervisingActor = system.actorOf(SupervisingActor.props(), "supervising-actor");
        supervisingActor.tell("failChild", ActorRef.noSender());
    }
}

运行结果:

/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/bin/java -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=54806:/Applications/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk-1.8.jdk/Contents/Home/jre/lib/rt.jar:/Users/mcy/IdeaProjects/Test/target/test-classes:/Users/mcy/IdeaProjects/Test/target/classes:/Users/mcy/Documents/devTools/mavenRepository/javax/enterprise/cdi-api/2.0.SP1/cdi-api-2.0.SP1.jar:/Users/mcy/Documents/devTools/mavenRepository/javax/el/javax.el-api/3.0.0/javax.el-api-3.0.0.jar:/Users/mcy/Documents/devTools/mavenRepository/javax/interceptor/javax.interceptor-api/1.2/javax.interceptor-api-1.2.jar:/Users/mcy/Documents/devTools/mavenRepository/javax/inject/javax.inject/1/javax.inject-1.jar:/Users/mcy/Documents/devTools/mavenRepository/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.jar:/Users/mcy/Documents/devTools/mavenRepository/javax/servlet/javax.servlet-api/4.0.1/javax.servlet-api-4.0.1.jar:/Users/mcy/Documents/devTools/mavenRepository/org/junit/jupiter/junit-jupiter-api/5.9.2/junit-jupiter-api-5.9.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/opentest4j/opentest4j/1.2.0/opentest4j-1.2.0.jar:/Users/mcy/Documents/devTools/mavenRepository/org/junit/platform/junit-platform-commons/1.9.2/junit-platform-commons-1.9.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/apiguardian/apiguardian-api/1.1.2/apiguardian-api-1.1.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/junit/jupiter/junit-jupiter-engine/5.9.2/junit-jupiter-engine-5.9.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/junit/platform/junit-platform-engine/1.9.2/junit-platform-engine-1.9.2.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/akka/akka-cluster_2.11/2.5.9/akka-cluster_2.11-2.5.9.jar:/Users/mcy/Documents/devTools/mavenRepository/org/scala-lang/scala-library/2.11.12/scala-library-2.11.12.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/akka/akka-remote_2.11/2.5.9/akka-remote_2.11-2.5.9.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/akka/akka-actor_2.11/2.5.9/akka-actor_2.11-2.5.9.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/config/1.3.2/config-1.3.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/scala-lang/modules/scala-java8-compat_2.11/0.7.0/scala-java8-compat_2.11-0.7.0.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/akka/akka-stream_2.11/2.5.9/akka-stream_2.11-2.5.9.jar:/Users/mcy/Documents/devTools/mavenRepository/org/reactivestreams/reactive-streams/1.0.2/reactive-streams-1.0.2.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/ssl-config-core_2.11/0.2.2/ssl-config-core_2.11-0.2.2.jar:/Users/mcy/Documents/devTools/mavenRepository/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.4/scala-parser-combinators_2.11-1.0.4.jar:/Users/mcy/Documents/devTools/mavenRepository/com/typesafe/akka/akka-protobuf_2.11/2.5.9/akka-protobuf_2.11-2.5.9.jar:/Users/mcy/Documents/devTools/mavenRepository/io/netty/netty/3.10.6.Final/netty-3.10.6.Final.jar:/Users/mcy/Documents/devTools/mavenRepository/io/aeron/aeron-driver/1.7.0/aeron-driver-1.7.0.jar:/Users/mcy/Documents/devTools/mavenRepository/io/aeron/aeron-client/1.7.0/aeron-client-1.7.0.jar:/Users/mcy/Documents/devTools/mavenRepository/org/agrona/agrona/0.9.12/agrona-0.9.12.jar SupervisorDemo
supervised actor started
supervised actor fails now
supervised actor stopped
supervised actor started
[ERROR] [08/23/2023 15:56:01.417] [macaoyuan-akka.actor.default-dispatcher-2] [akka://macaoyuan/user/supervising-actor/supervised-actor] I failed!
java.lang.Exception: I failed!
	at SupervisorDemo$SupervisedActor.lambda$createReceive$0(SupervisorDemo.java:51)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
	at akka.actor.ActorCell.invoke(ActorCell.scala:496)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

虽然supervised失败抛出异常,但是程序又重新的启动了起来了