RingBufferPlus is an MIT-licensed open source library that implements a generic autoscaler ring buffer (elastic buffer).
$ dotnet add package RingBufferPlusA ring buffer is a memory allocation scheme where memory is reused (reclaimed) when an index, incremented modulo the buffer size, writes over a previously used location. A ring buffer makes a bounded queue when separate indices are used for inserting and removing data. The queue can be safely shared between threads (or processors) without further synchronization so long as one processor enqueues data and the other dequeues it. (Also, modifications to the read/write pointers must be atomic, and this is a non-blocking queue--an error is returned when trying to write to a full queue or read from an empty queue).
A ring buffer makes a bounded queue when separate indices are used for inserting and removing data. The queue can be safely shared between threads (or processors) without further synchronization so long as one processor enqueues data and the other dequeues it. (Also, modifications to the read/write pointers must be atomic, and this is a non-blocking queue--an error is returned when trying to write to a full queue or read from an empty queue).
The RingBufferPlus implementation follows the basic principle. The principle was expanded to have a scale capacity to optimize the consumption of the resources used.
v4.0.1 (latest version)
v4.0.0
v3.2.0 (Deprecated)
Install-Package RingBufferPlus [-pre]
dotnet add package RingBufferPlus [--prerelease]
Note: [-pre]/[--prerelease] usage for pre-release versions
This example uses the RingBufferPlus with non scale (non elastic buffer)
Random rnd = new();
var rb = await RingBuffer<int>.New("MyBuffer")
.Capacity(3)
.Logger(HostApp.Services.GetService<ILogger<Program>>())
.Factory((_) => { return Task.FromResult(rnd.Next(1, 10)); })
.BuildWarmupAsync(token);
Console.WriteLine($"Ring Buffer name({rb.Name}) created.");
Console.WriteLine($"Ring Buffer Current capacity is : {rb.CurrentCapacity}");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsInitCapacity({rb.Capacity}) = {rb.IsInitCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMaxCapacity({rb.MaxCapacity}) = {rb.IsMaxCapacity}.");
Console.WriteLine($"Ring Buffer name({rb.Name}) IsMinCapacity({rb.MinCapacity}) = {rb.IsMinCapacity}.");
using (var buffer = await rb.AcquireAsync(token))
{
if (buffer.Successful)
{
Console.WriteLine($"Buffer is ok({buffer.Successful}:{buffer.ElapsedTime}) value: {buffer.Current}");
}
else
{
//do something
}
}
This example uses the RingBufferPlus with manual scale (elastic buffer).
The manual scaling up and down process is done on brackgroud without locking buffer acquisition or SwitchTo command.
Random rnd = new();
var rb = await RingBuffer<int>.New("MyBuffer")
.Capacity(6)
.Logger(HostApp.Services.GetService<ILogger<Program>>())
.Factory((_) => { return Task.FromResult(rnd.Next(1, 10)); })
.ScaleTimer()
.MinCapacity(3)
.MaxCapacity(9)
.BuildWarmupAsync(token);
if (!await rb.SwitchToAsync(ScaleSwitch.MaxCapacity))
{
//manual scale was not scheduled
//do something
}
This example uses RingBufferPlus with autoscaling. Autoscaling (scaling up) occurs when there is a capacity acquisition failure. Scaling down automatically occurs in the background when a resource availability is reached.
The auto scale up and down process is done on brackgroud without locking buffer acquisition.
The Manual scaling up and down will be disabled (always returns false) when using the AutoScaleAcquireFault command
Random rnd = new();
var rb = await RingBuffer<int>.New("MyBuffer")
.Capacity(6)
.Logger(HostApp.Services.GetService<ILogger<Program>>())
.Factory((_) => { return Task.FromResult(rnd.Next(1, 10)); })
.ScaleTimer(50, TimeSpan.FromSeconds(5))
.AutoScaleAcquireFault(2)
.MinCapacity(3)
.MaxCapacity(9)
.BuildWarmupAsync(token);
When the scaling up or down process is executed, acquisition or scale switching is not blocked.
In scenarios where there is a lot of stress on the buffer resource, it may not be possible to perform these actions. In these scenarios, it is preferable to block acquisition or scale switching to ensure the desired execution.
Random rnd = new();
var rb = await RingBuffer<int>.New("MyBuffer")
.Capacity(6)
.Logger(HostApp.Services.GetService<ILogger<Program>>())
.Factory((_) => { return Task.FromResult(rnd.Next(1, 10)); })
.ScaleTimer(50, TimeSpan.FromSeconds(5))
.LockWhenScaling()
.MinCapacity(3)
.MaxCapacity(9)
.BuildWarmupAsync(token);
There may be scenarios where you want to inspect an item in the buffer for some action (such as checking its health status). When this option is used periodically, an item is made available in the buffer for this need.
You should not execute the dispose of the acquired item! This is done internally by the component.
Random rnd = new();
var rb = await RingBuffer<int>.New("MyBuffer")
.Capacity(6)
.HeartBeat(MyHeartBeat)
.Logger(HostApp.Services.GetService<ILogger<Program>>())
.Factory((_) => { return Task.FromResult(rnd.Next(1, 10)); })
.BuildWarmupAsync(token);
private void MyHeartBeat(RingBufferValue<int> item)
{
//do anything ex: health check
}
Log execution is done automatically by the component (Level Debug, Warning and Error) in the same execution thread. This process can burden execution if the log recording process takes a long time.
For this scenario, you can use the log execution in the background in an asynchronous process done by the component.
Random rnd = new();
var rb = await RingBuffer<int>.New("MyBuffer")
.Capacity(6)
.Logger(HostApp.Services.GetService<ILogger<Program>>())
.BackgroundLogger()
.Factory((_) => { return Task.FromResult(rnd.Next(1, 10)); })
.BuildWarmupAsync(token);
This example uses RingBufferPlus for Rabbit channels to publish messages with improved performance using automatic scaling when an acquisition failure occurs.
Scaling down is performed automatically in the background when a resource availability is reached.
The auto scale up and down process is done on brackgroud without locking buffer acquisition.
The Manual scaling up and down will be disabled (always returns false) when using the AutoScaleAcquireFault command
var ConnectionFactory = new ConnectionFactory()
{
Port = 8087,
HostName = "localhost",
UserName = "guest",
Password = "guest",
ClientProvidedName = "PublisherRoleProgram"
};
var connectionRabbit = await ConnectionFactory!.CreateConnectionAsync(token);
var rb = await RingBuffer<IChannel>.New("RabbitChanels")
.Capacity(10)
.Logger(applogger!)
.BackgroundLogger()
.Factory((cts) => ChannelFactory(cts))
.ScaleTimer(100, TimeSpan.FromSeconds(10))
.MaxCapacity(20)
.MinCapacity(5)
.AutoScaleAcquireFault()
.BuildWarmupAsync(token);
using (var buffer = await rb.AcquireAsync(token))
{
if (buffer.Successful)
{
var body = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("Test"));
await buffer.Current!.BasicPublishAsync("", "log", body);
}
else
{
//do something
}
}
private async Task<IChannel> ChannelFactory(CancellationToken cancellation)
{
return await connectionRabbit!.CreateChannelAsync(cancellationToken: cancellation);
}
For more examples, please refer to the Samples directory :
The documentation is available in the Docs directory.
This project has adopted the code of conduct defined by the Contributor Covenant to clarify expected behavior in our community. For more information see the Code of Conduct.
See the Contributing guide for developer documentation.
This work was inspired by Luiz Carlos Faria project.
API documentation generated by
Copyright 2022 @ Fernando Cerqueira
RingBufferPlus is licensed under the MIT license. See LICENSE.