⚠ Deprecated: Legacy
Suggested alternative: NPipeline.Connectors.Postgres
PostgreSQL source and sink nodes for NPipeline using Npgsql - read from and write to PostgreSQL databases with configurable options
$ dotnet add package NPipeline.Connectors.PostgreSQLA PostgreSQL connector for NPipeline data pipelines. Provides source and sink nodes for reading from and writing to PostgreSQL databases with support for multiple write strategies, upsert operations, delivery semantics, checkpointing, convention-based mapping, custom mappers, connection pooling, and streaming.
dotnet add package NPipeline.Connectors.PostgreSQL
using NPipeline.Connectors.PostgreSQL;
using NPipeline.Pipeline;
// Define your model
public record Customer(int Id, string Name, string Email);
// Create a source node
var connectionString = "Host=localhost;Database=mydb;Username=postgres;Password=password";
var source = new PostgresSourceNode<Customer>(
connectionString,
"SELECT id, name, email FROM customers"
);
// Use in a pipeline
var pipeline = new PipelineBuilder()
.AddSource(source, "customer_source")
.AddSink<ConsoleSinkNode<Customer>, Customer>("console_sink")
.Build();
await pipeline.RunAsync();
using NPipeline.Connectors.PostgreSQL;
using NPipeline.Pipeline;
// Define your model
public record Customer(int Id, string Name, string Email);
// Create a sink node
var connectionString = "Host=localhost;Database=mydb;Username=postgres;Password=password";
var sink = new PostgresSinkNode<Customer>(
connectionString,
"customers",
writeStrategy: PostgresWriteStrategy.Batch
);
// Use in a pipeline
var pipeline = new PipelineBuilder()
.AddSource<InMemorySourceNode<Customer>, Customer>("source")
.AddSink(sink, "customer_sink")
.Build();
await pipeline.RunAsync();
The PostgreSQL connector supports URI-based configuration through StorageUri, enabling seamless environment switching without code changes.
using NPipeline.Connectors;
using NPipeline.Connectors.PostgreSQL;
var uri = StorageUri.Parse("postgres://localhost:5432/mydb?username=postgres&password=password");
var source = new PostgresSourceNode<Customer>(uri, "SELECT * FROM customers");
var sink = new PostgresSinkNode<Customer>(uri, "customers");
// Development (local database)
var devUri = StorageUri.Parse("postgres://localhost:5432/mydb?username=postgres&password=devpass");
// Production (AWS RDS)
var prodUri = StorageUri.Parse("postgres://mydb.prod.ap-southeast-2.rds.amazonaws.com:5432/mydb?username=produser&password=${DB_PASSWORD}");
// Same pipeline code works in both environments
var uri = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") == "Production" ? prodUri : devUri;
var source = new PostgresSourceNode<Customer>(uri, "SELECT * FROM customers");
Supported query parameters for PostgreSQL URIs:
| Parameter | Type | Description |
|---|---|---|
username | string | Database username |
password | string | Database password |
sslmode | string | SSL mode: disable, allow, prefer, require, verify-ca, verify-full |
timeout | int | Connection timeout in seconds |
pooling | bool | Enable connection pooling (true/false) |
using NPipeline.Connectors.Abstractions;
using NPipeline.Connectors.PostgreSQL;
var resolver = PostgresStorageResolverFactory.CreateResolver();
var uri = StorageUri.Parse("postgres://localhost:5432/mydb?username=postgres&password=password");
// Provider is resolved automatically
var provider = resolver.ResolveProvider(uri);
var connectionString = ((IDatabaseStorageProvider)provider).GetConnectionString(uri);
PascalCase to snake_case conversionConfigure connector behavior with PostgresConfiguration:
var configuration = new PostgresConfiguration
{
ConnectionString = "Host=localhost;Database=mydb;Username=postgres;Password=password",
Schema = "public",
StreamResults = true,
FetchSize = 1_000,
BatchSize = 1_000,
MaxBatchSize = 5_000,
UseTransaction = true,
WriteStrategy = PostgresWriteStrategy.Batch,
UseUpsert = false,
UseBinaryCopy = false,
DeliverySemantic = DeliverySemantic.AtLeastOnce,
CheckpointStrategy = CheckpointStrategy.None,
MaxRetryAttempts = 3,
RetryDelay = TimeSpan.FromSeconds(1),
ValidateIdentifiers = true,
CommandTimeout = 30,
CopyTimeout = 300
};
| Property | Type | Default | Description |
|---|---|---|---|
ConnectionString | string | "" | PostgreSQL connection string |
Schema | string | "public" | Default schema name |
WriteStrategy | PostgresWriteStrategy | Batch | Write strategy (PerRow, Batch, Copy) |
BatchSize | int | 1000 | Target batch size for batched writes |
MaxBatchSize | int | 5000 | Maximum batch size to prevent unbounded memory |
UseTransaction | bool | true | Wrap writes in a transaction |
UseUpsert | bool | false | Enable ON CONFLICT upsert semantics |
UpsertConflictColumns | string[]? | null | Columns that form the conflict target |
OnConflictAction | OnConflictAction | Update | Conflict resolution action (Update, Ignore) |
UseBinaryCopy | bool | false | Use binary format for COPY operations |
DeliverySemantic | DeliverySemantic | AtLeastOnce | Delivery guarantee semantic |
CheckpointStrategy | CheckpointStrategy | None | Checkpointing strategy for recovery |
CheckpointStorage | ICheckpointStorage? | null | Storage backend for checkpoints |
StreamResults | bool | true | Enable streaming for large result sets |
FetchSize | int | 1000 | Rows to fetch per round-trip when streaming |
CommandTimeout | int | 30 | Command timeout in seconds |
CopyTimeout | int | 300 | COPY operation timeout in seconds |
MaxRetryAttempts | int | 3 | Maximum retry attempts for transient errors |
RetryDelay | TimeSpan | 1 second | Delay between retry attempts |
ValidateIdentifiers | bool | true | Validate SQL identifiers to prevent injection |
UsePreparedStatements | bool | true | Use prepared statements for writes |
The connection string supports all Npgsql options:
Host=localhost;Port=5432;Database=mydb;Username=postgres;Password=password;Timeout=15;Pooling=true;SslMode=Require
Properties are automatically mapped to columns using snake_case conversion:
public record Customer(
int CustomerId, // Maps to customer_id
string FirstName, // Maps to first_name
string EmailAddress // Maps to email_address
);
Override default mapping with attributes:
using NPipeline.Connectors.PostgreSQL.Mapping;
public record Customer(
[PostgresColumn("cust_id", PrimaryKey = true)] int Id,
[PostgresColumn("full_name")] string Name,
[PostgresIgnore] string TemporaryField
);
For complete control, provide a custom mapper function:
var source = new PostgresSourceNode<Customer>(
connectionString,
"SELECT id, name, email FROM customers",
rowMapper: row => new Customer(
row.GetInt32(row.GetOrdinal("id")),
row.GetString(row.GetOrdinal("name")),
row.GetString(row.GetOrdinal("email"))
)
);
The PostgreSQL connector supports three write strategies, each optimized for different use cases.
Writes each row individually with a separate INSERT statement.
Best for:
var sink = new PostgresSinkNode<Customer>(
connectionString,
"customers",
writeStrategy: PostgresWriteStrategy.PerRow
);
Trade-offs: Higher overhead for large datasets due to individual round-trips, but provides better error isolation.
Buffers rows and issues a single multi-value INSERT statement (e.g., INSERT INTO table VALUES (...), (...), (...)).
Best for:
var configuration = new PostgresConfiguration
{
BatchSize = 1_000,
MaxBatchSize = 5_000,
UseTransaction = true
};
var sink = new PostgresSinkNode<Customer>(
connectionString,
"customers",
writeStrategy: PostgresWriteStrategy.Batch,
configuration: configuration
);
Trade-offs: 10-100x faster than PerRow for bulk operations, but all rows in a batch succeed or fail together.
Uses PostgreSQL's native COPY protocol for maximum throughput. Supports both text and binary formats.
Best for:
var configuration = new PostgresConfiguration
{
CopyTimeout = 600, // 10 minutes for large loads
UseBinaryCopy = true // Use binary format for better performance
};
var sink = new PostgresSinkNode<Customer>(
connectionString,
"customers",
writeStrategy: PostgresWriteStrategy.Copy,
configuration: configuration
);
Binary vs Text Format:
UseBinaryCopy = false): More compatible, easier to debugUseBinaryCopy = true): 20-30% faster, more efficient encodingTrade-offs: Fastest option for large datasets, but bypasses some PostgreSQL processing layers and has limited error granularity.
| Strategy | Throughput | Latency | Error Isolation | Use Case |
|---|---|---|---|---|
| PerRow | Low | Low | High | Real-time, small batches |
| Batch | High | Medium | Medium | Bulk loading, ETL |
| Copy | Very High | High | Low | Large bulk loads, data warehouse |
The connector supports PostgreSQL's ON CONFLICT clause for upsert operations, allowing you to insert rows or update them if they already exist.
Enable upsert by setting UseUpsert = true and specifying the conflict columns:
var configuration = new PostgresConfiguration
{
UseUpsert = true,
UpsertConflictColumns = new[] { "id" }, // Primary key or unique constraint columns
OnConflictAction = OnConflictAction.Update // Update on conflict
};
var sink = new PostgresSinkNode<Customer>(
connectionString,
"customers",
writeStrategy: PostgresWriteStrategy.Batch,
configuration: configuration
);
Updates non-conflict columns with values from the inserted row:
INSERT INTO customers (id, name, email)
VALUES (1, 'John Doe', 'john@example.com')
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email
var configuration = new PostgresConfiguration
{
UseUpsert = true,
UpsertConflictColumns = new[] { "id" },
OnConflictAction = OnConflictAction.Update
};
Silently skips conflicting rows without raising an error:
INSERT INTO customers (id, name, email)
VALUES (1, 'John Doe', 'john@example.com')
ON CONFLICT (id) DO NOTHING
var configuration = new PostgresConfiguration
{
UseUpsert = true,
UpsertConflictColumns = new[] { "id" },
OnConflictAction = OnConflictAction.Ignore
};
For tables with composite unique constraints:
public record OrderItem(int OrderId, int ProductId, int Quantity, decimal UnitPrice);
var configuration = new PostgresConfiguration
{
UseUpsert = true,
UpsertConflictColumns = new[] { "order_id", "product_id" }, // Composite key
OnConflictAction = OnConflictAction.Update
};
var sink = new PostgresSinkNode<OrderItem>(
connectionString,
"order_items",
writeStrategy: PostgresWriteStrategy.Batch,
configuration: configuration
);
Upsert works with all write strategies:
Why use upsert: Upsert eliminates the need for separate insert/update logic and handles race conditions where a row might be inserted between your check and insert operations.
The connector supports three delivery semantics to control data consistency guarantees during failures.
Guarantees that every item is delivered at least once. Items may be duplicated on retry after a failure.
var configuration = new PostgresConfiguration
{
DeliverySemantic = DeliverySemantic.AtLeastOnce,
UseTransaction = true
};
Characteristics:
Use when:
Guarantees that every item is delivered at most once. Items may be lost on failure, but never duplicated.
var configuration = new PostgresConfiguration
{
DeliverySemantic = DeliverySemantic.AtMostOnce
};
Characteristics:
Use when:
Guarantees that every item is delivered exactly once. Uses transactional semantics to prevent both data loss and duplication.
var configuration = new PostgresConfiguration
{
DeliverySemantic = DeliverySemantic.ExactlyOnce,
UseTransaction = true,
CheckpointStrategy = CheckpointStrategy.Offset,
CheckpointStorage = new FileCheckpointStorage("checkpoints.json")
};
Characteristics:
Use when:
| Semantic | Data Loss | Duplicates | Overhead | Use Case |
|---|---|---|---|---|
| AtLeastOnce | No | Possible | Low | General purpose, idempotent ops |
| AtMostOnce | Possible | No | Low | Telemetry, metrics |
| ExactlyOnce | No | No | High | Financial, audit |
Checkpointing enables pipelines to resume from where they left off after a failure, rather than restarting from the beginning.
No checkpointing is performed. Failures require restarting from the beginning.
var configuration = new PostgresConfiguration
{
CheckpointStrategy = CheckpointStrategy.None
};
Stores checkpoint state in memory. Enables recovery from transient failures during a single process execution.
var configuration = new PostgresConfiguration
{
CheckpointStrategy = CheckpointStrategy.InMemory,
StreamResults = true
};
var source = new PostgresSourceNode<Order>(
connectionString,
"SELECT * FROM orders ORDER BY id", // ORDER BY is required for checkpointing
configuration: configuration
);
Limitations: Checkpoint state is lost when the process terminates.
Persists numeric offset checkpoints to external storage. Tracks position using a monotonically increasing column (e.g., auto-increment ID).
var configuration = new PostgresConfiguration
{
CheckpointStrategy = CheckpointStrategy.Offset,
CheckpointOffsetColumn = "id",
CheckpointStorage = new FileCheckpointStorage("checkpoints/order_offset.json")
};
var source = new PostgresSourceNode<Order>(
connectionString,
"SELECT * FROM orders WHERE id > @lastCheckpoint ORDER BY id",
configuration: configuration
);
Requirements:
CheckpointOffsetColumn to be specifiedCheckpointStorage to persist checkpointsORDER BY on the offset columnTracks processed items using composite keys. Useful for tables without a single monotonic column.
var configuration = new PostgresConfiguration
{
CheckpointStrategy = CheckpointStrategy.KeyBased,
CheckpointKeyColumns = new[] { "customer_id", "order_date" },
CheckpointStorage = new FileCheckpointStorage("checkpoints/order_keys.json")
};
Requirements:
CheckpointKeyColumns to be specifiedCheckpointStorage to persist checkpointsTracks cursor position for cursor-based iteration.
var configuration = new PostgresConfiguration
{
CheckpointStrategy = CheckpointStrategy.Cursor,
CheckpointStorage = new FileCheckpointStorage("checkpoints/cursor.json")
};
Tracks WAL position for PostgreSQL logical replication. Enables capturing changes from the PostgreSQL write-ahead log.
var configuration = new PostgresConfiguration
{
CheckpointStrategy = CheckpointStrategy.CDC,
CdcSlotName = "my_pipeline_slot",
CdcPublicationName = "my_publication",
CheckpointStorage = new FileCheckpointStorage("checkpoints/cdc.json")
};
Requirements:
CdcSlotName to be specifiedImplement ICheckpointStorage to persist checkpoints to your preferred backend:
public interface ICheckpointStorage
{
Task<Checkpoint?> LoadAsync(string pipelineId, CancellationToken cancellationToken = default);
Task SaveAsync(string pipelineId, Checkpoint checkpoint, CancellationToken cancellationToken = default);
}
Built-in implementations:
FileCheckpointStorage - Stores checkpoints in a JSON fileInMemoryCheckpointStorage - Stores checkpoints in memory (for testing)Configure how frequently checkpoints are saved:
var configuration = new PostgresConfiguration
{
CheckpointStrategy = CheckpointStrategy.Offset,
CheckpointInterval = new CheckpointIntervalConfiguration
{
RowCount = 10_000, // Save every 10,000 rows
TimeInterval = TimeSpan.FromMinutes(5) // Or every 5 minutes, whichever comes first
}
};
Register the connector with dependency injection for production applications:
using Microsoft.Extensions.DependencyInjection;
using NPipeline.Connectors.PostgreSQL.DependencyInjection;
var services = new ServiceCollection()
.AddPostgresConnector(options =>
{
options.DefaultConnectionString = "Host=localhost;Database=mydb;Username=postgres;Password=password";
options.AddOrUpdateConnection("analytics", "Host=localhost;Database=analytics;Username=postgres;Password=postgres");
options.DefaultConfiguration = new PostgresConfiguration
{
StreamResults = true,
FetchSize = 1_000,
BatchSize = 1_000
};
})
.BuildServiceProvider();
var pool = services.GetRequiredService<IPostgresConnectionPool>();
var sourceFactory = services.GetRequiredService<PostgresSourceNodeFactory>();
var sinkFactory = services.GetRequiredService<PostgresSinkNodeFactory>();
var source = new PostgresSourceNode<Customer>(
pool,
"SELECT * FROM customers",
connectionName: "analytics"
);
Enable streaming for large result sets to reduce memory usage:
var configuration = new PostgresConfiguration
{
StreamResults = true,
FetchSize = 1_000
};
var source = new PostgresSourceNode<Customer>(
connectionString,
"SELECT * FROM large_table",
configuration: configuration
);
Why streaming matters: Without streaming, the entire result set is loaded into memory. Streaming fetches rows in batches, allowing you to process millions of rows without memory issues.
The PostgreSQL connector includes a companion analyzer package that provides compile-time diagnostics to help prevent common mistakes when using checkpointing.
dotnet add package NPipeline.Connectors.PostgreSQL.Analyzers
Category: Reliability Default Severity: Warning
When using checkpointing with PostgreSQL source nodes, the SQL query must include an ORDER BY clause on a unique, monotonically increasing column. This
ensures consistent row ordering across checkpoint restarts. Without proper ordering, checkpointing may skip rows or process duplicates.
// ❌ Warning: Missing ORDER BY clause
var source = new PostgresSourceNode<MyRecord>(
connectionString,
"SELECT id, name, created_at FROM my_table",
configuration: new PostgresConfiguration
{
CheckpointStrategy = CheckpointStrategy.Offset
}
);
// ✅ Correct: Includes ORDER BY clause
var source = new PostgresSourceNode<MyRecord>(
connectionString,
"SELECT id, name, created_at FROM my_table ORDER BY id",
configuration: new PostgresConfiguration
{
CheckpointStrategy = CheckpointStrategy.Offset
}
);
Checkpointing tracks the position of processed rows to enable recovery from failures. Without a consistent ORDER BY clause:
Use a unique, monotonically increasing column such as:
id (primary key)created_at (timestamp)updated_at (timestamp)timestamp (timestamp column)For more details, see the PostgreSQL Analyzer documentation.
Configure retries for transient failures:
var configuration = new PostgresConfiguration
{
MaxRetryAttempts = 3,
RetryDelay = TimeSpan.FromSeconds(2)
};
try
{
await pipeline.RunAsync();
}
catch (NpgsqlException ex) when (ex.IsTransient)
{
// Retry operation
await Task.Delay(TimeSpan.FromSeconds(5));
await pipeline.RunAsync();
}
Configure SSL/TLS for secure connections:
var configuration = new PostgresConfiguration
{
// SSL mode is configured via connection string
ConnectionString = "Host=localhost;Database=mydb;Username=postgres;Password=password;SslMode=Require"
};
Available SSL modes: Disable, Allow, Prefer, Require, VerifyCa, VerifyFull
The connector uses prepared statements by default (UsePreparedStatements = true). Prepared statements:
Consider disabling UsePreparedStatements only for:
| Scenario | Prepared Statements | Performance Impact |
|---|---|---|
| Repeated inserts (same query pattern) | Enabled | 10-30% faster |
| Ad-hoc queries (different each time) | Enabled | 5-10% overhead |
| One-time bulk operations | Disabled | No impact |
For comprehensive documentation including advanced scenarios, configuration reference, and best practices, see the PostgreSQL Connector documentation.
MIT License - see LICENSE file for details.