Roy van Kaathoven
Full Stack Developer

10 Oct

Akka Event Bus

I've been working on a new project for a while which is based on Scala, Play Framework and Akka. The goal of the project is to create a library which can communicate with the Steam Servers, and using the library write multiple bots which perform various actions, like starting trades with other Steam users and exchange items.

There can be any number of bots online and their events can be handled by a varying number of services. The services should not have a hard dependency on eachother or other bots, they all have to communicate using a EventBus.

What is a Event Bus?

The Event Bus Pattern is a publish-subscribe-style communication pattern where services or objects can subscribe to the event bus and listen to specific types of events. Other services can publish events to the Event Bus, which then will be propagated to subscribers which are interested in the specific event type.

This pattern is a flexible loosely coupled design which allows for decoupled components that are not aware of eachother.

When to use

There are a lot of use cases for an event bus, a few common are:

  • GUI's: In a traditional application, multiple sources of events can exist across the application. A menu or toolbar button, a mouse click, window resizing, or some other external data source could feasibly produce the same desired result in the application. Selecting a certain row in a JTable, for example, might result in a toolbar button being deactivated, a database record updated, and a new window spawned. A double click of a JTree item might invoke another long chain of events. The event bus helps solve this problem by simply allowing each component to subscribe to the event bus, and when an interesting action occurs (such as the row selection in a table), an event should be generated to the bus. Each subscriber to the event type will be notified and can react accordingly.
  • Service Oriented Architecture: In a SOA environment there can be a lot of services which perform various actions. The Orderservice will handle an order and notify that an order has succesfully been completed. The newsletter service listens and subscribes the customers to its newsletter, the reporting service adds the informations to its database for later BI, and the inventory registers that the items are substracted from the warehouse. This business logic can be designed in a loosely coupled way using the EventBus.

Implementation

In order to construct a valid EventBus you need to inherit the EventBus class and mix a Classification trait. There are 3 classifications distributed with Akka which you can used in your application.

  • Lookup
  • SubChannel
  • Scanning

The default eventbus provides the following methods which are used to interact with actors:

// subscribes the given subscriber to events with the given classifier
def subscribe(subscriber: Subscriber, classifier: Classifier): Boolean
// undoes a specific subscription
def unsubscribe(subscriber: Subscriber, classifier: Classifier): Boolean
// undoes all subscriptions for the given subscriber
def unsubscribe(subscriber: Subscriber): Unit
// // publishes an event, which first is classified according to the specific bus (see Classifiers) and then published to all subscribers for the obtained classifier
def publish(event: Event): Unit

In order to build a valid event bus you will need to implement the following types:

Event specifies the type of messages the Eventbus will accept Classifier specifies the type of the classifier Subscriber specifies the type of the subscribers

Examples

The 3 classifications that are already available suit most cases so lets start with explaining how they can and should be used, and which fits your use case.

Lookup classification

Lookup classification is the most basic and provides a simple string matcher. If the published string is exactly the same as a subscriber has subscribed to then it will receive a message.

case class MessageEvent(val channel: String, val message: Any)
case class Message(text: String)

class LookupEventBus extends ActorEventBus with LookupClassification {
  type Event = MessageEvent
  type Classifier = String

  protected def mapSize(): Int = 10

  protected def classify(event: Event): Classifier = {
    event.channel
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.message
  }
}

The example below shows a subcriber which listens to events send to the "/events" channel, it then publishes a new messages on the "/events" channel and the message is received and printed.

The second publish/subscribe shows exactly the same but on a more specific channel "/events/10". The message is only received by one subscriber because the Lookup classification does not support subchannels.

  object LookupClassificationApp extends App {

    val system = ActorSystem()
    val eventBus = new LookupEventBus

    val subscriber = system.actorOf(Props(new Actor {
      def receive = {
        case Message(text) => println(text)
      }
    }))

    eventBus.subscribe(subscriber, "/events")
    eventBus.publish(MessageEvent("/events", Message(text = "hello world!")))
    // Output: hello world!

    eventBus.subscribe(subscriber, "/events/10")
    eventBus.publish(MessageEvent("/events/10", Message(text = "hello world!")))
    // Output: hello world!
  }

Subclass classification

The subclass specification is a EventBus which supports a hierachy of channels or classifiers. This first example focuses on a simple string based approach which allows to listen to a top level channel and their leafs.

case class MessageEvent(channel: String, message: Any)
case class Message(text: String)

object SubChannelClassificationEventBus extends EventBus with SubchannelClassification {
  type Event = MessageEvent
  type Classifier = String
  type Subscriber = ActorRef

  protected def classify(event: Event): Classifier = event.channel

  protected def subclassification = new Subclassification[Classifier] {
    def isEqual(x: Classifier, y: Classifier) = x == y
    def isSubclass(x: Classifier, y: Classifier) = x.startsWith(y)
  }

  protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.message
  }
}

The example below shows subcribers and publishers which become increasingly more specific, and that previous subscribers which are higher up the hierachy will catch the upcoming more specific publishers.

object ChannelClassification extends App {

  val system = ActorSystem()
  val eventBus = new SubChannelClassificationEventBus

  val subscriber = system.actorOf(Props(new Actor {
    def receive = {
      case Message(text) => println(text)
    }
  }))

  // Top level
  eventBus.subscribe(subscriber, "/")
  eventBus.publish(MessageEvent("/", Message(text = "hello world!")))
  // Output: hello world!

  // Only events
  eventBus.subscribe(subscriber, "/events")
  eventBus.publish(MessageEvent("/events", Message(text = "hello world!")))
  // Output: hello world!
  // Output: hello world!

  // Specific events
  eventBus.subscribe(subscriber, "/events/10")
  eventBus.publish(MessageEvent("/events/10", Message(text = "hello world!")))
  // Output: hello world!
  // Output: hello world!
  // Output: hello world!
}

Downfalls

The sender is not preserved when receiving a message which passed through a EventBus, to solve this you need to pass the ActorRef manually