Akka之TCP

Akka之TCP

马草原 527 2021-08-15

Akka之TCP

Akka TCP是AkkaIO模块下的内容,所有的AkkaIO API只能通过mangaer对象来访问。当我们使用Akka IO时,第一步要先获取合适的manager对象,
获取TcpManager:

ActorRef tcpManager = Tcp.get(getContext().getSystem()).manager();

manager是一个处理底层IO资源(selectors,channels)的actor,同时可以给任务实例化workers。

Tcp客户端

  1. 第一步是向TcpManager发送TcpMessage.conenct方法。
  2. 然后TcpManager会返回一个内部的actor代表连接对象,也可能返回CommandFailed 消息
  3. 新的actor对象,然后发送一个Connected 消息给最初发送TcpMessage.connect的actor。
  4. 想要激活连接,必须要发送一个TcpMessage.register给连接对象。如果不做这一步,这个连接就是不能用的
  5. 连接对象(connection actor),监听注册的handler,当两边的actor某个关闭的时候,关闭连接对象。然后清除这个连接的资源

image-1692777764800

Demo For Java

@Slf4j
public class AkkaTcpClientDemo extends AbstractActor {

    final InetSocketAddress remote;
    final ActorRef listener;

    public static Props props(InetSocketAddress remote, ActorRef listener) {
        return Props.create(AkkaTcpClientDemo.class, remote, listener);
    }

    public AkkaTcpClientDemo(InetSocketAddress remote, ActorRef listener) {
        this.remote = remote;
        this.listener = listener;
        // 连接
        final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
        tcp.tell(TcpMessage.connect(remote), getSelf());
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Tcp.Bound.class, msg -> {
                    log.info("Bounded {} ,sender = {}", msg, getSender());
                })
                .match(
                        Tcp.CommandFailed.class,
                        msg -> {
                            log.info("failed msg{},sender {}", msg, getSender());
                            listener.tell("failed ", getSelf());
                            getContext().stop(getSelf());
                        })
                .match(
                        Tcp.Connected.class,
                        msg -> {
                            log.info("Connected : msg {} sender{}", msg, getSender());
                            listener.tell(msg, getSelf());
                            getSender().tell(TcpMessage.register(getSelf()), getSelf());
                            getContext().become(connected(getSender()));
                            ActorRef conn = getSender();
                            for (int i = 0; i < 10; i++) {
                                conn.tell(TcpMessage.write(ByteString.fromString("Hello" + i)), getSelf());
                            }

                        })
                .build();
    }


    private Receive connected(final ActorRef connection) {
        return receiveBuilder()
                .match(
                        ByteString.class,
                        msg -> {
                            connection.tell(TcpMessage.write((ByteString) msg), getSelf());
                        })
                .match(
                        Tcp.CommandFailed.class,
                        msg -> {
                            // OS kernel socket buffer was full
                        })
                .match(
                        Tcp.Received.class,
                        msg -> {
                            listener.tell(msg.data().utf8String(), getSelf());
                        })
                .matchEquals(
                        "close",
                        msg -> {
                            connection.tell(TcpMessage.close(), getSelf());
                        })
                .match(
                        Tcp.ConnectionClosed.class,
                        msg -> {
                            getContext().stop(getSelf());
                        })
                .match(Tcp.Register.class, msg -> {
                    log.info("register msg {}", msg);
                })
                .build();
    }

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("demo", ConfigFactory.load("my"));
        // ActorRef manager = Tcp.get(system).manager();
        ActorRef listen = system.actorOf(Props.create(ServerHandler.class));
        system.actorOf(Props.create(AkkaTcpClientDemo.class, new InetSocketAddress("127.0.0.1", 1234), listen));
        system.actorOf(Props.create(AkkaTcpClientDemo.class, new InetSocketAddress("30.211.65.3", 3307), listen));
    }
}

TCP服务端

  1. 发送TcpMessage.bind方法给TcpManager,然后TcpManager会监听在指定的地址端口上
  2. 通过TcpMessage.bind方法指定的actor会收到Bound消息,代表服务端已经准备好接收请求了。Bound消息也会包含socket实际绑定的端口
  3. 当收到Connected消息之后,发送TcpMessage.Register消息激活当前的连接,在Register方法中也可以指定代理actor来处理后续收到的数据

Demo

@Slf4j
public class AkkaTcpServerDemo extends AbstractActor {
    final ActorRef manager;

    public AkkaTcpServerDemo(ActorRef manager) {
        this.manager = manager;
    }

    public static Props props(ActorRef manager) {
        return Props.create(AkkaTcpServerDemo.class, manager);
    }

    @Override
    public void preStart() throws Exception {
        final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
        tcp.tell(TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 1234), 100), getSelf());
    }


    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(
                        Tcp.Bound.class,
                        msg -> {
                            log.info("MaCaoYuan Bounded {}, sender {}", msg, getSender());
                            manager.tell(msg, getSelf());
                        })
                .match(
                        Tcp.CommandFailed.class,
                        msg -> {
                            log.info("MaCaoYuan CommandFailed {}", msg);
                            getContext().stop(getSelf());
                        })
                .match(
                        Tcp.Connected.class,
                        conn -> {
                            log.info("Server Connected: {},sender {}", conn, getSender());
                            manager.tell(conn, getSelf());
                            final ActorRef handler =
                                    getContext().actorOf(Props.create(SimplisticHandler.class));
                            getSender().tell(TcpMessage.register(handler), getSelf());

                        })
                .match(Tcp.Register.class, msg -> {
                    log.info("注册消息:{}", msg);
                })
                .build();
    }


    static class SimplisticHandler extends AbstractActor {
        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(
                            Tcp.Received.class,
                            msg -> {
                                final ByteString data = msg.data();
                                log.info("收到数据:{}", data.utf8String());
                                getSender().tell(TcpMessage.write(data), getSelf());
                            })
                    .match(
                            Tcp.ConnectionClosed.class,
                            msg -> {
                                getContext().stop(getSelf());
                            })
                    .build();
        }
    }

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("demo", ConfigFactory.load("my"));
        ActorRef manager = Tcp.get(system).manager();
        ActorRef server = system.actorOf(Props.create(AkkaTcpServerDemo.class, manager));
    }
}

TCP客户端与服务端细节

TcpClient在激活连接之后,通过如下代码重新处理接收的消息:

getContext().become(connected(getSender()));

TcpServer在激活连接的时候指定代理Actor来处理后续的消息

final ActorRef handler =                            getContext().actorOf(Props.create(SimplisticHandler.class));
getSender().tell(TcpMessage.register(handler), getSelf());

关闭连接

想要关闭连接发送:TcpMessage.close, TcpMessage.confirmedClose 或者 TcpMessage.abort给conenctor actor即可。

  • TcpMessage.close,发送FIN消息,但是不等待对方的确认。还没有处理的写数据会被清空,如果成功关闭连接,监听者会收到Closed事件。
  • TcpMessage.confirmedClose,发送方会发送FIN信号,但是还可以收到数据,一直到对方也关闭连接。发送方后续的写数据会被清空,监听者会收到ConfirmedClosed事件
  • TcpMessage.abort,立刻关闭连接,发送RST 消息给对方连接,如果成功关闭连接,会收到Aborted消息

当对方关闭的时候,监听者会收到PeerClosed 事件,同时连接会自动关闭。如果想要支持办关闭的连接,在使用TcpMessage.register的时候设置keepOpenOnPeerClosed 参数为true,这个时候连接仍然保持打开,除非收到了上面三个的任意一个关闭消息。

当某一方出错的时候ErrorClosed 事件会发送给监听者,同时强制关闭连接。

所有的关闭连接事件都是ConnectionClosed 的子事件,所以监听这一个事件就可以处理所有关闭的事件。

TCP读写拥塞控制

Akka的连接actor没有内部的缓冲区,这代表着只有当数据在内核中满的时候才会发送出去,需要我们自己对Akka TCP的读写做一些拥塞控制。

对于对方连接回复写(back-pressuring writes)数据,有三种模式:

  • Ack-based:每次Write 命令都携带object数据,这个object不是Tcp.NoAck对象,然后在成功写入所有的数据到socket之后,这个object会返回给发送方。
  • NACK-based:在发送写命令后,如果上一次的写数据还没有完成就会返回CommandFailed 消息
  • NACK-based with write suspending: 和NACK-based很像,但在写入失败之后,如果没有收到TcpMessage.resumeWriting方法的话,就不会继续写入。然后回复一个WritingResumed 消息给连接actor。

对于读数据(back-pressuring reads)的控制,有两种模式:

  • Push-reading:连接actor发送给读数据的actor Received 事件,如果读actor发送TcpMessage.suspendReading方法给连接actor代表,它当前想要挂起接收新数据。只有再发送ResumeReading 事件发送后,代表读actor准备好接收新数据了。
  • Pull-reading:在接收到Received 事件之后,连接actor自动的挂起接收的数据。直到读actor发送ResumeReading 消息。

读数据是相对于 连接actor(connction actor)而言的。