Core entities for CandleFlow - A high-performance .NET library for aggregating tick data into OHLCV candlestick charts. Contains Tick, Candle, TimeFrame, and TradeDirection types.
$ dotnet add package Antaeus.CandleFlow.CoreA high-performance .NET 8 library for aggregating tick data into OHLCV candlestick charts across multiple timeframes.
dotnet add package Antaeus.CandleFlow.Core
dotnet add package Antaeus.CandleFlow.Aggregation
dotnet add package Antaeus.CandleFlow.Persistence.EfCore # Optional: for persistence
using Antaeus.CandleFlow.Aggregation;
using Antaeus.CandleFlow.Core.Entities;
// Create the aggregator with options
var options = new CandleAggregatorOptions
{
DefaultTimeFrames = new[] { TimeFrame.FiveMinutes, TimeFrame.OneHour }
};
using var aggregator = new CandleAggregator(options);
// Add a symbol to track
aggregator.AddSymbol("BTC/USD");
// Process ticks
var tick = new Tick(DateTime.UtcNow, price: 40000m, volume: 1.5m, TradeDirection.Buy);
aggregator.ProcessTick("BTC/USD", tick);
// Get current candle
var currentCandle = aggregator.GetCurrentCandle("BTC/USD", TimeFrame.FiveMinutes);
Console.WriteLine($"Current: O={currentCandle.Open} H={currentCandle.High} L={currentCandle.Low} C={currentCandle.Close}");
// Get closed candles
var closedCandles = aggregator.GetClosedCandles("BTC/USD", TimeFrame.FiveMinutes, count: 10);
using Antaeus.CandleFlow.Aggregation;
// In your Startup/Program.cs
services.AddCandleFlow(options =>
{
options.DefaultTimeFrames = new[] { TimeFrame.OneMinute, TimeFrame.FiveMinutes, TimeFrame.OneHour };
options.EnableBatchOptimization = true;
options.BatchThreshold = 100;
});
// Inject ICandleAggregator where needed
public class MyService
{
private readonly ICandleAggregator _aggregator;
public MyService(ICandleAggregator aggregator)
{
_aggregator = aggregator;
}
}
// Subscribe to all updates (tick-by-tick)
using var subscription = aggregator.Subscribe("BTC/USD", TimeFrame.FiveMinutes)
.Subscribe(update =>
{
Console.WriteLine($"Update: {update.UpdateType} - Close: {update.Candle.Close}");
update.Acknowledge(); // Mark as processed
});
// Subscribe to closed candles only
using var closedSub = aggregator.SubscribeToClosedOnly("BTC/USD", TimeFrame.FiveMinutes)
.Subscribe(update =>
{
Console.WriteLine($"Candle closed: {update.Candle.OpenTime} - OHLC: {update.Candle.Open}/{update.Candle.High}/{update.Candle.Low}/{update.Candle.Close}");
});// Process historical tick data efficiently
var historicalTicks = LoadTicksFromDatabase(); // Your data source
aggregator.ProcessBatch("BTC/USD", historicalTicks);// Use predefined timeframes
aggregator.AddSymbol("ETH/USD", TimeFrame.OneMinute, TimeFrame.FiveMinutes, TimeFrame.OneHour);
// Or create custom timeframes
var sevenMinutes = new TimeFrame(TimeSpan.FromMinutes(7), "7m");
var twoHours = new TimeFrame(TimeSpan.FromHours(2), "2H");
aggregator.AddSymbol("SOL/USD", sevenMinutes, twoHours);A Tick represents a single trade or price update:
public record Tick(
DateTime Timestamp, // UTC timestamp
decimal Price, // Trade price
decimal Volume, // Trade volume
TradeDirection Direction // Buy, Sell, or Unknown
);A Candle represents an OHLCV bar:
Predefined timeframes:
TimeFrame.OneMinute (1m)TimeFrame.FiveMinutes (5m)TimeFrame.FifteenMinutes (15m)TimeFrame.ThirtyMinutes (30m)TimeFrame.OneHour (1H)TimeFrame.FourHours (4H)TimeFrame.OneDay (1D)TimeFrame.OneWeek (1W)// Add DbContext
services.AddDbContext<CandleFlowDbContext>(options =>
options.UseSqlServer(connectionString));
// Register repositories
services.AddScoped<ICandleRepository, EfCoreCandleRepository>();
services.AddScoped<IAggregatorStateRepository, EfCoreAggregatorStateRepository>();var repository = serviceProvider.GetRequiredService<ICandleRepository>();
// Save a single candle
await repository.SaveAsync(candle);
// Save multiple candles
await repository.SaveBatchAsync(candles);
// Query candles
var recentCandles = await repository.GetRangeAsync(
"BTC/USD",
TimeFrame.FiveMinutes,
DateTime.UtcNow.AddDays(-1),
DateTime.UtcNow);using Antaeus.CandleFlow.Aggregation.Retention;
// Count-based retention (keep last N candles)
var countPolicy = new CountBasedRetention(maxCount: 1000);
// Time-based retention (keep candles younger than max age)
var timePolicy = new TimeBasedRetention(maxAge: TimeSpan.FromDays(30));
// Composite policy (both conditions must be met)
var composite = new CompositeRetention(countPolicy, timePolicy, requireAll: true);
// Use with retention manager
var manager = new RetentionManager(composite);
manager.TrackCandle(candle);
manager.Acknowledge(candle);
var evicted = manager.ApplyRetention();Antaeus.CandleFlow.Core
├── Entities/
│ ├── Tick.cs
│ ├── Candle.cs
│ ├── TimeFrame.cs
│ └── TradeDirection.cs
Antaeus.CandleFlow.Aggregation
├── ICandleAggregator.cs
├── CandleAggregator.cs
├── CandleAggregatorOptions.cs
├── Building/
│ ├── ICandleBuilder.cs
│ └── CandleBuilder.cs
├── Hierarchy/
│ ├── ITimeFrameHierarchy.cs
│ └── TimeFrameHierarchy.cs
├── Subscriptions/
│ ├── ICandleUpdate.cs
│ └── SubscriptionManager.cs
├── Retention/
│ ├── IRetentionPolicy.cs
│ └── RetentionManager.cs
└── Threading/
├── CandleStore.cs
└── AsyncTickProcessor.cs
Antaeus.CandleFlow.Persistence.EfCore
├── ICandleRepository.cs
├── EfCoreCandleRepository.cs
├── IAggregatorStateRepository.cs
└── EfCoreAggregatorStateRepository.cs
CandleFlow is designed for high-throughput scenarios:
ReaderWriterLockSlim for candle updatesChannel<T> for async tick processingBenchmarks (typical):
MIT License - see LICENSE file for details.
Contributions are welcome! Please read our contributing guidelines and submit pull requests.