Testing
dotnet add package Emit.TestingMessageSink and SinkConsumer
MessageSink<T> collects consumed messages in memory. Pair it with SinkConsumer<T> as a test double for your real consumer:
[Fact]public async Task Consumer_writes_to_sink(){ // Arrange var sink = new MessageSink<PizzaOrdered>(); var services = new ServiceCollection();
services.AddEmit(emit => { emit.AddKafka(kafka => { kafka.ConfigureClient(cfg => cfg.BootstrapServers = "localhost:9092");
kafka.Topic<string, PizzaOrdered>("pizzas", topic => { topic.SetKeyDeserializer(Confluent.Kafka.Deserializers.Utf8); topic.SetValueDeserializer(myDeserializer);
topic.ConsumerGroup("test-group", group => { group.AddConsumer<SinkConsumer<PizzaOrdered>>(); }); }); }); });
services.AddSingleton(sink);
// Act: produce a message using your real producer or inject one directly
// Assert var context = await sink.WaitForMessageAsync(TimeSpan.FromSeconds(10)); Assert.Equal(expectedPizzaId, context.Message.PizzaId);}For integration tests against a real broker, Testcontainers spins up the container automatically. See tests/Emit.IntegrationTests/INSTRUCT.md for the integration test patterns used in this repository.
WaitForMessageAsync
WaitForMessageAsync blocks until a message arrives or the timeout elapses. It throws TimeoutException if the timeout expires, or OperationCanceledException if the token fires.
For multiple messages, call it sequentially or inspect sink.ReceivedMessages after waiting:
var first = await sink.WaitForMessageAsync(timeout);var second = await sink.WaitForMessageAsync(timeout);
// or after collecting N messages:Assert.Equal(3, sink.ReceivedMessages.Count);Testing the mediator
The mediator is a plain IMediator resolved from DI:
[Fact]public async Task PlacePizzaOrder_saves_order(){ // Arrange var services = new ServiceCollection(); services.AddEmit(emit => { emit.AddMediator(mediator => { mediator.AddHandler<PlacePizzaOrderHandler>(); }); }); services.AddScoped<IPizzaRepository, InMemoryPizzaRepository>();
var provider = services.BuildServiceProvider(); var mediator = provider.GetRequiredService<IMediator>();
// Act await mediator.SendAsync(new PlacePizzaOrder("customer-1", ["margherita", "extra-cheese"], "123 Main St"));
// Assert var repo = provider.GetRequiredService<IPizzaRepository>(); Assert.Single(await repo.GetAllAsync());}Testing error handling
Register SinkConsumer<T> as the consumer, then produce a message that triggers the error condition. Use sink.ReceivedMessages to verify the message arrived (on success) or did not (on discard). For DLQ assertions, register a second sink on the DLQ topic.
Testing outbox behavior
The outbox adds a database write to the mix. Use real containers via Testcontainers rather than in-memory fakes; the outbox depends on database-level atomicity that no in-memory implementation can replicate.
Testing with IUnitOfWork:
using var scope = host.Services.CreateScope();var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();var producer = scope.ServiceProvider.GetRequiredService<IEventProducer<string, string>>();
await using var tx = await uow.BeginAsync();await producer.ProduceAsync(new EventMessage<string, string>("key", "value"));await tx.CommitAsync();
var msg = await sink.WaitForMessageAsync();Assert.Equal("value", msg.Message);Testing the EF Core implicit approach:
using var scope = host.Services.CreateScope();var producer = scope.ServiceProvider.GetRequiredService<IEventProducer<string, string>>();var dbContext = scope.ServiceProvider.GetRequiredService<MyDbContext>();
await producer.ProduceAsync(new EventMessage<string, string>("key", "value"));await dbContext.SaveChangesAsync();
var msg = await sink.WaitForMessageAsync();Assert.Equal("value", msg.Message);Testing a [Transactional] consumer: produce to the input topic and assert on the output sink. Transaction management is automatic.
await producer.ProduceAsync(new EventMessage<string, string>("key", "trigger"));var msg = await outputSink.WaitForMessageAsync();Assert.Equal("expected-output", msg.Message);