邮箱

    简介

    Akka 的邮箱中保存着发给 Actor 的信息。通常,每个 Actor 都有自己的邮箱,但也有例外,如使用BalancingPool,则所有路由器(routees)将共享一个邮箱实例。

    通过让某个 Actor 实现参数化接口RequiresMessageQueue,可以为某个 Actor 类型指定某种类型的消息队列。下面是一个例子:

    1. import akka.dispatch.BoundedMessageQueueSemantics;
    2. import akka.dispatch.RequiresMessageQueue;
    3. public class MyBoundedActor extends MyActor
    4. implements RequiresMessageQueue<BoundedMessageQueueSemantics> {}

    RequiresMessageQueue接口的类型参数需要映射到配置中的邮箱,如下所示:

    1. bounded-mailbox {
    2. mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
    3. mailbox-capacity = 1000
    4. }
    5. akka.actor.mailbox.requirements {
    6. "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
    7. }

    现在,每次创建MyBoundedActor类型的 Actor 时,它都会尝试获取一个有界邮箱。如果 Actor 在部署中配置了不同的邮箱,可以直接配置,也可以通过具有指定邮箱类型的调度器(dispatcher)配置,那么这将覆盖此映射。

    • 注释:接口中的所需类型为 Actor 创建的邮箱中的队列类型,如果队列未实现所需类型,则 Actor 创建将失败。

    指定调度器的消息队列类型

    调度器还可能需要运行在其上的 Actor 使用的邮箱类型。例如,BalancingDispatcher需要一个消息队列,该队列对于多个并发使用者是线程安全的。这需要对调度器进行配置,如下所示:

    1. my-dispatcher {
    2. mailbox-requirement = org.example.MyInterface
    3. }

    给定的需求命名一个类或接口,然后确保该类或接口是消息队列实现的父类型。如果发生冲突,例如,如果 Actor 需要不满足此要求的邮箱类型,则 Actor 创建将失败。

    创建 Actor 时,ActorRefProvider首先确定执行它的调度器。然后确定邮箱如下:

    1. 如果 Actor 的部署配置节(section)包含mailbox键,那么它将命名一个描述要使用的邮箱类型的配置节。
    2. 如果 Actor 的Props包含邮箱选择(mailbox selection),即对其调用了withMailbox,则该属性将命名一个描述要使用的邮箱类型的配置节。请注意,这需要绝对配置路径,例如myapp.special-mailbox,并且不嵌套在akka命名空间中。
    3. 如果调度器的配置节包含mailbox-type键,则将使用相同的节来配置邮箱类型。
    4. 如果 Actor 需要如上所述的邮箱类型,则将使用该要求(requirement)的映射来确定要使用的邮箱类型;如果失败,则尝试使用调度器的要求(如果有)。
    5. 如果调度器需要如上所述的邮箱类型,那么将使用该要求的映射来确定要使用的邮箱类型。
    6. 将使用默认邮箱akka.actor.default-mailbox

    默认邮箱

    如果未按上述说明指定邮箱,则使用默认邮箱。默认情况下,它是一个无边界的邮箱,由java.util.concurrent.ConcurrentLinkedQueue支持。

    SingleConsumerOnlyUnboundedMailbox是一个效率更高的邮箱,它可以用作默认邮箱,但不能与BalancingDispatcher一起使用。

    1. akka.actor.default-mailbox {
    2. mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
    3. }

    每个邮箱类型都由一个扩展MailboxType并接受两个构造函数参数的类实现:ActorSystem.Settings对象和Config部分。后者是通过从 Actor 系统的配置中获取命名的配置节、用邮箱类型的配置路径覆盖其id键并添加回退(fall-back )到默认邮箱配置节来计算的。

    内置邮箱实现

    Akka 附带了许多邮箱实现:

    • UnboundedMailbox(默认)
      • 默认邮箱
      • java.util.concurrent.ConcurrentLinkedQueue支持
      • 是否阻塞:No
      • 是否有界:No
      • 配置名称:unboundedakka.dispatch.UnboundedMailbox
    • SingleConsumerOnlyUnboundedMailbox,此队列可能比默认队列快,也可能不比默认队列快,具体取决于你的用例,请确保正确地进行基准测试!
      • 由多个生产商单个使用者队列支持,不能与BalancingDispatcher一起使用
      • 是否阻塞:No
      • 是否有界:No
      • 配置名称:akka.dispatch.SingleConsumerOnlyUnboundedMailbox
    • NonBlockingBoundedMailbox
      • 由一个非常高效的”多生产者,单消费者“队列支持
      • 是否阻塞:No(将溢出的消息丢弃为deadLetters
      • 是否有界:Yes
      • 配置名称:akka.dispatch.NonBlockingBoundedMailbox
    • UnboundedControlAwareMailbox
      • 传递以更高优先级扩展akka.dispatch.ControlMessage的消息
      • 由两个java.util.concurrent.ConcurrentLinkedQueue支持
      • 是否阻塞:No
      • 是否有界:No
      • 配置名称:akka.dispatch.UnboundedControlAwareMailbox
    • UnboundedPriorityMailbox
      • java.util.concurrent.PriorityBlockingQueue支持
      • 等优先级邮件的传递顺序未定义,与UnboundedStablePriorityMailbox相反
      • 是否阻塞:No
      • 是否有界:No
      • 配置名称:akka.dispatch.UnboundedPriorityMailbox
    • UnboundedStablePriorityMailbox
    • 由包装在akka.util.PriorityQueueStabilizer中的java.util.concurrent.PriorityBlockingQueue提供支持
    • 对于优先级相同的消息保留FIFO顺序,与UnboundedPriorityMailbox相反
    • 是否阻塞:No
    • 是否有界:No
    • 配置名称:akka.dispatch.UnboundedStablePriorityMailbox

    其他有界邮箱实现,如果达到容量并配置了非零mailbox-push-timeout-time超时时间,则会阻止发件人。特别地,以下邮箱只能与零mailbox-push-timeout-time一起使用。

    • BoundedMailbox
      • java.util.concurrent.LinkedBlockingQueue支持
      • 是否阻塞:如果与非零mailbox-push-timeout-time一起使用,则为Yes,否则为NO
      • 是否有界:Yes
      • 配置名称:boundedakka.dispatch.BoundedMailbox
    • BoundedPriorityMailbox
      • 由包装在akka.util.BoundedBlockingQueue中的java.util.PriorityQueue提供支持
      • 优先级相同的邮件的传递顺序未定义,与BoundedStablePriorityMailbox相反
      • 是否阻塞:如果与非零mailbox-push-timeout-time一起使用,则为Yes,否则为NO
      • 是否有界:Yes
      • 配置名称:akka.dispatch.BoundedPriorityMailbox
    • BoundedStablePriorityMailbox
      • 由包装在akka.util.PriorityQueueStabilizerakka.util.BoundedBlockingQueue中的提供支持
      • 对于优先级相同的消息保留FIFO顺序,与BoundedPriorityMailbox相反
      • 是否阻塞:如果与非零mailbox-push-timeout-time一起使用,则为Yes,否则为NO
      • 是否有界:Yes
      • 配置名称:akka.dispatch.BoundedStablePriorityMailbox
    • BoundedControlAwareMailbox
      • 由两个java.util.concurrent.ConcurrentLinkedQueue支持,如果达到容量,则在排队时阻塞
      • 是否阻塞:如果与非零mailbox-push-timeout-time一起使用,则为Yes,否则为NO
      • 是否有界:Yes
      • 配置名称:akka.dispatch.BoundedControlAwareMailbox

    PriorityMailbox

    如何创建PriorityMailbox:

    1. static class MyPrioMailbox extends UnboundedStablePriorityMailbox {
    2. // needed for reflective instantiation
    3. public MyPrioMailbox(ActorSystem.Settings settings, Config config) {
    4. // Create a new PriorityGenerator, lower prio means more important
    5. super(
    6. new PriorityGenerator() {
    7. @Override
    8. public int gen(Object message) {
    9. if (message.equals("highpriority"))
    10. return 0; // 'highpriority messages should be treated first if possible
    11. else if (message.equals("lowpriority"))
    12. return 2; // 'lowpriority messages should be treated last if possible
    13. else if (message.equals(PoisonPill.getInstance()))
    14. return 3; // PoisonPill when no other left
    15. else return 1; // By default they go between high and low prio
    16. }
    17. });
    18. }
    19. }

    然后将其添加到配置中:

    下面是一个关于如何使用它的示例:

    1. class Demo extends AbstractActor {
    2. LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
    3. {
    4. for (Object msg :
    5. new Object[] {
    6. "lowpriority",
    7. "lowpriority",
    8. "highpriority",
    9. "pigdog",
    10. "pigdog2",
    11. "pigdog3",
    12. "highpriority",
    13. PoisonPill.getInstance()
    14. }) {
    15. getSelf().tell(msg, getSelf());
    16. }
    17. }
    18. @Override
    19. public Receive createReceive() {
    20. return receiveBuilder()
    21. .matchAny(
    22. message -> {
    23. log.info(message.toString());
    24. })
    25. .build();
    26. }
    27. }
    28. // We create a new Actor that just prints out what it processes
    29. ActorRef myActor =
    30. system.actorOf(Props.create(Demo.class, this).withDispatcher("prio-dispatcher"));
    31. /*
    32. Logs:
    33. 'highpriority
    34. 'highpriority
    35. 'pigdog
    36. 'pigdog2
    37. 'pigdog3
    38. 'lowpriority
    39. 'lowpriority
    40. */

    也可以这样直接配置邮箱类型(这是顶级配置项):

    1. prio-mailbox {
    2. mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
    3. //Other mailbox configuration goes here
    4. }
    5. akka.actor.deployment {
    6. /priomailboxactor {
    7. mailbox = prio-mailbox
    8. }
    9. }

    然后从这样的部署中使用它:

    1. ActorRef myActor = system.actorOf(Props.create(MyActor.class), "priomailboxactor");

    或者这样的代码:

    1. ActorRef myActor = system.actorOf(Props.create(MyActor.class).withMailbox("prio-mailbox"));

    可以这样配置:

    1. control-aware-dispatcher {
    2. mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
    3. //Other dispatcher configuration goes here
    4. }

    控制消息需要扩展ControlMessage特性:

    下面是一个关于如何使用它的示例:

    1. class Demo extends AbstractActor {
    2. LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
    3. {
    4. for (Object msg :
    5. new Object[] {"foo", "bar", new MyControlMessage(), PoisonPill.getInstance()}) {
    6. }
    7. }
    8. @Override
    9. public Receive createReceive() {
    10. return receiveBuilder()
    11. .matchAny(
    12. message -> {
    13. log.info(message.toString());
    14. })
    15. .build();
    16. }
    17. }
    18. // We create a new Actor that just prints out what it processes
    19. ActorRef myActor =
    20. system.actorOf(Props.create(Demo.class, this).withDispatcher("control-aware-dispatcher"));
    21. /*
    22. Logs:
    23. 'MyControlMessage
    24. 'foo
    25. 'bar
    26. */

    创建自己的邮箱类型

    示例如下:

    1. // Marker interface used for mailbox requirements mapping
    2. public interface MyUnboundedMessageQueueSemantics {}
    1. import akka.actor.ActorRef;
    2. import akka.actor.ActorSystem;
    3. import akka.dispatch.Envelope;
    4. import akka.dispatch.MailboxType;
    5. import akka.dispatch.MessageQueue;
    6. import akka.dispatch.ProducesMessageQueue;
    7. import com.typesafe.config.Config;
    8. import java.util.concurrent.ConcurrentLinkedQueue;
    9. import java.util.Queue;
    10. import scala.Option;
    11. public class MyUnboundedMailbox
    12. implements MailboxType, ProducesMessageQueue<MyUnboundedMailbox.MyMessageQueue> {
    13. // This is the MessageQueue implementation
    14. public static class MyMessageQueue implements MessageQueue, MyUnboundedMessageQueueSemantics {
    15. private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>();
    16. // these must be implemented; queue used as example
    17. public void enqueue(ActorRef receiver, Envelope handle) {
    18. queue.offer(handle);
    19. }
    20. public Envelope dequeue() {
    21. return queue.poll();
    22. }
    23. public int numberOfMessages() {
    24. return queue.size();
    25. }
    26. public boolean hasMessages() {
    27. return !queue.isEmpty();
    28. }
    29. public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
    30. for (Envelope handle : queue) {
    31. deadLetters.enqueue(owner, handle);
    32. }
    33. }
    34. }
    35. // This constructor signature must exist, it will be called by Akka
    36. public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) {
    37. // put your initialization code here
    38. }
    39. // The create method is called to create the MessageQueue
    40. public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
    41. return new MyMessageQueue();
    42. }
    43. }

    然后,将MailboxType的 FQCN 指定为调度器配置或邮箱配置中mailbox-type的值。

    • 注释:请确保包含一个采用akka.actor.ActorSystem.Settingscom.typesafe.config.Config参数的构造函数,因为此构造函数是通过反射调用来构造邮箱类型的。作为第二个参数传入的配置是配置中描述使用此邮箱类型的调度器或邮箱设置的部分;邮箱类型将为使用它的每个调度器或邮箱设置实例化一次。

    你还可以使用邮箱作为调度器的要求(requirement),如下所示:

    1. custom-dispatcher {
    2. mailbox-requirement =
    3. "jdocs.dispatcher.MyUnboundedMessageQueueSemantics"
    4. }
    5. akka.actor.mailbox.requirements {
    6. "jdocs.dispatcher.MyUnboundedMessageQueueSemantics" =
    7. custom-dispatcher-mailbox
    8. }
    9. custom-dispatcher-mailbox {
    10. mailbox-type = "jdocs.dispatcher.MyUnboundedMailbox"
    11. }

    或者像这样定义 Actor 类的要求:

    1. static class MySpecialActor extends AbstractActor
    2. implements RequiresMessageQueue<MyUnboundedMessageQueueSemantics> {
    3. // ...

    为了使system.actorOf既同步又不阻塞,同时保持返回类型ActorRef(以及返回的ref完全起作用的语义),对这种情况进行了特殊处理。在幕后,构建了一种空的 Actor 引用,将其发送给系统的守护者 Actor,该 Actor 实际上创建了 Actor 及其上下文,并将其放入引用中。在这之前,发送到的消息将在本地排队,只有在交换真正的填充之后,它们才会被传输到真正的邮箱中。因此,

    可能会失败;你必须留出一段时间通过并重试检查TestKit.awaitCond