Serialization
Kafka transports raw bytes. Emit delegates serialization entirely to Confluent’s ISerializer<T> and IDeserializer<T> interfaces, so any serializer that implements those interfaces works.
Setting serializers
Assign serializers per topic in the Topic builder:
kafka.Topic<string, OrderPlaced>("orders", topic =>{ topic.SetKeySerializer(Confluent.Kafka.Serializers.Utf8); topic.SetValueSerializer(myValueSerializer);
topic.SetKeyDeserializer(Confluent.Kafka.Deserializers.Utf8); topic.SetValueDeserializer(myValueDeserializer);});Confluent provides built-in serializers for string, int, long, float, double, Null, and ByteArray in Confluent.Kafka.Serializers and Confluent.Kafka.Deserializers.
Emit also exposes shorthand methods that wrap those built-ins, so you don’t have to reference Confluent.Kafka.Serializers or Confluent.Kafka.Deserializers directly:
kafka.Topic<string, OrderPlaced>("orders", topic =>{ topic.SetUtf8KeySerializer(); topic.SetUtf8KeyDeserializer();});The full set of shorthands:
| Type | Serializer | Deserializer |
|---|---|---|
string | SetUtf8KeySerializer() / SetUtf8ValueSerializer() | SetUtf8KeyDeserializer() / SetUtf8ValueDeserializer() |
byte[] | SetByteArrayKeySerializer() / SetByteArrayValueSerializer() | SetByteArrayKeyDeserializer() / SetByteArrayValueDeserializer() |
Null | SetNullKeySerializer() / SetNullValueSerializer() | SetNullKeyDeserializer() / SetNullValueDeserializer() |
int | SetInt32KeySerializer() / SetInt32ValueSerializer() | SetInt32KeyDeserializer() / SetInt32ValueDeserializer() |
long | SetInt64KeySerializer() / SetInt64ValueSerializer() | SetInt64KeyDeserializer() / SetInt64ValueDeserializer() |
float | SetSingleKeySerializer() / SetSingleValueSerializer() | SetSingleKeyDeserializer() / SetSingleValueDeserializer() |
double | SetDoubleKeySerializer() / SetDoubleValueSerializer() | SetDoubleKeyDeserializer() / SetDoubleValueDeserializer() |
| (skip) | SetIgnoreKeyDeserializer() / SetIgnoreValueDeserializer() |
SetIgnore* is useful when you consume a topic but don’t care about the key or value bytes at all.
For async serializers (IAsyncSerializer<T> and IAsyncDeserializer<T>), use the corresponding overloads:
topic.SetValueSerializer(myAsyncSerializer);topic.SetValueDeserializer(myAsyncDeserializer);Schema Registry serializers
Emit provides three packages that integrate with Confluent Schema Registry. All three use a deferred factory pattern: the ISchemaRegistryClient is resolved at startup and passed to the serializer factory. All three require Schema Registry to be configured on the Kafka builder (see Setup).
JSON Schema
dotnet add package Emit.Kafka.JsonSerializerkafka.Topic<string, OrderPlaced>("orders", topic =>{ topic.SetJsonSchemaValueSerializer(); topic.SetJsonSchemaValueDeserializer();});Pass a JsonSerializerConfig to control schema validation behavior:
topic.SetJsonSchemaValueSerializer(new JsonSerializerConfig{ SubjectNameStrategy = SubjectNameStrategy.TopicRecord,});Pass a RuleRegistry as the second parameter to enable Schema Registry rule evaluation during serialization.
Avro
dotnet add package Emit.Kafka.AvroSerializerkafka.Topic<string, MyAvroType>("events", topic =>{ topic.SetAvroValueSerializer(); topic.SetAvroValueDeserializer();});TValue must be a type generated by the Avro code generator (implements ISpecificRecord). Pass AvroSerializerConfig to tune registration behavior:
topic.SetAvroValueSerializer(new AvroSerializerConfig{ AutoRegisterSchemas = false,});Pass a RuleRegistry as the second parameter to enable Schema Registry rule evaluation during serialization.
Protobuf
dotnet add package Emit.Kafka.ProtobufSerializerkafka.Topic<string, MyProtoMessage>("events", topic =>{ topic.SetProtobufValueSerializer(); topic.SetProtobufValueDeserializer();});TValue must implement IMessage<TValue> (generated by protoc). Pass a RuleRegistry as the second parameter to enable Schema Registry rule evaluation during serialization.
System.Text.Json (custom)
For a quick implementation without Schema Registry, implement both interfaces on a single class:
public class JsonValueSerializer<T> : ISerializer<T>, IDeserializer<T>{ public byte[] Serialize(T data, SerializationContext context) => JsonSerializer.SerializeToUtf8Bytes(data);
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context) => isNull ? default! : JsonSerializer.Deserialize<T>(data)!;}var serializer = new JsonValueSerializer<OrderPlaced>();topic.SetValueSerializer(serializer);topic.SetValueDeserializer(serializer);This is fine for internal topics where you control both sides. For anything crossing a schema boundary, use a Schema Registry-backed serializer.
Deserialization errors
If deserialization throws, the error is surfaced as a DeserializationErrorEvent on any registered IKafkaConsumerObserver and is handled by the OnDeserializationError policy on the consumer group. See Error Policies for configuration details.