Nethereum.RPC.Reactive, Reactive Client Subscriptions (WebSockets) and RPC Extensions for Nethereum
$ dotnet add package Nethereum.RPC.ReactiveNethereum.RPC.Reactive provides Reactive Extensions (Rx.NET) support for Ethereum RPC operations. It enables reactive programming patterns for monitoring blockchain events, streaming blocks and transactions, and building real-time Ethereum applications using observables.
Note: This package builds on Nethereum.JsonRpc.WebSocketStreamingClient, which provides the core IObservable infrastructure for WebSocket subscriptions. For detailed documentation on Observable handlers and subscription internals, see the Nethereum.JsonRpc.WebSocketStreamingClient package.
eth_subscribe + newHeads)eth_subscribe + newPendingTransactions)eth_subscribe + logs)dotnet add package Nethereum.RPC.Reactive
Nethereum.RPC - Core RPC functionalitySystem.Reactive (4.1.3+) - Reactive Extensionsusing Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.RPC.Eth.DTOs;
var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
// Subscribe to new block headers
var subscription = new EthNewBlockHeadersObservableSubscription(client);
subscription.GetSubscriptionDataResponsesAsObservable().Subscribe(block =>
{
Console.WriteLine($"New block: {block.Number.Value}");
Console.WriteLine($"Block hash: {block.BlockHash}");
Console.WriteLine($"Timestamp: {block.Timestamp.Value}");
});
// Handle subscription confirmation
subscription.GetSubscribeResponseAsObservable().Subscribe(subscriptionId =>
{
Console.WriteLine($"Subscription ID: {subscriptionId}");
});
await client.StartAsync();
await subscription.SubscribeAsync();
// Keep running...
await Task.Delay(Timeout.Infinite);
using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;
var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");
// Stream new blocks as they arrive
web3.Eth.GetBlocksWithTransactionHashes()
.Subscribe(block =>
{
Console.WriteLine($"New block: {block.Number}");
Console.WriteLine($"Transaction count: {block.TransactionHashes.Length}");
});
// Keep running...
await Task.Delay(Timeout.Infinite);
Subscribe to new block headers in real-time via WebSocket:
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var subscription = new EthNewBlockHeadersObservableSubscription(client);
subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(
block => Console.WriteLine($"Block {block.Number.Value}: {block.BlockHash}"),
error => Console.WriteLine($"Error: {error.Message}"),
() => Console.WriteLine("Subscription completed")
);
await client.StartAsync();
await subscription.SubscribeAsync();
Monitor pending transactions in the mempool:
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var subscription = new EthNewPendingTransactionObservableSubscription(client);
subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(txHash =>
{
Console.WriteLine($"Pending transaction: {txHash}");
});
await client.StartAsync();
await subscription.SubscribeAsync();
Subscribe to event logs with filters:
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.RPC.Eth.DTOs;
var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var subscription = new EthLogsObservableSubscription(client);
// Filter for specific contract events
var filterInput = new NewFilterInput
{
Address = new[] { "0x6B175474E89094C44Da98b954EedeAC495271d0F" }, // DAI contract
Topics = new[] { "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" } // Transfer event
};
subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(log =>
{
Console.WriteLine($"Log from block: {log.BlockNumber.Value}");
Console.WriteLine($"Transaction: {log.TransactionHash}");
});
await client.StartAsync();
await subscription.SubscribeAsync(filterInput);
using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;
var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");
// Stream blocks with transaction hashes
web3.Eth.GetBlocksWithTransactionHashes()
.Subscribe(block =>
{
Console.WriteLine($"Block: {block.Number}");
Console.WriteLine($"Transactions: {block.TransactionHashes.Length}");
});
// Stream blocks with full transaction details
web3.Eth.GetBlocksWithTransactions()
.Subscribe(block =>
{
Console.WriteLine($"Block: {block.Number}");
foreach (var tx in block.Transactions)
{
Console.WriteLine($" From: {tx.From} To: {tx.To} Value: {tx.Value}");
}
});
// Stream specific block range
web3.Eth.GetBlocksWithTransactionHashes(
start: new BlockParameter(18000000),
end: new BlockParameter(18000100)
).Subscribe(block =>
{
Console.WriteLine($"Historical block: {block.Number}");
});
From: src/Nethereum.RPC.Reactive/Polling/BlockStreamExtensions.cs:10
using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;
var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");
// Custom poller - check every 5 seconds
var customPoller = Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(5))
.Select(_ => Unit.Default);
web3.Eth.GetBlocksWithTransactionHashes(poller: customPoller)
.Subscribe(block =>
{
Console.WriteLine($"Block (5s interval): {block.Number}");
});
From: src/Nethereum.RPC.Reactive/Polling/PollingExtensions.cs:15
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System.Reactive.Linq;
public class BlockMonitor
{
private StreamingWebSocketClient client;
private EthNewBlockHeadersObservableSubscription subscription;
public async Task StartAsync(string wsUrl)
{
client = new StreamingWebSocketClient(wsUrl);
client.Error += Client_Error;
subscription = new EthNewBlockHeadersObservableSubscription(client);
// Subscribe to blocks
subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(
onNext: block =>
{
Console.WriteLine($"Block {block.Number.Value}");
Console.WriteLine($" Hash: {block.BlockHash}");
Console.WriteLine($" Time: {DateTimeOffset.FromUnixTimeSeconds((long)block.Timestamp.Value)}");
Console.WriteLine($" Transactions: {block.TransactionHashes?.Length ?? 0}");
},
onError: error =>
{
Console.WriteLine($"Subscription error: {error.Message}");
},
onCompleted: () =>
{
Console.WriteLine("Subscription completed");
}
);
// Log subscription confirmation
subscription.GetSubscribeResponseAsObservable()
.Subscribe(subscriptionId =>
{
Console.WriteLine($"Subscribed with ID: {subscriptionId}");
});
await client.StartAsync();
await subscription.SubscribeAsync();
}
private async void Client_Error(object sender, Exception ex)
{
Console.WriteLine($"WebSocket error: {ex.Message}");
Console.WriteLine("Attempting to reconnect...");
// Stop and restart
await ((StreamingWebSocketClient)sender).StopAsync();
await StartAsync(((StreamingWebSocketClient)sender).Path);
}
public async Task StopAsync()
{
if (client != null)
{
await client.StopAsync();
}
}
}
// Usage
var monitor = new BlockMonitor();
await monitor.StartAsync("ws://127.0.0.1:8546");
From: consoletests/Nethereum.Parity.Reactive.ConsoleTest/Program.cs:32
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.RpcStreaming;
using Nethereum.RPC.Eth;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.Hex.HexTypes;
// Custom PubSub subscription for account balance changes
var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
client.Error += (sender, ex) => Console.WriteLine($"Error: {ex.Message}");
var accountAddress = "0x12890d2cce102216644c59daE5baed380d84830c";
// Create balance subscription (Parity-specific)
var balanceSubscription = new ParityPubSubObservableSubscription<HexBigInteger>(client);
// Build balance query request
var ethBalanceRequest = new EthGetBalance().BuildRequest(
accountAddress,
BlockParameter.CreateLatest()
);
// Subscribe to balance updates
balanceSubscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(
newBalance => Console.WriteLine($"New Balance: {Web3.Convert.FromWei(newBalance.Value)} ETH"),
onError => Console.WriteLine($"Error: {onError.Message}")
);
// Log subscription confirmation
balanceSubscription.GetSubscribeResponseAsObservable()
.Subscribe(x => Console.WriteLine($"Balance subscription ID: {x}"));
await client.StartAsync();
await balanceSubscription.SubscribeAsync(ethBalanceRequest);
// Keep monitoring
await Task.Delay(Timeout.Infinite);
From: consoletests/Nethereum.Parity.Reactive.ConsoleTest/Program.cs:35
using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;
var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");
// Only blocks with more than 100 transactions
web3.Eth.GetBlocksWithTransactionHashes()
.Where(block => block.TransactionHashes.Length > 100)
.Subscribe(block =>
{
Console.WriteLine($"Busy block: {block.Number} ({block.TransactionHashes.Length} txs)");
});
// Calculate average transactions per block (last 10 blocks)
web3.Eth.GetBlocksWithTransactionHashes()
.Buffer(10) // Group last 10 blocks
.Select(blocks => blocks.Average(b => b.TransactionHashes.Length))
.Subscribe(avgTxs =>
{
Console.WriteLine($"Average transactions per block (last 10): {avgTxs:F2}");
});
// Alert on large base fee
web3.Eth.GetBlocksWithTransactionHashes()
.Where(block => block.BaseFeePerGas?.Value > 100000000000) // > 100 gwei
.Subscribe(block =>
{
Console.WriteLine($"High base fee alert! Block {block.Number}: {block.BaseFeePerGas.Value} wei");
});
using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;
var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");
// Monitor large ETH transfers
web3.Eth.GetBlocksWithTransactions()
.SelectMany(block => block.Transactions) // Flatten to transaction stream
.Where(tx => tx.Value?.Value > Web3.Convert.ToWei(100)) // > 100 ETH
.Subscribe(tx =>
{
var ethValue = Web3.Convert.FromWei(tx.Value.Value);
Console.WriteLine($"Whale transfer: {ethValue} ETH");
Console.WriteLine($" From: {tx.From}");
Console.WriteLine($" To: {tx.To}");
Console.WriteLine($" Tx: {tx.TransactionHash}");
});
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.Web3;
using System.Reactive.Linq;
var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var web3 = new Web3(client);
var subscription = new EthNewPendingTransactionObservableSubscription(client);
subscription.GetSubscriptionDataResponsesAsObservable()
.Buffer(TimeSpan.FromSeconds(5)) // Group pending txs every 5 seconds
.Subscribe(async txHashes =>
{
Console.WriteLine($"{txHashes.Count} pending transactions in last 5 seconds");
// Get details for first pending transaction
if (txHashes.Count > 0)
{
var tx = await web3.Eth.Transactions.GetTransactionByHash.SendRequestAsync(txHashes[0]);
if (tx != null)
{
Console.WriteLine($" Sample tx: {tx.TransactionHash}");
Console.WriteLine($" Gas price: {tx.GasPrice?.Value ?? 0} wei");
}
}
});
await client.StartAsync();
await subscription.SubscribeAsync();
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.Contracts;
using Nethereum.ABI.FunctionEncoding.Attributes;
// Transfer event DTO
[Event("Transfer")]
public class TransferEventDTO : IEventDTO
{
[Parameter("address", "from", 1, true)]
public string From { get; set; }
[Parameter("address", "to", 2, true)]
public string To { get; set; }
[Parameter("uint256", "value", 3, false)]
public BigInteger Value { get; set; }
}
var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var subscription = new EthLogsObservableSubscription(client);
// Filter for USDC transfers
var filterInput = new NewFilterInput
{
Address = new[] { "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" }, // USDC
Topics = new[] { Event<TransferEventDTO>.GetEventABI().Sha3Signature } // Transfer signature
};
subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(log =>
{
try
{
var decoded = Event<TransferEventDTO>.DecodeEvent(log);
var amount = Web3.Convert.FromWei(decoded.Event.Value, 6); // USDC has 6 decimals
Console.WriteLine($"USDC Transfer: {amount} USDC");
Console.WriteLine($" From: {decoded.Event.From}");
Console.WriteLine($" To: {decoded.Event.To}");
Console.WriteLine($" Block: {log.BlockNumber.Value}");
}
catch (Exception ex)
{
Console.WriteLine($"Error decoding: {ex.Message}");
}
});
await client.StartAsync();
await subscription.SubscribeAsync(filterInput);
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System.Reactive.Linq;
var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var blockSubscription = new EthNewBlockHeadersObservableSubscription(client);
var pendingTxSubscription = new EthNewPendingTransactionObservableSubscription(client);
// Combine streams
var blocks = blockSubscription.GetSubscriptionDataResponsesAsObservable()
.Select(block => $"Block: {block.Number.Value}");
var pendingTxs = pendingTxSubscription.GetSubscriptionDataResponsesAsObservable()
.Buffer(TimeSpan.FromSeconds(10))
.Select(txs => $"Pending: {txs.Count} txs");
// Merge and display
Observable.Merge(blocks, pendingTxs)
.Subscribe(message => Console.WriteLine(message));
await client.StartAsync();
await blockSubscription.SubscribeAsync();
await pendingTxSubscription.SubscribeAsync();
using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;
var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");
// Throttle block stream to process at most 1 block per second
web3.Eth.GetBlocksWithTransactionHashes()
.Throttle(TimeSpan.FromSeconds(1))
.Subscribe(block =>
{
Console.WriteLine($"Processing block: {block.Number}");
// Expensive processing here
});
// Sample blocks - only process every 10th block
web3.Eth.GetBlocksWithTransactionHashes()
.Buffer(10)
.Select(blocks => blocks.Last()) // Take last block from each group
.Subscribe(block =>
{
Console.WriteLine($"Sampled block: {block.Number}");
});
using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using Nethereum.RPC.Eth.DTOs;
using System.Reactive.Linq;
var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");
// Process blocks from 18,000,000 to 18,001,000
var startBlock = new BlockParameter(18000000);
var endBlock = new BlockParameter(18001000);
web3.Eth.GetBlocksWithTransactionHashes(startBlock, endBlock)
.Do(block => Console.WriteLine($"Processing: {block.Number}"))
.SelectMany(block => block.TransactionHashes) // Flatten to transaction hashes
.Count()
.Subscribe(totalTxs =>
{
Console.WriteLine($"Total transactions in range: {totalTxs}");
});
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System.Reactive.Linq;
var client = new StreamingWebSocketClient("ws://127.0.0.1:8546");
var subscription = new EthNewBlockHeadersObservableSubscription(client);
subscription.GetSubscriptionDataResponsesAsObservable()
.Retry(3) // Retry up to 3 times on error
.Catch<Block, Exception>(ex =>
{
Console.WriteLine($"Error after retries: {ex.Message}");
return Observable.Empty<Block>(); // Complete gracefully
})
.Subscribe(block =>
{
Console.WriteLine($"Block: {block.Number.Value}");
});
await client.StartAsync();
await subscription.SubscribeAsync();
using Nethereum.JsonRpc.Client;
using Nethereum.RPC.Reactive.RpcStreaming;
using System.Reactive.Linq;
// Create custom observable for any RPC method
var client = new RpcClient(new Uri("https://mainnet.infura.io/v3/YOUR-PROJECT-ID"));
var blockNumberObservable = Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
.SelectMany(async _ =>
{
var ethBlockNumber = new Nethereum.RPC.Eth.Blocks.EthBlockNumber(client);
return await ethBlockNumber.SendRequestAsync();
});
blockNumberObservable.Subscribe(blockNumber =>
{
Console.WriteLine($"Current block: {blockNumber.Value}");
});
using Nethereum.Web3;
using Nethereum.RPC.Reactive.Polling;
using System.Reactive.Linq;
var web3 = new Web3("https://mainnet.infura.io/v3/YOUR-PROJECT-ID");
// Create complex stream pipelines
var largeBlockTransfers = web3.Eth.GetBlocksWithTransactions()
.SelectMany(block => block.Transactions)
.Where(tx => tx.Value?.Value > Web3.Convert.ToWei(10))
.GroupBy(tx => tx.From)
.SelectMany(group => group
.Buffer(TimeSpan.FromMinutes(1))
.Where(txs => txs.Count >= 3) // 3+ large transfers in 1 minute from same address
.Select(txs => new { From = group.Key, Transactions = txs })
);
largeBlockTransfers.Subscribe(result =>
{
Console.WriteLine($"Suspicious activity from {result.From}");
Console.WriteLine($" {result.Transactions.Count} large transfers in 1 minute");
});
Use WebSocket Subscriptions for Real-time Data: More efficient than polling
// Good - WebSocket subscription (real-time)
var subscription = new EthNewBlockHeadersObservableSubscription(client);
// Less efficient - HTTP polling
web3.Eth.GetBlocksWithTransactionHashes()
Handle Errors Gracefully: Always provide error handlers
observable.Subscribe(
onNext: data => ProcessData(data),
onError: ex => LogError(ex),
onCompleted: () => Console.WriteLine("Completed")
);
Dispose Subscriptions: Clean up resources when done
var disposable = observable.Subscribe(...);
// Later...
disposable.Dispose();
Use Appropriate Polling Intervals: Balance freshness vs. resource usage
// For fast-changing data
var fastPoller = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(100));
// For slow-changing data
var slowPoller = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(30));
Apply Backpressure Operators: Prevent overwhelming downstream consumers
observable
.Throttle(TimeSpan.FromSeconds(1)) // Max 1 per second
.Buffer(10) // Process in batches of 10
.Sample(TimeSpan.FromSeconds(5)) // Sample every 5 seconds
Reconnect on WebSocket Errors: Implement automatic reconnection
client.Error += async (sender, ex) =>
{
await ((StreamingWebSocketClient)sender).StopAsync();
await ReconnectAsync();
};
Common operators for blockchain data processing:
| Operator | Use Case | Example |
|---|---|---|
Where | Filter blocks/transactions | Where(block => block.Number > 1000000) |
Select | Transform data | Select(block => block.Number.Value) |
SelectMany | Flatten nested data | SelectMany(block => block.Transactions) |
Buffer | Group items by count/time | Buffer(TimeSpan.FromSeconds(10)) |
Throttle | Rate limiting | Throttle(TimeSpan.FromSeconds(1)) |
Sample | Periodic sampling | Sample(TimeSpan.FromSeconds(5)) |
Take | Limit results | Take(100) // First 100 blocks |
Skip | Skip initial items | Skip(10) // Skip first 10 |
Distinct | Remove duplicates | Distinct(tx => tx.TransactionHash) |
GroupBy | Group by key | GroupBy(tx => tx.From) |
Merge | Combine streams | Observable.Merge(blocks, txs) |
Zip | Combine pairwise | blocks.Zip(receipts) |
Retry | Retry on error | Retry(3) |
Catch | Handle errors | Catch<T, Exception>(...) |
Observable never fires
Solution: Ensure WebSocket client is started:
await client.StartAsync();
await subscription.SubscribeAsync();
Memory usage grows over time
Solution: Always dispose subscriptions:
var disposable = observable.Subscribe(...);
// Later
disposable.Dispose();
Blocks are skipped in the stream
Solution: Adjust polling interval or use WebSocket subscriptions for guaranteed delivery.
MIT License - see LICENSE file for details