Skip to content

Serialization

Kafka transports raw bytes. Emit delegates serialization entirely to Confluent’s ISerializer<T> and IDeserializer<T> interfaces, so any serializer that implements those interfaces works.

Setting serializers

Assign serializers per topic in the Topic builder:

kafka.Topic<string, OrderPlaced>("orders", topic =>
{
topic.SetKeySerializer(Confluent.Kafka.Serializers.Utf8);
topic.SetValueSerializer(myValueSerializer);
topic.SetKeyDeserializer(Confluent.Kafka.Deserializers.Utf8);
topic.SetValueDeserializer(myValueDeserializer);
});

Confluent provides built-in serializers for string, int, long, float, double, Null, and ByteArray in Confluent.Kafka.Serializers and Confluent.Kafka.Deserializers.

Emit also exposes shorthand methods that wrap those built-ins, so you don’t have to reference Confluent.Kafka.Serializers or Confluent.Kafka.Deserializers directly:

kafka.Topic<string, OrderPlaced>("orders", topic =>
{
topic.SetUtf8KeySerializer();
topic.SetUtf8KeyDeserializer();
});

The full set of shorthands:

TypeSerializerDeserializer
stringSetUtf8KeySerializer() / SetUtf8ValueSerializer()SetUtf8KeyDeserializer() / SetUtf8ValueDeserializer()
byte[]SetByteArrayKeySerializer() / SetByteArrayValueSerializer()SetByteArrayKeyDeserializer() / SetByteArrayValueDeserializer()
NullSetNullKeySerializer() / SetNullValueSerializer()SetNullKeyDeserializer() / SetNullValueDeserializer()
intSetInt32KeySerializer() / SetInt32ValueSerializer()SetInt32KeyDeserializer() / SetInt32ValueDeserializer()
longSetInt64KeySerializer() / SetInt64ValueSerializer()SetInt64KeyDeserializer() / SetInt64ValueDeserializer()
floatSetSingleKeySerializer() / SetSingleValueSerializer()SetSingleKeyDeserializer() / SetSingleValueDeserializer()
doubleSetDoubleKeySerializer() / SetDoubleValueSerializer()SetDoubleKeyDeserializer() / SetDoubleValueDeserializer()
(skip)SetIgnoreKeyDeserializer() / SetIgnoreValueDeserializer()

SetIgnore* is useful when you consume a topic but don’t care about the key or value bytes at all.

For async serializers (IAsyncSerializer<T> and IAsyncDeserializer<T>), use the corresponding overloads:

topic.SetValueSerializer(myAsyncSerializer);
topic.SetValueDeserializer(myAsyncDeserializer);

Schema Registry serializers

Emit provides three packages that integrate with Confluent Schema Registry. All three use a deferred factory pattern: the ISchemaRegistryClient is resolved at startup and passed to the serializer factory. All three require Schema Registry to be configured on the Kafka builder (see Setup).

JSON Schema

Terminal window
dotnet add package Emit.Kafka.JsonSerializer
kafka.Topic<string, OrderPlaced>("orders", topic =>
{
topic.SetJsonSchemaValueSerializer();
topic.SetJsonSchemaValueDeserializer();
});

Pass a JsonSerializerConfig to control schema validation behavior:

topic.SetJsonSchemaValueSerializer(new JsonSerializerConfig
{
SubjectNameStrategy = SubjectNameStrategy.TopicRecord,
});

Pass a RuleRegistry as the second parameter to enable Schema Registry rule evaluation during serialization.

Avro

Terminal window
dotnet add package Emit.Kafka.AvroSerializer
kafka.Topic<string, MyAvroType>("events", topic =>
{
topic.SetAvroValueSerializer();
topic.SetAvroValueDeserializer();
});

TValue must be a type generated by the Avro code generator (implements ISpecificRecord). Pass AvroSerializerConfig to tune registration behavior:

topic.SetAvroValueSerializer(new AvroSerializerConfig
{
AutoRegisterSchemas = false,
});

Pass a RuleRegistry as the second parameter to enable Schema Registry rule evaluation during serialization.

Protobuf

Terminal window
dotnet add package Emit.Kafka.ProtobufSerializer
kafka.Topic<string, MyProtoMessage>("events", topic =>
{
topic.SetProtobufValueSerializer();
topic.SetProtobufValueDeserializer();
});

TValue must implement IMessage<TValue> (generated by protoc). Pass a RuleRegistry as the second parameter to enable Schema Registry rule evaluation during serialization.

System.Text.Json (custom)

For a quick implementation without Schema Registry, implement both interfaces on a single class:

public class JsonValueSerializer<T> : ISerializer<T>, IDeserializer<T>
{
public byte[] Serialize(T data, SerializationContext context)
=> JsonSerializer.SerializeToUtf8Bytes(data);
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
=> isNull ? default! : JsonSerializer.Deserialize<T>(data)!;
}
var serializer = new JsonValueSerializer<OrderPlaced>();
topic.SetValueSerializer(serializer);
topic.SetValueDeserializer(serializer);

This is fine for internal topics where you control both sides. For anything crossing a schema boundary, use a Schema Registry-backed serializer.

Deserialization errors

If deserialization throws, the error is surfaced as a DeserializationErrorEvent on any registered IKafkaConsumerObserver and is handled by the OnDeserializationError policy on the consumer group. See Error Policies for configuration details.