Implementing a Heterogeneous Telemetry Ingestion Pipeline with Go-Fiber, Scala, and OpenSearch on DigitalOcean


The operational telemetry from our Swift mobile application became both a firehose and a black box. The initial solution, logging directly to a monolithic backend that wrote to a relational database, collapsed under peak load. Queries to diagnose user-reported issues were slow, full-text search was non-existent, and the entire system was tightly coupled. Any change to the telemetry format required a coordinated, high-risk deployment of both the mobile app and the backend. We needed a decoupled, high-throughput ingestion and processing pipeline that was both searchable and cost-effective to operate for a lean team. The decision was made to build a new system from the ground up on DigitalOcean, leveraging a polyglot stack where each component was selected for its specific strengths.

Our initial whiteboard sketch outlined a multi-stage, asynchronous architecture designed for resilience and independent scalability.

graph TD
    subgraph Swift Client
        A[iOS App] -->|HTTPS Batch POST| B(Ingestion Service);
    end

    subgraph DigitalOcean VPC
        B[Go-Fiber Ingestion Service] -- Protobuf --> C{NATS Message Broker};
        C -- Protobuf --> D[Scala Enrichment Processor];
        D -- JSON Bulk API --> E(OpenSearch Cluster);
        B -->|Search Query| E;
    end

    style A fill:#F9A,stroke:#333,stroke-width:2px
    style B fill:#9CF,stroke:#333,stroke-width:2px
    style D fill:#C9F,stroke:#333,stroke-width:2px

The core principle was specialization. The ingestion layer had to be incredibly fast and lightweight. The processing layer needed robust concurrency and the ability to execute complex business logic. The storage layer required powerful search and aggregation capabilities. A single language or framework would have involved significant compromises.

Our technology selection process was guided by pragmatism. We chose DigitalOcean Droplets over larger cloud providers primarily for cost predictability and simplicity. This choice, however, meant we’d be taking on more operational responsibility for services like our message broker and search cluster, a trade-off we accepted.

  • Go with Fiber for Ingestion: The API endpoint receiving data from potentially tens of thousands of clients must handle high concurrency with minimal resource consumption. Go’s goroutines are a natural fit. We chose the Fiber framework for its performance and Express.js-like API, which accelerated initial development.
  • NATS as the Buffer: We needed a simple, high-performance message bus to decouple the ingestion and processing stages. Kafka felt like overkill for our initial requirements, carrying significant operational complexity. NATS core provides a fire-and-forget messaging pattern that’s lightweight enough to run on a small Droplet and serves perfectly as a distributed buffer.
  • Scala with Akka HTTP for Enrichment: The real work happens in the processing stage. This service consumes raw events, enriches them with data from other sources, transforms the schema, and prepares them for indexing. Scala’s powerful type system and functional patterns are ideal for complex data manipulation. We leverage Akka HTTP and its streaming capabilities to build a resilient consumer that can handle back-pressure and process messages concurrently.
  • OpenSearch for Storage and Analytics: We required flexible, fast search over semi-structured JSON documents. OpenSearch, as a fork of Elasticsearch, provides exactly that. Self-hosting it on Droplets was a deliberate choice to avoid the high costs of managed services at our scale.
  • Protocol Buffers for the Data Contract: A common mistake in microservice architectures is using JSON for internal communication. It’s verbose and lacks a formal schema. We defined our internal event contract with Protobuf, ensuring type safety, backward compatibility, and significantly lower network overhead between the Go and Scala services.

Phase 1: The Go-Fiber Ingestion Service

The Go service is the front door. Its only jobs are to accept HTTP requests, validate them, serialize the payload into Protobuf, and publish it to NATS as quickly as possible. It performs no blocking I/O other than the network write to NATS.

First, we define our data contract in telemetry.proto. This contract is the source of truth for our event structure and will be used to generate code for both Go and Scala.

// telemetry.proto
syntax = "proto3";

package telemetry;

option go_package = "./telemetry";

import "google/protobuf/timestamp.proto";

message Event {
  string event_id = 1;
  string session_id = 2;
  string user_id = 3;
  string event_name = 4;
  google.protobuf.Timestamp client_timestamp = 5;
  map<string, string> metadata = 6;
  string app_version = 7;
  string device_os = 8;
}

message EventBatch {
  repeated Event events = 1;
}

The Go service uses this definition to handle incoming JSON and translate it for the internal pipeline.

Here is the core of the ingestion server, main.go. It sets up a Fiber app, connects to NATS, and defines the ingestion endpoint. Note the error handling and logging; in a real-world project, you never assume a downstream dependency like NATS will be available.

// main.go
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/gofiber/fiber/v2"
	"github.com/gofiber/fiber/v2/middleware/logger"
	"github.com/gofiber/fiber/v2/middleware/recover"
	"github.com/nats-io/nats.go"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/types/known/timestamppb"

	telemetry "ingestion-service/telemetry"
)

const (
	natsSubject = "telemetry.events"
	idleTimeout = 5 * time.Second
)

// NatsPublisher encapsulates the NATS connection
type NatsPublisher struct {
	conn *nats.Conn
}

// Publish serializes and sends a batch of events to NATS
func (p *NatsPublisher) Publish(ctx context.Context, batch *telemetry.EventBatch) error {
	data, err := proto.Marshal(batch)
	if err != nil {
		// This is a critical error, indicates a problem with our generated code or data model
		return fmt.Errorf("failed to marshal protobuf: %w", err)
	}

	// In a production system, you might use PublishMsg with headers for tracing
	if err := p.conn.Publish(natsSubject, data); err != nil {
		return fmt.Errorf("failed to publish to NATS: %w", err)
	}
	return nil
}

// IngestionHandler handles the HTTP POST request
func IngestionHandler(publisher *NatsPublisher) fiber.Handler {
	return func(c *fiber.Ctx) error {
		// The client sends a JSON array of events, which we map to our Protobuf structure
		var jsonEvents []map[string]interface{}
		if err := c.BodyParser(&jsonEvents); err != nil {
			return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Invalid JSON format"})
		}

		if len(jsonEvents) == 0 {
			return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Empty event batch"})
		}

		// A common pitfall is processing events one-by-one.
		// We translate the entire JSON batch into a single Protobuf batch.
		eventBatch := &telemetry.EventBatch{
			Events: make([]*telemetry.Event, 0, len(jsonEvents)),
		}

		for _, je := range jsonEvents {
			// Naive validation and conversion for demonstration.
			// A production system would have robust validation logic here.
			ts, err := time.Parse(time.RFC3339, je["client_timestamp"].(string))
			if err != nil {
				// We might choose to drop this single event or reject the whole batch.
				// For now, we skip it and log the error.
				log.Printf("WARN: Skipping event with invalid timestamp: %v", je["event_id"])
				continue
			}

			event := &telemetry.Event{
				EventId:        je["event_id"].(string),
				SessionId:      je["session_id"].(string),
				UserId:         je["user_id"].(string),
				EventName:      je["event_name"].(string),
				ClientTimestamp: timestamppb.New(ts),
				AppVersion:     je["app_version"].(string),
				DeviceOs:       je["device_os"].(string),
				Metadata:       convertMetadata(je["metadata"]),
			}
			eventBatch.Events = append(eventBatch.Events, event)
		}

		if len(eventBatch.Events) == 0 {
			return c.Status(fiber.StatusOK).JSON(fiber.Map{"status": "Batch processed, no valid events found"})
		}

		// Publish the entire batch as a single NATS message
		if err := publisher.Publish(c.Context(), eventBatch); err != nil {
			log.Printf("ERROR: Failed to publish batch to NATS: %v", err)
			return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Internal processing error"})
		}

		return c.Status(fiber.StatusAccepted).JSON(fiber.Map{"status": "Batch accepted"})
	}
}

func convertMetadata(data interface{}) map[string]string {
    // Production code needs safer type assertion here.
    if m, ok := data.(map[string]interface{}); ok {
        res := make(map[string]string)
        for k, v := range m {
            res[k] = fmt.Sprintf("%v", v)
        }
        return res
    }
    return nil
}


func main() {
	natsURL := os.Getenv("NATS_URL")
	if natsURL == "" {
		natsURL = nats.DefaultURL
	}

	// Connect to NATS with retry logic
	nc, err := nats.Connect(natsURL, nats.RetryOnFailedConnect(true), nats.MaxReconnects(5), nats.ReconnectWait(time.Second))
	if err != nil {
		log.Fatalf("FATAL: Failed to connect to NATS: %v", err)
	}
	defer nc.Drain() // Drains remaining messages before closing
	log.Println("Connected to NATS server at", nc.ConnectedUrl())

	publisher := &NatsPublisher{conn: nc}

	app := fiber.New(fiber.Config{
		IdleTimeout: idleTimeout,
	})

	// Add production-grade middleware
	app.Use(recover.New())
	app.Use(logger.New(logger.Config{
		Format: "[${ip}]:${port} ${status} - ${method} ${path}\n",
	}))

	// API route
	app.Post("/v1/track", IngestionHandler(publisher))

	// Graceful shutdown
	go func() {
		if err := app.Listen(":3000"); err != nil {
			log.Fatalf("FATAL: Fiber app failed to start: %v", err)
		}
	}()

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	log.Println("Shutting down server...")
	if err := app.Shutdown(); err != nil {
		log.Fatalf("FATAL: Server forced to shutdown: %v", err)
	}
	log.Println("Server exiting")
}

This service is containerized for deployment on a DigitalOcean Droplet.

# Dockerfile
FROM golang:1.21-alpine

WORKDIR /app

# Install protoc and Go protobuf plugins for code generation
RUN apk add --no-cache protobuf-dev
RUN go install google.golang.org/protobuf/cmd/protoc-gen-[email protected]
RUN go install google.golang.org/grpc/cmd/protoc-gen-go-[email protected]

# Copy proto file and generate Go code
COPY telemetry.proto ./telemetry/
RUN protoc --go_out=. ./telemetry/telemetry.proto

COPY go.mod go.sum ./
RUN go mod download

COPY . .

RUN go build -o /ingestion-service .

EXPOSE 3000

CMD [ "/ingestion-service" ]

Phase 2: OpenSearch Setup and Index Mapping

Before the Scala processor can index data, we need a properly configured OpenSearch cluster. On DigitalOcean, this means setting up a Droplet and running OpenSearch and OpenSearch Dashboards via Docker Compose. A critical production step is securing the cluster and configuring memory limits.

# docker-compose.yml
version: '3'
services:
  opensearch-node1:
    image: opensearchproject/opensearch:2.9.0
    container_name: opensearch-node1
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node1
      - discovery.seed_hosts=opensearch-node1
      - cluster.initial_cluster_manager_nodes=opensearch-node1
      - bootstrap.memory_lock=true # along with ulimits settings
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # For development
      - "DISABLE_SECURITY_PLUGIN=true" # For development ONLY
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-data1:/usr/share/opensearch/data
    ports:
      - 9200:9200
      - 9600:9600
    networks:
      - opensearch-net

  opensearch-dashboards:
    image: opensearchproject/opensearch-dashboards:2.9.0
    container_name: opensearch-dashboards
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      OPENSEARCH_HOSTS: '["http://opensearch-node1:9200"]'
      # Disable security for development ONLY
      DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"
    networks:
      - opensearch-net

volumes:
  opensearch-data1:

networks:
  opensearch-net:

More important than the setup is the index template. Defining a mapping before indexing data prevents OpenSearch from guessing data types, which is crucial for performance and correctness. We apply this template using the OpenSearch REST API.

// opensearch_template.json
PUT _index_template/telemetry_template
{
  "index_patterns": ["telemetry-*"],
  "template": {
    "settings": {
      "number_of_shards": 2,
      "number_of_replicas": 1
    },
    "mappings": {
      "properties": {
        "event_id": { "type": "keyword" },
        "session_id": { "type": "keyword" },
        "user_id": { "type": "keyword" },
        "event_name": { "type": "keyword" },
        "ingestion_timestamp": { "type": "date" },
        "client_timestamp": { "type": "date" },
        "app_version": { "type": "keyword" },
        "device_os": { "type": "keyword" },
        "geo_country": { "type": "keyword" },
        "metadata": { "type": "object", "dynamic": true },
        "event_message": { "type": "text" }
      }
    }
  }
}

This template ensures IDs are non-analyzed keyword types for exact matching and aggregations, while providing a text field for full-text search. The index pattern telemetry-* allows us to create new time-based indices (e.g., telemetry-2023-10) without reapplying the mapping.

Phase 3: The Scala Enrichment Processor

This is the most complex component. It consumes Protobuf messages from NATS, enriches them (e.g., with a GeoIP lookup based on the client IP captured by the Go service, though omitted here for simplicity), and performs a bulk insert into OpenSearch.

We use Akka HTTP’s client for interacting with OpenSearch and Akka Streams for managing the flow of data from NATS. This provides resilience through stream supervision strategies and back-pressure.

First, the build.sbt file to pull in the necessary dependencies.

// build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.13.10"

lazy val root = (project in file("."))
  .settings(
    name := "enrichment-processor",
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-actor-typed" % "2.8.0",
      "com.typesafe.akka" %% "akka-stream" % "2.8.0",
      "com.typesafe.akka" %% "akka-http" % "10.5.0",
      "com.typesafe.akka" %% "akka-http-spray-json" % "10.5.0",
      "io.nats" % "jnats" % "2.16.8",
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
      // Add other necessary logging libraries
      "ch.qos.logback" % "logback-classic" % "1.3.5"
    )
  )

// ScalaPB plugin for Protobuf code generation
enablePlugins(ScalaPbPlugin)

The core logic resides in a single application object. It sets up the NATS subscription and an Akka Streams pipeline to process messages.

// EnrichmentProcessor.scala
package com.example

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import akka.util.ByteString
import com.example.telemetry.telemetry.EventBatch // Generated by ScalaPB
import io.nats.client.{Connection, Dispatcher, Nats, Message}
import spray.json._
import DefaultJsonProtocol._

import java.nio.charset.StandardCharsets
import java.time.format.DateTimeFormatter
import java.time.{Instant, ZoneOffset}
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}

object EnrichmentProcessor extends App {
  implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "EnrichmentProcessor")
  implicit val executionContext = system.executionContext

  val natsUrl = sys.env.getOrElse("NATS_URL", "nats://localhost:4222")
  val opensearchUrl = sys.env.getOrElse("OPENSEARCH_URL", "http://localhost:9200")
  val natsSubject = "telemetry.events"

  val http = Http()

  // The decider resumes the stream on processing failure, logging the error.
  // In a real-world project, this might push to a dead-letter queue.
  val decider: Supervision.Decider = {
    case e: Exception =>
      system.log.error("Unhandled exception in stream", e)
      Supervision.Resume
  }

  // Akka Streams source that wraps a NATS subscription
  def natsSource(nc: Connection, subject: String): Source[Message, _] = {
    Source.queue[Message](bufferSize = 1024)
      .mapMaterializedValue { queue =>
        val d: Dispatcher = nc.createDispatcher((msg: Message) => {
          queue.offer(msg)
        })
        d.subscribe(subject)
        d // So we can unsubscribe later if needed
      }
  }

  // Flow to parse Protobuf and transform to JSON for OpenSearch
  val processingFlow = Flow[Message]
    .map(msg => Try(EventBatch.parseFrom(msg.getData)))
    .collect { case Success(batch) => batch }
    .mapConcat(_.events.toList)
    .map { event =>
      // This is where enrichment would happen (e.g., GeoIP lookup).
      val ingestionTime = Instant.now()
      val clientTime = Instant.ofEpochSecond(event.clientTimestamp.get.seconds, event.clientTimestamp.get.nanos.toLong)
      
      val metadataJson = event.metadata.toJson

      // Construct the JSON document for OpenSearch
      JsObject(
        "event_id" -> JsString(event.eventId),
        "session_id" -> JsString(event.sessionId),
        "user_id" -> JsString(event.userId),
        "event_name" -> JsString(event.eventName),
        "ingestion_timestamp" -> JsString(DateTimeFormatter.ISO_INSTANT.format(ingestionTime)),
        "client_timestamp" -> JsString(DateTimeFormatter.ISO_INSTANT.format(clientTime)),
        "app_version" -> JsString(event.appVersion),
        "device_os" -> JsString(event.deviceOs),
        "metadata" -> metadataJson
      )
    }

  // Sink that batches events and sends them to OpenSearch Bulk API
  val opensearchSink = Flow[JsObject]
    .groupedWithin(500, 5.seconds) // Batch up to 500 events or every 5 seconds
    .mapAsync(4) { batch => // Process up to 4 batches in parallel
      if (batch.isEmpty) {
        Future.successful(0)
      } else {
        val indexName = s"telemetry-${DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC).format(Instant.now())}"
        val bulkPayload = batch.map { doc =>
          s"""{ "index" : { "_index" : "$indexName" } }\n${doc.compactPrint}"""
        }.mkString("", "\n", "\n")

        val request = HttpRequest(
          method = HttpMethods.POST,
          uri = s"$opensearchUrl/_bulk",
          headers = RawHeader("Content-Type", "application/x-ndjson") :: Nil,
          entity = HttpEntity(ContentTypes.`application/json`, ByteString(bulkPayload, StandardCharsets.UTF_8))
        )
        
        http.singleRequest(request).flatMap {
          case HttpResponse(StatusCodes.OK, _, entity, _) =>
            entity.discardBytes()
            system.log.info(s"Successfully indexed ${batch.size} documents.")
            Future.successful(batch.size)
          case HttpResponse(status, _, entity, _) =>
            entity.toStrict(2.seconds).map { e =>
                system.log.error(s"Failed to index batch. Status: $status. Response: ${e.data.utf8String}")
            }
            Future.successful(0)
        }
      }
    }
    .to(Sink.ignore) // We consume the result and log, but don't pass it on

  // Main application logic: connect to NATS and run the stream
  val nc = Nats.connect(natsUrl)
  system.log.info(s"Connected to NATS at ${nc.getConnectedUrl}")

  natsSource(nc, natsSubject)
    .via(processingFlow)
    .to(opensearchSink)
    .withAttributes(ActorAttributes.supervisionStrategy(decider))
    .run()

  sys.addShutdownHook {
    nc.close()
    system.terminate()
  }
}

This processor is the heart of the pipeline. The batching logic (groupedWithin) is critical for achieving high throughput with OpenSearch, as individual insert requests are highly inefficient. Parallelism (mapAsync) allows us to saturate the network connection to the OpenSearch cluster without overwhelming it.

Phase 4: The Swift Client Snippet

The final piece is the data producer. On the Swift client, the key is to not send an HTTP request for every single event. This would drain battery and flood the network. Instead, events are batched locally and sent periodically. A production-grade SDK would also handle offline storage and retries.

// TelemetryService.swift (Conceptual)
import Foundation

struct TelemetryEvent: Codable {
    let event_id: String
    let session_id: String
    let user_id: String
    let event_name: String
    let client_timestamp: String // ISO 8601 format
    let app_version: String
    let device_os: String
    let metadata: [String: String]
}

class TelemetryService {
    private let ingestionEndpoint = URL(string: "https://your-do-droplet-ip:3000/v1/track")!
    private var eventQueue = [TelemetryEvent]()
    private let queue = DispatchQueue(label: "com.example.telemetryQueue")
    private var timer: Timer?

    init() {
        // Send batches every 30 seconds
        self.timer = Timer.scheduledTimer(withTimeInterval: 30.0, repeats: true) { [weak self] _ in
            self?.flushQueue()
        }
    }

    func track(_ event: TelemetryEvent) {
        queue.async {
            self.eventQueue.append(event)
            // Or if queue reaches a certain size, flush immediately
            if self.eventQueue.count >= 100 {
                self.flushQueue()
            }
        }
    }

    private func flushQueue() {
        queue.async {
            guard !self.eventQueue.isEmpty else { return }

            let batch = self.eventQueue
            self.eventQueue.removeAll()

            self.sendBatch(batch)
        }
    }

    private func sendBatch(_ batch: [TelemetryEvent]) {
        var request = URLRequest(url: ingestionEndpoint)
        request.httpMethod = "POST"
        request.setValue("application/json", forHTTPHeaderField: "Content-Type")

        do {
            let data = try JSONEncoder().encode(batch)
            request.httpBody = data
        } catch {
            print("Telemetry Error: Failed to encode event batch: \(error)")
            // A real implementation would re-queue the failed batch
            return
        }

        let task = URLSession.shared.dataTask(with: request) { data, response, error in
            if let error = error {
                print("Telemetry Error: Network request failed: \(error)")
                // Re-queue logic here
                return
            }

            guard let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 202 else {
                print("Telemetry Error: Ingestion failed with status code \((response as? HTTPURLResponse)?.statusCode ?? 0)")
                // Re-queue logic here
                return
            }
            
            print("Telemetry: Successfully submitted a batch of \(batch.count) events.")
        }
        task.resume()
    }
}

With all components in place, a query against the OpenSearch cluster reveals the fully processed data, ready for analysis in OpenSearch Dashboards.

// Sample OpenSearch Query
GET /telemetry-*/_search
{
  "query": {
    "bool": {
      "must": [
        { "term": { "user_id": { "value": "user-123" } } },
        { "term": { "event_name": { "value": "item_purchased" } } }
      ],
      "filter": [
        { "range": { "client_timestamp": { "gte": "now-1d/d" } } }
      ]
    }
  }
}

The system successfully decoupled the components, but it is by no means complete. Self-hosting OpenSearch and NATS on DigitalOcean Droplets is cost-effective initially but places a significant operational burden for backups, scaling, and security patching. As volume grows, the cost of engineering time to manage this infrastructure might outweigh the savings over a managed service. The current NATS configuration is not persistent; a service restart could lead to data loss. Migrating to NATS JetStream is a necessary next step for durability. Finally, the entire pipeline lacks end-to-end distributed tracing, making it difficult to pinpoint bottlenecks between the Go, NATS, and Scala components. Implementing the OpenTelemetry standard across all services would be the next critical iteration to ensure true production readiness.


  TOC