The operational telemetry from our Swift mobile application became both a firehose and a black box. The initial solution, logging directly to a monolithic backend that wrote to a relational database, collapsed under peak load. Queries to diagnose user-reported issues were slow, full-text search was non-existent, and the entire system was tightly coupled. Any change to the telemetry format required a coordinated, high-risk deployment of both the mobile app and the backend. We needed a decoupled, high-throughput ingestion and processing pipeline that was both searchable and cost-effective to operate for a lean team. The decision was made to build a new system from the ground up on DigitalOcean, leveraging a polyglot stack where each component was selected for its specific strengths.
Our initial whiteboard sketch outlined a multi-stage, asynchronous architecture designed for resilience and independent scalability.
graph TD subgraph Swift Client A[iOS App] -->|HTTPS Batch POST| B(Ingestion Service); end subgraph DigitalOcean VPC B[Go-Fiber Ingestion Service] -- Protobuf --> C{NATS Message Broker}; C -- Protobuf --> D[Scala Enrichment Processor]; D -- JSON Bulk API --> E(OpenSearch Cluster); B -->|Search Query| E; end style A fill:#F9A,stroke:#333,stroke-width:2px style B fill:#9CF,stroke:#333,stroke-width:2px style D fill:#C9F,stroke:#333,stroke-width:2px
The core principle was specialization. The ingestion layer had to be incredibly fast and lightweight. The processing layer needed robust concurrency and the ability to execute complex business logic. The storage layer required powerful search and aggregation capabilities. A single language or framework would have involved significant compromises.
Our technology selection process was guided by pragmatism. We chose DigitalOcean Droplets over larger cloud providers primarily for cost predictability and simplicity. This choice, however, meant we’d be taking on more operational responsibility for services like our message broker and search cluster, a trade-off we accepted.
- Go with Fiber for Ingestion: The API endpoint receiving data from potentially tens of thousands of clients must handle high concurrency with minimal resource consumption. Go’s goroutines are a natural fit. We chose the Fiber framework for its performance and Express.js-like API, which accelerated initial development.
- NATS as the Buffer: We needed a simple, high-performance message bus to decouple the ingestion and processing stages. Kafka felt like overkill for our initial requirements, carrying significant operational complexity. NATS core provides a fire-and-forget messaging pattern that’s lightweight enough to run on a small Droplet and serves perfectly as a distributed buffer.
- Scala with Akka HTTP for Enrichment: The real work happens in the processing stage. This service consumes raw events, enriches them with data from other sources, transforms the schema, and prepares them for indexing. Scala’s powerful type system and functional patterns are ideal for complex data manipulation. We leverage Akka HTTP and its streaming capabilities to build a resilient consumer that can handle back-pressure and process messages concurrently.
- OpenSearch for Storage and Analytics: We required flexible, fast search over semi-structured JSON documents. OpenSearch, as a fork of Elasticsearch, provides exactly that. Self-hosting it on Droplets was a deliberate choice to avoid the high costs of managed services at our scale.
- Protocol Buffers for the Data Contract: A common mistake in microservice architectures is using JSON for internal communication. It’s verbose and lacks a formal schema. We defined our internal event contract with Protobuf, ensuring type safety, backward compatibility, and significantly lower network overhead between the Go and Scala services.
Phase 1: The Go-Fiber Ingestion Service
The Go service is the front door. Its only jobs are to accept HTTP requests, validate them, serialize the payload into Protobuf, and publish it to NATS as quickly as possible. It performs no blocking I/O other than the network write to NATS.
First, we define our data contract in telemetry.proto
. This contract is the source of truth for our event structure and will be used to generate code for both Go and Scala.
// telemetry.proto
syntax = "proto3";
package telemetry;
option go_package = "./telemetry";
import "google/protobuf/timestamp.proto";
message Event {
string event_id = 1;
string session_id = 2;
string user_id = 3;
string event_name = 4;
google.protobuf.Timestamp client_timestamp = 5;
map<string, string> metadata = 6;
string app_version = 7;
string device_os = 8;
}
message EventBatch {
repeated Event events = 1;
}
The Go service uses this definition to handle incoming JSON and translate it for the internal pipeline.
Here is the core of the ingestion server, main.go
. It sets up a Fiber app, connects to NATS, and defines the ingestion endpoint. Note the error handling and logging; in a real-world project, you never assume a downstream dependency like NATS will be available.
// main.go
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/logger"
"github.com/gofiber/fiber/v2/middleware/recover"
"github.com/nats-io/nats.go"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
telemetry "ingestion-service/telemetry"
)
const (
natsSubject = "telemetry.events"
idleTimeout = 5 * time.Second
)
// NatsPublisher encapsulates the NATS connection
type NatsPublisher struct {
conn *nats.Conn
}
// Publish serializes and sends a batch of events to NATS
func (p *NatsPublisher) Publish(ctx context.Context, batch *telemetry.EventBatch) error {
data, err := proto.Marshal(batch)
if err != nil {
// This is a critical error, indicates a problem with our generated code or data model
return fmt.Errorf("failed to marshal protobuf: %w", err)
}
// In a production system, you might use PublishMsg with headers for tracing
if err := p.conn.Publish(natsSubject, data); err != nil {
return fmt.Errorf("failed to publish to NATS: %w", err)
}
return nil
}
// IngestionHandler handles the HTTP POST request
func IngestionHandler(publisher *NatsPublisher) fiber.Handler {
return func(c *fiber.Ctx) error {
// The client sends a JSON array of events, which we map to our Protobuf structure
var jsonEvents []map[string]interface{}
if err := c.BodyParser(&jsonEvents); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Invalid JSON format"})
}
if len(jsonEvents) == 0 {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Empty event batch"})
}
// A common pitfall is processing events one-by-one.
// We translate the entire JSON batch into a single Protobuf batch.
eventBatch := &telemetry.EventBatch{
Events: make([]*telemetry.Event, 0, len(jsonEvents)),
}
for _, je := range jsonEvents {
// Naive validation and conversion for demonstration.
// A production system would have robust validation logic here.
ts, err := time.Parse(time.RFC3339, je["client_timestamp"].(string))
if err != nil {
// We might choose to drop this single event or reject the whole batch.
// For now, we skip it and log the error.
log.Printf("WARN: Skipping event with invalid timestamp: %v", je["event_id"])
continue
}
event := &telemetry.Event{
EventId: je["event_id"].(string),
SessionId: je["session_id"].(string),
UserId: je["user_id"].(string),
EventName: je["event_name"].(string),
ClientTimestamp: timestamppb.New(ts),
AppVersion: je["app_version"].(string),
DeviceOs: je["device_os"].(string),
Metadata: convertMetadata(je["metadata"]),
}
eventBatch.Events = append(eventBatch.Events, event)
}
if len(eventBatch.Events) == 0 {
return c.Status(fiber.StatusOK).JSON(fiber.Map{"status": "Batch processed, no valid events found"})
}
// Publish the entire batch as a single NATS message
if err := publisher.Publish(c.Context(), eventBatch); err != nil {
log.Printf("ERROR: Failed to publish batch to NATS: %v", err)
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Internal processing error"})
}
return c.Status(fiber.StatusAccepted).JSON(fiber.Map{"status": "Batch accepted"})
}
}
func convertMetadata(data interface{}) map[string]string {
// Production code needs safer type assertion here.
if m, ok := data.(map[string]interface{}); ok {
res := make(map[string]string)
for k, v := range m {
res[k] = fmt.Sprintf("%v", v)
}
return res
}
return nil
}
func main() {
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = nats.DefaultURL
}
// Connect to NATS with retry logic
nc, err := nats.Connect(natsURL, nats.RetryOnFailedConnect(true), nats.MaxReconnects(5), nats.ReconnectWait(time.Second))
if err != nil {
log.Fatalf("FATAL: Failed to connect to NATS: %v", err)
}
defer nc.Drain() // Drains remaining messages before closing
log.Println("Connected to NATS server at", nc.ConnectedUrl())
publisher := &NatsPublisher{conn: nc}
app := fiber.New(fiber.Config{
IdleTimeout: idleTimeout,
})
// Add production-grade middleware
app.Use(recover.New())
app.Use(logger.New(logger.Config{
Format: "[${ip}]:${port} ${status} - ${method} ${path}\n",
}))
// API route
app.Post("/v1/track", IngestionHandler(publisher))
// Graceful shutdown
go func() {
if err := app.Listen(":3000"); err != nil {
log.Fatalf("FATAL: Fiber app failed to start: %v", err)
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
if err := app.Shutdown(); err != nil {
log.Fatalf("FATAL: Server forced to shutdown: %v", err)
}
log.Println("Server exiting")
}
This service is containerized for deployment on a DigitalOcean Droplet.
# Dockerfile
FROM golang:1.21-alpine
WORKDIR /app
# Install protoc and Go protobuf plugins for code generation
RUN apk add --no-cache protobuf-dev
RUN go install google.golang.org/protobuf/cmd/protoc-gen-[email protected]
RUN go install google.golang.org/grpc/cmd/protoc-gen-go-[email protected]
# Copy proto file and generate Go code
COPY telemetry.proto ./telemetry/
RUN protoc --go_out=. ./telemetry/telemetry.proto
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o /ingestion-service .
EXPOSE 3000
CMD [ "/ingestion-service" ]
Phase 2: OpenSearch Setup and Index Mapping
Before the Scala processor can index data, we need a properly configured OpenSearch cluster. On DigitalOcean, this means setting up a Droplet and running OpenSearch and OpenSearch Dashboards via Docker Compose. A critical production step is securing the cluster and configuring memory limits.
# docker-compose.yml
version: '3'
services:
opensearch-node1:
image: opensearchproject/opensearch:2.9.0
container_name: opensearch-node1
environment:
- cluster.name=opensearch-cluster
- node.name=opensearch-node1
- discovery.seed_hosts=opensearch-node1
- cluster.initial_cluster_manager_nodes=opensearch-node1
- bootstrap.memory_lock=true # along with ulimits settings
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # For development
- "DISABLE_SECURITY_PLUGIN=true" # For development ONLY
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- opensearch-data1:/usr/share/opensearch/data
ports:
- 9200:9200
- 9600:9600
networks:
- opensearch-net
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:2.9.0
container_name: opensearch-dashboards
ports:
- 5601:5601
expose:
- "5601"
environment:
OPENSEARCH_HOSTS: '["http://opensearch-node1:9200"]'
# Disable security for development ONLY
DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"
networks:
- opensearch-net
volumes:
opensearch-data1:
networks:
opensearch-net:
More important than the setup is the index template. Defining a mapping before indexing data prevents OpenSearch from guessing data types, which is crucial for performance and correctness. We apply this template using the OpenSearch REST API.
// opensearch_template.json
PUT _index_template/telemetry_template
{
"index_patterns": ["telemetry-*"],
"template": {
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"event_id": { "type": "keyword" },
"session_id": { "type": "keyword" },
"user_id": { "type": "keyword" },
"event_name": { "type": "keyword" },
"ingestion_timestamp": { "type": "date" },
"client_timestamp": { "type": "date" },
"app_version": { "type": "keyword" },
"device_os": { "type": "keyword" },
"geo_country": { "type": "keyword" },
"metadata": { "type": "object", "dynamic": true },
"event_message": { "type": "text" }
}
}
}
}
This template ensures IDs are non-analyzed keyword
types for exact matching and aggregations, while providing a text
field for full-text search. The index pattern telemetry-*
allows us to create new time-based indices (e.g., telemetry-2023-10
) without reapplying the mapping.
Phase 3: The Scala Enrichment Processor
This is the most complex component. It consumes Protobuf messages from NATS, enriches them (e.g., with a GeoIP lookup based on the client IP captured by the Go service, though omitted here for simplicity), and performs a bulk insert into OpenSearch.
We use Akka HTTP’s client for interacting with OpenSearch and Akka Streams for managing the flow of data from NATS. This provides resilience through stream supervision strategies and back-pressure.
First, the build.sbt
file to pull in the necessary dependencies.
// build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.13.10"
lazy val root = (project in file("."))
.settings(
name := "enrichment-processor",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % "2.8.0",
"com.typesafe.akka" %% "akka-stream" % "2.8.0",
"com.typesafe.akka" %% "akka-http" % "10.5.0",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.5.0",
"io.nats" % "jnats" % "2.16.8",
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
// Add other necessary logging libraries
"ch.qos.logback" % "logback-classic" % "1.3.5"
)
)
// ScalaPB plugin for Protobuf code generation
enablePlugins(ScalaPbPlugin)
The core logic resides in a single application object. It sets up the NATS subscription and an Akka Streams pipeline to process messages.
// EnrichmentProcessor.scala
package com.example
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.util.ByteString
import com.example.telemetry.telemetry.EventBatch // Generated by ScalaPB
import io.nats.client.{Connection, Dispatcher, Nats, Message}
import spray.json._
import DefaultJsonProtocol._
import java.nio.charset.StandardCharsets
import java.time.format.DateTimeFormatter
import java.time.{Instant, ZoneOffset}
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}
object EnrichmentProcessor extends App {
implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "EnrichmentProcessor")
implicit val executionContext = system.executionContext
val natsUrl = sys.env.getOrElse("NATS_URL", "nats://localhost:4222")
val opensearchUrl = sys.env.getOrElse("OPENSEARCH_URL", "http://localhost:9200")
val natsSubject = "telemetry.events"
val http = Http()
// The decider resumes the stream on processing failure, logging the error.
// In a real-world project, this might push to a dead-letter queue.
val decider: Supervision.Decider = {
case e: Exception =>
system.log.error("Unhandled exception in stream", e)
Supervision.Resume
}
// Akka Streams source that wraps a NATS subscription
def natsSource(nc: Connection, subject: String): Source[Message, _] = {
Source.queue[Message](bufferSize = 1024)
.mapMaterializedValue { queue =>
val d: Dispatcher = nc.createDispatcher((msg: Message) => {
queue.offer(msg)
})
d.subscribe(subject)
d // So we can unsubscribe later if needed
}
}
// Flow to parse Protobuf and transform to JSON for OpenSearch
val processingFlow = Flow[Message]
.map(msg => Try(EventBatch.parseFrom(msg.getData)))
.collect { case Success(batch) => batch }
.mapConcat(_.events.toList)
.map { event =>
// This is where enrichment would happen (e.g., GeoIP lookup).
val ingestionTime = Instant.now()
val clientTime = Instant.ofEpochSecond(event.clientTimestamp.get.seconds, event.clientTimestamp.get.nanos.toLong)
val metadataJson = event.metadata.toJson
// Construct the JSON document for OpenSearch
JsObject(
"event_id" -> JsString(event.eventId),
"session_id" -> JsString(event.sessionId),
"user_id" -> JsString(event.userId),
"event_name" -> JsString(event.eventName),
"ingestion_timestamp" -> JsString(DateTimeFormatter.ISO_INSTANT.format(ingestionTime)),
"client_timestamp" -> JsString(DateTimeFormatter.ISO_INSTANT.format(clientTime)),
"app_version" -> JsString(event.appVersion),
"device_os" -> JsString(event.deviceOs),
"metadata" -> metadataJson
)
}
// Sink that batches events and sends them to OpenSearch Bulk API
val opensearchSink = Flow[JsObject]
.groupedWithin(500, 5.seconds) // Batch up to 500 events or every 5 seconds
.mapAsync(4) { batch => // Process up to 4 batches in parallel
if (batch.isEmpty) {
Future.successful(0)
} else {
val indexName = s"telemetry-${DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC).format(Instant.now())}"
val bulkPayload = batch.map { doc =>
s"""{ "index" : { "_index" : "$indexName" } }\n${doc.compactPrint}"""
}.mkString("", "\n", "\n")
val request = HttpRequest(
method = HttpMethods.POST,
uri = s"$opensearchUrl/_bulk",
headers = RawHeader("Content-Type", "application/x-ndjson") :: Nil,
entity = HttpEntity(ContentTypes.`application/json`, ByteString(bulkPayload, StandardCharsets.UTF_8))
)
http.singleRequest(request).flatMap {
case HttpResponse(StatusCodes.OK, _, entity, _) =>
entity.discardBytes()
system.log.info(s"Successfully indexed ${batch.size} documents.")
Future.successful(batch.size)
case HttpResponse(status, _, entity, _) =>
entity.toStrict(2.seconds).map { e =>
system.log.error(s"Failed to index batch. Status: $status. Response: ${e.data.utf8String}")
}
Future.successful(0)
}
}
}
.to(Sink.ignore) // We consume the result and log, but don't pass it on
// Main application logic: connect to NATS and run the stream
val nc = Nats.connect(natsUrl)
system.log.info(s"Connected to NATS at ${nc.getConnectedUrl}")
natsSource(nc, natsSubject)
.via(processingFlow)
.to(opensearchSink)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.run()
sys.addShutdownHook {
nc.close()
system.terminate()
}
}
This processor is the heart of the pipeline. The batching logic (groupedWithin
) is critical for achieving high throughput with OpenSearch, as individual insert requests are highly inefficient. Parallelism (mapAsync
) allows us to saturate the network connection to the OpenSearch cluster without overwhelming it.
Phase 4: The Swift Client Snippet
The final piece is the data producer. On the Swift client, the key is to not send an HTTP request for every single event. This would drain battery and flood the network. Instead, events are batched locally and sent periodically. A production-grade SDK would also handle offline storage and retries.
// TelemetryService.swift (Conceptual)
import Foundation
struct TelemetryEvent: Codable {
let event_id: String
let session_id: String
let user_id: String
let event_name: String
let client_timestamp: String // ISO 8601 format
let app_version: String
let device_os: String
let metadata: [String: String]
}
class TelemetryService {
private let ingestionEndpoint = URL(string: "https://your-do-droplet-ip:3000/v1/track")!
private var eventQueue = [TelemetryEvent]()
private let queue = DispatchQueue(label: "com.example.telemetryQueue")
private var timer: Timer?
init() {
// Send batches every 30 seconds
self.timer = Timer.scheduledTimer(withTimeInterval: 30.0, repeats: true) { [weak self] _ in
self?.flushQueue()
}
}
func track(_ event: TelemetryEvent) {
queue.async {
self.eventQueue.append(event)
// Or if queue reaches a certain size, flush immediately
if self.eventQueue.count >= 100 {
self.flushQueue()
}
}
}
private func flushQueue() {
queue.async {
guard !self.eventQueue.isEmpty else { return }
let batch = self.eventQueue
self.eventQueue.removeAll()
self.sendBatch(batch)
}
}
private func sendBatch(_ batch: [TelemetryEvent]) {
var request = URLRequest(url: ingestionEndpoint)
request.httpMethod = "POST"
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
do {
let data = try JSONEncoder().encode(batch)
request.httpBody = data
} catch {
print("Telemetry Error: Failed to encode event batch: \(error)")
// A real implementation would re-queue the failed batch
return
}
let task = URLSession.shared.dataTask(with: request) { data, response, error in
if let error = error {
print("Telemetry Error: Network request failed: \(error)")
// Re-queue logic here
return
}
guard let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 202 else {
print("Telemetry Error: Ingestion failed with status code \((response as? HTTPURLResponse)?.statusCode ?? 0)")
// Re-queue logic here
return
}
print("Telemetry: Successfully submitted a batch of \(batch.count) events.")
}
task.resume()
}
}
With all components in place, a query against the OpenSearch cluster reveals the fully processed data, ready for analysis in OpenSearch Dashboards.
// Sample OpenSearch Query
GET /telemetry-*/_search
{
"query": {
"bool": {
"must": [
{ "term": { "user_id": { "value": "user-123" } } },
{ "term": { "event_name": { "value": "item_purchased" } } }
],
"filter": [
{ "range": { "client_timestamp": { "gte": "now-1d/d" } } }
]
}
}
}
The system successfully decoupled the components, but it is by no means complete. Self-hosting OpenSearch and NATS on DigitalOcean Droplets is cost-effective initially but places a significant operational burden for backups, scaling, and security patching. As volume grows, the cost of engineering time to manage this infrastructure might outweigh the savings over a managed service. The current NATS configuration is not persistent; a service restart could lead to data loss. Migrating to NATS JetStream is a necessary next step for durability. Finally, the entire pipeline lacks end-to-end distributed tracing, making it difficult to pinpoint bottlenecks between the Go, NATS, and Scala components. Implementing the OpenTelemetry standard across all services would be the next critical iteration to ensure true production readiness.