Event store em memória lock-free, genérico, com capacidade fixa, particionado e agregações funcionais.
$ dotnet add package LockFree.EventStore
An in-memory event store, running as a service, to synchronize and validate operations across multiple instances with high concurrency and no locks.
docker run --rm -p 7070:7070 daniloneto/lockfree-eventstore:latest
dotnet add package LockFree.EventStore
var es = new EventStoreClient("http://localhost:7070");
await es.Append("gateway/orders", new OrderCreated { Id = "o-1", Valor = 123 });
await foreach (var ev in es.Read("gateway/orders", from: 0))
{
/* handle event */
}
See samples/ClientSample for an example that:
gateway/ordersTo run:
docker run --rm -p 7070:7070 daniloneto/lockfree-eventstore:latest
cd samples/ClientSample
dotnet run
Start 1 EventStore, 3 gateways and Nginx load balancing:
docker compose up --build
Test sending orders (load balanced across gateways):
curl -X POST http://localhost:8080/orders
curl -X POST 'http://localhost:8080/orders/bulk?n=50'
View statistics:
curl http://localhost:8080/stats/local # stats of one gateway (one of the 3)
curl http://localhost:8080/stats/global # global consolidation (via central reader)
Two (or more) gateways behind a load balancer need to record operations to the same stream.
Lockfree.EventStore ensures order and integrity even under high parallelism, without relying on locks, keeping all state in memory.
Below is the full technical documentation of the API, advanced features, benchmarks, and usage examples.
var store = new EventStore<Pedido>();
store.TryAppend(new Pedido { Id = 1, Valor = 10m, Timestamp = DateTime.UtcNow });
var total = store.Aggregate(() => 0m, (acc, e) => acc + e.Valor,
from: DateTime.UtcNow.AddMinutes(-10));
// Explicit capacity
var store = new EventStore<Pedido>(capacidade: 100_000);
// Capacity and partitions
var store = new EventStore<Pedido>(capacidade: 50_000, particoes: 8);
// Advanced configuration
var store = new EventStore<Pedido>(new EventStoreOptions<Pedido>
{
Capacidade = 100_000,
Particoes = 16,
OnEventDiscarded = evt => Logger.LogTrace("Event discarded: {Event}", evt),
OnCapacityReached = () => Metrics.IncrementCounter("eventstore.capacity_reached"),
TimestampSelector = new PedidoTimestampSelector()
});
// Fluent API
var store = EventStore.For<Pedido>()
.WithCapacity(100_000)
.WithPartitions(8)
.OnDiscarded(evt => Log(evt))
.OnCapacityReached(() => NotifyAdmin())
.WithTimestampSelector(new PedidoTimestampSelector())
.Create();
store.Count // Current number of events
store.Capacity // Maximum configured capacity
store.IsEmpty // Whether it's empty
store.IsFull // Whether it reached maximum capacity
store.Partitions // Number of partitions
// Count by time window
var count = store.Count(from: inicio, to: fim);
// Sum of values
var sum = store.Sum(evt => evt.Valor, from: inicio, to: fim);
// Average
var avg = store.Average(evt => evt.Valor, from: inicio, to: fim);
// Min and max
var min = store.Min(evt => evt.Pontuacao, from: inicio, to: fim);
var max = store.Max(evt => evt.Pontuacao, from: inicio, to: fim);
// With filters
var filteredSum = store.Sum(
evt => evt.Valor,
filter: evt => evt.Tipo == "Pagamento",
from: inicio,
to: fim
);
// Filtered snapshot
var eventosRecentes = store.Snapshot(
filter: evt => evt.Timestamp > DateTime.UtcNow.AddMinutes(-5)
);
// Snapshot by time window
var snapshot = store.Snapshot(from: inicio, to: fim);
// Snapshot with filter and time window
var filtrado = store.Snapshot(
filter: evt => evt.Valor > 100,
from: inicio,
to: fim
);
// Clear all events
store.Clear();
store.Reset(); // Alias for Clear()
// Purge old events (requires TimestampSelector)
store.Purge(olderThan: DateTime.UtcNow.AddHours(-1));
// Detailed statistics
store.Statistics.TotalAppended // Total appended events
store.Statistics.TotalDiscarded // Total discarded events
store.Statistics.AppendsPerSecond // Current append rate
store.Statistics.LastAppendTime // Timestamp of the last append
Fully featured web API for collecting and querying real-time metrics:
cd .\samples\MetricsDashboard
dotnet run
Available endpoints:
POST /metrics - Add metricGET /metrics/sum?label=cpu_usage - Sum values by labelGET /metrics/top?k=5 - Top K metricsSee samples/MetricsDashboard/TESTING.md for a complete testing guide.
TryAppend(event) — Adds an event, lock-freeAggregate — Aggregates values by time windowSnapshot() — Returns an immutable copy of eventsCount/Sum/Average/Min/Max — Specialized aggregationsClear/Reset/Purge — Cleanup methodsQuery — Flexible queries with filtersStatistics — Monitoring metricsThe default number of partitions is Environment.ProcessorCount. You can force the partition using TryAppend(e, partition).
Snapshot() returns an approximate immutable copy of the current state of all partitions, ordered from the oldest to the newest event per partition.
Designed for high concurrency and low latency. Global order across partitions is approximate.
| Operation | Value Type | Reference Type | Improvement |
|---|---|---|---|
| Event append | 560 ms | 797 ms | 42% faster |
| Event iteration | 35.8 ns | 132.5 ns | 74% faster |
| Event queries | 393.5 ns | 1,749.1 ns | 77% faster |
| Operation | SoA | AoS | Improvement |
|---|---|---|---|
| Key aggregation | 55.2 ms | 74.6 ms | 26% faster |
| Memory usage | Lower | Higher | Varies |
Conclusions:
EventStoreV2 implementation is recommended.// Using EventStoreV2 with value types
var store = new EventStoreV2(capacidade: 1_000_000, particoes: 16);
store.Add("sensor1", 25.5, DateTime.UtcNow.Ticks);
double media = store.Average("sensor1");
MIT