Data Lake table abstractions for NPipeline - partitioned writes, manifests, snapshots, and time travel with Parquet as the default format
$ dotnet add package NPipeline.Connectors.DataLakeNPipeline Data Lake Connector provides table abstractions for building data lakes on top of the Parquet connector. This package enables partitioned writes, manifest-based table management, time travel queries, and small-file compaction with Parquet as the default storage format.
NPipeline is a high-performance, extensible data processing framework for .NET that enables developers to build scalable and efficient pipeline-based applications. It provides a rich set of components for data transformation, aggregation, branching, and parallel processing, with built-in support for resilience patterns and error handling.
dotnet add package NPipeline.Connectors.DataLake
This package builds on NPipeline.Connectors.Parquet and uses Parquet as its default file format. The Data Lake
connector adds:
column=value/ patternsWhy this separation: The Parquet connector handles single-file I/O with full Parquet feature support. The Data Lake connector adds table-level semantics ( partitioning, snapshots, time travel) without duplicating the Parquet implementation. This allows using either package independently or together.
column=value/ directory structure for query engine compatibilityUse PartitionSpec<T> to define how records map to partition directories:
using NPipeline.Connectors.DataLake.Partitioning;
public class SalesRecord
{
public long Id { get; set; }
public string ProductName { get; set; } = string.Empty;
public decimal Amount { get; set; }
public DateTime EventDate { get; set; } // Partition column
public string Region { get; set; } = string.Empty; // Partition column
}
// Single-level partitioning
var spec = PartitionSpec<SalesRecord>.By(x => x.EventDate);
// Multi-level partitioning (produces: event_date=2025-01-15/region=EU/)
var spec = PartitionSpec<SalesRecord>
.By(x => x.EventDate)
.ThenBy(x => x.Region);
// Custom column names
var spec = PartitionSpec<SalesRecord>
.By(x => x.EventDate, "date")
.ThenBy(x => x.Region, "geo_region");
// No partitioning (all files in table root)
var spec = PartitionSpec<SalesRecord>.None();
Why fluent builder: The fluent API makes partition schemes readable and discoverable. Property expressions are compiled to delegates for efficient runtime
evaluation. Column names default to snake_case (e.g., EventDate → event_date) following Hive conventions.
Use DataLakeTableWriter<T> to write partitioned Parquet files:
using NPipeline.Connectors.DataLake;
using NPipeline.Connectors.DataLake.Partitioning;
using NPipeline.Connectors.Parquet;
using NPipeline.StorageProviders;
using NPipeline.StorageProviders.Models;
using NPipeline.DataFlow.DataPipes;
var resolver = StorageProviderFactory.CreateResolver();
var provider = StorageProviderFactory.GetProviderOrThrow(
resolver,
StorageUri.Parse("file:///data/warehouse"));
var tableUri = StorageUri.Parse("file:///data/warehouse/sales_table");
var partitionSpec = PartitionSpec<SalesRecord>
.By(x => x.EventDate)
.ThenBy(x => x.Region);
var config = new ParquetConfiguration
{
RowGroupSize = 100_000,
Compression = Parquet.CompressionMethod.Snappy
};
// Write data
await using var writer = new DataLakeTableWriter<SalesRecord>(
provider,
tableUri,
partitionSpec,
config);
Console.WriteLine($"Snapshot ID: {writer.SnapshotId}");
var dataPipe = new InMemoryDataPipe<SalesRecord>(records, "SalesData");
await writer.AppendAsync(dataPipe, CancellationToken.None);
Generated directory structure:
sales_table/
├── _manifest/
│ ├── manifest.ndjson
│ └── snapshots/
│ └── 20250215093045abcd1234.ndjson
├── event_date=2025-01-15-00-00-00/
│ ├── region=EU/
│ │ └── part-001.parquet
│ └── region=US/
│ └── part-001.parquet
└── event_date=2025-01-16-00-00-00/
└── region=APAC/
└── part-001.parquet
Use DataLakeTableSourceNode<T> to read all data in a table:
using NPipeline.Connectors.DataLake;
// Read latest snapshot
var sourceNode = new DataLakeTableSourceNode<SalesRecord>(
provider,
tableUri);
// Use in a pipeline
var builder = new PipelineBuilder();
var source = builder.AddSource(() => sourceNode, "table-source");
The source node:
ParquetSourceNode<T>Read table state as of a specific point in time:
using NPipeline.Connectors.DataLake;
// Read as of a specific timestamp
var asOfTimestamp = new DateTimeOffset(2025, 1, 15, 12, 0, 0, TimeSpan.Zero);
var timeTravelSource = new DataLakeTableSourceNode<SalesRecord>(
provider,
tableUri,
asOfTimestamp);
// Read a specific snapshot by ID
var snapshotSource = new DataLakeTableSourceNode<SalesRecord>(
provider,
tableUri,
snapshotId: "20250215093045abcd1234");
Why time travel: Time travel enables:
The manifest is stored as NDJSON at _manifest/manifest.ndjson:
{"path":"event_date=2025-01-15/region=EU/part-001.parquet","row_count":5000,"written_at":"2025-01-15T10:30:45Z","file_size_bytes":245678,"partition_values":{"event_date":"2025-01-15","region":"EU"},"snapshot_id":"20250215103045abcd1234","format_version":"v1","file_format":"parquet","compression":"snappy"}
{"path":"event_date=2025-01-15/region=US/part-001.parquet","row_count":3200,"written_at":"2025-01-15T10:30:46Z","file_size_bytes":198234,"partition_values":{"event_date":"2025-01-15","region":"US"},"snapshot_id":"20250215103045abcd1234","format_version":"v1","file_format":"parquet"}
Each ManifestEntry tracks:
| Field | Description |
|---|---|
path | Relative path from table base |
row_count | Number of rows in the file |
written_at | Timestamp when file was written |
file_size_bytes | File size in bytes |
partition_values | Partition key/value pairs |
snapshot_id | ID of the snapshot containing this file |
content_hash | Optional hash for integrity verification |
file_format | Format (e.g., "parquet") |
compression | Compression codec used |
Why NDJSON: Newline-delimited JSON allows:
using NPipeline.Connectors.DataLake.Manifest;
var manifestReader = new ManifestReader(provider, tableUri);
// Read all entries
var entries = await manifestReader.ReadAllAsync(cancellationToken);
foreach (var entry in entries)
{
Console.WriteLine($"Path: {entry.Path}");
Console.WriteLine($" Rows: {entry.RowCount}, Size: {entry.FileSizeBytes:N0} bytes");
Console.WriteLine($" Snapshot: {entry.SnapshotId}");
Console.WriteLine($" Written: {entry.WrittenAt:yyyy-MM-dd HH:mm:ss}");
if (entry.PartitionValues is not null)
{
var partitions = string.Join(", ",
entry.PartitionValues.Select(kv => $"{kv.Key}={kv.Value}"));
Console.WriteLine($" Partitions: {partitions}");
}
}
// Get available snapshot IDs
var snapshotIds = await manifestReader.GetSnapshotIdsAsync(cancellationToken);
Use DataLakeCompactor to consolidate small files:
using NPipeline.Connectors.DataLake;
using NPipeline.Connectors.DataLake.FormatAdapters;
var compactor = new DataLakeCompactor(provider, tableUri, new ParquetConfiguration());
// Dry run to see what would be compacted
var dryRunRequest = new TableCompactRequest
{
TableBasePath = tableUri,
Provider = provider,
SmallFileThresholdBytes = 32L * 1024 * 1024, // 32 MB
MinFilesToCompact = 5,
MaxFilesToCompact = 100,
TargetFileSizeBytes = 256L * 1024 * 1024, // 256 MB
DryRun = true,
DeleteOriginalFiles = true
};
var dryRunResult = await compactor.CompactAsync(dryRunRequest, cancellationToken);
Console.WriteLine($"Would compact {dryRunResult.FilesCompacted} files into {dryRunResult.FilesCreated}");
// Perform actual compaction
var actualRequest = dryRunRequest with { DryRun = false };
var result = await compactor.CompactAsync(actualRequest, cancellationToken);
Console.WriteLine($"Compacted {result.FilesCompacted} files in {result.Duration.TotalSeconds:N1}s");
Console.WriteLine($"Bytes: {result.BytesBefore:N0} → {result.BytesAfter:N0}");
Why compaction: Small files degrade query performance because:
Compaction merges small files into fewer, larger files while preserving data and updating the manifest.
Implement ITableFormatAdapter to support alternative table formats:
using NPipeline.Connectors.DataLake.FormatAdapters;
public class IcebergFormatAdapter : ITableFormatAdapter
{
public string Name => "iceberg";
public Task AppendAsync(TableAppendRequest request, CancellationToken cancellationToken = default)
{
// Implement Iceberg-specific commit protocol
// Write metadata files, update snapshot log, etc.
}
public Task<TableSnapshot> GetSnapshotAsync(TableSnapshotRequest request, CancellationToken cancellationToken = default)
{
// Read Iceberg metadata to resolve snapshot
}
public Task<TableCompactResult> CompactAsync(TableCompactRequest request, CancellationToken cancellationToken = default)
{
// Implement compaction with Iceberg metadata updates
}
public Task<IReadOnlyList<SnapshotSummary>> ListSnapshotsAsync(StorageUri tableBasePath, CancellationToken cancellationToken = default)
{
// Read Iceberg snapshot log
}
public Task<bool> TableExistsAsync(StorageUri tableBasePath, CancellationToken cancellationToken = default)
{
// Check for Iceberg metadata files
}
public Task CreateTableAsync(StorageUri tableBasePath, CancellationToken cancellationToken = default)
{
// Initialize Iceberg table metadata
}
}
Why format adapters: Different table formats (Iceberg, Delta Lake, Hudi) have different:
The adapter interface isolates format-specific logic while reusing the core partitioning and file I/O infrastructure.
The connector generates standard Hive-style partition paths for compatibility with query engines:
event_date=2025-01-15/region=EU/
Path format rules:
| CLR Type | Path Format | Example |
|---|---|---|
DateOnly | yyyy-MM-dd | 2025-01-15 |
DateTime | yyyy-MM-dd-HH-mm-ss | 2025-01-15-14-30-00 |
DateTimeOffset | yyyy-MM-dd-HH-mm-ss | 2025-01-15-14-30-00 |
string | URL-encoded | Hello%20World |
enum | Lowercase name | active |
Guid | Lowercase D format | a1b2c3d4-e5f6-7890-abcd-ef1234567890 |
| Numeric types | Invariant culture | 12345, 3.14 |
Why Hive-style: Hive-style partitioning is supported by:
This allows the same data files to be queried by multiple engines without ETL.
Target file sizes between 256 MB and 1 GB for optimal query performance:
var config = new ParquetConfiguration
{
RowGroupSize = 100_000,
TargetFileSizeBytes = 512L * 1024 * 1024 // 512 MB
};
Control memory during high-cardinality partition writes:
var config = new ParquetConfiguration
{
MaxBufferedRows = 500_000, // Total rows across all partition buffers
RowGroupSize = 50_000 // Rows per row group
};
Use idempotency keys to prevent duplicate data after retries:
var request = new TableAppendRequest<SalesRecord>
{
TableBasePath = tableUri,
Provider = provider,
PartitionSpec = partitionSpec,
IdempotencyKey = $"batch-{batchId}" // Deduplicates on retry
};
The manifest is the source of truth for table contents. Consider:
_manifest/ directory in object storageRun compaction as a scheduled job:
// Compact files smaller than 32 MB into 256 MB files
var request = new TableCompactRequest
{
TableBasePath = tableUri,
Provider = provider,
SmallFileThresholdBytes = 32L * 1024 * 1024,
TargetFileSizeBytes = 256L * 1024 * 1024,
MinFilesToCompact = 10, // Only compact when enough small files exist
DeleteOriginalFiles = true
};
using NPipeline.Connectors.DataLake;
using NPipeline.Connectors.DataLake.Partitioning;
using NPipeline.Connectors.Parquet;
using NPipeline.Connectors.Parquet.Attributes;
using NPipeline.Pipeline;
using NPipeline.StorageProviders;
using NPipeline.StorageProviders.Models;
public class SalesRecord
{
[ParquetColumn("sale_id")]
public long Id { get; set; }
[ParquetColumn("product_name")]
public string ProductName { get; set; } = string.Empty;
[ParquetDecimal(18, 2)]
public decimal Amount { get; set; }
public DateTime EventDate { get; set; } // Partition column
public string Region { get; set; } = string.Empty; // Partition column
}
public class DataLakePipeline : IPipelineDefinition
{
private readonly StorageUri _tableUri = StorageUri.Parse("s3://warehouse/sales_table/");
public void Define(PipelineBuilder builder, PipelineContext context)
{
var resolver = StorageProviderFactory.CreateResolver();
var provider = StorageProviderFactory.GetProviderOrThrow(resolver, _tableUri);
var partitionSpec = PartitionSpec<SalesRecord>
.By(x => x.EventDate)
.ThenBy(x => x.Region);
var config = new ParquetConfiguration
{
RowGroupSize = 100_000,
Compression = Parquet.CompressionMethod.Snappy,
TargetFileSizeBytes = 512L * 1024 * 1024
};
// Source: Read from Data Lake table with time travel
var asOfDate = new DateTimeOffset(2025, 1, 15, 0, 0, 0, TimeSpan.Zero);
var source = builder.AddSource(
new DataLakeTableSourceNode<SalesRecord>(provider, _tableUri, asOfDate),
"lake-source");
// Transform: Process records
var transform = builder.AddTransform<SalesTransform, SalesRecord, SalesRecord>("transform");
// Sink: Write back to Data Lake with partitioning
var sink = builder.AddSink(
new DataLakePartitionedSinkNode<SalesRecord>(
provider,
_tableUri,
partitionSpec,
config),
"lake-sink");
builder.Connect(source, transform);
builder.Connect(transform, sink);
}
}
MIT License - see LICENSE file for details.