Skip to content

Configuration Reference

All configuration in Emit is validated at startup via IValidateOptions<T>. Misconfiguration surfaces as a startup error, not a runtime surprise.

Applies to both MongoDB and EF Core when UseOutbox() is called on the persistence provider. These options control the background worker that delivers outbox entries to the broker. All producers route through the outbox by default when outbox infrastructure is configured; use topic.Producer(p => p.UseDirect()) to opt out.

OptionDefaultRangeDescription
PollingInterval5smin 1sIdle wait between poll cycles. When a full batch is fetched and fully delivered, the next poll starts immediately to drain the backlog without waiting.
BatchSize1001-10000Max entries fetched per cycle, ordered by sequence (FIFO).
MaxConcurrentGroups32min 1Max entry groups delivered in parallel within a cycle. Groups are delivered concurrently up to this limit; entries within a group stay sequential.

ConfigureClient maps directly to Confluent.Kafka's ClientConfig. All standard Confluent.Kafka client properties are available. Common ones:

OptionDescription
BootstrapServers(required) Comma-separated broker addresses
ClientIdClient identifier reported to the broker
SecurityProtocolPlaintext, Ssl, SaslPlaintext, or SaslSsl
SaslMechanismPlain, ScramSha256, ScramSha512, or Gssapi
SaslUsername / SaslPasswordSASL credentials
SslCaLocationPath to the CA certificate file
MethodDescription
UseDirect()Forces the producer to send directly to the broker, bypassing the outbox. Only meaningful when outbox infrastructure is configured.

ConfigureProducer maps to Confluent.Kafka's ProducerConfig. Common options:

OptionDefaultDescription
AcksLeaderAcknowledgement level: None, Leader, or All
EnableIdempotencefalsePrevents duplicate messages on retry
CompressionTypeNoneGzip, Snappy, Lz4, or Zstd
Linger0Batching delay before sending
AttributeDescription
[Transactional]Applied to a consumer or mediator handler class. The pipeline wraps the handler invocation in a unit of work; the transaction commits on success and rolls back on failure.

Emit-specific options:

OptionDefaultDescription
WorkerCount1Number of parallel workers
WorkerDistributionByKeyHashByKeyHash or RoundRobin
BufferSize32Per-worker channel capacity
CommitInterval5sOffset commit frequency
WorkerStopTimeout30sGraceful shutdown wait for in-flight messages

Confluent consumer options (also settable on the group):

OptionDescription
AutoOffsetResetWhere to start when no committed offset exists: Earliest or Latest
SessionTimeoutBroker marks the consumer dead after this period without a heartbeat
HeartbeatIntervalHow often the consumer sends heartbeats to the broker
MaxPollIntervalMaximum time between poll calls before the broker considers the consumer dead
IsolationLevelReadUncommitted or ReadCommitted (for transactional producers)
QueuedMaxMessagesKbytesMaximum KB of pre-fetched messages in the local consumer queue
QueuedMinMessagesMinimum number of messages per topic+partition librdkafka tries to maintain in the local queue

All options classes (KafkaClientOptions, KafkaConsumerOptions, KafkaProducerOptions) expose an AdditionalProperties dictionary for librdkafka settings not yet surfaced as typed properties. Typed properties always take precedence over entries in this dictionary. Invalid keys cause an InvalidOperationException at client build time.

kafka.ClientConfig(client =>
{
client.AdditionalProperties = new Dictionary<string, string>
{
["socket.nagle.disable"] = "true"
};
});

ConfigureSchemaRegistry maps to Confluent.Kafka's SchemaRegistryConfig. Common options:

OptionDescription
Url(required) Schema Registry URL
BasicAuthUserInfoCredentials in user:password format
RequestTimeoutPer-request timeout
MaxCachedSchemasIn-memory schema cache size

Circuit breaker configuration is covered in detail on the error policies page. The short form:

group.CircuitBreaker(cb =>
{
cb.FailureThreshold(5);
cb.SamplingWindow(TimeSpan.FromSeconds(60));
cb.PauseDuration(TimeSpan.FromSeconds(30));
cb.TripOn<HttpRequestException>(); // optional; default trips on any exception
});

Dead letter configuration is covered on the error policies page. The short form:

kafka.DeadLetter("orders.dlt");
// With provisioning options:
kafka.DeadLetter("orders.dlt", dlq =>
{
dlq.Provisioning(options =>
{
options.Retention = TimeSpan.FromDays(30);
});
});

The DLQ topic participates in topic verification and auto-provisioning like any other topic.

Call kafka.AutoProvision() to enable automatic topic creation at startup. Per-topic creation options are configured via topic.Provisioning(...). See Kafka Setup: Auto-provisioning for usage examples.

OptionDefaultDescription
NumPartitionsbroker defaultNumber of partitions
ReplicationFactorbroker defaultNumber of replicas
Retention7 daysHow long messages are retained (null = infinite)
RetentionBytesno limitMaximum partition size before old segments are discarded
CleanupPolicyDeleteDelete, Compact, or DeleteAndCompact
DeleteRetention1 dayTombstone retention for compacted topics
CompressionTypeProducerProducer, Uncompressed, Gzip, Snappy, Lz4, or Zstd
MinCleanableDirtyRatio0.5Minimum dirty-to-total ratio before compaction
MinCompactionLag0Minimum time before compaction
MaxCompactionLagnoneMaximum time before compaction

Controls how Emit elects a leader across instances. Tuning is rarely needed except in high-latency networks or when the default lease duration is too aggressive for your failover requirements.

OptionDefaultDescription
HeartbeatInterval15sHow often each node attempts to renew leadership
LeaseDuration60sHow long a lease lasts; another node takes over if not renewed in time
QueryTimeout5sMax time for a heartbeat database query; the node drops leadership on timeout
NodeRegistrationTtl90sHow long before a silent node is considered dead and removed
InstanceIdMachineNameHuman-readable node identifier visible in leader records
emit.ConfigureLeaderElection(le =>
{
le.HeartbeatInterval = TimeSpan.FromSeconds(10);
le.LeaseDuration = TimeSpan.FromSeconds(45);
le.InstanceId = Environment.GetEnvironmentVariable("POD_NAME");
});

Controls how the outbox daemon is assigned to leader nodes.

OptionDefaultDescription
AcknowledgeTimeout30sHow long a node has to acknowledge a daemon assignment before the leader reassigns it
DrainTimeout30sHow long a node has to stop a revoked daemon before the leader force-reassigns it
emit.ConfigureDaemons(d =>
{
d.AcknowledgeTimeout = TimeSpan.FromSeconds(15);
d.DrainTimeout = TimeSpan.FromSeconds(20);
});

Applies to the EF Core persistence provider only. Controls how often expired distributed lock rows are purged from the database.

OptionDefaultRangeDescription
CleanupInterval5 minutesmin 1 minuteHow often the cleanup worker runs
OptionDefaultDescription
EnabledtrueMaster switch for all Emit trace activity
CreateRootActivitiestrueCreates a root span when no parent trace context is present
PropagateBaggagetrueForwards W3C baggage across message boundaries
MaxBaggageSizeBytes8192Maximum baggage payload size in bytes (range: 1024-65536)
OptionDefaultDescription
EnableEmitMetertrueCore pipeline instruments
EnableOutboxMetertrueOutbox worker instruments
EnableLockMetertrueDistributed lock instruments
EnableMediatorMetertrueMediator instruments
EnableKafkaMetertrueKafka consumer/producer instruments
EnableKafkaBrokerMetertrueBroker-level Kafka instruments

Call options.EnrichWithTag("key", "value") to add a global dimension to all instruments.

group.Use<MyMiddleware>(MiddlewareLifetime.Singleton); // default: one shared instance
group.Use<MyMiddleware>(MiddlewareLifetime.Scoped); // one instance per message

See Custom Middleware for registration details and lifetime semantics.

All options classes implement IValidateOptions<T> and are registered with ValidateOnStart(). If a required field is missing or a value falls outside its allowed range, the application throws at startup with a descriptive error pointing to the misconfigured option. You will not discover bad configuration at 2am when a message finally exercises that code path.