Producers
IEventProducer
Once a topic has been declared with topic.Producer(), Emit registers IEventProducer<TKey, TValue> in the DI container. Inject it wherever you need to produce messages:
public class PizzeriaService(IEventProducer<string, PizzaOrdered> producer){ public async Task PublishAsync(PizzaOrdered pizza, CancellationToken ct) { await producer.ProduceAsync(pizza.PizzaId.ToString(), pizza, ct); }}IEventProducer is scoped. Inject it into controllers, endpoint handlers, or any scoped service.
EventMessage
EventMessage<TKey, TValue> is a sealed record that carries the key, value, and optional headers:
// Key + value onlyvar msg = new EventMessage<string, PizzaOrdered>(pizza.PizzaId.ToString(), pizza);
// With headersvar msg = new EventMessage<string, PizzaOrdered>( pizza.PizzaId.ToString(), pizza, [new KeyValuePair<string, string>("x-tenant-id", tenantId)]);Headers are forwarded to Kafka as-is. When OpenTelemetry is enabled, Emit appends W3C trace context headers automatically.
Convenience overloads
IEventProducer<TKey, TValue> has extension methods from EventProducerExtensions that skip the EventMessage wrapper:
// Key + valueawait producer.ProduceAsync(pizza.PizzaId.ToString(), pizza, ct);
// Key + value + headersawait producer.ProduceAsync(pizza.PizzaId.ToString(), pizza, [new KeyValuePair<string, string>("x-tenant-id", tenantId)], ct);They construct the EventMessage internally. Use them when you don’t need to build the message object ahead of time, which is most of the time.
Headers
Headers are string key-value pairs that travel with the message through the full pipeline:
Producing: Set headers on EventMessage at produce time, either via the record constructor or a convenience overload.
Pipeline enrichment: Emit’s ProduceTracingMiddleware appends W3C trace context headers (traceparent, tracestate, baggage) and emit-source-address before delivery. The canonical header names are defined on WellKnownHeaders.
Outbox path: Headers are UTF-8 encoded to byte[] and stored alongside the message body in the outbox. The background worker forwards them byte-for-byte to Kafka.
Direct path: Headers are UTF-8 encoded and attached to the Kafka message directly.
Consuming: On the consume side, headers are decoded back to strings and available on ConsumeContext<T>.Headers as IReadOnlyList<KeyValuePair<string, string>>.
Custom headers survive the full round trip untouched regardless of which delivery mode you use.
Outbox mode and direct mode
Emit supports two delivery modes per producer. For the full explanation of outbox mechanics and transactional guarantees, see Outbox.
Outbox mode is the default when outbox infrastructure is configured on a persistence provider. ProduceAsync writes to the database outbox inside the current unit of work, and the background worker delivers to Kafka after the transaction commits.
Direct mode calls the Kafka producer immediately and waits for the broker acknowledgement before returning. Opt in per producer:
kafka.Topic<string, AnalyticsEvent>("analytics", topic =>{ topic.Producer(p => p.UseDirect());});Use direct mode when you want low latency and can tolerate the dual-write risk, or when there is no database write to coordinate with.
Per-producer middleware
Register middleware that runs only for a specific topic’s producer:
kafka.Topic<string, PizzaOrdered>("pizzas", topic =>{ topic.Producer(producer => { producer.Use<EnrichmentMiddleware>(); });});The middleware receives SendContext<PizzaOrdered> and calls next.InvokeAsync(context) to continue the pipeline. This is the Leaf level of the pipeline model.
Accessing the message key in middleware
The Kafka key is available via the KafkaTransportContext<TKey> payload on the send context:
public class LoggingMiddleware<T>(ILogger<LoggingMiddleware<T>> logger) : IMiddleware<SendContext<T>>{ public Task InvokeAsync(SendContext<T> context, IMiddlewarePipeline<SendContext<T>> next) { if (context.TryGetPayload<KafkaTransportContext<string>>() is { } kafka) { logger.LogDebug("Producing with key {Key}", kafka.Key); } return next.InvokeAsync(context); }}Produce observers
IProduceObserver provides lifecycle hooks without writing pipeline middleware. All three methods fire around every ProduceAsync call:
public class ProduceAuditObserver : IProduceObserver{ public Task OnProducingAsync<T>(SendContext<T> context) { ... } public Task OnProducedAsync<T>(SendContext<T> context) { ... } public Task OnProduceErrorAsync<T>(SendContext<T> context, Exception exception) { ... }}Register at the global level on EmitBuilder:
emit.AddProduceObserver<ProduceAuditObserver>();