Quickstart
This guide walks you through producing and consuming your first message with the transactional outbox. You’ll have a working producer/consumer pair in about five minutes.
Prerequisites
- .NET 10+
- Kafka: running locally on
localhost:9092 - Schema Registry: running locally on
localhost:8081(for JSON Schema serialization) - MongoDB: running locally on
localhost:27017(for the outbox and distributed locking)
-
Install the packages
Terminal window dotnet add package Emitdotnet add package Emit.MongoDBdotnet add package Emit.Kafkadotnet add package Emit.Kafka.JsonSerializer<PackageReference Include="Emit" /><PackageReference Include="Emit.MongoDB" /><PackageReference Include="Emit.Kafka" /><PackageReference Include="Emit.Kafka.JsonSerializer" /> -
Define your message
Messages are plain records or classes. No base types or attributes required.
Messages.cs public record PizzaOrdered(Guid PizzaId,string CustomerId,string[] Toppings,string DeliveryAddress); -
Register Emit
Program.cs using MongoDB.Driver;builder.Services.AddSingleton<IMongoClient>(new MongoClient("mongodb://localhost:27017"));builder.Services.AddEmit(emit =>{emit.AddMongoDb(mongo =>{mongo.Configure((sp, ctx) =>{ctx.Client = sp.GetRequiredService<IMongoClient>();ctx.Database = ctx.Client.GetDatabase("pizzeria-db");}).UseOutbox().UseDistributedLock();});emit.AddKafka(kafka =>{kafka.ConfigureClient(cfg =>{cfg.BootstrapServers = "localhost:9092";});kafka.ConfigureSchemaRegistry(cfg =>{cfg.Url = "localhost:8081";});kafka.AutoProvision();kafka.Topic<string, PizzaOrdered>("pizzas", topic =>{topic.SetKeySerializer(Confluent.Kafka.Serializers.Utf8);topic.SetJsonSchemaValueSerializer();topic.SetKeyDeserializer(Confluent.Kafka.Deserializers.Utf8);topic.SetJsonSchemaValueDeserializer();topic.Producer();topic.ConsumerGroup("pizza-kitchen", group =>{group.AddConsumer<PizzaOrderedConsumer>();});});});}); -
Implement your consumer
PizzaOrderedConsumer.cs public class PizzaOrderedConsumer(ILogger<PizzaOrderedConsumer> logger): IConsumer<PizzaOrdered>{public Task ConsumeAsync(ConsumeContext<PizzaOrdered> context,CancellationToken cancellationToken){var pizza = context.Message;logger.LogInformation("New order {PizzaId} for {CustomerId}: {Toppings}",pizza.PizzaId,pizza.CustomerId,string.Join(", ", pizza.Toppings));return Task.CompletedTask;}} -
Produce a message inside a transaction
With MongoDB, use
IUnitOfWorkto share the session between your business write and the outbox write. Begin a transaction, do your writes, then commit. Both writes land atomically or neither does.PlacePizzaOrderEndpoint.cs app.MapPost("/orders", async (PlacePizzaOrderRequest request,IUnitOfWork unitOfWork,IMongoSessionAccessor sessionAccessor,IMongoDatabase database,IEventProducer<string, PizzaOrdered> producer,CancellationToken ct) =>{var pizza = new PizzaOrdered(Guid.NewGuid(), request.CustomerId,request.Toppings, request.DeliveryAddress);await using var tx = await unitOfWork.BeginAsync(ct);var session = sessionAccessor.Session!;await database.GetCollection<PizzaOrdered>("orders").InsertOneAsync(session, pizza, cancellationToken: ct);await producer.ProduceAsync(new EventMessage<string, PizzaOrdered>(pizza.PizzaId.ToString(), pizza), ct);await tx.CommitAsync(ct);return Results.Accepted();});If you’re using EF Core instead, you can skip the explicit unit of work: call
SaveChangesAsyncand both your entity and the outbox entry flush in the same implicit database transaction. The[Transactional]attribute covers the case where the produce happens inside a consumer or mediator handler. See Transactional Outbox for all three approaches.
What just happened
When ProduceAsync is called with outbox infrastructure enabled, Emit does not send the message to the broker immediately. Instead:
-
Atomic write: The outbox entry is written to the database inside the same transaction as your business data. Either both persist or neither does.
-
Background delivery: A background worker polls the outbox, picks up pending entries, and publishes them to the broker. Your endpoint returns as soon as the transaction commits.
-
Crash recovery: If the process crashes between the commit and delivery, the worker picks up the pending entry on restart. No messages are lost and no duplicates are introduced.