Define topic schema for Kafka using Protobuf (with examples in Scala)

Introduction

Low-latency messaging in big distributed systems has never been easy. Developing systems with many microservices that rely on messaging for cross service communication, especially when the services are developed by different teams, brings a lot of challenges. It requires discipline and a clear exchange contract. The last thing you want to have to deal with is a backwards-incompatible change in the message that is sent to a topic consumed by your service. Having a clear schema for topics will reduce and ideally eliminate the potential for this problem occurring.

In this blog post I'm going to explain how to achieve this using Protocol Buffers (or simply Protobuf) and Apache Kafka. It is based on a PoC I did recently which can be found on GitHub. For simplicity I'm going to use Scala (even though I will try to use as few code snippets as possible) but this approach can be used with other languages which have a Kafka client and a Protobuf compiler available.

Topic design

There are several approaches you can take with topic design, but they can be categorized as one the following: one message type per topic, a few message types per topic or many of them wrapped in a common envelope.

1. Multiple message types per topic.

If you have multiple events and the order must be preserved then it's very appropriate to have multiple message types per topic. This approach is very common to event sourcing but it's not limited to this. In this case you will have the events for a specific entity sent to the same partition (Kafka guarantees message order only in the same partition but not the topic) by setting a common message key (e.g. the entity ID).

2. One message type per topic

While the ordering guarantees are a potentially important benefit of using the same topic, it doesn't make much sense to send unrelated data to the same topic. In that case it's more appropriate to split things such that each topic has only one message type. This way it will be easier to reason about the data written to and consumed from the topic. It also reduces the amount of unwanted messages that consumers may need to ignore.

3. Envelopes with Any payload

In big distributed systems you will probably want to send some metadata in addition to the actual application data, whether this is a correlation ID for distributed tracing purposes or message signatures to ensure that the content can be trusted. Of course, it's also possible to use Kafka headers, but having the metadata in the raw message that is sent to Kafka makes it easier to define a contract across the entire system and has the additional benefit of being message broker agnostic.

This can be achieved by wrapping the aplication message in an envelope. A common approach is to have an envelope with multiple metadata fields and then a payload expressed using Protobuf's Any type:

import "google/protobuf/any.proto";

message Envelope {
  string correlation_id = 1;
  string auth_token = 2;
  // some other metadata
  google.protobuf.Any payload = 10;
}

Note: google.protobuf.Any is not directly comparable to Object in Java or Any in Scala. It's best understood by looking at the Protobuf schema:

message Any {
  string type_url = 1;
  bytes value = 2;
}

type_url is used to uniquely identify the type of the serialized Protobuf message, while value represents the serialized payload. Since value is just a sequence of bytes, it requires additional deserialization. As a result, we end up having a 2-step deserialization:

  1. Deserialize the envelope itself. After deserializing the envelope we get access to our metadata fields and the payload which is of type google.protobuf.Any.
  2. Deserialize the value bytes into the real type. For this we will use type_url to determine what deserialiser we should use for these value bytes.

This envelope can be used regardless of whether you have one message type sent to a topic or many. It is also a neat way to provide a message structure across the system. One drawback of the Any type is that the deserialization has to be done in two steps. In some cases it does make sense to have an envelope with a generic payload type, but we also need to consider the complexity it introduces.

4. Concrete/Topic Schema Envelopes

The main purpose of concrete envelopes is to define a topic schema. It involves defining an envelope and list all the possile payload types that are sent through a topic. Let's look at the next example that defines a concrete envelope:

message UserEnvelope {
  string correlation_id = 1;
  string auth_token = 2;
  // some other metadata
  User payload = 10; // Where user is a protobuf message that represents the user state
}

message User {
  string first_name = 1;
  string last_name = 2;
}

Here we have defined a UserEnvelope that resembles the generic Envelope. One interesting thing is that the payload has a more concrete type. You can easily determine what the type of the payload is and what the message deserialization and processing will look like. And, conveniently, we only have to perform one deserialization step.

But then the question is: "How can we deal with multiple payload types?" This is where the fun begins!

Multiple payload types using oneof

Protobuf allows us to define a set of fields where at most one can be set at the same time using the oneof construction:

message SampleMessage {
  oneof sample_field {
    string field_1 = 10;
    string field_2 = 11;
    CustomSubMessage field_3 = 12;
  }
}

On the write side, Protobuf ensures that setting a oneof field will automatically clear all other members of the oneof. On the read side, when the parser encounters multiple members of the same oneof only the last member seen will be set.

So, we can use oneof to support multiple payload types, whether this is for Event Sourcing, the current state in a Finite State Machine or some other model. You could even use it for versioning within the topic, but whether it is a good idea is a subject for a different discussion.

For the sake of this post I'm going to assume we are using event sourcing and define a set of events for a user entity:

syntax = "proto3";

package com.tudorzgureanu.protocol.users.v1;

message UsersEnvelope {
    string correlation_id = 1;
    // some other metadata
    oneof payload {
        UserCreated user_created = 11;
        UserUpdated user_updated = 12;
        UserActivated user_activated = 13;
    }
}

message UserCreated {
    string id = 1;
    string first_name = 2;
    string last_name = 3;
}

message UserUpdated {
    string id = 1;
    string first_name = 2;
    string last_name = 3;
}

message UserActivated {
    string user_id = 1;
}

By using oneof to define a concrete topic envelope we can have only one top-level message type per topic, but which has all the possible payload types listed as part of that message definition. This is a nice way to define topic schemas. It gives us a better and a clearer contract as it states exactly what payload types to expect. It also improves type safety. Now that we have this clear definition of what is valid input to the topic, anything that doesn't conform to this top-level message schema can simply be discarded by consumers as invalid data.

The only thing remains to do is to get this message definition in your service, generate the Scala (or other language) classes, and you're ready to go.

Payload processing in Scala

The payload processing is relatively simple. ScalaPB generates a sum type for all the oneof fields and an Empty case for when the payload is not set at all or set with a field that is not expected or supported by the consumer (in other languages the "empty" case may be named differently). One scenario where this is expected to happen is new event types are introduced and sent by producers but received by a consumer which hasn't been updated to handle them yet.

In Scala, the easiest way to process all the payload types is by using pattern matching. By pattern matching sum types we have the advantage that the Scala compiler will tell us if the pattern matching is exhaustive or not.

This is what the envelope processing looks like. I've ommited the Kafka consumer code here because the messages are consumed and deserialized in the usual way, but feel free to check out the full UserConsumerActor code here.

def processUserEvent(key: Option[String], envelope: UsersEnvelope): Future[Either[String, UserEvent]] = {
  envelope.payload match {
    case Payload.UserCreated(userCreatedProto) =>
      log.info(s"[correlationId: ${envelope.correlationId}] User created $userCreatedProto")
      userService
        .persistUserEvent(UserCreated.fromProtoV1(userCreatedProto))
        .map(Right(_))
    case Payload.UserUpdated(userUpdatedProto) =>
      log.info(s"[correlationId: ${envelope.correlationId}] User updated $userUpdatedProto")
      userService
        .persistUserEvent(UserUpdated.fromProtoV1(userUpdatedProto))
        .map(Right(_))
    case Payload.UserActivated(userActivatedProto) =>
      log.info(s"[correlationId: ${envelope.correlationId}] User activated $userActivatedProto")
      userService.persistUserEvent(UserActivated.fromProtoV1(userActivatedProto)).map(Right(_))
    case Payload.Empty =>
      log.info(
        s"[correlationId: ${envelope.correlationId}] Unexpected payload with key: ${key.getOrElse("null")}. Payload ignored."
      )
      Future.successful(Left("Couldn't not process payload."))
  }
}

As you can see this is a pretty neat way of processing the payload. In case a new payload type is added to the protobuf definition the Scala compiler will let us know that we need process the newly added type. In Java you will have to use instanceof followed by a cast to the correct type (please let me know if you would like to see a Java version of this PoC).

Challenges

Log Compaction

If you decide to use compacted topics, envelopes may require additional work.

But first, a quick recap on what log compaction is, from Getting started with Kafka using Scala Kafka Client and Akka:

When topics running in the normal mode of operation (the mode is called 'delete') reach their configured length bound (whether that be a time and/or space bound), messages are deleted from the tail of the log.

In log compaction mode (the mode is called 'compact'), instead of deleting messages from the tail of the log when the length bound is reached, Kafka builds a new version of the log, retaining on a key-by-key basis only the most recent message that was written to the old un-compacted log.

Deletion is achieved by sending a null payload, which will act like a tombstone for that key. After compaction, tombstones are cleaned up, leaving no footprint and hence achieving deletion for that key.

Because deletion is achieved by sending a key with a null message, metadata such as the correlation ID is lost too. In order to preserve it deletion has to be done in two steps:

  1. First, we send an envelope message with all the relevant headers but with an "empty" payload (we'll look at two ways of doing that below). This will give us the metadata that we need but also let us know that this entity has been marked as deleted and has to be processed accordingly (e.g. by deleting the entity from the database). The drawback of this approach is that all the parties have to be aware how entities get marked as deleted.

  2. Immediately after we send the above message we send a second message with the same key but with a null envelope. This is only done so Kafka knows this key has to be removed from the log. When compaction takes place Kafka will remove the key from the log.

Let's look back at step 1 and see how we can represent a deleted entity in the envelope:

Without oneof

With only one payload type per topic things are easy. Let's look back at the example with one message type per topic:

message UserEnvelope {
  string correlation_id = 1;
  User payload = 2;  
}

As described above deletion requires two steps:

  1. In Protobuf 3 non-scalar fields are nullable and ScalaPB represents them in the generated code as Option. So it's possible to make the application aware of deletes by sending a null (None) payload. Alternatively, we could have a flag (e.g. bool deleted = 3;) that would tell us this represents a deletion.

  2. Straight after step 1 we are ready to send a message with the same key and a null envelope to mark it for deletion.

With oneof

oneof gives us another option. We can easily define a separate entity-deleted event/submessage that tells us an entity was removed. This is especially useful when you have more than one entity type:

message UsersEnvelope {
    string correlation_id = 1;
    // some other metadata
    oneof payload {
        User user = 11;
        UserRemoved user_removed = 12; // Contains the user id. Or a bool could be used if the key sent to Kafka is the entity id
        // more payload types
    }
}

Even though it still requires 2 steps (but only 1 deserialization), the main advantage is that instead of setting the payload to null (in Java and other languages) or None in Scala, we use a different oneof case. This way we can have multiple entity types and have a coresponding deletion case for each of them. Another significant benefit of this is in readability - UserRemoved leaves little to be interpreted by the reader, whereas the meaning of null or None is context sensitive.

Conclusions

In this post we have discussed the advantages and disadvantages of several topic designs, with a focus on "concrete envelopes". They provide a better contract between parties and a typesafe way to process payloads. Nonetheless, this approach may not suit all the use cases, it's up to you to decide which topic design works better for your system.

The full code of this PoC is available here. Also, I recommend checking out this nice Kafka tutorial using Scala Kafka Client.

P.S. I'd like to thank David Piggott and Simon Souter for their valuable feedback!

Happy Hacking!

References and further reading