Implementing a High-Throughput Predictive Maintenance Data Pipeline with C#, TimescaleDB, and Keras


The initial system design for our industrial equipment monitoring platform was straightforward and ultimately, naive. A fleet of turbines, each with hundreds of sensors reporting vibration, temperature, and pressure, would push data to a .NET API endpoint. This endpoint would then execute a simple INSERT into a standard PostgreSQL table. At low volumes during staging, it worked. In production, with over 50,000 data points per second, the database became the bottleneck. Write latencies skyrocketed, VACUUM processes couldn’t keep up, and any analytical query attempting to calculate rolling averages for our anomaly detection models would time out. The core business requirement—near real-time predictive maintenance—was impossible. We were data-rich but information-poor, and the system was collapsing under its own weight.

This led to a complete architectural rethink, centered on three core technology choices designed to work in concert to solve specific problems in the data pipeline: TimescaleDB for scalable time-series ingestion and pre-computation, C#/.NET for the high-performance orchestration and data transport layer, and Keras for the ML inference endpoint. This wasn’t about picking the trendiest technologies; it was about selecting tools for their specific strengths in a high-throughput environment.

The first point of failure was the database. A standard relational database is not optimized for the relentless, append-only firehose of time-series data. The indexing structures, particularly B-trees, become bloated and inefficient with this workload. We selected TimescaleDB, an extension for PostgreSQL, because it directly addresses this pain point with hypertables. A hypertable automatically partitions data into smaller “chunks” based on time, so recent data is written to newer, smaller, more efficient tables and indexes.

Here is the foundational schema for our raw sensor readings. The key is converting a standard table into a hypertable.

-- Represents raw data from a single sensor at a point in time.
CREATE TABLE sensor_readings (
    time        TIMESTAMPTZ       NOT NULL,
    turbine_id  UUID              NOT NULL,
    sensor_id   UUID              NOT NULL,
    -- Using JSONB is a pragmatic choice for accommodating
    -- varying sensor types without schema migrations.
    -- In a real-world project, performance-critical values might be
    -- promoted to their own columns (e.g., temperature DOUBLE PRECISION).
    payload     JSONB             NOT NULL
);

-- This is the crucial TimescaleDB step. It transforms the standard table
-- into a hypertable partitioned by the 'time' column.
-- We chose a 1-day chunk interval, a good starting point for our data volume.
SELECT create_hypertable('sensor_readings', 'time', chunk_time_interval => INTERVAL '1 day');

-- A composite index is vital for efficient lookups of a specific sensor
-- on a specific turbine over a time range. Time is always the first column.
CREATE INDEX ix_sensor_readings_turbine_sensor_time ON sensor_readings (turbine_id, sensor_id, time DESC);

-- Enable TimescaleDB's native compression. After 7 days, chunks will be
-- compressed, saving significant storage costs for historical data
-- that is queried less frequently.
ALTER TABLE sensor_readings SET (
  timescaledb.compress,
  timescaledb.compress_segmentby = 'turbine_id, sensor_id'
);
SELECT add_compression_policy('sensor_readings', INTERVAL '7 days');

With the database schema prepared to handle the write load, the next bottleneck was the ingestion application itself. The original C# API opened a database connection and performed an INSERT for every single HTTP request. This is catastrophically inefficient due to connection overhead and transaction management. The solution is batching. We implemented a decoupled, in-memory pipeline using .NET’s System.Threading.Channels to buffer incoming requests and a background service to perform high-performance bulk writes.

This architecture decouples the web API’s responsibility (accepting data, acknowledging receipt) from the database writer’s responsibility (efficiently persisting data).

sequenceDiagram
    participant Turbine as Turbine Sensor
    participant IngestionAPI as C# Ingestion API (.NET)
    participant Channel as In-Memory Channel
    participant BatchWriter as C# Batch Writer (IHostedService)
    participant TimescaleDB

    Turbine->>+IngestionAPI: POST /ingest (JSON payload)
    IngestionAPI->>+Channel: Write(sensorReading)
    Channel-->>-IngestionAPI: Acknowledge (non-blocking)
    IngestionAPI-->>-Turbine: 202 Accepted
    loop Batch Processing
        BatchWriter->>+Channel: ReadBatchAsync()
        Note over BatchWriter: Gathers readings until batch size or timeout
        Channel-->>-BatchWriter: Batch of SensorReadings
        BatchWriter->>+TimescaleDB: Perform Bulk Copy Operation
        TimescaleDB-->>-BatchWriter: Success
    end

The C# implementation for this ingestion pipeline is robust, including configuration, logging, and a graceful shutdown mechanism.

First, a simple model and the channel service for dependency injection.

// Models/SensorReading.cs
public record SensorReading(DateTimeOffset Time, Guid TurbineId, Guid SensorId, string Payload);

// Services/IngestionChannel.cs
using System.Threading.Channels;

public class IngestionChannel
{
    private readonly Channel<SensorReading> _channel;

    public IngestionChannel()
    {
        // An unbounded channel is used here. In a production system with memory constraints,
        // a Bounded channel with a backpressure strategy (e.g., Wait or Drop) is safer.
        _channel = Channel.CreateUnbounded<SensorReading>();
    }

    public ChannelWriter<SensorReading> Writer => _channel.Writer;
    public ChannelReader<SensorReading> Reader => _channel.Reader;
}

The API controller becomes incredibly simple. Its only job is to deserialize the request and drop it into the channel. It returns 202 Accepted immediately, providing high throughput.

// Program.cs (using .NET 7 Minimal APIs)

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<IngestionChannel>();
builder.Services.AddHostedService<BatchWriterService>();
// ... other services like NpgsqlDataSource

var app = builder.Build();

app.MapPost("/ingest", async (SensorReading reading, IngestionChannel channel) =>
{
    // A common mistake is to await the WriteAsync call here.
    // For maximum throughput, we fire-and-forget. The channel handles the queueing.
    // Error handling for a full channel would be needed for a Bounded channel.
    await channel.Writer.WriteAsync(reading);
    return Results.Accepted();
});

app.Run();

The core of the work happens in the BatchWriterService, an IHostedService that runs in the background for the application’s lifetime.

// Services/BatchWriterService.cs
public class BatchWriterService : BackgroundService
{
    private readonly ILogger<BatchWriterService> _logger;
    private readonly IngestionChannel _channel;
    private readonly NpgsqlDataSource _dataSource; // Injected as a singleton
    private readonly int _batchSize;
    private readonly TimeSpan _batchTimeout;

    public BatchWriterService(
        ILogger<BatchWriterService> logger,
        IngestionChannel channel,
        NpgsqlDataSource dataSource,
        IConfiguration configuration)
    {
        _logger = logger;
        _channel = channel;
        _dataSource = dataSource;
        _batchSize = configuration.GetValue<int>("Ingestion:BatchSize", 1000);
        _batchTimeout = TimeSpan.FromMilliseconds(configuration.GetValue<int>("Ingestion:BatchTimeoutMs", 500));
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Batch Writer Service started.");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var batch = await ReadBatchAsync(stoppingToken);

                if (batch.Count > 0)
                {
                    await WriteBatchToDatabaseAsync(batch, stoppingToken);
                }
            }
            catch (OperationCanceledException)
            {
                // Expected when the application is shutting down.
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "An unexpected error occurred in the batch writer service.");
                // Delay to prevent a tight loop of failures
                await Task.Delay(5000, stoppingToken);
            }
        }

        _logger.LogInformation("Batch Writer Service stopping. Processing final batch...");
        // Process any remaining items on shutdown
        var finalBatch = await ReadRemainingAsBatchAsync();
        if(finalBatch.Count > 0)
        {
            await WriteBatchToDatabaseAsync(finalBatch, CancellationToken.None);
        }
    }

    private async Task<List<SensorReading>> ReadBatchAsync(CancellationToken stoppingToken)
    {
        var batch = new List<SensorReading>(_batchSize);
        // Use a linked CancellationTokenSource to implement the timeout
        using var timeoutCts = new CancellationTokenSource(_batchTimeout);
        using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, timeoutCts.Token);

        try
        {
            // Wait for the first item
            await _channel.Reader.WaitToReadAsync(linkedCts.Token);
            while (batch.Count < _batchSize && _channel.Reader.TryRead(out var reading))
            {
                batch.Add(reading);
            }
        }
        catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested)
        {
            // This is not an error; it's the timeout firing, which is normal.
            // It allows us to flush a partially full batch.
        }

        return batch;
    }

    private async Task<List<SensorReading>> ReadRemainingAsBatchAsync()
    {
        var batch = new List<SensorReading>();
        while (_channel.Reader.TryRead(out var reading))
        {
            batch.Add(reading);
        }
        return batch;
    }

    private async Task WriteBatchToDatabaseAsync(List<SensorReading> batch, CancellationToken cancellationToken)
    {
        var stopwatch = System.Diagnostics.Stopwatch.StartNew();
        try
        {
            await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken);

            // Npgsql's Binary COPY is the fastest way to bulk insert data.
            await using var writer = await connection.BeginBinaryImportAsync(
                "COPY sensor_readings (time, turbine_id, sensor_id, payload) FROM STDIN (FORMAT BINARY)", cancellationToken);

            foreach (var reading in batch)
            {
                await writer.StartRowAsync(cancellationToken);
                await writer.WriteAsync(reading.Time, NpgsqlTypes.NpgsqlDbType.TimestampTz, cancellationToken);
                await writer.WriteAsync(reading.TurbineId, NpgsqlTypes.NpgsqlDbType.Uuid, cancellationToken);
                await writer.WriteAsync(reading.SensorId, NpgsqlTypes.NpgsqlDbType.Uuid, cancellationToken);
                await writer.WriteAsync(reading.Payload, NpgsqlTypes.NpgsqlDbType.Jsonb, cancellationToken);
            }

            await writer.CompleteAsync(cancellationToken);
            stopwatch.Stop();
            _logger.LogInformation("Wrote {BatchCount} records to database in {ElapsedMilliseconds}ms.", batch.Count, stopwatch.ElapsedMilliseconds);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to write batch of {BatchCount} records to database.", batch.Count);
            // In a real-world project, a dead-letter queue or other retry mechanism would be implemented here.
            // For now, we log the failure and drop the batch.
        }
    }
}

With ingestion solved, the next challenge was feature engineering. The Keras model, an LSTM Autoencoder trained by our data science team, doesn’t operate on raw sensor values. It requires features like the 1-minute rolling average, standard deviation, and max/min values for a given sensor. Querying the sensor_readings table and calculating these on-the-fly for every inference request would be far too slow.

This is where TimescaleDB’s continuous aggregates are invaluable. They are essentially materialized views that automatically and incrementally refresh in the background. We define the complex aggregation once, and TimescaleDB keeps it up-to-date for us.

-- Create a materialized view for 1-minute aggregated features.
-- This view will be the data source for the ML inference service.
CREATE MATERIALIZED VIEW sensor_features_1min
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 minute', time) AS bucket,
    turbine_id,
    sensor_id,
    -- Example feature: average of a specific value from the JSON payload.
    -- The `->>` operator extracts a JSON field as text, which is then cast.
    AVG((payload->>'temperature')::double precision) AS avg_temp,
    STDDEV((payload->>'temperature')::double precision) AS stddev_temp,
    MAX((payload->>'vibration_x')::double precision) AS max_vibration_x,
    MIN((payload->>'vibration_x')::double precision) AS min_vibration_x
FROM
    sensor_readings
GROUP BY
    bucket, turbine_id, sensor_id;

-- Policy to refresh the continuous aggregate automatically.
-- It will refresh data between 2 hours and 1 minute ago, every 1 minute.
-- This ensures the data is very fresh for real-time inference.
SELECT add_continuous_aggregate_policy('sensor_features_1min',
    start_offset => INTERVAL '2 hours',
    end_offset   => INTERVAL '1 minute',
    schedule_interval => INTERVAL '1 minute');

Now, the inference service doesn’t query the massive raw data table. It performs a fast lookup on the much smaller sensor_features_1min view.

The final piece is the C# inference service that calls the Keras model. A common pitfall here is trying to integrate ML models directly into a .NET process using libraries that are often wrappers and can be complex to manage. A more pragmatic, robust, and scalable approach is to treat the Keras model as its own microservice. We built a simple Python Flask API that loads the trained model and exposes a /predict endpoint. The C# service is then just an HTTP client to this service.

Here’s the Python/Flask inference server. It’s minimal and focused.

# predict_server.py
import os
import numpy as np
from flask import Flask, request, jsonify
from tensorflow.keras.models import load_model

app = Flask(__name__)

# The model is loaded once at startup, not per-request.
# A common mistake is loading the model inside the request handler.
model_path = os.environ.get("MODEL_PATH", "models/lstm_autoencoder.h5")
if not os.path.exists(model_path):
    raise RuntimeError(f"Model file not found at {model_path}")

model = load_model(model_path)
print(f"Model {model_path} loaded successfully.")

@app.route("/predict", methods=["POST"])
def predict():
    try:
        data = request.get_json()
        # The expected input is a list of feature vectors.
        # The model expects a specific shape, e.g., (n_samples, timesteps, n_features)
        # Preprocessing like scaling must match what was used during training.
        features = np.array(data['features'])

        if features.ndim != 3:
            return jsonify({"error": f"Invalid input shape. Expected 3D array, got {features.ndim}D."}), 400

        # The core of the autoencoder anomaly detection is to compare
        # the reconstruction error.
        reconstructed = model.predict(features)
        mse = np.mean(np.power(features - reconstructed, 2), axis=1)

        # The result is the mean squared error for each input sequence.
        # The calling service will compare this against a pre-defined threshold.
        return jsonify({"reconstruction_error": mse.flatten().tolist()})

    except Exception as e:
        # Basic error handling
        return jsonify({"error": str(e)}), 500

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5001)

The C# service queries the feature view and calls this Python service. We use IHttpClientFactory for efficient client management and Polly for resilience patterns like retries and circuit breakers.

// Services/InferenceService.cs
public class InferenceService
{
    private readonly ILogger<InferenceService> _logger;
    private readonly NpgsqlDataSource _dataSource;
    private readonly HttpClient _httpClient;

    public InferenceService(ILogger<InferenceService> logger, NpgsqlDataSource dataSource, HttpClient httpClient)
    {
        _logger = logger;
        _dataSource = dataSource;
        _httpClient = httpClient;
    }

    public async Task<double?> GetAnomalyScoreAsync(Guid turbineId, Guid sensorId, DateTimeOffset startTime, DateTimeOffset endTime)
    {
        // 1. Fetch feature data from TimescaleDB
        var features = await GetFeaturesForWindowAsync(turbineId, sensorId, startTime, endTime);

        if (features == null || features.Count == 0)
        {
            _logger.LogWarning("No feature data found for turbine {TurbineId} in the given time window.", turbineId);
            return null;
        }

        // 2. Prepare the payload for the Python service
        // The structure must match exactly what the model was trained on.
        // Here we assume it expects a 3D array: [ [ [f1, f2], [f1, f2]... ] ]
        var payload = new { features = new[] { features.Select(f => new[] { f.AvgTemp, f.StddevTemp }).ToArray() } };

        // 3. Call the inference service
        try
        {
            var response = await _httpClient.PostAsJsonAsync("/predict", payload);
            response.EnsureSuccessStatusCode();

            var result = await response.Content.ReadFromJsonAsync<InferenceResult>();
            // The model returns a list of errors, for a single sequence we take the first.
            return result?.ReconstructionError?.FirstOrDefault();
        }
        catch(HttpRequestException ex)
        {
            _logger.LogError(ex, "Failed to call inference service.");
            return null;
        }
    }

    private async Task<List<FeatureRecord>> GetFeaturesForWindowAsync(Guid turbineId, Guid sensorId, DateTimeOffset start, DateTimeOffset end)
    {
        const string sql = @"
            SELECT
                avg_temp AS AvgTemp,
                stddev_temp AS StddevTemp
            FROM sensor_features_1min
            WHERE turbine_id = @turbineId
              AND sensor_id = @sensorId
              AND bucket >= @start
              AND bucket < @end
            ORDER BY bucket;
        ";
        await using var connection = await _dataSource.OpenConnectionAsync();
        return (await connection.QueryAsync<FeatureRecord>(sql, new { turbineId, sensorId, start, end })).AsList();
    }
}

// Helper records for deserialization
public record FeatureRecord(double AvgTemp, double StddevTemp);
public record InferenceResult(List<double> ReconstructionError);

// In Program.cs, configure HttpClientFactory and Polly
builder.Services.AddHttpClient<InferenceService>(client =>
{
    client.BaseAddress = new Uri(builder.Configuration["InferenceService:Url"]);
})
.AddPolicyHandler(
    HttpPolicyExtensions
        .HandleTransientHttpError()
        .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)))
);

This final architecture is more complex than the original, but every component exists to solve a specific, real-world performance or scalability problem. The ingestion API can handle a massive write load without blocking. TimescaleDB efficiently stores the data and performs the heavy lifting of feature calculation in the background. The C# and Python services are decoupled, scalable, and resilient, communicating over a standard HTTP interface. We successfully moved from a system that couldn’t handle its load to one that can provide near real-time anomaly scores reliably.

The current HTTP/JSON communication between the C# orchestration service and the Python inference service, while robust, introduces serialization and network overhead. For scenarios requiring even lower latency, migrating this link to gRPC with Protobuf would be a logical next step, reducing payload size and connection setup time. Furthermore, the model deployment is static; a production-grade MLOps pipeline would be required to introduce zero-downtime model updates, potentially using a service mesh like Linkerd or Istio to manage traffic shifting between old and new model versions. The current solution also lacks a sophisticated mechanism for handling data schema evolution in the JSONB payload, which would require a schema registry and versioning strategy to maintain long-term pipeline stability.


  TOC