Parallel HTTP operators for Rivulet with resilient downloads, streaming, retry policies, and HttpClientFactory integration.
$ dotnet add package Rivulet.HttpParallel HTTP operations with automatic retries, resilient downloads, and HttpClientFactory integration.
Built on top of Rivulet.Core, this package provides HTTP-aware parallel operators that automatically handle transient failures, respect rate limits, and support resumable downloads.
dotnet add package Rivulet.Http
Requires Rivulet.Core (automatically included).
Fetch multiple URLs in parallel with automatic retry for transient HTTP errors:
using Rivulet.Http;
var urls = new[]
{
new Uri("https://api.example.com/users/1"),
new Uri("https://api.example.com/users/2"),
new Uri("https://api.example.com/users/3")
};
var responses = await urls.GetParallelAsync(
httpClient,
new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10,
MaxRetries = 3
}
});
foreach (var response in responses)
{
var content = await response.Content.ReadAsStringAsync();
Console.WriteLine(content);
response.Dispose();
}
Automatically read response content as strings:
var urls = new[]
{
new Uri("https://api.example.com/data/1"),
new Uri("https://api.example.com/data/2")
};
var contents = await urls.GetStringParallelAsync(httpClient);
foreach (var content in contents)
{
Console.WriteLine(content);
}
Send multiple POST requests in parallel:
var requests = users.Select(user => (
uri: new Uri("https://api.example.com/users"),
content: new StringContent(
JsonSerializer.Serialize(user),
Encoding.UTF8,
"application/json")
));
var responses = await requests.PostParallelAsync(
httpClient,
new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 5,
ErrorMode = ErrorMode.CollectAndContinue
}
});
Use named or typed HttpClient instances with IHttpClientFactory:
using Microsoft.Extensions.DependencyInjection;
var services = new ServiceCollection();
services.AddHttpClient("api", client =>
{
client.BaseAddress = new Uri("https://api.example.com");
client.DefaultRequestHeaders.Add("Authorization", "Bearer token");
});
var provider = services.BuildServiceProvider();
var factory = provider.GetRequiredService<IHttpClientFactory>();
var urls = new[]
{
new Uri("/endpoint1", UriKind.Relative),
new Uri("/endpoint2", UriKind.Relative)
};
var results = await urls.GetStringParallelAsync(
factory,
clientName: "api",
new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10
}
});
Download files in parallel with automatic resume on failure:
var downloads = new[]
{
(uri: new Uri("https://example.com/file1.zip"), path: "downloads/file1.zip"),
(uri: new Uri("https://example.com/file2.zip"), path: "downloads/file2.zip"),
(uri: new Uri("https://example.com/file3.zip"), path: "downloads/file3.zip")
};
var results = await downloads.DownloadParallelAsync(
httpClient,
new StreamingDownloadOptions
{
EnableResume = true,
ValidateContentLength = true,
OnProgressAsync = (uri, downloaded, total) =>
{
var percent = total.HasValue ? (downloaded * 100.0 / total.Value) : 0;
Console.WriteLine($"{uri}: {downloaded:N0}/{total:N0} bytes ({percent:F1}%)");
return ValueTask.CompletedTask;
},
HttpOptions = new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 3,
MaxRetries = 5
}
}
});
foreach (var (uri, filePath, bytesDownloaded) in results)
{
Console.WriteLine($"Downloaded {uri} to {filePath}: {bytesDownloaded:N0} bytes");
}
Download directly to any Stream:
using var memoryStream = new MemoryStream();
var bytesDownloaded = await HttpStreamingExtensions.DownloadToStreamAsync(
new Uri("https://example.com/data.json"),
memoryStream,
httpClient,
new StreamingDownloadOptions
{
BufferSize = 8192,
OnProgressAsync = (uri, downloaded, total) =>
{
Console.WriteLine($"Downloaded: {downloaded:N0} bytes");
return ValueTask.CompletedTask;
}
});
memoryStream.Position = 0;
var data = await JsonSerializer.DeserializeAsync<MyData>(memoryStream);
Rivulet.Http automatically retries transient HTTP errors:
Automatically respects Retry-After headers from rate-limited APIs:
var options = new HttpOptions
{
RespectRetryAfterHeader = true, // Default: true
ParallelOptions = new ParallelOptionsRivulet
{
MaxRetries = 5,
BaseDelay = TimeSpan.FromSeconds(1)
}
};
var responses = await urls.GetParallelAsync(httpClient, options);
When a server returns 429 Too Many Requests or 503 Service Unavailable with a Retry-After header, Rivulet.Http will wait the specified duration before retrying instead of using the configured backoff strategy.
Get notified of HTTP errors with status codes:
var options = new HttpOptions
{
OnHttpErrorAsync = (uri, statusCode, exception) =>
{
Console.WriteLine($"Error fetching {uri}: {statusCode} - {exception.Message}");
return ValueTask.CompletedTask;
},
ParallelOptions = new ParallelOptionsRivulet
{
ErrorMode = ErrorMode.CollectAndContinue,
MaxRetries = 3
}
};
var responses = await urls.GetParallelAsync(httpClient, options);
Customize which HTTP status codes should trigger retries:
var options = new HttpOptions
{
RetriableStatusCodes = new HashSet<HttpStatusCode>
{
HttpStatusCode.TooManyRequests,
HttpStatusCode.ServiceUnavailable,
HttpStatusCode.RequestTimeout
},
ParallelOptions = new ParallelOptionsRivulet
{
MaxRetries = 3,
BackoffStrategy = BackoffStrategy.ExponentialJitter
}
};
Control request rate to respect API limits:
var options = new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10,
RateLimit = new RateLimitOptions
{
TokensPerSecond = 100,
BurstCapacity = 200
}
}
};
var responses = await urls.GetParallelAsync(httpClient, options);
Prevent cascading failures with circuit breaker pattern:
var options = new HttpOptions
{
ParallelOptions = new ParallelOptionsRivulet
{
CircuitBreaker = new CircuitBreakerOptions
{
FailureThreshold = 5,
SuccessThreshold = 2,
OpenTimeout = TimeSpan.FromSeconds(30)
}
}
};
Monitor download progress for long-running operations:
var options = new StreamingDownloadOptions
{
ProgressInterval = TimeSpan.FromSeconds(1),
OnProgressAsync = (uri, downloaded, total) =>
{
var percent = total.HasValue ? (downloaded * 100.0 / total.Value) : 0;
Console.WriteLine($"Progress: {percent:F1}%");
return ValueTask.CompletedTask;
},
OnResumeAsync = (uri, offset) =>
{
Console.WriteLine($"Resuming download from byte {offset:N0}");
return ValueTask.CompletedTask;
},
OnCompleteAsync = (uri, path, bytes) =>
{
Console.WriteLine($"Download complete: {bytes:N0} bytes saved to {path}");
return ValueTask.CompletedTask;
}
};
All standard HTTP methods with parallel operators:
GetParallelAsync, GetStringParallelAsync, GetByteArrayParallelAsyncPostParallelAsyncPutParallelAsyncDeleteParallelAsyncAll methods support both HttpClient and IHttpClientFactory.
HTTP-specific configuration:
var options = new HttpOptions
{
RequestTimeout = TimeSpan.FromSeconds(30),
RespectRetryAfterHeader = true,
RetriableStatusCodes = new HashSet<HttpStatusCode> { /* ... */ },
BufferSize = 81920,
OnHttpErrorAsync = async (uri, status, ex) => { /* ... */ },
ParallelOptions = new ParallelOptionsRivulet
{
MaxDegreeOfParallelism = 10,
MaxRetries = 3,
BaseDelay = TimeSpan.FromMilliseconds(100),
BackoffStrategy = BackoffStrategy.ExponentialJitter,
ErrorMode = ErrorMode.CollectAndContinue
}
};
Download-specific configuration:
var options = new StreamingDownloadOptions
{
EnableResume = true,
ValidateContentLength = true,
OverwriteExisting = false,
BufferSize = 81920,
ProgressInterval = TimeSpan.FromSeconds(1),
HttpOptions = new HttpOptions { /* ... */ }
};
GetParallelAsync, PostParallelAsync, etc.GetStringParallelAsync or GetByteArrayParallelAsync for automatic disposalEnableResume = true for downloads that might be interruptedRespectRetryAfterHeader = true and configure RateLimit for APIsIHttpClientFactory over creating HttpClient instances manuallyRivulet.Http is designed for high-throughput scenarios:
ValueTask<T> in hot pathsSee the samples directory for complete working examples including:
MIT License - see LICENSE file for details