Serialization
Kafka transports raw bytes. Emit delegates serialization entirely to Confluent's ISerializer<T> and IDeserializer<T> interfaces, so any serializer that implements those interfaces works.
Setting serializers
Section titled “Setting serializers”Assign serializers per topic in the Topic builder:
kafka.Topic<string, OrderPlaced>("orders", topic =>{ topic.SetKeySerializer(Confluent.Kafka.Serializers.Utf8); topic.SetValueSerializer(myValueSerializer);
topic.SetKeyDeserializer(Confluent.Kafka.Deserializers.Utf8); topic.SetValueDeserializer(myValueDeserializer);});Confluent provides built-in serializers for string, int, long, float, double, Null, and ByteArray in Confluent.Kafka.Serializers and Confluent.Kafka.Deserializers.
Emit also exposes shorthand methods that wrap those built-ins, so you don't have to reference Confluent.Kafka.Serializers or Confluent.Kafka.Deserializers directly:
kafka.Topic<string, OrderPlaced>("orders", topic =>{ topic.SetUtf8KeySerializer(); topic.SetUtf8KeyDeserializer();});The full set of shorthands:
| Type | Serializer | Deserializer |
|---|---|---|
string | SetUtf8KeySerializer() / SetUtf8ValueSerializer() | SetUtf8KeyDeserializer() / SetUtf8ValueDeserializer() |
byte[] | SetByteArrayKeySerializer() / SetByteArrayValueSerializer() | SetByteArrayKeyDeserializer() / SetByteArrayValueDeserializer() |
Null | SetNullKeySerializer() / SetNullValueSerializer() | SetNullKeyDeserializer() / SetNullValueDeserializer() |
int | SetInt32KeySerializer() / SetInt32ValueSerializer() | SetInt32KeyDeserializer() / SetInt32ValueDeserializer() |
long | SetInt64KeySerializer() / SetInt64ValueSerializer() | SetInt64KeyDeserializer() / SetInt64ValueDeserializer() |
float | SetSingleKeySerializer() / SetSingleValueSerializer() | SetSingleKeyDeserializer() / SetSingleValueDeserializer() |
double | SetDoubleKeySerializer() / SetDoubleValueSerializer() | SetDoubleKeyDeserializer() / SetDoubleValueDeserializer() |
| (skip) | SetIgnoreKeyDeserializer() / SetIgnoreValueDeserializer() |
SetIgnore* is useful when you consume a topic but don't care about the key or value bytes at all.
For async serializers (IAsyncSerializer<T> and IAsyncDeserializer<T>), use the corresponding overloads:
topic.SetValueSerializer(myAsyncSerializer);topic.SetValueDeserializer(myAsyncDeserializer);Schema Registry serializers
Section titled “Schema Registry serializers”Emit provides three packages that integrate with Confluent Schema Registry. All three use a deferred factory pattern: the ISchemaRegistryClient is resolved at startup and passed to the serializer factory. All three require Schema Registry to be configured on the Kafka builder (see Setup).
JSON Schema
Section titled “JSON Schema”dotnet add package Emit.Kafka.JsonSerializerkafka.Topic<string, OrderPlaced>("orders", topic =>{ topic.SetJsonSchemaValueSerializer(); topic.SetJsonSchemaValueDeserializer();});Pass a JsonSerializerConfig to control schema validation behavior:
topic.SetJsonSchemaValueSerializer(new JsonSerializerConfig{ SubjectNameStrategy = SubjectNameStrategy.TopicRecord,});Pass a RuleRegistry as the second parameter to enable Schema Registry rule evaluation during serialization.
dotnet add package Emit.Kafka.AvroSerializerkafka.Topic<string, MyAvroType>("events", topic =>{ topic.SetAvroValueSerializer(); topic.SetAvroValueDeserializer();});TValue must be a type generated by the Avro code generator (implements ISpecificRecord). Pass AvroSerializerConfig to tune registration behavior:
topic.SetAvroValueSerializer(new AvroSerializerConfig{ AutoRegisterSchemas = false,});Pass a RuleRegistry as the second parameter to enable Schema Registry rule evaluation during serialization.
Protobuf
Section titled “Protobuf”dotnet add package Emit.Kafka.ProtobufSerializerkafka.Topic<string, MyProtoMessage>("events", topic =>{ topic.SetProtobufValueSerializer(); topic.SetProtobufValueDeserializer();});TValue must implement IMessage<TValue> (generated by protoc). Pass a RuleRegistry as the second parameter to enable Schema Registry rule evaluation during serialization.
System.Text.Json (custom)
Section titled “System.Text.Json (custom)”For a quick implementation without Schema Registry, implement both interfaces on a single class:
public class JsonValueSerializer<T> : ISerializer<T>, IDeserializer<T>{ public byte[] Serialize(T data, SerializationContext context) => JsonSerializer.SerializeToUtf8Bytes(data);
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context) => isNull ? default! : JsonSerializer.Deserialize<T>(data)!;}var serializer = new JsonValueSerializer<OrderPlaced>();topic.SetValueSerializer(serializer);topic.SetValueDeserializer(serializer);This is fine for internal topics where you control both sides. For anything crossing a schema boundary, use a Schema Registry-backed serializer.
Deserialization errors
Section titled “Deserialization errors”If deserialization throws, the error is surfaced as a DeserializationErrorEvent on any registered IKafkaConsumerObserver and is handled by the OnDeserializationError policy on the consumer group. See Error Policies for configuration details.