A .NET 10 ETL host scaffolding for high-throughput data pipelines. Excavator wires Extract → Transform → Load stages over bounded Channels with parallel workers, structured Serilog logging, and IHostedService lifecycle.
using Texnomic.Excavator;
public sealed class BlockExcavator(IOptions<ExcavatorOptions> Options, ILogger Logger)
: Excavator<RawBlock, NormalizedBlock>("Blocks", Options, Logger)
{
protected override async ValueTask ExtractorCore(CancellationToken Token)
{
var Raw = await Source.NextBlockAsync(Token);
await TransformerQueue.Writer.WriteAsync(Raw, Token);
}
}
IHostedService.A pre-wired ETL skeleton — Extract, Transform, Load, Report — running over bounded Channels with parallel workers, all behind a familiar .NET hosted-service surface.
Two Channel.CreateBounded queues between stages give you backpressure for free — fast producers never overwhelm slow consumers.
Transform and Load stages run through Parallel.ForEachAsync with MaxDegreeOfParallelism = Environment.ProcessorCount.
Every stage reads a single CancellationToken. StopAsync cancels, completes writers, then awaits every worker — partial batches drain cleanly.
A 1-minute tick logs queue depths for both stages so you can spot a backed-up pipeline without bolting on metrics infrastructure.
Every stage transition logs through [{System}] [{Function}] templates. Property-aware sinks (Seq, ELK, Loki) parse out of the box.
Primary constructors, file-scoped namespaces, nullable-enabled, TreatWarningsAsErrors, deterministic builds, SourceLink + symbol packages.
Subclass Excavator<TTransformer, TLoader> and implement four *Core hooks. The base class runs the loops, owns the channels, and handles cancellation.
One-shot warm-up. Open connections, prime caches, validate configuration — runs once before any worker starts.
Single producer loop. Pulls work from upstream and writes onto TransformerQueue. Backpressure blocks the loop when the queue is full.
N parallel workers (one per logical core). Reads from TransformerQueue, writes to LoaderQueue.
N parallel workers drain LoaderQueue into your sink — database, object store, message bus, anywhere.
Ticks every minute. Logs Transformer and Loader queue depths so you can see bottlenecks in real time.
Any unhandled exception inside a worker logs Fatal and triggers StopAsync. No zombie state.
Install the package, subclass Excavator, register as a hosted service. The rest of the lifecycle is done.
A single NuGet package. Pulls Microsoft.Extensions.Hosting.Abstractions and Serilog as transitive dependencies.
$ dotnet add package Texnomic.Excavator
Override the four *Core hooks. The base class owns the queues, the workers, and the lifecycle.
using Texnomic.Excavator;
public sealed class BlockExcavator(IOptions<ExcavatorOptions> Options, ILogger Logger)
: Excavator<RawBlock, NormalizedBlock>("Blocks", Options, Logger)
{
protected override ValueTask InitializerCore(CancellationToken Token)
=> ValueTask.CompletedTask;
protected override async ValueTask ExtractorCore(CancellationToken Token)
{
var Raw = await Source.NextBlockAsync(Token);
await TransformerQueue.Writer.WriteAsync(Raw, Token);
}
protected override async ValueTask TransformerCore(RawBlock Context, CancellationToken Token)
{
var Normalized = Normalize(Context);
await LoaderQueue.Writer.WriteAsync(Normalized, Token);
}
protected override async ValueTask LoaderCore(NormalizedBlock Context, CancellationToken Token)
=> await Sink.InsertAsync(Context, Token);
}
Add it to the .NET generic host. RunAsync handles SIGINT, draining, and graceful shutdown.
var Builder = Host.CreateApplicationBuilder(args);
Builder.Services.AddSerilog((Services, Configuration) => Configuration
.ReadFrom.Configuration(Builder.Configuration)
.Enrich.FromLogContext());
Builder.Services.Configure<ExcavatorOptions>(Options =>
{
Options.TransformerQueueCapacity = 500;
Options.LoaderQueueCapacity = 500;
Options.TransformerMaxDegreeOfParallelism = 16;
Options.LoaderMaxDegreeOfParallelism = 8;
Options.ReporterInterval = TimeSpan.FromSeconds(30);
});
Builder.Services.AddHostedService<BlockExcavator>();
await Builder.Build().RunAsync();
Need your own knobs — connection strings, batch sizes, feature flags? Inherit ExcavatorOptions and close the open generic over your derived type. Options on the base class is typed as your derived options.
public sealed class BlockExcavatorOptions : ExcavatorOptions
{
public string Network { get; set; } = "mainnet";
public int BatchSize { get; set; } = 100;
}
public sealed class BlockExcavator(IOptions<BlockExcavatorOptions> Options, ILogger Logger)
: Excavator<RawBlock, NormalizedBlock, BlockExcavatorOptions>("Blocks", Options, Logger)
{
protected override async ValueTask ExtractorCore(CancellationToken Token)
{
var Raw = await Source.NextBlockAsync(Options.Network, Options.BatchSize, Token);
await TransformerQueue.Writer.WriteAsync(Raw, Token);
}
}
A producer-consumer pipeline glued together with two bounded Channels. Backpressure is automatic. Cancellation is cooperative. Logging is structured.
Anywhere data has to come in, get reshaped, and land somewhere else — Excavator gives you the lifecycle, the backpressure, and the parallelism for free.
A single package. No platform-specific natives. Restores in seconds.
Install the package, subclass Excavator, register as a hosted service. The pipeline runs itself.