Nethereum.JsonRpc WebSocketClient
$ dotnet add package Nethereum.JsonRpc.WebSocketClientWebSocket JSON-RPC client with support for real-time subscriptions and event streaming.
Nethereum.JsonRpc.WebSocketClient provides WebSocket transport implementations for Ethereum node communication, supporting both standard request/response patterns and real-time event subscriptions. WebSockets enable push-based notifications from the node for new blocks, pending transactions, and contract events without polling.
Key Features:
Use Cases:
dotnet add package Nethereum.JsonRpc.WebSocketClient
Requirements:
Nethereum:
External:
using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.RPC.Eth;
// Connect to WebSocket endpoint
var client = new WebSocketClient("ws://localhost:8546");
// Use like any other RPC client
var ethBlockNumber = new EthBlockNumber(client);
var blockNumber = await ethBlockNumber.SendRequestAsync();
Console.WriteLine($"Current block: {blockNumber.Value}");
// Always dispose when done
client.Dispose();
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
// Create streaming client
var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID");
// Create subscription for new blocks
var subscription = new EthNewBlockHeadersSubscription(client);
// Handle new block events
subscription.GetSubscriptionDataResponsesAsObservable().Subscribe(block =>
{
Console.WriteLine($"New block: {block.Number.Value}");
Console.WriteLine($"Hash: {block.BlockHash}");
Console.WriteLine($"Miner: {block.Miner}");
});
// Start streaming
await client.StartAsync();
// Subscribe
await subscription.SubscribeAsync();
// Keep running
Console.WriteLine("Monitoring new blocks. Press Enter to exit.");
Console.ReadLine();
// Cleanup
await subscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();
using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.RPC.Eth;
// Local Geth/Erigon
var client = new WebSocketClient("ws://localhost:8546");
// Infura
var infuraClient = new WebSocketClient(
"wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID"
);
// Alchemy
var alchemyClient = new WebSocketClient(
"wss://eth-mainnet.g.alchemy.com/v2/YOUR_API_KEY"
);
// Use with RPC services
var ethChainId = new EthChainId(client);
var chainId = await ethChainId.SendRequestAsync();
var ethGasPrice = new EthGasPrice(client);
var gasPrice = await ethGasPrice.SendRequestAsync();
Console.WriteLine($"Chain ID: {chainId.Value}");
Console.WriteLine($"Gas Price: {gasPrice.Value} wei");
// Cleanup
client.Dispose();
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.RPC.Eth.DTOs;
using System.Reactive.Linq;
var client = new StreamingWebSocketClient("ws://localhost:8546");
// Create new block headers subscription
var subscription = new EthNewBlockHeadersSubscription(client);
// Subscribe to new blocks
subscription.GetSubscriptionDataResponsesAsObservable()
.Subscribe(block =>
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] New Block");
Console.WriteLine($" Number: {block.Number.Value}");
Console.WriteLine($" Hash: {block.BlockHash}");
Console.WriteLine($" Parent: {block.ParentHash}");
Console.WriteLine($" Timestamp: {DateTimeOffset.FromUnixTimeSeconds((long)block.Timestamp.Value)}");
Console.WriteLine($" Difficulty: {block.Difficulty.Value}");
Console.WriteLine($" Gas Used: {block.GasUsed.Value:N0}");
Console.WriteLine($" Transactions: {block.TransactionCount()}");
Console.WriteLine();
},
error => Console.WriteLine($"Error: {error.Message}"));
// Start client and subscribe
await client.StartAsync();
await subscription.SubscribeAsync();
Console.WriteLine("Monitoring blocks. Press Enter to stop.");
Console.ReadLine();
// Cleanup
await subscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.Hex.HexTypes;
var client = new StreamingWebSocketClient("wss://mainnet.infura.io/ws/v3/YOUR_PROJECT_ID");
// Create logs subscription for USDC Transfer events
var transferEventSignature = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef";
var usdcAddress = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48";
var filterLogs = new NewFilterInput
{
Address = new[] { usdcAddress },
Topics = new[] { transferEventSignature }
};
var subscription = new EthLogsSubscription(client);
await subscription.SubscribeAsync(filterLogs);
// Handle Transfer events
subscription.GetSubscriptionDataResponsesAsObservable().Subscribe(log =>
{
var from = "0x" + log.Topics[1].ToString().Substring(26);
var to = "0x" + log.Topics[2].ToString().Substring(26);
var amount = new HexBigInteger(log.Data).Value;
Console.WriteLine($"USDC Transfer:");
Console.WriteLine($" From: {from}");
Console.WriteLine($" To: {to}");
Console.WriteLine($" Amount: {amount / 1000000m:N2} USDC"); // USDC has 6 decimals
Console.WriteLine($" Tx: {log.TransactionHash}");
Console.WriteLine();
});
await client.StartAsync();
Console.WriteLine("Monitoring USDC transfers. Press Enter to stop.");
Console.ReadLine();
await subscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using Nethereum.Web3;
var client = new StreamingWebSocketClient("ws://localhost:8546");
// Create pending transactions subscription
var subscription = new EthNewPendingTransactionSubscription(client);
// Handle new pending transactions
subscription.GetSubscriptionDataResponsesAsObservable()
.Buffer(TimeSpan.FromSeconds(1)) // Batch for 1 second
.Subscribe(async txHashes =>
{
if (txHashes.Count > 0)
{
Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] {txHashes.Count} new pending transactions");
// Fetch details for first transaction
var web3 = new Web3(client);
var txDetails = await web3.Eth.Transactions.GetTransactionByHash.SendRequestAsync(txHashes[0]);
if (txDetails != null)
{
Console.WriteLine($" First tx hash: {txDetails.TransactionHash}");
Console.WriteLine($" From: {txDetails.From}");
Console.WriteLine($" To: {txDetails.To}");
Console.WriteLine($" Value: {Web3.Convert.FromWei(txDetails.Value)} ETH");
Console.WriteLine($" Gas Price: {Web3.Convert.FromWei(txDetails.GasPrice, Web3.Convert.UnitConversion.Gwei)} Gwei");
}
}
});
await client.StartAsync();
await subscription.SubscribeAsync();
Console.WriteLine("Monitoring mempool. Press Enter to stop.");
Console.ReadLine();
await subscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
var client = new StreamingWebSocketClient("ws://localhost:8546");
// Create multiple subscriptions
var blockSubscription = new EthNewBlockHeadersSubscription(client);
var pendingTxSubscription = new EthNewPendingTransactionSubscription(client);
var syncSubscription = new EthSyncingSubscription(client);
// Handle new blocks
blockSubscription.GetSubscriptionDataResponsesAsObservable().Subscribe(block =>
{
Console.WriteLine($"[BLOCK] #{block.Number.Value}");
});
// Handle pending transactions (with throttling)
pendingTxSubscription.GetSubscriptionDataResponsesAsObservable()
.Buffer(TimeSpan.FromSeconds(5))
.Subscribe(txHashes =>
{
Console.WriteLine($"[MEMPOOL] {txHashes.Count} pending transactions in last 5s");
});
// Handle sync status
syncSubscription.GetSubscriptionDataResponsesAsObservable().Subscribe(syncStatus =>
{
if (syncStatus.IsSyncing)
{
Console.WriteLine($"[SYNC] Current: {syncStatus.CurrentBlock}, Highest: {syncStatus.HighestBlock}");
}
else
{
Console.WriteLine($"[SYNC] Node is synced");
}
});
// Start client and all subscriptions
await client.StartAsync();
await blockSubscription.SubscribeAsync();
await pendingTxSubscription.SubscribeAsync();
await syncSubscription.SubscribeAsync();
Console.WriteLine("Monitoring multiple streams. Press Enter to stop.");
Console.ReadLine();
// Cleanup all subscriptions
await blockSubscription.UnsubscribeAsync();
await pendingTxSubscription.UnsubscribeAsync();
await syncSubscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();
using Nethereum.JsonRpc.WebSocketClient;
using Nethereum.RPC.Eth;
var client = new WebSocketClient("wss://api.example.com/ws");
// Add custom headers (e.g., API key)
client.RequestHeaders.Add("X-API-Key", "your-api-key-here");
client.RequestHeaders.Add("Authorization", "Bearer your-token");
var ethBlockNumber = new EthBlockNumber(client);
var blockNumber = await ethBlockNumber.SendRequestAsync();
Console.WriteLine($"Block (authenticated): {blockNumber.Value}");
client.Dispose();
CRITICAL for production: Automatic reconnection when WebSocket connection drops:
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System.Reactive.Linq;
public class ProductionBlockMonitor
{
private readonly string url;
private StreamingWebSocketClient client;
public ProductionBlockMonitor(string url)
{
this.url = url;
}
public async Task SubscribeAndRunAsync()
{
if (client == null)
{
client = new StreamingWebSocketClient(url);
// Production pattern: auto-reconnect on error
client.Error += Client_Error;
}
var blockHeaderSubscription = new EthNewBlockHeadersObservableSubscription(client);
// Get subscription ID when subscribed
blockHeaderSubscription.GetSubscribeResponseAsObservable().Subscribe(subscriptionId =>
Console.WriteLine($"Block Header subscription Id: {subscriptionId}"));
// Process new blocks
blockHeaderSubscription.GetSubscriptionDataResponsesAsObservable().Subscribe(
block => Console.WriteLine($"New Block: {block.BlockHash}"),
exception => Console.WriteLine($"BlockHeaderSubscription error info: {exception.Message}")
);
// Handle unsubscribe confirmation
blockHeaderSubscription.GetUnsubscribeResponseAsObservable().Subscribe(response =>
Console.WriteLine($"Block Header unsubscribe result: {response}"));
await client.StartAsync();
await blockHeaderSubscription.SubscribeAsync();
Console.WriteLine("Monitoring blocks with auto-reconnect. Press Enter to stop.");
Console.ReadLine();
await blockHeaderSubscription.UnsubscribeAsync();
}
// Production reconnection handler
private async void Client_Error(object sender, Exception ex)
{
Console.WriteLine($"Client Error, restarting... ({ex.Message})");
// Stop the failed connection
await ((StreamingWebSocketClient)sender).StopAsync();
// Restart everything
await SubscribeAndRunAsync();
}
}
// Usage
var monitor = new ProductionBlockMonitor("ws://localhost:8546");
await monitor.SubscribeAndRunAsync();
Why this pattern works:
Client_Error event catches all WebSocket failuresusing Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Reactive.Eth.Subscriptions;
using System.Reactive.Linq;
var client = new StreamingWebSocketClient("ws://localhost:8546");
var subscription = new EthNewBlockHeadersSubscription(client);
// Advanced reactive processing
subscription.GetSubscriptionDataResponsesAsObservable()
.Window(TimeSpan.FromMinutes(1)) // 1-minute windows
.SelectMany(window => window
.Aggregate(new
{
Count = 0,
TotalGasUsed = BigInteger.Zero,
TotalTransactions = 0
}, (acc, block) => new
{
Count = acc.Count + 1,
TotalGasUsed = acc.TotalGasUsed + block.GasUsed.Value,
TotalTransactions = acc.TotalTransactions + (int)block.TransactionCount()
}))
.Subscribe(stats =>
{
Console.WriteLine($"=== 1-Minute Stats ===");
Console.WriteLine($"Blocks: {stats.Count}");
Console.WriteLine($"Avg Gas/Block: {stats.TotalGasUsed / stats.Count:N0}");
Console.WriteLine($"Total Transactions: {stats.TotalTransactions}");
Console.WriteLine();
});
await client.StartAsync();
await subscription.SubscribeAsync();
Console.WriteLine("Collecting statistics. Press Enter to stop.");
Console.ReadLine();
await subscription.UnsubscribeAsync();
await client.StopAsync();
client.Dispose();
using Nethereum.Web3;
using Nethereum.JsonRpc.WebSocketClient;
// Create WebSocket client
var wsClient = new WebSocketClient("ws://localhost:8546");
// Use with Web3
var web3 = new Web3(wsClient);
// Standard Web3 operations over WebSocket
var balance = await web3.Eth.GetBalance.SendRequestAsync(
"0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb"
);
var blockNumber = await web3.Eth.Blocks.GetBlockNumber.SendRequestAsync();
Console.WriteLine($"Balance: {Web3.Convert.FromWei(balance)} ETH");
Console.WriteLine($"Block: {blockNumber.Value}");
// Cleanup
wsClient.Dispose();
public class WebSocketClient : ClientBase, IDisposable, IClientRequestHeaderSupport
{
public WebSocketClient(string path,
JsonSerializerSettings jsonSerializerSettings = null,
ILogger log = null)
public Dictionary<string, string> RequestHeaders { get; set; }
public TimeSpan ConnectionTimeout { get; set; }
public Task StopAsync()
public Task StopAsync(WebSocketCloseStatus webSocketCloseStatus, string status, CancellationToken timeOutToken)
}
public class StreamingWebSocketClient : IStreamingClient, IDisposable, IClientRequestHeaderSupport
{
public StreamingWebSocketClient(string path,
JsonSerializerSettings jsonSerializerSettings = null,
ILogger log = null)
public Dictionary<string, string> RequestHeaders { get; set; }
public static TimeSpan ConnectionTimeout { get; set; }
public WebSocketState WebSocketState { get; }
public bool IsStarted { get; }
public event WebSocketStreamingErrorEventHandler Error;
public Task StartAsync()
public Task StopAsync()
public bool AddSubscription(string subscriptionId, IRpcStreamingResponseHandler handler)
public bool RemoveSubscription(string subscriptionId)
}
| Subscription | Description |
|---|---|
| EthNewBlockHeadersSubscription | New block headers |
| EthNewPendingTransactionSubscription | Pending transactions (mempool) |
| EthLogsSubscription | Contract event logs |
| EthSyncingSubscription | Node sync status |
Common WebSocket URLs:
| Node/Provider | WebSocket URL |
|---|---|
| Geth (local) | ws://localhost:8546 |
| Erigon (local) | ws://localhost:8545 |
| Infura | wss://mainnet.infura.io/ws/v3/PROJECT_ID |
| Alchemy | wss://eth-mainnet.g.alchemy.com/v2/API_KEY |
| QuickNode | wss://your-endpoint.quiknode.pro/TOKEN/ |
Geth:
geth --ws --ws.addr 0.0.0.0 --ws.port 8546 --ws.api eth,net,web3
Erigon:
erigon --ws --ws.port 8545
| Subscription | Event Rate | Notes |
|---|---|---|
| newHeads | ~12s (mainnet) | One per block |
| pendingTransactions | 100-1000/s | Very high volume |
| logs (filtered) | Variable | Depends on filter |
| syncing | Rare | Only during sync |
Tips:
Buffer() or throttling for high-volume subscriptionsError event to detect connection issuesDispose() to properly close connectionsSome providers limit concurrent subscriptions: