事件总线

    • 注释:请注意,EventBus不保留已发布消息的发送者。如果你需要原始发件人的引用,则必须在消息中提供。

    此机制在 Akka 内的不同地方使用,例如「事件流」。实现可以使用下面介绍的特定构建基块。

    事件总线(event bus)必须定义以下三个类型参数:

    • Event是在该总线上发布的所有事件的类型
    • Subscriber是允许在该事件总线上注册的订阅者类型
    • Classifier定义用于选择用于调度事件的订阅者的分类器

    下面的特性在这些类型中仍然是通用的,但是需要为任何具体的实现定义它们。

    这里介绍的分类器(classifiers)是 Akka 发行版的一部分,但是如果你没有找到完美的匹配,那么滚动你自己的分类器并不困难,请检查「」上现有分类器的实现情况。

    最简单的分类就是从每个事件中提取一个任意的分类器,并为每个可能的分类器维护一组订阅者。特征LookupClassification仍然是通用的,因为它抽象了如何比较订阅者以及如何准确地分类。

    以下示例说明了需要实现的必要方法:

    1. import akka.event.japi.LookupEventBus;
    2. static class MsgEnvelope {
    3. public final String topic;
    4. public final Object payload;
    5. public MsgEnvelope(String topic, Object payload) {
    6. this.topic = topic;
    7. this.payload = payload;
    8. }
    9. }
    10. /**
    11. * Publishes the payload of the MsgEnvelope when the topic of the MsgEnvelope equals the String
    12. * specified when subscribing.
    13. */
    14. static class LookupBusImpl extends LookupEventBus<MsgEnvelope, ActorRef, String> {
    15. // is used for extracting the classifier from the incoming events
    16. @Override
    17. public String classify(MsgEnvelope event) {
    18. return event.topic;
    19. }
    20. // will be invoked for each event for all subscribers which registered themselves
    21. // for the event’s classifier
    22. @Override
    23. public void publish(MsgEnvelope event, ActorRef subscriber) {
    24. subscriber.tell(event.payload, ActorRef.noSender());
    25. }
    26. // must define a full order over the subscribers, expressed as expected from
    27. // `java.lang.Comparable.compare`
    28. @Override
    29. public int compareSubscribers(ActorRef a, ActorRef b) {
    30. return a.compareTo(b);
    31. }
    32. // determines the initial size of the index data structure
    33. // used internally (i.e. the expected number of different classifiers)
    34. @Override
    35. public int mapSize() {
    36. return 128;
    37. }
    38. }

    此实现的测试可能如下所示:

    1. LookupBusImpl lookupBus = new LookupBusImpl();
    2. lookupBus.subscribe(getTestActor(), "greetings");
    3. lookupBus.publish(new MsgEnvelope("time", System.currentTimeMillis()));
    4. lookupBus.publish(new MsgEnvelope("greetings", "hello"));
    5. expectMsgEquals("hello");

    如果不存在特定事件的订阅者,则此分类器是有效的。

    子通道分类

    如果分类器形成了一个层次结构,并且希望不仅可以在叶节点上订阅,那么这个分类可能就是正确的分类。这种分类是为分类器只是事件的 JVM 类而开发的,订阅者可能对订阅某个类的所有子类感兴趣,但它可以与任何分类器层次结构一起使用。

    以下示例说明了需要实现的必要方法:

    1. import akka.event.japi.SubchannelEventBus;
    2. static class StartsWithSubclassification implements Subclassification<String> {
    3. @Override
    4. public boolean isEqual(String x, String y) {
    5. return x.equals(y);
    6. }
    7. @Override
    8. public boolean isSubclass(String x, String y) {
    9. }
    10. }
    11. /**
    12. * Publishes the payload of the MsgEnvelope when the topic of the MsgEnvelope starts with the
    13. * String specified when subscribing.
    14. */
    15. // Subclassification is an object providing `isEqual` and `isSubclass`
    16. // to be consumed by the other methods of this classifier
    17. @Override
    18. public Subclassification<String> subclassification() {
    19. return new StartsWithSubclassification();
    20. }
    21. // is used for extracting the classifier from the incoming events
    22. @Override
    23. public String classify(MsgEnvelope event) {
    24. return event.topic;
    25. }
    26. // will be invoked for each event for all subscribers which registered themselves
    27. // for the event’s classifier
    28. @Override
    29. public void publish(MsgEnvelope event, ActorRef subscriber) {
    30. subscriber.tell(event.payload, ActorRef.noSender());
    31. }
    32. }
    1. SubchannelBusImpl subchannelBus = new SubchannelBusImpl();
    2. subchannelBus.subscribe(getTestActor(), "abc");
    3. subchannelBus.publish(new MsgEnvelope("xyzabc", "x"));
    4. subchannelBus.publish(new MsgEnvelope("bcdef", "b"));
    5. subchannelBus.publish(new MsgEnvelope("abc", "c"));
    6. expectMsgEquals("c");
    7. subchannelBus.publish(new MsgEnvelope("abcdef", "d"));
    8. expectMsgEquals("d");

    在没有为事件找到订阅者的情况下,该分类器也很有效,但它使用常规锁来同步内部分类器缓存,因此它不适合订阅以非常高的频率更改的情况(请记住,通过发送第一条消息“打开”分类器也必须重新检查所有以前的订阅)。

    前一个分类器是为严格分层的多分类器订阅而构建的,如果有重叠的分类器覆盖事件空间的各个部分而不形成分层结构,则此分类器非常有用。

    以下示例说明了需要实现的必要方法:

    此实现的测试可能如下所示:

    1. ScanningBusImpl scanningBus = new ScanningBusImpl();
    2. scanningBus.subscribe(getTestActor(), 3);
    3. scanningBus.publish("xyzabc");
    4. scanningBus.publish("ab");
    5. expectMsgEquals("ab");
    6. scanningBus.publish("abc");
    7. expectMsgEquals("abc");

    这个分类器总是需要一个与订阅数量成比例的时间,与实际匹配的数量无关。

    Actor 分类

    这个分类最初是专门为实现「DeathWatch」而开发的:订阅者和分类器都是ActorRef类型的。

    这种分类需要一个ActorSystem来执行与作为 Actor 的订阅者相关的簿记操作,而订阅者可以在不首先从EventBus取消订阅的情况下终止。ManagedActorClassification维护一个系统 Actor,自动处理取消订阅终止的 Actor。

    以下示例说明了需要实现的必要方法:

    1. import akka.event.japi.ManagedActorEventBus;
    2. static class Notification {
    3. public final ActorRef ref;
    4. public final int id;
    5. public Notification(ActorRef ref, int id) {
    6. this.ref = ref;
    7. this.id = id;
    8. }
    9. }
    10. static class ActorBusImpl extends ManagedActorEventBus<Notification> {
    11. // the ActorSystem will be used for book-keeping operations, such as subscribers terminating
    12. public ActorBusImpl(ActorSystem system) {
    13. super(system);
    14. }
    15. // is used for extracting the classifier from the incoming events
    16. @Override
    17. public ActorRef classify(Notification event) {
    18. return event.ref;
    19. }
    20. // determines the initial size of the index data structure
    21. // used internally (i.e. the expected number of different classifiers)
    22. @Override
    23. public int mapSize() {
    24. }
    25. }

    此实现的测试可能如下所示:

    1. ActorRef observer1 = new TestKit(system).getRef();
    2. ActorRef observer2 = new TestKit(system).getRef();
    3. TestKit probe1 = new TestKit(system);
    4. ActorRef subscriber1 = probe1.getRef();
    5. ActorRef subscriber2 = probe2.getRef();
    6. ActorBusImpl actorBus = new ActorBusImpl(system);
    7. actorBus.subscribe(subscriber1, observer1);
    8. actorBus.subscribe(subscriber2, observer1);
    9. actorBus.subscribe(subscriber2, observer2);
    10. Notification n1 = new Notification(observer1, 100);
    11. actorBus.publish(n1);
    12. probe1.expectMsgEquals(n1);
    13. probe2.expectMsgEquals(n1);
    14. Notification n2 = new Notification(observer2, 101);
    15. actorBus.publish(n2);
    16. probe2.expectMsgEquals(n2);
    17. probe1.expectNoMessage(Duration.ofMillis(500));

    这个分类器在事件类型中仍然是通用的,对于所有用例都是有效的。

    1. import akka.actor.ActorRef;
    2. import akka.actor.ActorSystem;

    可以这样订阅:

    1. final ActorSystem system = ActorSystem.create("DeadLetters");
    2. final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
    3. system.getEventStream().subscribe(actor, DeadLetter.class);

    值得指出的是,由于在事件流中实现子通道分类的方式,可以订阅一组事件,方法是订阅它们的公共超类,如下例所示:

    1. interface AllKindsOfMusic {}
    2. class Jazz implements AllKindsOfMusic {
    3. public final String artist;
    4. public Jazz(String artist) {
    5. this.artist = artist;
    6. }
    7. }
    8. class Electronic implements AllKindsOfMusic {
    9. public final String artist;
    10. public Electronic(String artist) {
    11. this.artist = artist;
    12. }
    13. }
    14. static class Listener extends AbstractActor {
    15. @Override
    16. public Receive createReceive() {
    17. return receiveBuilder()
    18. .match(
    19. Jazz.class,
    20. msg -> System.out.printf("%s is listening to: %s%n", getSelf().path().name(), msg))
    21. .match(
    22. Electronic.class,
    23. msg -> System.out.printf("%s is listening to: %s%n", getSelf().path().name(), msg))
    24. .build();
    25. }
    26. }
    27. final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));
    28. system.getEventStream().subscribe(actor, DeadLetter.class);
    29. final ActorRef jazzListener = system.actorOf(Props.create(Listener.class));
    30. final ActorRef musicListener = system.actorOf(Props.create(Listener.class));
    31. system.getEventStream().subscribe(jazzListener, Jazz.class);
    32. system.getEventStream().subscribe(musicListener, AllKindsOfMusic.class);
    33. // only musicListener gets this message, since it listens to *all* kinds of music:
    34. system.getEventStream().publish(new Electronic("Parov Stelar"));
    35. // jazzListener and musicListener will be notified about Jazz:
    36. system.getEventStream().publish(new Jazz("Sonny Rollins"));

    与 Actor 分类类似,EventStream将在订阅者终止时自动删除订阅者。

    • 注释:事件流是一个本地设施,这意味着它不会将事件分发到集群环境中的其他节点(除非你明确地向流订阅远程 Actor)。如果你需要在 Akka 集群中广播事件,而不明确地知道你的收件人(即获取他们的ActorRefs),你可能需要查看:「」。

    启动后,Actor 系统创建并订阅事件流的 Actor 以进行日志记录:这些是在application.conf中配置的处理程序:

    1. akka {
    2. loggers = ["akka.event.Logging$DefaultLogger"]
    3. }

    此处按完全限定类名列出的处理程序将订阅优先级高于或等于配置的日志级别的所有日志事件类,并且在运行时更改日志级别时,它们的订阅将保持同步:

      这意味着对于一个不会被记录的级别,日志事件通常根本不会被调度(除非已经完成了对相应事件类的手动订阅)。

      死信

      如「」所述,Actor 在其死亡后终止或发送时排队的消息将重新路由到死信邮箱,默认情况下,死信邮箱将发布用死信包装的消息。此包装包含已重定向信封的原始发件人、收件人和消息。

      一些内部消息(用死信抑制特性标记)不会像普通消息一样变成死信。这些是设计安全的,并且预期有时会到达一个终止的 Actor,因为它们不需要担心,所以它们被默认的死信记录机制抑制。

      但是,如果你发现自己需要调试这些低级抑制死信(low level suppressed dead letters),仍然可以明确订阅它们:

      或所有死信(包括被压制的):

      1. system.getEventStream().subscribe(actor, AllDeadLetters.class);

      英文原文链接Event Bus.