SQL Server implementation for BbQ.Events, providing both IEventStore for event sourcing and IProjectionCheckpointStore for projection checkpoints. Features durable event persistence with sequential positions, atomic operations, JSON serialization, and thread-safe parallel processing support.
License
—
Deps
5
Install Size
—
Vulns
✓ 0
Published
Jan 27, 2026
$ dotnet add package BbQ.Events.SqlServerSQL Server implementation for BbQ.Events, providing both event store and checkpoint persistence.
This package provides production-ready, durable implementations for:
dotnet add package BbQ.Events.SqlServer
The package includes SQL schema files in the Schema/ folder.
The simplest way to set up the database schema is to enable automatic schema creation:
services.UseSqlServerEventStore(options =>
{
options.ConnectionString = "Server=localhost;Database=MyApp;Integrated Security=true";
options.AutoCreateSchema = true; // Automatically create tables if they don't exist
});
When AutoCreateSchema is enabled:
Note: For production environments, you may prefer to run the SQL scripts manually for more control. Set AutoCreateSchema = false (the default) and execute the scripts during deployment.
Alternatively, you can manually run the schema scripts:
-- See Schema/CreateEventsTable.sql for full script
CREATE TABLE BbQ_Events (
EventId BIGINT IDENTITY(1,1) PRIMARY KEY,
StreamName NVARCHAR(200) NOT NULL,
Position BIGINT NOT NULL,
EventType NVARCHAR(500) NOT NULL,
EventData NVARCHAR(MAX) NOT NULL,
Metadata NVARCHAR(MAX) NULL,
CreatedUtc DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
CONSTRAINT UQ_BbQ_Events_Stream_Position UNIQUE (StreamName, Position)
);
-- See Schema/CreateStreamsTable.sql for full script
CREATE TABLE BbQ_Streams (
StreamName NVARCHAR(200) PRIMARY KEY,
CurrentPosition BIGINT NOT NULL DEFAULT -1,
Version INT NOT NULL DEFAULT 0,
CreatedUtc DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
LastUpdatedUtc DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);
-- See Schema/CreateCheckpointTable.sql for full script
CREATE TABLE BbQ_ProjectionCheckpoints (
ProjectionName NVARCHAR(200) NOT NULL,
PartitionKey NVARCHAR(200) NULL,
Position BIGINT NOT NULL,
LastUpdatedUtc DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
PRIMARY KEY (ProjectionName, PartitionKey)
);
Note: The PartitionKey column is nullable and defaults to NULL for non-partitioned projections. SQL Server allows nullable columns in composite primary keys. Due to how NULL values work in unique constraints, only one row with a NULL PartitionKey can exist per ProjectionName, which is the desired behavior for non-partitioned projections.
You can also manually trigger schema initialization at any time:
var eventStore = provider.GetRequiredService<IEventStore>();
// Ensure the schema exists (creates tables if they don't exist)
await eventStore.EnsureSchemaAsync();
This is useful for:
Use the SQL Server event store for durable event persistence:
using BbQ.Events.SqlServer.Configuration;
var services = new ServiceCollection();
// Register SQL Server event store
services.UseSqlServerEventStore("Server=localhost;Database=MyApp;Integrated Security=true");
var provider = services.BuildServiceProvider();
var eventStore = provider.GetRequiredService<IEventStore>();
// Append events to a stream
var userId = Guid.NewGuid();
await eventStore.AppendAsync("users", new UserCreated(userId, "Alice", "alice@example.com"));
await eventStore.AppendAsync("users", new UserUpdated(userId, "Alice Smith"));
// Read events from a stream
await foreach (var storedEvent in eventStore.ReadAsync<UserCreated>("users"))
{
Console.WriteLine($"Position {storedEvent.Position}: {storedEvent.Event.Name}");
}
// Get current stream position
var position = await eventStore.GetStreamPositionAsync("users");
Console.WriteLine($"Stream at position: {position}");
Configure advanced options:
services.UseSqlServerEventStore(options =>
{
options.ConnectionString = "Server=localhost;Database=MyApp;Integrated Security=true";
options.IncludeMetadata = true; // Include metadata (timestamp, server, etc.)
options.AutoCreateSchema = true; // Automatically create schema if missing
options.JsonSerializerOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
});
Use SQL Server checkpoint store for durable projection checkpoints:
using BbQ.Events.Configuration;
using BbQ.Events.SqlServer.Configuration;
var services = new ServiceCollection();
// Register event bus and projections
services.AddInMemoryEventBus();
services.AddProjection<UserProfileProjection>();
// Register SQL Server checkpoint store
services.UseSqlServerCheckpoints(
"Server=localhost;Database=MyApp;Integrated Security=true");
// Register projection engine
services.AddProjectionEngine();
var provider = services.BuildServiceProvider();
// Get the projection engine
var engine = provider.GetRequiredService<IProjectionEngine>();
// Run projections (blocks until cancelled)
await engine.RunAsync(cancellationToken);
using BbQ.Events.Configuration;
using BbQ.Events.SqlServer.Configuration;
var services = new ServiceCollection();
// Register SQL Server event store for event sourcing
services.UseSqlServerEventStore("Server=localhost;Database=MyApp;Integrated Security=true");
// Register event bus for pub/sub
services.AddInMemoryEventBus();
// Register projections
services.AddProjection<UserProfileProjection>();
// Register SQL Server checkpoint store for projections
services.UseSqlServerCheckpoints("Server=localhost;Database=MyApp;Integrated Security=true");
// Register projection engine
services.AddProjectionEngine();
var provider = services.BuildServiceProvider();
Recommended: Store connection strings in configuration:
var eventStoreConnection = builder.Configuration.GetConnectionString("EventStore");
var checkpointConnection = builder.Configuration.GetConnectionString("Checkpoints");
services.UseSqlServerEventStore(eventStoreConnection);
services.UseSqlServerCheckpoints(checkpointConnection);
appsettings.json:
{
"ConnectionStrings": {
"EventStore": "Server=localhost;Database=MyApp;Integrated Security=true",
"Checkpoints": "Server=localhost;Database=MyApp;Integrated Security=true"
}
}
Note: For development environments with self-signed certificates, you may need to add TrustServerCertificate=true to the connection string. However, this should not be used in production as it disables TLS certificate validation and can allow man-in-the-middle attacks.
The package follows a feature-based folder structure:
BbQ.Events.SqlServer/
Events/ # Event store implementation
SqlServerEventStore.cs
SqlServerEventStoreOptions.cs
Checkpointing/ # Projection checkpoint store
SqlServerProjectionCheckpointStore.cs
Schema/ # SQL schema scripts
CreateEventsTable.sql
CreateStreamsTable.sql
CreateCheckpointTable.sql
Configuration/ # DI extensions
ServiceCollectionExtensions.cs
Internal/ # Internal helpers (not public API)
SqlHelpers.cs
SqlConstants.cs
This structure:
The event store uses SQL Server's transaction support to ensure:
The checkpoint store uses SQL Server's MERGE statement for atomic upsert operations:
MERGE BbQ_ProjectionCheckpoints AS target
USING (SELECT @ProjectionName, @PartitionKey) AS source
ON target.ProjectionName = source.ProjectionName
AND target.PartitionKey IS NULL
WHEN MATCHED THEN UPDATE SET Position = @Position
WHEN NOT MATCHED THEN INSERT (...)
This ensures that:
Both implementations are safe for parallel processing scenarios:
Both implementations rely on ADO.NET's built-in connection pooling:
All schema files include recommended indexes. The primary indexes are:
Events Table:
-- Unique constraint ensures position uniqueness within a stream
CONSTRAINT UQ_BbQ_Events_Stream_Position UNIQUE (StreamName, Position)
-- Optimized for reading events from a stream
CREATE INDEX IX_BbQ_Events_StreamName_Position ON BbQ_Events(StreamName, Position);
Checkpoints Table:
-- Primary key for fast checkpoint lookups
PRIMARY KEY (ProjectionName, PartitionKey)
Replace the in-memory event store with SQL Server:
Before:
services.AddSingleton<IEventStore, InMemoryEventStore>();
After:
services.UseSqlServerEventStore(connectionString);
All event store client code remains unchanged.
Replace the in-memory checkpoint store with SQL Server:
Before:
services.AddProjectionEngine(); // Uses InMemoryProjectionCheckpointStore
After:
services.UseSqlServerCheckpoints(connectionString);
services.AddProjectionEngine();
All existing projection code remains unchanged.
If you encounter connection errors:
Verify SQL Server is accessible:
sqlcmd -S localhost -d MyApp -E
Check connection string: Ensure it's valid and includes necessary options like TrustServerCertificate=true for dev environments
Verify tables exist:
SELECT * FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME LIKE 'BbQ_%';
If events aren't being saved:
Verify tables exist:
SELECT * FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME IN ('BbQ_Events', 'BbQ_Streams');
Check for errors: Enable logging to see any exceptions
Query events directly:
SELECT * FROM BbQ_Events;
SELECT * FROM BbQ_Streams;
If checkpoints aren't being saved:
Verify projection engine is running:
await engine.RunAsync(cancellationToken);
Check projection registration:
services.AddProjection<YourProjection>();
Query the checkpoint table directly:
SELECT * FROM BbQ_ProjectionCheckpoints;
The schema and architecture are designed to support future features:
PartitionKey column enables per-partition checkpointingLastUpdatedUtc column enables projection health monitoringMIT License - see LICENSE.txt for details
Contributions are welcome! Please open an issue or pull request at: https://github.com/JeanMarcMbouma/Outcome