In-memory transport implementation for HoneyDrunk.Transport. Provides observable queues and pub/sub subscriptions for testing without external dependencies.
$ dotnet add package HoneyDrunk.Transport.InMemoryReliable messaging and outbox infrastructure for the Hive - Transport unifies brokers, queues, and event buses under one contract ensuring delivery, order, and idempotence. It powers communication between Nodes—Data, Pulse, Vault, and beyond—so every message finds its way.
HoneyDrunk.Transport is the messaging backbone of HoneyDrunk.OS ("the Hive"). It provides a transport-agnostic abstraction layer over different message brokers with built-in resilience, observability, and exactly-once semantics.
ITransportPublisher and ITransportConsumer over Azure Service Bus, Azure Storage Queue, and InMemoryITransportEnvelope with correlation/causation tracking and Grid context propagationIGridContext from HoneyDrunk.Kernel for distributed context propagationIOutboxStore and IOutboxDispatcher contracts for exactly-once processingITransportHealthContributor for Kubernetes probe integrationITransportMetrics and built-in telemetry middlewareThe following features exist as contracts only or have limitations in v0.1.0:
| Feature | Contract | v0.1.0 Status |
|---|---|---|
| Transport publishing | ITransportPublisher | ✅ Implemented for Service Bus, Storage Queue, InMemory |
| Transport consuming | ITransportConsumer | ✅ Implemented for Service Bus, Storage Queue, InMemory |
| Transactional outbox | IOutboxStore | ⚠️ Contract only — application must implement against their database |
| Outbox dispatching | IOutboxDispatcher | ✅ DefaultOutboxDispatcher provided |
| Health aggregation | ITransportHealthContributor | ⚠️ Contributors exist — application wires into health system |
| Message serialization | IMessageSerializer | ✅ JsonMessageSerializer provided as default |
Bottom line: v0.1.0 provides complete transport abstraction with Azure providers. Applications must implement:
IOutboxStore if using transactional outbox pattern (database-specific)IMessageSerializer if JSON is not suitable# Azure Service Bus transport
dotnet add package HoneyDrunk.Transport.AzureServiceBus
# Or Azure Storage Queue transport
dotnet add package HoneyDrunk.Transport.StorageQueue
# Or InMemory transport (for testing)
dotnet add package HoneyDrunk.Transport.InMemory
# Or just the core abstractions (contracts only)
dotnet add package HoneyDrunk.Transport
This example shows a web application with Kernel and Azure Service Bus. Simpler setups are possible—see package-specific documentation.
Registration order matters. Kernel must be registered before Transport. See DependencyInjection.md for details.
using HoneyDrunk.Kernel.DependencyInjection;
using HoneyDrunk.Transport.DependencyInjection;
var builder = WebApplication.CreateBuilder(args);
// 1. Kernel (required for Grid context)
builder.Services.AddHoneyDrunkCoreNode(nodeDescriptor);
// 2. Transport core (middleware pipeline, envelope factory)
builder.Services.AddHoneyDrunkTransportCore(options =>
{
options.EnableTelemetry = true;
options.EnableLogging = true;
});
// 3. Azure Service Bus provider
builder.Services.AddHoneyDrunkServiceBusTransport(options =>
{
options.FullyQualifiedNamespace = "mynamespace.servicebus.windows.net";
options.Address = "orders";
options.EntityType = ServiceBusEntityType.Topic;
options.SubscriptionName = "order-processor";
options.MaxConcurrency = 10;
});
// 4. Register message handlers
builder.Services.AddMessageHandler<OrderCreatedEvent, OrderCreatedHandler>();
var app = builder.Build();
app.Run();
For libraries that only need contracts without runtime dependencies:
// Reference only HoneyDrunk.Transport
// No Kernel runtime, no broker SDK dependencies
public class OrderService
{
private readonly ITransportPublisher _publisher;
private readonly EnvelopeFactory _envelopeFactory;
private readonly IMessageSerializer _serializer;
public async Task PublishOrderCreatedAsync(Order order, CancellationToken ct)
{
var @event = new OrderCreatedEvent { OrderId = order.Id };
var payload = _serializer.Serialize(@event);
var envelope = _envelopeFactory.CreateEnvelope<OrderCreatedEvent>(payload);
await _publisher.PublishAsync(
envelope,
EndpointAddress.Create("orders", "orders-topic"),
ct);
}
}
All messages are wrapped in immutable ITransportEnvelope for distributed tracing:
public interface ITransportEnvelope
{
string MessageId { get; }
string? CorrelationId { get; }
string? CausationId { get; }
string MessageType { get; }
ReadOnlyMemory<byte> Payload { get; }
IReadOnlyDictionary<string, string> Headers { get; }
DateTimeOffset Timestamp { get; }
}
// Create envelopes via factory (integrates with TimeProvider and Grid context)
var envelope = envelopeFactory.CreateEnvelopeWithGridContext<OrderCreatedEvent>(
payload, gridContext);
Note: Always use EnvelopeFactory to create envelopes. It integrates with TimeProvider for deterministic timestamps and IGridContext for distributed context propagation.
Transport is fully integrated with Kernel's IGridContext for distributed context propagation:
public class OrderCreatedHandler : IMessageHandler<OrderCreatedEvent>
{
public async Task<MessageProcessingResult> HandleAsync(
OrderCreatedEvent message,
MessageContext context,
CancellationToken ct)
{
// Access Grid context directly from MessageContext
var grid = context.GridContext;
_logger.LogInformation(
"Processing order {OrderId} with CorrelationId {CorrelationId} on Node {NodeId}",
message.OrderId,
grid?.CorrelationId,
grid?.NodeId);
return MessageProcessingResult.Success;
}
}
Note: Grid context is extracted from envelope headers by GridContextPropagationMiddleware and populated in MessageContext automatically.
Message processing follows an onion-style middleware pattern:
// Built-in middleware (executed in order)
// 1. GridContextPropagationMiddleware - Extracts IGridContext from envelope
// 2. TelemetryMiddleware - Distributed tracing via OpenTelemetry
// 3. LoggingMiddleware - Structured logging of message processing
// Custom middleware registration
builder.Services.AddHoneyDrunkTransportCore()
.AddMiddleware<CustomRetryMiddleware>()
.AddMiddleware<CustomValidationMiddleware>();
Note: Middleware order matters. GridContextPropagation must run before telemetry to ensure correlation IDs are available for tracing.
For exactly-once processing with database transactions:
public class OrderService(
IOutboxStore outboxStore,
EnvelopeFactory factory,
IMessageSerializer serializer,
IDbContext dbContext)
{
public async Task CreateOrderAsync(CreateOrderCommand command, CancellationToken ct)
{
await using var transaction = await dbContext.BeginTransactionAsync(ct);
// Save order to database
var order = new Order { /* ... */ };
await dbContext.Orders.AddAsync(order, ct);
// Save message to outbox (same transaction)
var payload = serializer.Serialize(new OrderCreatedEvent { OrderId = order.Id });
var envelope = factory.CreateEnvelope<OrderCreatedEvent>(payload);
await outboxStore.SaveAsync(
EndpointAddress.Create("orders", "orders-topic"),
envelope, ct);
await dbContext.SaveChangesAsync(ct);
await transaction.CommitAsync(ct);
// DefaultOutboxDispatcher publishes from outbox in background
}
}
Note: IOutboxStore is a contract—application must implement against their database. DefaultOutboxDispatcher polls the store and publishes pending messages.
Transport providers include health monitoring for Kubernetes probes:
public interface ITransportHealthContributor
{
string Name { get; }
ValueTask<TransportHealthResult> CheckHealthAsync(CancellationToken ct);
}
// Each transport registers its own contributor
// - ServiceBusHealthContributor
// - StorageQueueHealthContributor
// - InMemoryHealthContributor
Note: Health contributors are passive—invoked by host health system on demand. Applications wire contributors into their health check infrastructure.
| Scenario | Storage Queue | Service Bus |
|---|---|---|
| Cost optimization | ✅ $0.0004/10K ops | ❌ Higher cost |
| High volume (millions/day) | ✅ Excellent | ✅ Good |
| Simple queue semantics | ✅ Yes | ✅ Yes |
| Message size < 64KB | ✅ Yes | ✅ Up to 100MB |
| Topics/subscriptions (fan-out) | ❌ No | ✅ Yes |
| Sessions (ordered processing) | ❌ No | ✅ Yes |
| Transactions | ❌ No | ✅ Yes |
| Duplicate detection | ❌ No | ✅ Yes |
Choose Storage Queue for cost-effective, high-volume, simple queue scenarios.
Choose Service Bus for enterprise messaging with topics, sessions, or transactions.
ITransportEnvelope, IMessageHandler, MessageContextTransportCoreOptions, RetryOptions, error strategiesIGridContextFactoryEnvelopeFactory, TransportEnvelope, serializationHoneyDrunk.Transport/
├── HoneyDrunk.Transport/ # Core abstractions & pipeline
│ ├── Abstractions/ # Publisher, consumer, handler contracts
│ ├── Pipeline/ # Middleware execution engine
│ ├── Configuration/ # TransportCoreOptions, RetryOptions
│ ├── Context/ # Grid context factory and propagation
│ ├── Primitives/ # EnvelopeFactory, TransportEnvelope
│ ├── Outbox/ # IOutboxStore, DefaultOutboxDispatcher
│ ├── Runtime/ # ITransportRuntime host
│ ├── Health/ # ITransportHealthContributor
│ ├── Metrics/ # ITransportMetrics
│ ├── Telemetry/ # OpenTelemetry integration
│ └── DependencyInjection/ # AddHoneyDrunkTransportCore()
│
├── HoneyDrunk.Transport.AzureServiceBus/ # Azure Service Bus provider
│ ├── Publishing/ # ServiceBusTransportPublisher
│ ├── Consuming/ # ServiceBusTransportConsumer
│ ├── BlobFallback/ # Blob storage for failed publishes
│ ├── Health/ # ServiceBusHealthContributor
│ └── DependencyInjection/ # AddHoneyDrunkServiceBusTransport()
│
├── HoneyDrunk.Transport.StorageQueue/ # Azure Storage Queue provider
│ ├── Publishing/ # StorageQueueTransportPublisher
│ ├── Consuming/ # StorageQueueTransportConsumer
│ ├── Health/ # StorageQueueHealthContributor
│ └── DependencyInjection/ # AddHoneyDrunkTransportStorageQueue()
│
├── HoneyDrunk.Transport.InMemory/ # In-memory provider (testing)
│ ├── InMemoryBroker # Thread-safe in-memory message store
│ ├── Health/ # InMemoryHealthContributor
│ └── DependencyInjection/ # AddHoneyDrunkInMemoryTransport()
│
└── HoneyDrunk.Transport.Tests/ # xUnit test suite
ITransportPublisher and ITransportConsumer transport abstractionITransportEnvelope immutable message wrapper with Grid contextEnvelopeFactory integrating TimeProvider and IGridContextIMessagePipeline with onion-style middleware executionIMessageHandler<T> and MessageProcessingResult for handler contractsGridContextPropagationMiddleware, TelemetryMiddleware, LoggingMiddlewareIOutboxStore and DefaultOutboxDispatcher for transactional outboxITransportHealthContributor for health check participationITransportMetrics for OpenTelemetry integrationServiceBusTransportPublisher with topic and queue supportServiceBusTransportConsumer with session and subscription supportBlobFallbackPublisher for failed publish persistenceServiceBusHealthContributor for connectivity health checksAzureServiceBusOptions with retry and prefetch configurationStorageQueueTransportPublisher with base64 encodingStorageQueueTransportConsumer with concurrent pollingStorageQueueHealthContributor for connectivity health checksStorageQueueOptions with dequeue count and visibility timeoutInMemoryBroker thread-safe message store for testingInMemoryTransportPublisher and InMemoryTransportConsumerInMemoryHealthContributor always-healthy contributor| Project | Relationship |
|---|---|
| HoneyDrunk.Kernel | Transport depends on Kernel for IGridContext and TimeProvider |
| HoneyDrunk.Standards | Analyzers and coding conventions |
| HoneyDrunk.Data | Data access and persistence (in development) |
| HoneyDrunk.Auth | Authentication and authorization (in development) |
Note: HoneyDrunk.Transport depends only on HoneyDrunk.Kernel.Abstractions (contracts, no runtime). Transport providers depend on their respective Azure SDKs.
This project is licensed under the MIT License.
Built with 🍯 by HoneyDrunk Studios