Skip to content

Producers

IEventProducer

Once a topic has been declared with topic.Producer(), Emit registers IEventProducer<TKey, TValue> in the DI container. Inject it wherever you need to produce messages:

public class PizzeriaService(IEventProducer<string, PizzaOrdered> producer)
{
public async Task PublishAsync(PizzaOrdered pizza, CancellationToken ct)
{
await producer.ProduceAsync(pizza.PizzaId.ToString(), pizza, ct);
}
}

IEventProducer is scoped. Inject it into controllers, endpoint handlers, or any scoped service.

EventMessage

EventMessage<TKey, TValue> is a sealed record that carries the key, value, and optional headers:

// Key + value only
var msg = new EventMessage<string, PizzaOrdered>(pizza.PizzaId.ToString(), pizza);
// With headers
var msg = new EventMessage<string, PizzaOrdered>(
pizza.PizzaId.ToString(),
pizza,
[new KeyValuePair<string, string>("x-tenant-id", tenantId)]);

Headers are forwarded to Kafka as-is. When OpenTelemetry is enabled, Emit appends W3C trace context headers automatically.

Convenience overloads

IEventProducer<TKey, TValue> has extension methods from EventProducerExtensions that skip the EventMessage wrapper:

// Key + value
await producer.ProduceAsync(pizza.PizzaId.ToString(), pizza, ct);
// Key + value + headers
await producer.ProduceAsync(pizza.PizzaId.ToString(), pizza,
[new KeyValuePair<string, string>("x-tenant-id", tenantId)], ct);

They construct the EventMessage internally. Use them when you don’t need to build the message object ahead of time, which is most of the time.

Headers

Headers are string key-value pairs that travel with the message through the full pipeline:

Producing: Set headers on EventMessage at produce time, either via the record constructor or a convenience overload.

Pipeline enrichment: Emit’s ProduceTracingMiddleware appends W3C trace context headers (traceparent, tracestate, baggage) and emit-source-address before delivery. The canonical header names are defined on WellKnownHeaders.

Outbox path: Headers are UTF-8 encoded to byte[] and stored alongside the message body in the outbox. The background worker forwards them byte-for-byte to Kafka.

Direct path: Headers are UTF-8 encoded and attached to the Kafka message directly.

Consuming: On the consume side, headers are decoded back to strings and available on ConsumeContext<T>.Headers as IReadOnlyList<KeyValuePair<string, string>>.

Custom headers survive the full round trip untouched regardless of which delivery mode you use.

Outbox mode and direct mode

Emit supports two delivery modes per producer. For the full explanation of outbox mechanics and transactional guarantees, see Outbox.

Outbox mode is the default when outbox infrastructure is configured on a persistence provider. ProduceAsync writes to the database outbox inside the current unit of work, and the background worker delivers to Kafka after the transaction commits.

Direct mode calls the Kafka producer immediately and waits for the broker acknowledgement before returning. Opt in per producer:

kafka.Topic<string, AnalyticsEvent>("analytics", topic =>
{
topic.Producer(p => p.UseDirect());
});

Use direct mode when you want low latency and can tolerate the dual-write risk, or when there is no database write to coordinate with.

Per-producer middleware

Register middleware that runs only for a specific topic’s producer:

kafka.Topic<string, PizzaOrdered>("pizzas", topic =>
{
topic.Producer(producer =>
{
producer.Use<EnrichmentMiddleware>();
});
});

The middleware receives SendContext<PizzaOrdered> and calls next.InvokeAsync(context) to continue the pipeline. This is the Leaf level of the pipeline model.

Accessing the message key in middleware

The Kafka key is available via the KafkaTransportContext<TKey> payload on the send context:

public class LoggingMiddleware<T>(ILogger<LoggingMiddleware<T>> logger)
: IMiddleware<SendContext<T>>
{
public Task InvokeAsync(SendContext<T> context, IMiddlewarePipeline<SendContext<T>> next)
{
if (context.TryGetPayload<KafkaTransportContext<string>>() is { } kafka)
{
logger.LogDebug("Producing with key {Key}", kafka.Key);
}
return next.InvokeAsync(context);
}
}

Produce observers

IProduceObserver provides lifecycle hooks without writing pipeline middleware. All three methods fire around every ProduceAsync call:

public class ProduceAuditObserver : IProduceObserver
{
public Task OnProducingAsync<T>(SendContext<T> context) { ... }
public Task OnProducedAsync<T>(SendContext<T> context) { ... }
public Task OnProduceErrorAsync<T>(SendContext<T> context, Exception exception) { ... }
}

Register at the global level on EmitBuilder:

emit.AddProduceObserver<ProduceAuditObserver>();