The core failure mode was subtle but catastrophic for our analytics. Our IoT telemetry ingestion service, running on a DigitalOcean Droplet, would occasionally restart due to deployments or transient node issues. Upon restart, it would re-process the last batch of data from the upstream message queue. This resulted in duplicate data points in our InfluxDB instance, skewing every time-series query, from simple averages to complex anomaly detection models. At-least-once delivery was creating a data integrity nightmare. We needed exactly-once semantics, but deploying a heavyweight framework like Apache Flink or Spark Streaming for this single pipeline felt like using a sledgehammer to crack a nut, especially given our lean operational principles.
Our initial concept was to store the last processed message offset in a PostgreSQL database. The processing loop would look something like this: begin transaction, read messages, write to InfluxDB, update offset in Postgres, commit transaction. This is a classic two-phase commit problem across distributed systems (the message queue, InfluxDB, and Postgres). A failure between writing to InfluxDB and committing the Postgres transaction could leave the system in an inconsistent state. While solvable with more complex logic, it introduced significant transactional overhead and operational complexity for our small team.
This led to a fundamental rethink of the problem. The ingestion service isn’t just a simple loop; it’s a stateful process. It can be initializing
, consuming
, processing
, committing
, or in a failed
state. This realization pointed directly to state machines. By modeling the entire ingestion lifecycle as a formal statechart, we could make the logic explicit, predictable, and resilient. XState was the immediate choice here. It provides a robust, framework-agnostic way to implement statecharts, complete with actors, services, and clear definitions for transitions, actions, and guards.
The next piece of the puzzle was persisting the state. How do we atomically commit the result of a batch process (the data in InfluxDB) and the new consumer offset? The state itself needed a transactional home. We were already using Delta Lake on DigitalOcean Spaces for our batch analytics data lake. It brings ACID transactions to object storage. The pitfall of most object storage is its eventual consistency and lack of atomic multi-file operations. Delta Lake solves this by using a transaction log. This sparked an unconventional idea: what if we used a Delta table not for massive datasets, but as a hyper-reliable, auditable, transactional state store for our ingestion service? We could treat our processor’s state changes as immutable events, appending them to a dedicated Delta table. On startup, the service would simply read the last ‘COMMITTED’ state from this table to know exactly where to resume.
This architecture offered a unique combination of benefits:
- Transactional Integrity: Delta Lake’s ACID guarantees ensure that we never commit a partial state. The update to the offset is atomic.
- Auditability: The Delta table becomes a complete, time-travel-capable log of every batch processed. We can see exactly what was processed and when.
- Decoupling: The processing logic (XState) is decoupled from the sink (InfluxDB) and the state store (Delta Lake).
- Simplicity: The core application remains a single Node.js process, orchestrated by Caddy, running on a standard DigitalOcean Droplet.
Here is a high-level view of the data and control flow:
graph TD subgraph DigitalOcean Droplet A[Caddy Reverse Proxy] --> B{Node.js Ingestion Service}; subgraph B C[XState Machine] end B -->|Write Batches| D[InfluxDB]; B -->|Read/Write State| E[DO Spaces via Delta Lake]; end F[IoT Devices] --> |HTTPS/TLS| A; G[Message Queue] -->|Consume Bat_ches| B
The rest of this document details the build-out, focusing on the core code that makes this system work in a production environment.
Phase 1: Infrastructure and Ingress with Caddy
The entry point must be secure and simple. Caddy excels here, providing automatic HTTPS with zero configuration. It acts as a reverse proxy, terminating TLS and forwarding requests to our Node.js application running on the same Droplet.
Here is the complete Caddyfile
used for this service. It’s deceptively simple but handles production necessities.
# Caddyfile
# Domain for our ingestion endpoint. Caddy will automatically provision
# a Let's Encrypt certificate for this domain.
ingest.my-iot-platform.com {
# Enable gzip and zstd compression for responses.
encode gzip zstd
# Set security headers to harden the endpoint.
header {
# Enables XSS filtering and blocks rendering if an attack is detected.
X-Xss-Protection "1; mode=block"
# Prevents clickjacking attacks.
X-Frame-Options "DENY"
# Prevents browsers from trying to MIME-sniff the content type.
X-Content-Type-Options "nosniff"
# Strict-Transport-Security: tells browsers to always use HTTPS.
Strict-Transport-Security "max-age=31536000;"
# Content-Security-Policy: very restrictive, as this is an API endpoint.
Content-Security-Policy "default-src 'none'; frame-ancestors 'none';"
}
# Logging configuration. We use JSON format for easy parsing
# by log aggregation tools like Loki or Fluentd.
log {
output file /var/log/caddy/ingest.access.log {
roll_size 100mb
roll_keep 10
roll_keep_for 720h
}
format json
}
# The core reverse proxy directive.
# All requests to https://ingest.my-iot-platform.com are forwarded
# to the Node.js service running on port 3000.
reverse_proxy localhost:3000 {
# Pass essential headers to the backend application.
header_up Host {http.request.host}
header_up X-Real-IP {http.request.remote.ip}
header_up X-Forwarded-For {http.request.remote.ip}
header_up X-Forwarded-Proto {http.request.scheme}
}
}
This configuration is deployed on a standard DigitalOcean Droplet running Ubuntu, with Caddy installed as a systemd service. It’s a robust, set-and-forget component of our stack.
Phase 2: The XState Processing Machine
The heart of the service is the state machine. It orchestrates every step, ensuring operations happen in the correct order and that failures are handled gracefully. We use XState to define this logic declaratively.
Here is the complete machine definition. It manages the entire lifecycle from initialization to committing state.
ingestionMachine.js
:
import { createMachine, assign } from 'xstate';
// This machine coordinates the entire ETL process for a single batch.
// It fetches data, writes to the sink (InfluxDB), and commits its state
// transactionally to the state store (Delta Lake).
export const createIngestionMachine = ({
deltaStateStore,
influxWriter,
messageQueue,
}) => {
return createMachine({
id: 'iot-ingestion',
initial: 'initializing',
// The context holds the state for the machine's execution,
// including offsets, data batches, and error information.
context: {
processorId: 'processor-01',
currentOffset: null,
batch: null,
error: null,
retries: 0,
},
states: {
initializing: {
invoke: {
id: 'getLatestOffset',
src: async (context) => deltaStateStore.getLatestCommittedOffset(context.processorId),
onDone: {
target: 'idle',
actions: assign({
currentOffset: (context, event) => event.data,
}),
description: "Successfully retrieved the last committed offset from Delta Lake."
},
onError: {
target: 'initialization_failed',
actions: assign({
error: (context, event) => event.data,
}),
description: "Failed to connect to or read from the Delta Lake state store."
},
},
},
idle: {
// The machine waits here for a trigger to start processing a new batch.
on: {
FETCH: 'fetchingBatch',
},
// After a successful run, we wait 5 seconds before fetching the next batch.
after: {
5000: 'fetchingBatch'
}
},
fetchingBatch: {
invoke: {
id: 'fetchFromQueue',
src: async (context) => messageQueue.fetchBatch(context.currentOffset, 1000), // Batch size of 1000
onDone: [
{
target: 'idle',
cond: (context, event) => !event.data || event.data.messages.length === 0,
description: "No new messages found in the queue, returning to idle."
},
{
target: 'processing',
actions: assign({
batch: (context, event) => event.data,
retries: 0, // Reset retries for the new batch
}),
description: "A new batch of messages was successfully fetched."
}
],
onError: {
target: 'transient_failure',
actions: assign({
error: (context, event) => event.data,
}),
description: "Failed to fetch messages from the upstream queue."
},
},
},
processing: {
// This state represents the data transformation and validation logic.
// In a real-world project, this would be more complex.
// Here, we transition immediately to writing.
always: 'writingToInfluxDB',
},
writingToInfluxDB: {
invoke: {
id: 'writeToInflux',
src: async (context) => influxWriter.writeBatch(context.batch.messages),
onDone: {
target: 'committingState',
description: "Batch successfully written to InfluxDB."
},
onError: {
target: 'transient_failure',
actions: assign({
error: (context, event) => event.data,
}),
description: "Failed to write batch to InfluxDB."
},
},
},
committingState: {
invoke: {
id: 'commitToDelta',
// The most critical step: atomically commit the new offset.
src: async (context) => {
const { processorId, batch } = context;
const state = {
newOffset: batch.nextOffset,
batchSize: batch.messages.length,
processedAt: new Date().toISOString(),
};
return deltaStateStore.commitOffset(processorId, state);
},
onDone: {
target: 'idle',
actions: assign({
currentOffset: (context) => context.batch.nextOffset,
batch: null, // Clear the batch from context
}),
description: "New offset successfully committed to Delta Lake state store."
},
onError: {
// A failure here is critical. The data is in InfluxDB, but the offset is not committed.
// The system will re-process on restart. Idempotent writes to InfluxDB are crucial.
target: 'critical_failure',
actions: assign({
error: (context, event) => event.data,
}),
description: "CRITICAL: Failed to commit offset to Delta Lake after writing to InfluxDB."
},
},
},
transient_failure: {
// A state to handle retryable errors (e.g., network issues).
after: {
// Exponential backoff logic
3000: {
target: 'fetchingBatch',
actions: assign({ retries: (context) => context.retries + 1 }),
cond: (context) => context.retries < 5,
},
// If retries are exhausted, move to a permanent failure state.
3001: {
target: 'permanent_failure',
cond: (context) => context.retries >= 5,
}
}
},
initialization_failed: {
type: 'final',
description: "The service cannot start because the state store is unreachable."
},
critical_failure: {
type: 'final',
description: "A non-recoverable error occurred, requiring manual intervention."
},
permanent_failure: {
type: 'final',
description: "A transient error could not be resolved after multiple retries."
}
},
});
};
This machine definition is the blueprint for our service’s behavior. It clearly separates concerns and handles failure paths explicitly. A common mistake is to write this logic as a series of nested try/catch
blocks, which quickly becomes unmanageable.
Phase 3: The Transactional State Store with Delta Lake
This is the most unconventional part of the architecture. We use a Delta table residing on DigitalOcean Spaces as our state store. Since there’s no official, mature Delta Lake writer for Node.js, we implement a small, robust Python helper script that the Node.js service invokes. In a production environment, this could be a microservice, but for simplicity, we’ll use a child process.
The schema for our state table is simple but effective:processor_id
: (string) Identifier for the processor instance.offset
: (long) The last successfully committed offset.batch_size
: (int) Number of messages in the last batch.commit_timestamp
: (timestamp) When the state was committed.metadata
: (map) Extra information, e.g., source topic.
Here’s the Python script (delta_manager.py
) responsible for interacting with the Delta table. It uses the deltalake
and pyarrow
libraries.
delta_manager.py
:
import sys
import json
import os
import pyarrow as pa
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import TableNotFoundError
# Configuration from environment variables for security.
# A real-world project would use a proper secrets management tool.
STORAGE_OPTIONS = {
"AWS_ACCESS_KEY_ID": os.environ.get("DO_SPACES_KEY"),
"AWS_SECRET_ACCESS_KEY": os.environ.get("DO_SPACES_SECRET"),
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
"AWS_ENDPOINT_URL": os.environ.get("DO_SPACES_ENDPOINT"),
"AWS_REGION": os.environ.get("DO_SPACES_REGION")
}
TABLE_URI = "s3://my-datalake-bucket/ingestion_state"
def get_latest_offset(processor_id: str):
"""
Reads the latest committed offset for a given processor_id.
It reads the entire table, filters, and finds the max timestamp.
For a state store, this table will be small, so performance is acceptable.
"""
try:
dt = DeltaTable(TABLE_URI, storage_options=STORAGE_OPTIONS)
df = dt.to_pyarrow_table(
filters=[("processor_id", "=", processor_id)]
).to_pandas()
if df.empty:
return 0 # Default offset if no state exists
latest = df.loc[df['commit_timestamp'].idxmax()]
return int(latest['offset'])
except TableNotFoundError:
# If the table doesn't exist, it's the first run.
return 0
except Exception as e:
print(f"Error reading from Delta table: {e}", file=sys.stderr)
sys.exit(1)
def commit_offset(processor_id: str, new_offset: int, batch_size: int):
"""
Atomically commits a new state record to the Delta table.
This uses write_deltalake with 'append' mode, which is transactional.
"""
try:
data = pa.table({
'processor_id': pa.array([processor_id], type=pa.string()),
'offset': pa.array([new_offset], type=pa.int64()),
'batch_size': pa.array([batch_size], type=pa.int32()),
'commit_timestamp': pa.array([pa.timestamp('ns')], type=pa.timestamp('ns')).cast(pa.timestamp('ns'))
})
# The core of the transactional write. 'overwriteSchema' is false
# to prevent accidental schema changes. 'mode' is 'append'.
write_deltalake(
TABLE_URI,
data,
mode='append',
storage_options=STORAGE_OPTIONS
)
print("Commit successful", file=sys.stdout)
except Exception as e:
print(f"Error writing to Delta table: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
# Simple command-line interface for the Node.js service to call.
command = sys.argv[1]
payload = json.loads(sys.argv[2])
if command == "get":
offset = get_latest_offset(payload["processorId"])
print(json.dumps({"offset": offset}), file=sys.stdout)
elif command == "commit":
commit_offset(
payload["processorId"],
payload["newOffset"],
payload["batchSize"]
)
else:
print(f"Unknown command: {command}", file=sys.stderr)
sys.exit(1)
And here is the Node.js wrapper that executes this Python script. It’s crucial to handle stdout
, stderr
, and exit codes properly.
deltaStateStore.js
:
import { spawn } from 'child_process';
import path from 'path';
// Path to the Python executable and script. In production, this should
// be a fixed path within the container or VM.
const PYTHON_EXEC = 'python3';
const SCRIPT_PATH = path.resolve('./delta_manager.py');
// This class acts as a facade, providing a clean async interface
// to the underlying Python process that handles Delta Lake operations.
class DeltaStateStore {
_runScript(command, payload) {
return new Promise((resolve, reject) => {
const args = [SCRIPT_PATH, command, JSON.stringify(payload)];
const pythonProcess = spawn(PYTHON_EXEC, args);
let stdout = '';
let stderr = '';
pythonProcess.stdout.on('data', (data) => {
stdout += data.toString();
});
pythonProcess.stderr.on('data', (data) => {
stderr += data.toString();
});
pythonProcess.on('close', (code) => {
if (code !== 0) {
// A non-zero exit code indicates a failure in the Python script.
const error = new Error(`Delta manager script failed with code ${code}. Stderr: ${stderr}`);
console.error(error);
return reject(error);
}
try {
// Some commands might not return JSON, handle that gracefully.
const result = stdout ? JSON.parse(stdout) : {};
resolve(result);
} catch (e) {
// If stdout is not valid JSON, it's also an error condition.
reject(new Error(`Failed to parse JSON from Delta manager: ${e.message}. Raw output: ${stdout}`));
}
});
});
}
async getLatestCommittedOffset(processorId) {
console.log(`[StateStore] Fetching latest offset for processor: ${processorId}`);
const result = await this._runScript('get', { processorId });
console.log(`[StateStore] Found offset: ${result.offset}`);
return result.offset;
}
async commitOffset(processorId, { newOffset, batchSize }) {
console.log(`[StateStore] Committing offset ${newOffset} for processor: ${processorId}`);
await this._runScript('commit', { processorId, newOffset, batchSize });
console.log(`[StateStore] Commit successful for offset: ${newOffset}`);
return true;
}
}
export const deltaStateStore = new DeltaStateStore();
This separation of concerns keeps the Node.js process focused on orchestration logic while delegating the specialized, transaction-heavy work to the Python runtime, which has first-class support for Delta Lake.
Phase 4: Data Sink and Main Application Loop
The final pieces are the InfluxDB writer and the main application entry point that wires everything together.
The InfluxDB writer is straightforward. We use the official client, and the key is to perform batch writes for efficiency and implement robust error handling.
influxWriter.js
:
import { InfluxDB, Point } from '@influxdata/influxdb-client';
// Configuration from environment variables
const INFLUX_URL = process.env.INFLUX_URL;
const INFLUX_TOKEN = process.env.INFLUX_TOKEN;
const INFLUX_ORG = process.env.INFLUX_ORG;
const INFLUX_BUCKET = process.env.INFLUX_BUCKET;
class InfluxWriter {
constructor() {
this.influxDB = new InfluxDB({ url: INFLUX_URL, token: INFLUX_TOKEN });
this.writeApi = this.influxDB.getWriteApi(INFLUX_ORG, INFLUX_BUCKET, 'ns');
}
async writeBatch(messages) {
if (!messages || messages.length === 0) {
return;
}
console.log(`[InfluxWriter] Writing ${messages.length} points to InfluxDB.`);
const points = messages.map(msg => {
// Assuming a message format like:
// { deviceId: 'd-01', timestamp: 1672531200000, temp: 21.5, humidity: 45.2 }
return new Point('environment')
.tag('deviceId', msg.deviceId)
.floatField('temperature', msg.temp)
.floatField('humidity', msg.humidity)
.timestamp(new Date(msg.timestamp));
});
try {
this.writeApi.writePoints(points);
await this.writeApi.flush();
console.log(`[InfluxWriter] Batch successfully written.`);
} catch (error) {
console.error(`[InfluxWriter] Failed to write batch: ${error.message}`);
// Propagate the error to be handled by the XState machine
throw error;
}
}
}
export const influxWriter = new InfluxWriter();
Finally, the main index.js
ties all the components together, creates the state machine service, and starts the processing loop.
index.js
:
import { interpret } from 'xstate';
import { createIngestionMachine } from './ingestionMachine.js';
import { deltaStateStore } from './deltaStateStore.js';
import { influxWriter } from './influxWriter.js';
// A mock message queue for demonstration.
// In a real system, this would connect to Kafka, RabbitMQ, etc.
const messageQueue = {
_messages: Array.from({ length: 10000 }, (_, i) => ({
deviceId: `device-${i % 100}`,
timestamp: Date.now() - (10000 - i) * 1000,
temp: 20 + Math.random() * 5,
humidity: 40 + Math.random() * 10,
})),
async fetchBatch(offset, size) {
console.log(`[Queue] Fetching batch from offset ${offset} with size ${size}`);
if (offset >= this._messages.length) {
return { messages: [], nextOffset: offset };
}
const end = Math.min(offset + size, this._messages.length);
const messages = this._messages.slice(offset, end);
return { messages, nextOffset: end };
},
};
// 1. Create the state machine instance with its dependencies injected.
const ingestionMachine = createIngestionMachine({
deltaStateStore,
influxWriter,
messageQueue
});
// 2. Create an interpreter (service) to run the machine.
const ingestionService = interpret(ingestionMachine)
.onTransition((state) => {
// Logging every state transition is invaluable for debugging.
console.log(`[XState] Transition to state: ${state.value}`);
if (state.context.error) {
console.error(`[XState] Error in context: ${state.context.error.message}`);
}
})
.onDone(() => {
console.log('[XState] Machine has reached a final state.');
process.exit(1); // Exit on unrecoverable failure
});
// 3. Start the service.
ingestionService.start();
// 4. Send the initial event to kick off the process after initialization.
// The machine's internal timers will handle subsequent fetches.
setTimeout(() => {
if (ingestionService.getSnapshot().value === 'idle') {
ingestionService.send('FETCH');
}
}, 1000);
process.on('SIGINT', () => {
console.log('Caught interrupt signal. Stopping service...');
ingestionService.stop();
process.exit(0);
});
When this service is started on the DigitalOcean Droplet, it first queries the Delta table for the last known offset. It then enters its idle state, and after a short delay, transitions to fetch a batch of data. It proceeds through writing to InfluxDB and, critically, committing the new offset to Delta Lake. If the process is killed at any point and restarted, its first action is always to consult the transactional state store, guaranteeing it picks up exactly where it left off.
The primary trade-off of this architecture is latency. Writing to a Delta table on object storage involves creating Parquet files and updating a JSON transaction log, which is inherently slower than a single-row UPDATE
in a traditional database. This solution is therefore not suited for applications requiring sub-second processing latency. However, for near-real-time telemetry pipelines where data integrity and auditability are paramount, the robustness it provides is a significant advantage over more complex or fragile alternatives. Future scalability would involve partitioning the input stream and running multiple, independent instances of this state machine, each with a unique processor_id
, allowing the workload to be parallelized horizontally across multiple Droplets.