侧边栏壁纸
博主头像
王一川博主等级

努力成为一个不会前端的全栈工程师

  • 累计撰写 70 篇文章
  • 累计创建 20 个标签
  • 累计收到 39 条评论

目 录CONTENT

文章目录

akka for java

王一川
2022-06-19 / 0 评论 / 5 点赞 / 2,113 阅读 / 3,530 字
温馨提示:
本文最后更新于 2022-06-19,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

目前火热的两大计算框架 Spark 和 Flink 底层的通讯原理使用的都是 akka(当前的 Spark 已经背叛了akka,转投 netty),而网上对 akka 的教程是在太少,推荐 github 上的 《Akka 中文指南》,这是为数不多的参考教程,但是它的入门案例属实劝退,因此再看了一部分后总结出我认为比较好入门的知识点组成了这篇文章,算是对《Akka 中文指南》食用前的开胃菜。如果您对它的Java快速入门很感兴趣,且能很好地理解它的代码,那么这篇文章你可以跳过去了。

Akka 是一个用 Scala 编写的库,用于在 JVM 平台上简化编写具有可容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用,其同时提供了Java 和 Scala 的开发接口。Akka 允许我们专注于满足业务需求,而不是编写初级代码。在 Akka 中,Actor 之间通信的唯一机制就是消息传递。Akka 对 Actor 模型的使用提供了一个抽象级别,使得编写正确的并发、并行和分布式系统更加容易。Actor 模型贯穿了整个 Akka 库,为我们提供了一致的理解和使用它们的方法。Akka 主要解决的问题是:可以轻松的写出高效稳定的并发程序,程序员不再过多的考虑线程、锁和资源竞争等细节。

一、Akka 必会概念

1.1 Actor 中 Actor 模型

image-20220619152233500

  1. Akka 处理并发的方式基于 Actor 模型,如上图
  2. 在基于 Actor 的系统里,所有的事物都是 Actor,就好像面向对象设计里面一切皆对象
  3. Actor 模型是作为一个并发模型,Actor 与 Actor 之间只能通过消息进行通信,如图信封
    1. Actor 向 Actor 发送消息时必须获取对象的引用即:ActorRef,就好像张三给李四打电话必须知道李四的电话号码一样
    2. Actor 向 Actor 发送的消息并不是直接给到 Actor 的,而是统一发送到对应 Actor 内部的 Mailbox(内部封装,用户不可见),由 Mailbox 转发给 Actor
    3. Mailbox 可以识别出每个消息的发送者
  4. 当一个 Actor 给另外一个 Actor 发消息,消息是有序的,多个 Actor 之间顺序不保证
  5. 发送消息的 Actor 可以等待消息的响应也可以异步处理
  6. ActorSystem 的职责是负责并管理其创建的 Actor,一个进程中的 ActorSystem 是单例的,Actor 可以有多个

1.2 Actor 引用

即 ActorRef,最重要的功能就是是通过 ActorRef 进行发消息,每个 Actor 可以通过getSelf()获取自身的引用,也可以在消息处理阶段通过 getSender() 获取发送消息的 Actor 引用

根据 ActorSystem 的配置,支持不同类型的 Actor 引用

  1. 纯本地 Actor 引用:未配置网络功能的 ActorSystem 使用。
  2. 远程 Actor 引用:支持网络功能的 ActorSystem 使用,其引用包含协议和远程寻址信息
  3. 特殊 Actor 引用:DeadLetterActorRef(死信)、EmptyLocalActorRef(查找不存在的本地引用是返回)

1.3 Actor 路径

类似 Zookeeper 的结构,ActorSystem 存在一个根目录,其名称为 /,下一级包括:

  1. /user:所有用户创建的 Actor 的顶级 Actor,即我们创建的 Actor 都在 /user 下
  2. /system:所有系统创建的 Actor 的顶级 Actor,如系统的日志监听器
  3. /deadletters:死信 Actor,即所有发送到已停止或不存在的 Actor 的消息都会重新路由这里
  4. /tmp:所有短期系统创建的 Actor 的守护者 Actor
  5. /remote:一个人工路径,其下面所有 Actor 的监督者都是远程 Actor 引用

二、Akka for Java

引入依赖 pom.xml

<project>
    <modelVersion>4.0.0</modelVersion>

    <groupId>hello-akka-java</groupId>
    <artifactId>app</artifactId>
    <version>1.0</version>

    <properties>
        <akka.version>2.6.19</akka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.12</artifactId>
            <version>${akka.version}</version>
        </dependency>
    </dependencies>
</project>

从上面的描述创建一个入门的 akka 程序基本步骤如下:

  1. 创建 ActorSystem
  2. 创建若干个 Actor
  3. Actor 之间发消息与接收消息的逻辑处理

2.1 ActorSystem

创建 ActorSystem 最简单的方式

ActorSystem system = ActorSystem.create("demo"); // 传一个系统名称

2.2 Actor

通过继承 AbstractActor 来获取,例如:创建一个 JobManager 和 TaskManager 的 Actor

JobManager

package tech.kpretty;

import akka.actor.AbstractActor;

/**
 * @author wjun
 * @date 2022/6/19 16:17
 * @email wjunjobs@outlook.com
 * @describe 
 */
public class JobManager extends AbstractActor {
    /**
     * 重写消息接收的方法
     *
     * @return Receive,封装了对不同消息的处理逻辑
     */
    @Override
    public Receive createReceive() {
        // 对所有的消息不做任何响应
        return receiveBuilder().build();
    }
}

TaskManager

package tech.kpretty;

import akka.actor.AbstractActor;

/**
 * @author wjun
 * @date 2022/6/19 16:17
 * @email wjunjobs@outlook.com
 * @describe
 */
public class TaskManager extends AbstractActor {
    /**
     * 重写消息接收的方法
     *
     * @return Receive,封装了对不同消息的处理逻辑
     */
    @Override
    public Receive createReceive() {
        // 对所有的消息不做任何响应
        return receiveBuilder().build();
    }
}

1.3 tell & receive

上面说过发送消息需要获取 ActorRef,通常有两种方式:创建的时候会返回 ActorRef、通过 ActorSystem 给定路劲搜索获取 ActorRef,即:一个是针对不存在的 Actor 需要创建,一个是针对已经存在的 Actor。

方式一:创建后返回 ActorRef

// ActorSystem
def actorOf(props: Props, name: String): ActorRef

需要一个 Props 配置类,用于指定创建 Actor 的选项,例如:

package tech.kpretty;

import akka.actor.ActorSystem;
import akka.actor.Props;

/**
 * @author wjun
 * @date 2022/6/19 16:20
 * @email wjunjobs@outlook.com
 * @describe 测试通信的入口
 */
public class ApplicationMaster {
    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("demo");
        system.actorOf(Props.create(JobManager.class, () -> new JobManager()));
    }
}

注:这是一个非常危险的方式,破坏了 Actor 的封装性,因此建立使用下面的方式

每个 Actor 给出自己的静态 Props 创建方法,例如 JobManager

package tech.kpretty;

import akka.actor.AbstractActor;
import akka.actor.Props;

/**
 * @author wjun
 * @date 2022/6/19 16:17
 * @email wjunjobs@outlook.com
 * @describe
 */
public class JobManager extends AbstractActor {
    // 提供 Props 的静态方法,同时还可以传入一些参数,作为实例化 JobManager 的参数
    static Props props() {
        return Props.create(JobManager.class, JobManager::new);
    }

    /**
     * 重写消息接收的方法
     *
     * @return Receive,封装了对不同消息的处理逻辑
     */
    @Override
    public Receive createReceive() {
        // 对所有的消息不做任何响应
        return receiveBuilder().build();
    }
}

修改 ApplicationMaster

package tech.kpretty;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

/**
 * @author wjun
 * @date 2022/6/19 16:20
 * @email wjunjobs@outlook.com
 * @describe 测试通信的入口
 */
public class ApplicationMaster {
    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("demo");
        ActorRef jobManager = system.actorOf(JobManager.props(), "jobManager");
        // 打印 actor path
        System.out.println(jobManager);
    }
}

下面编写 JobManager 接收消息的逻辑,例如:当接收到 init 时,打印 hello xxx,我是 xxx,分别打印发送者的名字和自己的名字,重写 createReceive 即可

/**
     * 重写消息接收的方法
     *
     * @return Receive,封装了对不同消息的处理逻辑
     */
@Override
public Receive createReceive() {
  // 对所有的消息不做任何响应
  return receiveBuilder()
    .matchEquals("init", message -> System.out.printf("hello %s,I'm %s%n", sender().path().name(), self().path().name()))
    .build();
}

通过 ActorRef.tell(message,ActorRef) 发送信息,其中 ActorRef 即为消息的发送者,我们这里是独立于 Akka 系统之外的角色来给 JobManager 发送消息,因此可以使用 ActorRef.noSender 表示没有 Actor

package tech.kpretty;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

/**
 * @author wjun
 * @date 2022/6/19 16:20
 * @email wjunjobs@outlook.com
 * @describe 测试通信的入口
 */
public class ApplicationMaster {
    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("demo");
        ActorRef jobManager = system.actorOf(JobManager.props(), "jobManager");
        // 打印 actor path
        System.out.println(jobManager);
        // 发送消息
        jobManager.tell("init",ActorRef.noSender());
    }
}

结果如下:

image-20220619164925917

可以看出 ActorRef.noSender() 返回的是一个死信的 ActorRef

1.4 Actor 的生命周期方法

继承 AbstractActor 后可以重写:

  1. preStart:Actor 创建时自动异步启动
  2. postStop:getContext.stop(ActorRef)时调用
  3. preRestart:Actor 重启前调用,用于清理崩溃的数据
  4. postRestart:Actor 重启后调用,用于崩溃后的初始化,默认调用 preStart

三、模拟Flink心跳检测

需求是:启动 JobManager 后,发送 init 请求,JobManager 创建若干个 TaskManager,TaskManager每 5 秒向 JobManager 发送心跳检测,若 10 秒都没有接收到心跳则打印 xxx 已停止

3.1 封装消息类型

封装 init 消息携带创建 TaskManager 个数

package tech.kpretty;

/**
 * @author wjun
 * @date 2022/6/19 17:10
 * @email wjunjobs@outlook.com
 * @describe
 */
public class InitRequest {
    private final int taskManagerNumber;

    public InitRequest(int taskManagerNumber) {
        this.taskManagerNumber = taskManagerNumber;
    }

    public int getTaskManagerNumber() {
        return taskManagerNumber;
    }
}

封装 TaskManager 请求

package tech.kpretty;

/**
 * @author wjun
 * @date 2022/6/19 17:26
 * @email wjunjobs@outlook.com
 * @describe
 */
public class TaskManagerRequest {
    private final String type;

    private final long ts;

    public TaskManagerRequest(String type, long ts) {
        this.type = type;
        this.ts = ts;
    }

    public String getType() {
        return type;
    }

    public long getTs() {
        return ts;
    }
}

3.2 JobManager

处理 InitRequest 请求,创建 TaskManager

/**
     * 重写消息接收的方法
     *
     * @return Receive,封装了对不同消息的处理逻辑
     */
@Override
public Receive createReceive() {
  return receiveBuilder()
    .match(InitRequest.class, message -> {
      System.out.println("开始初始化...");
      for (int i = 0; i < message.getTaskManagerNumber(); i++) {
        getContext().actorOf(TaskManager.props(), "taskManager-" + i);
      }
    })
    .build();
}

当 TaskManager 创建完成后发送 init 请求,JobManager 将当前 TaskManage 创建的时间保存起来

处理心跳逻辑,当 JobManager 启动时创建一个 ConcurrentHashMap 用来保存 TaskManager 的心跳时间,再启动一个线程用于定时检测

private ConcurrentHashMap<ActorRef, Long> heartbeat;
private volatile boolean isRunning = false;

@Override
public void preStart() throws Exception {
  heartbeat = new ConcurrentHashMap<>();
  new Thread(() -> {
    while (true) {
      if (isRunning) checkHeartbeat();
      try {
        TimeUnit.SECONDS.sleep(10);
      } catch (InterruptedException e) {
        // no-op
      }
    }
  }).start();
}

private void checkHeartbeat() {
  long currentTimeMillis = System.currentTimeMillis();
  Enumeration<ActorRef> keys = heartbeat.keys();
  while (keys.hasMoreElements()) {
    ActorRef actorRef = keys.nextElement();
    if (currentTimeMillis - 5000 > heartbeat.get(actorRef)) {
      System.out.println(actorRef + "已经挂掉了,尝试关闭它");
      getContext().stop(actorRef);
    }
  }
}

最终 JobManager 代码如下

package tech.kpretty;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import scala.Option;

import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @author wjun
 * @date 2022/6/19 16:17
 * @email wjunjobs@outlook.com
 * @describe
 */
public class JobManager extends AbstractActor {
    private ConcurrentHashMap<ActorRef, Long> heartbeat;

    private volatile boolean isRunning = false;

    @Override
    public void preStart() throws Exception {
        heartbeat = new ConcurrentHashMap<>();
        new Thread(() -> {
            while (true) {
                if (isRunning) checkHeartbeat();
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    // no-op
                }
            }
        }).start();
    }

    private void checkHeartbeat() {
        long currentTimeMillis = System.currentTimeMillis();
        Enumeration<ActorRef> keys = heartbeat.keys();
        while (keys.hasMoreElements()) {
            ActorRef actorRef = keys.nextElement();
            if (currentTimeMillis - 5000 > heartbeat.get(actorRef)) {
                System.out.println(actorRef + "已经挂掉了,尝试关闭它");
                getContext().stop(actorRef);
            }
        }
    }

    // 提供 Props 的静态方法,同时还可以传入一些参数,作为实例化 JobManager 的参数
    static Props props() {
        return Props.create(JobManager.class, JobManager::new);
    }

    /**
     * 重写消息接收的方法
     *
     * @return Receive,封装了对不同消息的处理逻辑
     */
    @Override
    public Receive createReceive() {
        // 对所有的消息不做任何响应
        return receiveBuilder()
                .match(InitRequest.class, message -> {
                    System.out.println("开始初始化...");
                    for (int i = 0; i < message.getTaskManagerNumber(); i++) {
                        getContext().actorOf(TaskManager.props(), "taskManager-" + i);
                    }
                })
                .match(TaskManagerRequest.class, message -> {
                    if ("init".equals(message.getType())) {
                        heartbeat.put(sender(), message.getTs());
                        System.out.println("收到 " + sender() + " init 信息");
                        if (!isRunning) // 只要有一个 TaskManager 启动了 就开始进行心跳检测
                            isRunning = true;
                    } else if ("heartbeat".equals(message.getType())) {
                        heartbeat.put(sender(), message.getTs());
                        System.out.println("收到 " + sender() + " heartbeat 信息");
                    }

                })
                .matchEquals("stop", message -> {
                    isRunning = false;
                    getContext().stop(self());

                })
                .build();
    }
}

3.3 TaskManager

TaskManager 难点在于如何给 JobManager 发消息,即如何获取 JobManager 的 ActorRef,这时候需要用到获取 ActorRef 的第二种方式,对于已经存在的 Actor 可以使用 actorSelection 方法传入 Actor 的路径即可,因为 TaskManager 是 JobManager 创建的,因此它们的关系如下:

image-20220619181635725

因此 TaskManager 给 JobManager 发送消息代码如下:

getContext().actorSelection(self().path().parent()).tell(new TaskManagerRequest("heartbeat", System.currentTimeMillis()), self());

TaskManager 完整代码如下:

package tech.kpretty;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;

import java.util.Enumeration;
import java.util.concurrent.TimeUnit;

/**
 * @author wjun
 * @date 2022/6/19 16:17
 * @email wjunjobs@outlook.com
 * @describe
 */
public class TaskManager extends AbstractActor {
    private volatile boolean isRunning = false;

    // 提供 Props 的静态方法,同时还可以传入一些参数,作为实例化 JobManager 的参数
    static Props props() {
        return Props.create(TaskManager.class, TaskManager::new);
    }

    @Override
    public void preStart() throws Exception {
        System.out.println("开始启动" + self());
        TimeUnit.SECONDS.sleep((int) (Math.random() * 5));
        // 告诉 JobManager 启动好了,汇报当前时间
        getContext().actorSelection(self().path().parent()).tell(new TaskManagerRequest("init", System.currentTimeMillis()), self());
        System.out.println("启动完成" + self());
        isRunning = true;
        new Thread(() -> {
            while (true) {
                if (isRunning)
                    sendHeartbeat();
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    // no-op
                }
            }
        }).start();
    }

    @Override
    public void postStop() throws Exception {
        System.out.println(self() + "关闭中...");
    }

    private void sendHeartbeat() {
        // 告诉 JobManager 启动好了,汇报当前时间
        System.out.println(self() + " 发送心跳数据");
        getContext().actorSelection(self().path().parent()).tell(new TaskManagerRequest("heartbeat", System.currentTimeMillis()), self());
    }

    /**
     * 重写消息接收的方法
     *
     * @return Receive,封装了对不同消息的处理逻辑
     */
    @Override
    public Receive createReceive() {
        // 对所有的消息不做任何响应
        return receiveBuilder().matchEquals("fail", message -> isRunning = false).build();
    }
}

接下来就是主函数的编写,比如发送一些消息给 JobManager、TaskManager 等

package tech.kpretty;

import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;

import java.util.Random;
import java.util.Scanner;

/**
 * @author wjun
 * @date 2022/6/19 16:20
 * @email wjunjobs@outlook.com
 * @describe 测试通信的入口
 */
public class ApplicationMaster {
    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("demo");
        ActorRef jobManager = system.actorOf(JobManager.props(), "jobManager");
        Scanner scanner = new Scanner(System.in);
        while (true) {
            System.out.print(">> ");
            String op = scanner.nextLine();
            switch (op) {
                case "init":
                    jobManager.tell(new InitRequest(3), ActorRef.noSender());
                    break;
                case "fail":
                    ActorPath child = jobManager.path().child("taskManager-" + new Random().nextInt(3));
                    system.actorSelection(child).tell("fail", ActorRef.noSender());
                    break;
                case "stop":
                    jobManager.tell("stop", ActorRef.noSender());
                    System.exit(0);
            }
        }
    }
}

测试结果如下

image-20220619182012019

ps: 这个案例只是为了演示 akka 运行的基本原理,案例的心跳检测存在很多 bug,作为 akka 的入门案例是比较合适的,有了 akka 的基本使用经验再去看 flink 源码会有不错的收获

5

评论区