Package Description
License
—
Deps
2
Install Size
—
Vulns
✓ 0
Published
Mar 13, 2024
$ dotnet add package MicroPlumberd.DirectConnectMicro library for EventStore, CQRS and EventSourcing Just eXtreamly simple.
Documentation can be found here: MicroPlumberd Documentation
dotnet add package MicroPlumberd
dotnet add package MicroPlumberd.SourceGeneratiors
If you'd like to use direct dotnet-dotnet communication to execute command-handlers install MicroPlumberd.DirectConnect
dotnet add package MicroPlumberd.DirectConnect
/// change to your connection-string.
string connectionString = $"esdb://admin:changeit@localhost:2113?tls=false&tlsVerifyCert=false";
var settings = EventStoreClientSettings.Create(connectionString);
var plumber = Plumber.Create(settings);
[Aggregate]
public partial class FooAggregate(Guid id) : AggregateBase<FooAggregate.FooState>(id)
{
internal new FooState State => base.State;
public record FooState { public string Name { get; set; } };
private static FooState Given(FooState state, FooCreated ev) => state with { Name = ev.Name };
private static FooState Given(FooState state, FooUpdated ev) => state with { Name =ev.Name };
public void Open(string msg) => AppendPendingChange(new FooCreated() { Name = msg });
public void Change(string msg) => AppendPendingChange(new FooUpdated() { Name = msg });
}
Comments:
If you want to create a new aggregate and save it to EventStoreDB:
FooAggregate aggregate = FooAggregate.New(Guid.NewGuid());
aggregate.Open("Hello");
await plumber.SaveNew(aggregate);
If you want to load aggregate from EventStoreDB, change it and save back to EventStoreDB
var aggregate = await plumber.Get<FooAggregate>("YOUR_ID");
aggregate.Change("World");
await plumber.SaveChanges(aggregate);
[EventHandler]
public partial class FooModel
{
private async Task Given(Metadata m, FooCreated ev)
{
// your code
}
private async Task Given(Metadata m, FooUpdated ev)
{
// your code
}
}Comments:
var fooModel = new FooModel();
var sub= await plumber.SubscribeModel(fooModel);
// or if you want to persist progress of your subscription
var sub2= await plumber.SubscribeModelPersistently(fooModel);With SubscribeModel you can subscribe from start, from certain moment or from the end of the stream.
[EventHandler]
public partial class FooProcessor(IPlumber plumber)
{
private async Task Given(Metadata m, FooUpdated ev)
{
var agg = FooAggregate.New(Guid.NewGuid());
agg.Open(ev.Name + " new");
await plumber.SaveNew(agg);
}
}Implementing a processor is technically the same as implementing a read-model, but inside the Given method you would typically invoke a command or execute an aggregate.
Imagine this:
Please note that Sql schema create/drop auto-generation script will be covered in a different article. (For now we leave it for developers.)
Comments:
[OutputStream(FooModel.MODEL_NAME)]
[EventHandler]
public partial class FooModel : DbContext
{
internal const string MODEL_VER = "_v1";
internal const string MODEL_NAME = $"FooModel{MODEL_VER}";
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder
.Entity<FooEntity>()
.ToTable($"FooEntities{MODEL_VER}");
}
private async Task Given(Metadata m, FooCreated ev)
{
// your code
}
private async Task Given(Metadata m, FooUpdated ev)
{
// your code
}
}/// Given simple models, where master-model has foreign-key used to obtain value from dimentionLookupModel
var dimentionTable = new DimentionLookupModel();
var factTable = new MasterModel(dimentionTable);
await plumber.SubscribeSet()
.With(dimentionTable)
.With(factTable)
.SubscribeAsync("MasterStream", FromStream.Start);/// Let's configure server:
services.AddCommandHandler<FooCommandHandler>().AddServerDirectConnect();
/// Add mapping to direct-connect service
app.MapDirectConnect();Here is an example of a command handler code:
[CommandHandler]
public partial class FooCommandHandler(IPlumber plumber)
{
[ThrowsFaultException<BusinessFault>]
public async Task Handle(Guid id, CreateFoo cmd)
{
if (cmd.Name == "error")
throw new BusinessFaultException("Foo");
var agg = FooAggregate.New(id);
agg.Open(cmd.Name);
await plumber.SaveNew(agg);
}
[ThrowsFaultException<BusinessFault>]
public async Task<HandlerOperationStatus> Handle(Guid id, ChangeFoo cmd)
{
if (cmd.Name == "error")
throw new BusinessFaultException("Foo");
var agg = await plumber.Get<FooAggregate>(id);
agg.Change(cmd.Name);
await plumber.SaveChanges(agg);
return HandlerOperationStatus.Ok();
}
}And how on the client side:
service.AddClientDirectConnect().AddCommandInvokers();
// And invocation
var clientPool = sp.GetRequiredService<IRequestInvokerPool>();
var invoker = clientPool.Get("YOUR_GRPC_URL");
await invoker.Execute(Guid.NewId(), new CreateFoo(){});You can easily inject aspects through decorator pattern.