Event store em memória lock-free, genérico, com capacidade fixa, particionado e agregações funcionais.
$ dotnet add package LockFree.EventStoreAn in-memory event store, running as a service, to synchronize and validate operations across multiple instances with high concurrency and no locks.
docker run --rm -p 7070:7070 daniloneto/lockfree-eventstore:latest
dotnet add package LockFree.EventStore
var es = new EventStoreClient("http://localhost:7070");
await es.Append("gateway/orders", new OrderCreated { Id = "o-1", Valor = 123 });
await foreach (var ev in es.Read("gateway/orders", from: 0))
{
/* handle event */
}
See samples/ClientSample for an example that:
gateway/ordersTo run:
docker run --rm -p 7070:7070 daniloneto/lockfree-eventstore:latest
cd samples/ClientSample
dotnet run
Start 1 EventStore, 3 gateways and Nginx load balancing:
docker compose up --build
Test sending orders (load balanced across gateways):
curl -X POST http://localhost:8080/orders
curl -X POST 'http://localhost:8080/orders/bulk?n=50'
View statistics:
curl http://localhost:8080/stats/local # stats of one gateway (one of the 3)
curl http://localhost:8080/stats/global # global consolidation (via central reader)
Two (or more) gateways behind a load balancer need to record operations to the same stream.
Lockfree.EventStore ensures order and integrity even under high parallelism, without relying on locks, keeping all state in memory.
Below is the full technical documentation of the API, advanced features, benchmarks, and usage examples.
var store = new EventStore<Pedido>();
store.TryAppend(new Pedido { Id = 1, Valor = 10m, Timestamp = DateTime.UtcNow });
var total = store.Aggregate(() => 0m, (acc, e) => acc + e.Valor,
from: DateTime.UtcNow.AddMinutes(-10));
// Explicit capacity
var store = new EventStore<Pedido>(capacity: 100_000);
// Capacity and partitions
var store = new EventStore<Pedido>(capacity: 50_000, partitions: 8);
// Advanced configuration
var store = new EventStore<Pedido>(new EventStoreOptions<Pedido>
{
Capacity = 100_000,
Partitions = 16,
OnEventDiscarded = evt => Logger.LogTrace("Event discarded: {Event}", evt),
OnCapacityReached = () => Metrics.IncrementCounter("eventstore.capacity_reached"),
TimestampSelector = new PedidoTimestampSelector(),
// RFC 002: disable window tracking for pure append/snapshot workloads
EnableWindowTracking = false
});
// Fluent API
var store = new EventStoreBuilder<Pedido>()
.WithCapacity(100_000)
.WithPartitions(8)
.OnDiscarded(evt => Log(evt))
.OnCapacityReached(() => NotifyAdmin())
.WithTimestampSelector(new PedidoTimestampSelector())
// RFC 002: opt-out when window queries are not used
.WithEnableWindowTracking(false)
.Create();
store.Count // Current number of events
store.Capacity // Maximum configured capacity
store.IsEmpty // Whether it's empty
store.IsFull // Whether it reached maximum capacity
store.Partitions // Number of partitions
// Count by time window
var count = store.Count(from: inicio, to: fim);
// Sum of values
var sum = store.Sum(evt => evt.Valor, from: inicio, to: fim);
// Average
var avg = store.Average(evt => evt.Valor, from: inicio, to: fim);
// Min and max
var min = store.Min(evt => evt.Pontuacao, from: inicio, to: fim);
var max = store.Max(evt => evt.Pontuacao, from: inicio, to: fim);
// With filters
var filteredSum = store.Sum(
evt => evt.Valor,
filter: evt => evt.Tipo == "Pagamento",
from: inicio,
to: fim
);
Note: Time-filtered queries require EnableWindowTracking = true. When disabled, a clear InvalidOperationException is thrown: "Window tracking is disabled. EnableWindowTracking must be true to use window queries."
Snapshot() returns an approximate immutable copy of the current state of all partitions, ordered from the oldest to the newest event per partition.
Adds an optional persistence layer that periodically saves partition states to disk without impacting the hot append/query path.
Key goals:
RestoreFromSnapshotsAsync() (warm memory reconstruction)var store = new EventStore<Event>(new EventStoreOptions<Event>
{
// existing core options
CapacityPerPartition = 1_000_000,
Partitions = Environment.ProcessorCount,
EnableFalseSharingProtection = true
});
// Configure snapshot subsystem (one-time)
var snapshotter = store.ConfigureSnapshots(
new SnapshotOptions
{
Enabled = true,
Interval = TimeSpan.FromMinutes(2), // Time-based trigger (can be combined with MinEventsBetweenSnapshots)
MinEventsBetweenSnapshots = 50_000, // Event-count trigger
MaxConcurrentSnapshotJobs = 2, // Limit concurrent save jobs
SnapshotsToKeep = 3, // Pruning window
MaxSaveAttempts = 5, // Retry attempts for transient errors
BackoffBaseDelay = TimeSpan.FromMilliseconds(100),
BackoffFactor = 2.0,
CompactBeforeSnapshot = true, // (Future compaction hook)
EnableLocalTracing = false // Enables ActivitySource if true
},
serializer: new BinarySnapshotSerializer(),
store: new FileSystemSnapshotStore("snapshots")
);
await store.RestoreFromSnapshotsAsync(); // Rebuild in-memory state before serving traffic
Internally TryGetStableView(partitionKey, out PartitionState state) performs a double-read of head counters (HeadVersion → HeadIndex → HeadVersion) with bounded retries to obtain a coherent cut, copying the ring into a contiguous buffer (handling wrap-around with at most two spans) without allocating per event.
<partition>_<version>_<ticks>.snap.tmp.snap (same volume) via File.Move(temp, final, overwrite:false). A collision (target already exists) is treated as a logic/corruption anomaly and the move throws (fail-fast) instead of silently overwriting an existing snapshot.Temporary .tmp files are ignored during load. Stray/unknown non-.snap files are also skipped both on restore and pruning to avoid interfering with normal operation.
Transient I/O failures trigger exponential backoff:
nextDelay = baseDelay * factor^(attempt-1) + small jitter
Stops after MaxSaveAttempts. Failures are logged; producers remain unaffected.
After a successful save, the N newest snapshots (by version/timestamp) are retained; older ones are deleted. Failures during pruning are logged and skipped.
RestoreFromSnapshotsAsync() enumerates partitions via the snapshot store, loads the most recent .snap for each, validates schema version, and reconstructs the in-memory ring state. Delta replay hooks exist but are inactive.
ActivitySource emits: snapshot.save (partition, version, bytes, attempts, success) and snapshot.prune (kept, deleted)Measured p50 / p99 append latency regression ≤ +2% with snapshots enabled under benchmark load, staying within target budget. The stable view logic uses only volatile reads and bounded retries; no global locks are introduced.
SnapshotOptions.Enabled = true)Two focused samples demonstrate the snapshot subsystem:
High-frequency synthetic sensor workload (temperature + humidity) showing:
Run:
dotnet run --project samples/SnapshotSensors/SnapshotSensors.csproj
Stop (Ctrl+C) and run again; look for:
[BOOT] Partitions restauradas de snapshot: X
If X > 0 the state was warm-started.
Key configuration (Program.cs):
BinarySnapshotSerializer(compress: true))HTTP API receiving JSON sensor readings:
RestoreFromSnapshotsAsync() before serving requestsRun:
dotnet run --project samples/SnapshotSensorsApi/SnapshotSensorsApi.csproj
Send readings:
curl -X POST http://localhost:5000/sensor \
-H "Content-Type: application/json" \
-d '{"deviceId":"dev-1","temperature":22.5,"humidity":48.2}'
Inspect state/metrics:
curl http://localhost:5000/state
curl http://localhost:5000/metrics
Snapshot config (Program.cs):
Both samples showcase that snapshot persistence does not block high-frequency appends and that partial files are never observed (atomic rename). Adjust Interval, MinEventsBetweenSnapshots, or enable directory fsync (Unix) to explore durability vs performance.
// Clear all events
store.Clear();
store.Reset(); // Alias for Clear()
// Purge old events (requires TimestampSelector)
store.Purge(olderThan: DateTime.UtcNow.AddHours(-1));
// Detailed statistics
store.Statistics.TotalAppended // Total appended events
store.Statistics.TotalDiscarded // Total discarded events
store.Statistics.AppendsPerSecond // Current append rate
store.Statistics.LastAppendTime // Timestamp of the last append
Fully featured web API for collecting and querying real-time metrics:
cd .\samples\MetricsDashboard
dotnet run
Available endpoints:
POST /metrics - Add metricGET /metrics/sum?label=cpu_usage - Sum values by labelGET /metrics/top?k=5 - Top K metricsSee samples/MetricsDashboard/TESTING.md for a complete testing guide.
TryAppend(event) — Adds an event, lock-freeAggregate — Aggregates values by time windowSnapshot() — Returns an immutable copy of eventsCount/Sum/Average/Min/Max — Specialized aggregationsClear/Reset/Purge — Cleanup methodsQuery — Flexible queries with filtersStatistics — Monitoring metricsThe default number of partitions is Environment.ProcessorCount. You can force the partition using TryAppend(e, partition).
Snapshot() returns an approximate immutable copy of the current state of all partitions, ordered from the oldest to the newest event per partition.
Designed for high concurrency and low latency. Global order across partitions is approximate.
| Operation | Value Type | Reference Type | Improvement |
|---|---|---|---|
| Event append | 560 ms | 797 ms | 42% faster |
| Event iteration | 35.8 ns | 132.5 ns | 74% faster |
| Event queries | 393.5 ns | 1,749.1 ns | 77% faster |
| Operation | SoA | AoS | Improvement |
|---|---|---|---|
| Key aggregation | 55.2 ms | 74.6 ms | 26% faster |
| Memory usage | Lower | Higher | Varies |
Conclusions:
MIT