Kafka Setup
All Kafka configuration flows through AddKafka on the EmitBuilder. This page covers the broker connection, topic registration, and auto-provisioning. See Producers and Consumers for what you can do once topics are wired up.
Package
dotnet add package Emit.KafkaMinimal registration
builder.Services.AddEmit(emit =>{ emit.AddKafka(kafka => { kafka.ConfigureClient(cfg => { cfg.BootstrapServers = "localhost:9092"; });
kafka.Topic<string, PizzaOrdered>("pizzas", topic => { topic.SetKeySerializer(Confluent.Kafka.Serializers.Utf8); topic.SetValueSerializer(/* your serializer */); topic.SetKeyDeserializer(Confluent.Kafka.Deserializers.Utf8); topic.SetValueDeserializer(/* your deserializer */);
topic.Producer();
topic.ConsumerGroup("pizza-kitchen", group => { group.AddConsumer<PizzaOrderedConsumer>(); }); }); });});ConfigureClient is required. Everything else is opt-in per topic.
Client configuration
ConfigureClient applies settings that affect all producers and consumers. The most common options:
kafka.ConfigureClient(cfg =>{ cfg.BootstrapServers = "broker1:9092,broker2:9092"; cfg.ClientId = "my-service";
// SASL/SSL cfg.SecurityProtocol = SecurityProtocol.SaslSsl; cfg.SaslMechanism = SaslMechanism.Plain; cfg.SaslUsername = "user"; cfg.SaslPassword = "pass"; cfg.SslCaLocation = "/etc/ssl/certs/ca-bundle.crt";});For producer-specific tuning (batching, compression, idempotence), use ConfigureProducer:
kafka.ConfigureProducer(cfg =>{ cfg.Acks = Acks.All; cfg.EnableIdempotence = true; cfg.CompressionType = CompressionType.Snappy; cfg.Linger = TimeSpan.FromMilliseconds(5);});Schema Registry
If you use Avro, JSON Schema, or Protobuf serializers, configure the Schema Registry connection before your topics:
kafka.ConfigureSchemaRegistry(cfg =>{ cfg.Url = "http://schema-registry:8081"; cfg.BasicAuthUserInfo = "user:password";});See Serialization for how to wire schema-aware serializers to topics.
Topics
Each topic is declared with Topic<TKey, TValue>. The type parameters fix the serializer and deserializer types for that topic. You attach a producer, one or more consumer groups, or both:
kafka.Topic<string, PizzaOrdered>("pizzas", topic =>{ topic.SetKeySerializer(Confluent.Kafka.Serializers.Utf8); topic.SetValueSerializer(mySerializer);
topic.SetKeyDeserializer(Confluent.Kafka.Deserializers.Utf8); topic.SetValueDeserializer(myDeserializer);
topic.Producer();
topic.ConsumerGroup("my-group", group => { group.AddConsumer<MyConsumer>(); });});A topic can have both a producer and multiple consumer groups. Declaring a producer does not require a consumer group on the same topic, and vice versa.
Fan-out
Call ConsumerGroup multiple times on the same topic to register separate consumer groups. Each group receives every message independently. This is standard Kafka consumer group semantics; Emit adds no special mechanism:
kafka.Topic<string, PizzaOrdered>("pizzas", topic =>{ topic.ConsumerGroup("pizza-kitchen", group => { group.AddConsumer<PizzaOrderedConsumer>(); });
topic.ConsumerGroup("pizza-analytics", group => { group.AddConsumer<PizzaAnalyticsConsumer>(); });});Both groups receive every pizza order. Their offsets and processing are completely independent.
Auto-provisioning
By default, Emit verifies that all declared topics exist at startup. A missing topic causes an InvalidOperationException before any message processing begins. Call AutoProvision() to create missing topics instead:
emit.AddKafka(kafka =>{ kafka.AutoProvision();
kafka.Topic<string, PizzaOrdered>("pizzas", topic => { topic.Provisioning(options => { options.NumPartitions = 6; options.ReplicationFactor = 3; options.Retention = TimeSpan.FromDays(14); options.CompressionType = TopicCompressionType.Zstd; });
// ... serializers, producer, consumer groups ... });});Without per-topic Provisioning, missing topics are created with broker defaults and 7-day retention.
Topic creation options
| Option | Default | Description |
|---|---|---|
NumPartitions | broker default | Number of partitions |
ReplicationFactor | broker default | Number of replicas |
Retention | 7 days | How long messages are retained (null = infinite) |
RetentionBytes | no limit | Maximum partition size before old segments are discarded |
CleanupPolicy | Delete | Delete, Compact, or DeleteAndCompact |
DeleteRetention | 1 day | How long delete tombstones are retained for compacted topics |
CompressionType | Producer | Producer, Uncompressed, Gzip, Snappy, Lz4, or Zstd |
MinCleanableDirtyRatio | 0.5 | Minimum dirty-to-total ratio before the compactor runs |
MinCompactionLag | 0 | Minimum time a message remains uncompacted |
MaxCompactionLag | none | Maximum time a message can remain uncompacted |
Compression levels apply when the matching CompressionType is set:
| Option | Range | Default |
|---|---|---|
GzipCompressionLevel | 1-9 (or -1 for default) | -1 |
Lz4CompressionLevel | 1-17 | 9 |
ZstdCompressionLevel | negative for fast mode, 1-22 for normal | 3 |
How it works
KafkaTopicVerifier runs as a hosted service before all other hosted services. It queries the broker for existing topics and compares the result against all declared topics, including the dead-letter topic. When auto-provisioning is enabled, missing topics are created with their configured options. When disabled, any missing topic fails startup immediately.
Dead-letter topic
To route unprocessable messages to a dead-letter topic, call DeadLetter on the Kafka builder with the topic name:
emit.AddKafka(kafka =>{ kafka.DeadLetter("emit-dlq");
// ...});The dead-letter topic is subject to the same existence check (or auto-provisioning) as all other declared topics.
Provider-level middleware
Middleware registered on kafka.InboundPipeline or kafka.OutboundPipeline runs for all Kafka consumers or producers respectively. This is the Provider level of the pipeline model:
emit.AddKafka(kafka =>{ kafka.InboundPipeline.Use<TracingMiddleware>(); kafka.OutboundPipeline.Use<SigningMiddleware>();});