Demystifying Akka Extensions

When it comes to adding features to Akka, there is an elegant way to do that by using a very handy extension mechanism called Akka Extensions.

Adding a custom extension requires implementing two basic components: Extension and ExtensionId. There are many examples of extensions that will bring your application and development to a new level (Cluster and ClusterSharding just to name a few of the powerful extensions that are provided by Akka).

Extensions are loaded once per ActorSystem, either on-demand or at ActorSystem creation time. By default extensions are loaded on-demand. For loading extensions at actor system creation time you should specify that in the configuration file. We will go through this a bit further.

Sample extension

We are going to implement an extension which loads and stores the configuration for a sample application which uses Apache Kafka. This is a typical example where Akka Extensions is used.

Let's define a simple configuration file:

application.name = "sample-application"

kafka {
  producer {
    bootstrap.servers = "localhost:9092"
    acks = "all"
    retries = 0
    batch.size = 16834
    linger.ms = 1
    buffer.memory = 33554432
  }

  consumer {
    bootstrap.servers = "localhost:9092"
    group.id = "sample-application"
    topics = ["sample-topic"]
  }
}
Implementation

Now we can implement our settings extension. As previously mentioned, we have to implement two components: Extension and ExtensionId.
Here is what the extension looks like:

import akka.actor.{Actor, ExtendedActorSystem, Extension, ExtensionId}
import com.typesafe.config.Config
import scala.collection.JavaConverters._

class SettingsExtensionImpl(config: Config) extends Extension {

  object application {
    private val applicationConfig = config.getConfig("application")
    val name = applicationConfig.getString("name")
  }

  object kafka {
    private val kafkaConfig = config.getConfig("kafka")

    object producer {
      private val producerConfig = kafkaConfig.getConfig("producer")
      val bootstrapServers = producerConfig.getString("bootstrap.servers")
      val acks = producerConfig.getString("acks")
      val retries = producerConfig.getInt("retries")
      val batchSize = producerConfig.getInt("batch.size")
      val lingerMs = producerConfig.getInt("linger.ms")
      val bufferMemory = producerConfig.getInt("buffer.memory")
    }

    object consumer {
      private val consumerConfig = kafkaConfig.getConfig("consumer")
      val bootstrapServers = consumerConfig.getString("bootstrap.servers")
      val groupId = consumerConfig.getString("group.id")
      val topics = consumerConfig.getStringList("topics").asScala.toList
    }
  }

}

object SettingsExtension extends ExtensionId[SettingsExtensionImpl] {
  override def createExtension(system: ExtendedActorSystem) = new SettingsExtensionImpl(system.settings.config)
}
Usage

Using the extension is straightforward. Just call the apply method on the earlier defined extension: SettingsExtension(context.system).

For convenience I have added a SettingsActor which we are going to use inside the Actors:

trait SettingsActor {
  _: Actor =>
  val settings = SettingsExtension(context.system)
}

This is what ConsumerActor looks like after mixing in the SettingsActor:

import akka.actor.Actor
import cakesolutions.kafka.KafkaConsumer
import cakesolutions.kafka.akka.KafkaConsumerActor
import cakesolutions.kafka.akka.KafkaConsumerActor.{Subscribe, Unsubscribe}
import com.ted.akka.extensions.SettingsActor
import org.apache.kafka.common.serialization.StringDeserializer

class ConsumerActor extends Actor with SettingsActor {

  import settings.kafka.consumer._

  val kafkaConsumerActor = context.actorOf(
    KafkaConsumerActor.props(
      consumerConf = KafkaConsumer.Conf(
        keyDeserializer = new StringDeserializer,
        valueDeserializer = new StringDeserializer,
        bootstrapServers = bootstrapServers,
        groupId = groupId
      ),
      actorConf = KafkaConsumerActor.Conf(),
      downstreamActor = self))

  override def preStart() = {
    super.preStart()
    kafkaConsumerActor ! Subscribe.AutoPartition(topics)
  }

  override def postStop = {
    kafkaConsumerActor ! Unsubscribe
    super.postStop()
  }

  override def receive: Receive = // some message processing
}

And the ProducerActor:

import akka.actor.Actor
import cakesolutions.kafka.KafkaProducer.Conf
import cakesolutions.kafka.akka.KafkaProducerActor
import com.ted.akka.extensions.SettingsActor
import org.apache.kafka.common.serialization.StringSerializer

class ProducerActor extends Actor with SettingsActor {

  import settings.kafka.producer._

  val kafkaProducerActor = context.actorOf(
    KafkaProducerActor.props(
      Conf(
        keySerializer = new StringSerializer,
        valueSerializer = new StringSerializer,
        bootstrapServers = bootstrapServers,
        acks = acks,
        retries = retries,
        batchSize = batchSize,
        lingerMs = lingerMs,
        bufferMemory = bufferMemory
      )))

  override def receive: Receive = // some message processing
}
Testing

Testing SettingsExtension is very easy:

import akka.actor.ActorSystem
import akka.testkit.TestKit
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike, Matchers}

class SettingsExtensionTest
  extends TestKit(ActorSystem("SettingsExtensionTest"))
    with FunSuiteLike
    with BeforeAndAfterAll
    with Matchers {

  override def afterAll: Unit = {
    TestKit.shutdownActorSystem(system)
  }

  val settings = SettingsExtension(system)

  test("SettingsExtension to return the correct config values for application") {
    val application = settings.application

    application.name shouldBe "sample-application"
  }

  test("SettingsExtension to return the correct config values for kafka.producer") {
    val kakfaProducer = settings.kafka.producer

    kakfaProducer.bootstrapServers shouldBe "localhost:9092"
    kakfaProducer.acks shouldBe "all"
    kakfaProducer.batchSize shouldBe 16834
    kakfaProducer.bufferMemory shouldBe 33554432
    kakfaProducer.retries shouldBe 0
    kakfaProducer.lingerMs shouldBe 1
  }

  test("SettingsExtension to return the correct config values for kafka.consumer") {
    val kafkaConsumer = settings.kafka.consumer

    kafkaConsumer.bootstrapServers shouldBe "localhost:9092"
    kafkaConsumer.groupId shouldBe "sample-application"
    kafkaConsumer.topics shouldBe List("sample-topic")
  }

}
Loading extensions on ActorSystem creation time

For extensions that we want to load at the ActorSystem creation time we should specify FQCNs (fully qualified class name) of implementations of ExtensionId in the configuration file:

// loads the extension at ActorSystem creation time
akka.extensions = ["com.ted.akka.extensions.SettingsExtension"]

Alternatively, we can define a class/object that implements ExtensionIdProvider and specify its FQCN instead.

Note: A third party library may register its extension to be loaded at ActorSystem creation time. This is achieved by appending the extension's FQCN to akka.library-extensions in the reference.conf file:

akka.library-extensions += "com.example.extension.ExampleExtension"

Conclusion

In this blog post we went through a brief introduction to Akka Extensions. It is an elegant, yet powerful way to define reusable features for applications written in Akka, which should be definitely considered.

The complete code of the settings extension defined in this blog post is available on github.

Happy hakking!