Skip to content

Quickstart

This guide walks you through producing and consuming your first message with the transactional outbox. You’ll have a working producer/consumer pair in about five minutes.

Prerequisites

  • .NET 10+
  • Kafka: running locally on localhost:9092
  • Schema Registry: running locally on localhost:8081 (for JSON Schema serialization)
  • MongoDB: running locally on localhost:27017 (for the outbox and distributed locking)
  1. Install the packages

    Terminal window
    dotnet add package Emit
    dotnet add package Emit.MongoDB
    dotnet add package Emit.Kafka
    dotnet add package Emit.Kafka.JsonSerializer
  2. Define your message

    Messages are plain records or classes. No base types or attributes required.

    Messages.cs
    public record PizzaOrdered(
    Guid PizzaId,
    string CustomerId,
    string[] Toppings,
    string DeliveryAddress);
  3. Register Emit

    Program.cs
    using MongoDB.Driver;
    builder.Services.AddSingleton<IMongoClient>(
    new MongoClient("mongodb://localhost:27017"));
    builder.Services.AddEmit(emit =>
    {
    emit.AddMongoDb(mongo =>
    {
    mongo.Configure((sp, ctx) =>
    {
    ctx.Client = sp.GetRequiredService<IMongoClient>();
    ctx.Database = ctx.Client.GetDatabase("pizzeria-db");
    })
    .UseOutbox()
    .UseDistributedLock();
    });
    emit.AddKafka(kafka =>
    {
    kafka.ConfigureClient(cfg =>
    {
    cfg.BootstrapServers = "localhost:9092";
    });
    kafka.ConfigureSchemaRegistry(cfg =>
    {
    cfg.Url = "localhost:8081";
    });
    kafka.AutoProvision();
    kafka.Topic<string, PizzaOrdered>("pizzas", topic =>
    {
    topic.SetKeySerializer(Confluent.Kafka.Serializers.Utf8);
    topic.SetJsonSchemaValueSerializer();
    topic.SetKeyDeserializer(Confluent.Kafka.Deserializers.Utf8);
    topic.SetJsonSchemaValueDeserializer();
    topic.Producer();
    topic.ConsumerGroup("pizza-kitchen", group =>
    {
    group.AddConsumer<PizzaOrderedConsumer>();
    });
    });
    });
    });
  4. Implement your consumer

    PizzaOrderedConsumer.cs
    public class PizzaOrderedConsumer(ILogger<PizzaOrderedConsumer> logger)
    : IConsumer<PizzaOrdered>
    {
    public Task ConsumeAsync(
    ConsumeContext<PizzaOrdered> context,
    CancellationToken cancellationToken)
    {
    var pizza = context.Message;
    logger.LogInformation(
    "New order {PizzaId} for {CustomerId}: {Toppings}",
    pizza.PizzaId,
    pizza.CustomerId,
    string.Join(", ", pizza.Toppings));
    return Task.CompletedTask;
    }
    }
  5. Produce a message inside a transaction

    With MongoDB, use IUnitOfWork to share the session between your business write and the outbox write. Begin a transaction, do your writes, then commit. Both writes land atomically or neither does.

    PlacePizzaOrderEndpoint.cs
    app.MapPost("/orders", async (
    PlacePizzaOrderRequest request,
    IUnitOfWork unitOfWork,
    IMongoSessionAccessor sessionAccessor,
    IMongoDatabase database,
    IEventProducer<string, PizzaOrdered> producer,
    CancellationToken ct) =>
    {
    var pizza = new PizzaOrdered(
    Guid.NewGuid(), request.CustomerId,
    request.Toppings, request.DeliveryAddress);
    await using var tx = await unitOfWork.BeginAsync(ct);
    var session = sessionAccessor.Session!;
    await database.GetCollection<PizzaOrdered>("orders")
    .InsertOneAsync(session, pizza, cancellationToken: ct);
    await producer.ProduceAsync(
    new EventMessage<string, PizzaOrdered>(pizza.PizzaId.ToString(), pizza), ct);
    await tx.CommitAsync(ct);
    return Results.Accepted();
    });

    If you’re using EF Core instead, you can skip the explicit unit of work: call SaveChangesAsync and both your entity and the outbox entry flush in the same implicit database transaction. The [Transactional] attribute covers the case where the produce happens inside a consumer or mediator handler. See Transactional Outbox for all three approaches.

What just happened

When ProduceAsync is called with outbox infrastructure enabled, Emit does not send the message to the broker immediately. Instead:

  1. Atomic write: The outbox entry is written to the database inside the same transaction as your business data. Either both persist or neither does.

  2. Background delivery: A background worker polls the outbox, picks up pending entries, and publishes them to the broker. Your endpoint returns as soon as the transaction commits.

  3. Crash recovery: If the process crashes between the commit and delivery, the worker picks up the pending entry on restart. No messages are lost and no duplicates are introduced.