Operators for processing and transforming streams of values from a variety of sources
$ dotnet add package VDT.Core.OperatorsOperators that process streams of published values from operand streams, allowing you to subscribe to the output stream for handling any sorts of events in a streamlined way. Create operand streams of the types you want to process, apply the required operators to those streams and subscribe to the results. Conceptually this is similar to piping observables, except an operand stream doesn't own the data it publishes - it's merely a conduit that's used for publishing, piping and subscribing. Below example uses a series of operators to ensure string values don't get published more than once every half a second and can be parsed as integers before subscribing to the resulting integer stream.
Each operand stream can be provided with an OperandStreamOptions object to specify how subscribers are interacted with.
ReplayWhenSubscribing toggles the setting to publish all previously published values to a new subscriber when it is addedValueGenerator sets a method that will be executed when an operand stream is subscribed to, providing initial valuesReplayValueGeneratorWhenSubscribing toggles the setting to determine when to execute the ValueGenerator
false to execute ValueGenerator only for the first subscribertrue to execute ValueGenerator for every subscriberPlease note that if ReplayWhenSubscribing and ReplayValueGeneratorWhenSubscribing are both false, any subscribers after the first will only receive
values from ValueGenerator generated after they are subscribed.
Debounce delays and throttles output valuesFilter discards output values based on a predicateFlatten subscribes to a stream of streams and outputs values published by the child streamsGroupBy groups published values by using a key selectorIterate loops over received values of IEnumerable<T>Map transforms valuesMerge merges two or more streamsQueueThrottle throttles output, queueing received valuesQueueZip outputs tuples of values published by two streams, queueing received valuesThrottle throttles output, discarding old received valuesZip outputs tuples of values published by two streams, discarding old received valuesBelow example uses a series of operators to ensure string values don't get published more than once every half a second and can be parsed as integers before subscribing to the resulting integer stream.
<input type="text" @oninput="async args => await valueStream.Publish(args.Value!.ToString()!)" />
@code {
private readonly IOperandStream<string> valueStream = new OperandStream<string>();
protected override void OnAfterRender(bool firstRender) {
if (firstRender) {
valueStream
.Debounce(500)
.Map(value => new { IsValid = int.TryParse(value, out var result), Result = result })
.Filter(value => value.IsValid)
.Map(value => value.Result)
.Subscribe(value => {
// Handle received integer values
});
}
}
}
Although many common operators are available out of the box, it is simple to create your own by implementing either
IOperator<TValue, TTransformedValue> to transform values without any initialization, or
IOperator<TValue, TTransformedValue, TInitializationData> to add an initialization method with data to the operator. For ease of use, you can then also
create extension methods for IOperandStream<TValue< that pipes values through your operator to the target stream.