Skip to content

Kafka Setup

All Kafka configuration flows through AddKafka on the EmitBuilder. This page covers the broker connection, topic registration, and auto-provisioning. See Producers and Consumers for what you can do once topics are wired up.

Package

Terminal window
dotnet add package Emit.Kafka

Minimal registration

Program.cs
builder.Services.AddEmit(emit =>
{
emit.AddKafka(kafka =>
{
kafka.ConfigureClient(cfg =>
{
cfg.BootstrapServers = "localhost:9092";
});
kafka.Topic<string, PizzaOrdered>("pizzas", topic =>
{
topic.SetKeySerializer(Confluent.Kafka.Serializers.Utf8);
topic.SetValueSerializer(/* your serializer */);
topic.SetKeyDeserializer(Confluent.Kafka.Deserializers.Utf8);
topic.SetValueDeserializer(/* your deserializer */);
topic.Producer();
topic.ConsumerGroup("pizza-kitchen", group =>
{
group.AddConsumer<PizzaOrderedConsumer>();
});
});
});
});

ConfigureClient is required. Everything else is opt-in per topic.

Client configuration

ConfigureClient applies settings that affect all producers and consumers. The most common options:

kafka.ConfigureClient(cfg =>
{
cfg.BootstrapServers = "broker1:9092,broker2:9092";
cfg.ClientId = "my-service";
// SASL/SSL
cfg.SecurityProtocol = SecurityProtocol.SaslSsl;
cfg.SaslMechanism = SaslMechanism.Plain;
cfg.SaslUsername = "user";
cfg.SaslPassword = "pass";
cfg.SslCaLocation = "/etc/ssl/certs/ca-bundle.crt";
});

For producer-specific tuning (batching, compression, idempotence), use ConfigureProducer:

kafka.ConfigureProducer(cfg =>
{
cfg.Acks = Acks.All;
cfg.EnableIdempotence = true;
cfg.CompressionType = CompressionType.Snappy;
cfg.Linger = TimeSpan.FromMilliseconds(5);
});

Schema Registry

If you use Avro, JSON Schema, or Protobuf serializers, configure the Schema Registry connection before your topics:

kafka.ConfigureSchemaRegistry(cfg =>
{
cfg.Url = "http://schema-registry:8081";
cfg.BasicAuthUserInfo = "user:password";
});

See Serialization for how to wire schema-aware serializers to topics.

Topics

Each topic is declared with Topic<TKey, TValue>. The type parameters fix the serializer and deserializer types for that topic. You attach a producer, one or more consumer groups, or both:

kafka.Topic<string, PizzaOrdered>("pizzas", topic =>
{
topic.SetKeySerializer(Confluent.Kafka.Serializers.Utf8);
topic.SetValueSerializer(mySerializer);
topic.SetKeyDeserializer(Confluent.Kafka.Deserializers.Utf8);
topic.SetValueDeserializer(myDeserializer);
topic.Producer();
topic.ConsumerGroup("my-group", group =>
{
group.AddConsumer<MyConsumer>();
});
});

A topic can have both a producer and multiple consumer groups. Declaring a producer does not require a consumer group on the same topic, and vice versa.

Fan-out

Call ConsumerGroup multiple times on the same topic to register separate consumer groups. Each group receives every message independently. This is standard Kafka consumer group semantics; Emit adds no special mechanism:

kafka.Topic<string, PizzaOrdered>("pizzas", topic =>
{
topic.ConsumerGroup("pizza-kitchen", group =>
{
group.AddConsumer<PizzaOrderedConsumer>();
});
topic.ConsumerGroup("pizza-analytics", group =>
{
group.AddConsumer<PizzaAnalyticsConsumer>();
});
});

Both groups receive every pizza order. Their offsets and processing are completely independent.

Auto-provisioning

By default, Emit verifies that all declared topics exist at startup. A missing topic causes an InvalidOperationException before any message processing begins. Call AutoProvision() to create missing topics instead:

emit.AddKafka(kafka =>
{
kafka.AutoProvision();
kafka.Topic<string, PizzaOrdered>("pizzas", topic =>
{
topic.Provisioning(options =>
{
options.NumPartitions = 6;
options.ReplicationFactor = 3;
options.Retention = TimeSpan.FromDays(14);
options.CompressionType = TopicCompressionType.Zstd;
});
// ... serializers, producer, consumer groups ...
});
});

Without per-topic Provisioning, missing topics are created with broker defaults and 7-day retention.

Topic creation options

OptionDefaultDescription
NumPartitionsbroker defaultNumber of partitions
ReplicationFactorbroker defaultNumber of replicas
Retention7 daysHow long messages are retained (null = infinite)
RetentionBytesno limitMaximum partition size before old segments are discarded
CleanupPolicyDeleteDelete, Compact, or DeleteAndCompact
DeleteRetention1 dayHow long delete tombstones are retained for compacted topics
CompressionTypeProducerProducer, Uncompressed, Gzip, Snappy, Lz4, or Zstd
MinCleanableDirtyRatio0.5Minimum dirty-to-total ratio before the compactor runs
MinCompactionLag0Minimum time a message remains uncompacted
MaxCompactionLagnoneMaximum time a message can remain uncompacted

Compression levels apply when the matching CompressionType is set:

OptionRangeDefault
GzipCompressionLevel1-9 (or -1 for default)-1
Lz4CompressionLevel1-179
ZstdCompressionLevelnegative for fast mode, 1-22 for normal3

How it works

KafkaTopicVerifier runs as a hosted service before all other hosted services. It queries the broker for existing topics and compares the result against all declared topics, including the dead-letter topic. When auto-provisioning is enabled, missing topics are created with their configured options. When disabled, any missing topic fails startup immediately.

Dead-letter topic

To route unprocessable messages to a dead-letter topic, call DeadLetter on the Kafka builder with the topic name:

emit.AddKafka(kafka =>
{
kafka.DeadLetter("emit-dlq");
// ...
});

The dead-letter topic is subject to the same existence check (or auto-provisioning) as all other declared topics.

Provider-level middleware

Middleware registered on kafka.InboundPipeline or kafka.OutboundPipeline runs for all Kafka consumers or producers respectively. This is the Provider level of the pipeline model:

emit.AddKafka(kafka =>
{
kafka.InboundPipeline.Use<TracingMiddleware>();
kafka.OutboundPipeline.Use<SigningMiddleware>();
});