The CI system for our Python monorepo, housing over 200 distinct services, was a black box. Build times were creeping up from 15 minutes to over 45 for some services, and test suites were becoming unpredictably flaky. When a senior developer asked, “What was the p99 test duration for the payments service before and after the new validation library was merged?”, the answer was a shrug followed by hours of manually scraping CI logs. We had metrics, but they were ephemeral, unstructured, and effectively useless for trend analysis. This wasn’t just an inconvenience; it was a direct impediment to engineering velocity.
Our first, naive concept was to have each service’s CI job curl
a JSON payload of metrics to a central Flask API. This was quickly dismissed. The approach would have led to a schema-less free-for-all, making any form of reliable aggregation impossible. The maintenance overhead of a bespoke API endpoint, coupled with the inevitable performance bottlenecks and lack of type safety, made it a non-starter for a production-grade engineering system. The solution needed to be centralized, standardized, and integrated directly into the fabric of our monorepo development lifecycle.
The decision was made to build a shared library within the monorepo itself, a single, versioned tool that every service would use to report its CI metrics. This centralized control, ensuring that any improvements or schema changes could be rolled out atomically across all 200+ services.
For the core technologies, the selection process was straightforward:
- Monorepo Structure: We use a standard layout with a
libs/
directory for shared code andservices/
for individual applications. This structure is fundamental to the solution’s design. - Pandas: The data from our CI jobs—JUnit XML files, code coverage reports, build timings from Makefiles—is messy and inconsistent. Pandas provides a powerful, in-memory engine to parse these disparate formats, clean the data, enforce a schema, perform aggregations, and structure it perfectly for time-series ingestion. Trying to do this with standard Python dictionaries and lists would be an order of magnitude more complex and less performant.
- InfluxDB: This problem is fundamentally about time-series data. We need to track metrics like
build_duration_seconds
over time, tagged with dimensions likeservice_name
,git_branch
, andbuild_trigger_event
. InfluxDB is purpose-built for this, offering efficient storage, a rich query language (Flux), and features like retention policies that are critical for managing data lifecycle without manual intervention.
The implementation journey began by defining the monorepo’s structure and the core components of our new library, which we named ci_observer
.
/monorepo
|-- libs/
| |-- ci_observer/
| | |-- __init__.py
| | |-- main.py # CLI entrypoint
| | |-- config.py # Configuration loading (Pydantic)
| | |-- parsers.py # Data source parsers (e.g., JUnit XML)
| | |-- processor.py # Data enrichment and transformation
| | |-- writers.py # InfluxDB writer client
| | |-- schemas.py # Core data models (Pydantic)
|-- services/
| |-- payments/
| |-- inventory/
| |-- ... (200+ more)
|-- .github/
| |-- workflows/
| | |-- reusable_ci_workflow.yml
The first critical step was defining a strict schema for our metrics. Without this, we’d be back in the curl
-to-API mess. We used Pydantic to define our data models, providing runtime validation and a single source of truth for our data’s shape.
# libs/ci_observer/schemas.py
import os
from datetime import datetime
from typing import List, Literal, Optional
from pydantic import BaseModel, Field, validator
class CIEnvironment(BaseModel):
"""
Schema for environment variables injected by the CI system.
Provides context for the metrics.
"""
service_name: str = Field(..., description="Name of the service being built.")
git_branch: str = Field(..., description="The source Git branch.")
git_commit_sha: str = Field(..., description="The commit SHA.")
run_id: str = Field(..., description="Unique ID for the CI run.")
event_name: str = Field(..., description="The event that triggered the workflow (e.g., push, pull_request).")
@classmethod
def from_env(cls):
"""Factory method to load CI context from environment variables."""
# In a real-world scenario, you'd map these from GH Actions, Jenkins, etc.
return cls(
service_name=os.getenv("CI_SERVICE_NAME", "unknown"),
git_branch=os.getenv("GITHUB_REF_NAME", "unknown"),
git_commit_sha=os.getenv("GITHUB_SHA", "unknown"),
run_id=os.getenv("GITHUB_RUN_ID", "unknown"),
event_name=os.getenv("GITHUB_EVENT_NAME", "unknown"),
)
class TestResult(BaseModel):
"""
Schema for a single test case result.
"""
name: str
classname: str
time_seconds: float
status: Literal["passed", "failed", "skipped", "error"]
timestamp: datetime
class BuildMetrics(BaseModel):
"""
Schema for build-related metrics.
"""
duration_seconds: float
artifact_size_bytes: Optional[int] = None
timestamp: datetime
class IngestionPayload(BaseModel):
"""
The final, unified payload to be written to InfluxDB.
"""
environment: CIEnvironment
test_results: List[TestResult] = []
build_metrics: Optional[BuildMetrics] = None
With schemas in place, we built the parsers. Here, Pandas proved its worth immediately. Parsing JUnit XML files, a notoriously verbose format, became a few lines of code.
# libs/ci_observer/parsers.py
import logging
import pandas as pd
from pathlib import Path
from typing import List
from datetime import datetime, timezone
from lxml import etree
from .schemas import TestResult
logger = logging.getLogger(__name__)
class JUnitXMLParser:
"""Parses JUnit XML report files into a structured list of TestResult objects."""
def parse(self, file_path: Path) -> List[TestResult]:
if not file_path.exists():
logger.warning(f"JUnit XML report not found at: {file_path}")
return []
try:
# Pandas can read XML, but for complex structures like JUnit,
# using a dedicated XML parser first is more robust.
tree = etree.parse(str(file_path))
testcases = tree.xpath("//testcase")
records = []
for case in testcases:
status = "passed"
if case.find("failure") is not None:
status = "failed"
elif case.find("error") is not None:
status = "error"
elif case.find("skipped") is not None:
status = "skipped"
records.append({
"name": case.get("name"),
"classname": case.get("classname"),
"time_seconds": float(case.get("time", 0.0)),
"status": status,
# Fallback to now if timestamp isn't in the report
"timestamp": datetime.now(timezone.utc)
})
if not records:
logger.info(f"No test cases found in {file_path}")
return []
# Use Pydantic to validate each record after parsing
validated_results = [TestResult(**rec) for rec in records]
logger.info(f"Successfully parsed {len(validated_results)} test results from {file_path}")
return validated_results
except Exception as e:
logger.error(f"Failed to parse JUnit XML file {file_path}: {e}", exc_info=True)
return []
The core of the system is the InfluxDBWriter
. Our first implementation was naive, writing one data point at a time. In testing, this saturated the network and caused significant backpressure on the CI runners, adding minutes to build times. The pitfall here is underestimating the overhead of individual HTTP requests. The solution was to implement batching.
# libs/ci_observer/writers.py
import logging
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
from tenacity import retry, stop_after_attempt, wait_exponential
from .schemas import IngestionPayload
logger = logging.getLogger(__name__)
class InfluxDBWriter:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.bucket = bucket
self.client = InfluxDBClient(url=url, token=token, org=org)
# Crucial for performance: configure a batching writer
self.write_api = self.client.write_api(
write_options=WriteOptions(
batch_size=5000, # Number of points to send in a single request
flush_interval=10_000, # Flush every 10 seconds
jitter_interval=2_000, # Add random delay to avoid thundering herd
retry_interval=5_000, # Retry interval in ms
max_retries=3
)
)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def write(self, payload: IngestionPayload):
"""
Transforms the Pydantic payload into InfluxDB Points and writes them.
This method is decorated with tenacity for resiliency.
"""
points = self._generate_points(payload)
if not points:
logger.warning("No data points generated from payload. Skipping write.")
return
try:
logger.info(f"Writing {len(points)} points to InfluxDB bucket '{self.bucket}'")
self.write_api.write(bucket=self.bucket, record=points)
except Exception as e:
logger.error(f"Failed to write to InfluxDB after retries: {e}", exc_info=True)
# In a real CI job, we might choose to fail the build here or just log the error.
# Re-raising allows the retry decorator to work.
raise
def _generate_points(self, payload: IngestionPayload) -> list:
"""
Converts the unified data payload into a list of InfluxDB Point objects.
This is where we define our measurements, tags, and fields.
"""
points = []
env = payload.environment
# Common tags for all points
common_tags = {
"service": env.service_name,
"branch": env.git_branch,
"event": env.event_name,
"run_id": env.run_id
}
# A major architectural decision: commit SHA is a field, not a tag.
# Using a high-cardinality value like a commit SHA as a tag can lead
# to series cardinality explosion in InfluxDB, crippling performance.
# Storing it as a field prevents this, at the cost of not being able
# to efficiently GROUP BY commit hash. This is a deliberate trade-off.
if payload.build_metrics:
p = Point("ci_build") \
.tag("service", common_tags["service"]) \
.tag("branch", common_tags["branch"]) \
.tag("event", common_tags["event"]) \
.field("duration_seconds", payload.build_metrics.duration_seconds) \
.field("commit_sha", env.git_commit_sha) \
.time(payload.build_metrics.timestamp)
if payload.build_metrics.artifact_size_bytes is not None:
p.field("artifact_size_bytes", payload.build_metrics.artifact_size_bytes)
points.append(p)
for test in payload.test_results:
points.append(
Point("ci_test_results")
.tag("service", common_tags["service"])
.tag("branch", common_tags["branch"])
.tag("test_name", test.name)
.tag("test_class", test.classname)
.tag("status", test.status)
.field("duration_seconds", test.time_seconds)
.field("commit_sha", env.git_commit_sha)
.time(test.timestamp)
)
return points
def close(self):
"""Ensure all buffered points are written before exiting."""
logger.info("Flushing and closing InfluxDB writer.")
self.write_api.close()
self.client.close()
The entire process is orchestrated by a `main.py` CLI entrypoint and integrated into a reusable GitHub Actions workflow.
```yaml
# .github/workflows/reusable_ci_workflow.yml
name: Reusable Python CI
on:
workflow_call:
inputs:
service_name:
required: true
type: string
python_version:
required: false
type: string
default: '3.10'
secrets:
INFLUXDB_URL:
required: true
INFLUXDB_TOKEN:
required: true
INFLUXDB_ORG:
required: true
INFLUXDB_BUCKET:
required: true
jobs:
build-and-test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
# ... steps to install dependencies, lint, build ...
- name: Run Tests and Generate Report
id: tests
run: |
# This command would run pytest and generate a JUnit XML report
pytest --junitxml=junit-report.xml
- name: Collect and Ingest CI Metrics
# This step runs regardless of test success/failure to capture all outcomes
if: always()
env:
INFLUXDB_URL: ${{ secrets.INFLUXDB_URL }}
INFLUXDB_TOKEN: ${{ secrets.INFLUXDB_TOKEN }}
INFLUXDB_ORG: ${{ secrets.INFLUXDB_ORG }}
INFLUXDB_BUCKET: ${{ secrets.INFLUXDB_BUCKET }}
CI_SERVICE_NAME: ${{ inputs.service_name }}
run: |
python -m libs.ci_observer.main \
--junit-xml-path junit-report.xml \
# Other flags for build duration, artifact size, etc.
The data flow can be visualized as a pipeline within each CI job.
graph TD A[CI Job Starts] --> B{Run Build & Tests}; B --> C[Generate Artifacts e.g., junit-report.xml]; C --> D[Invoke ci_observer CLI]; D --> E{Parse Artifacts}; E --> F[Create Pandas DataFrame]; F --> G{Enrich with CI Environment Info}; G --> H[Validate with Pydantic Schema]; H --> I{Transform to InfluxDB Points}; I --> J[Batch Write to InfluxDB]; J --> K[CI Job Completes];
With this system in place, we could finally answer the hard questions. An engineering lead could run a Flux query directly or use a Grafana dashboard to plot build performance.
// Example Flux query to find the p95 build duration for the 'payments' service
// on the main branch over the last 30 days, aggregated daily.
from(bucket: "ci-metrics")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "ci_build")
|> filter(fn: (r) => r._field == "duration_seconds")
|> filter(fn: (r) => r.service == "payments")
|> filter(fn: (r) => r.branch == "main")
|> aggregateWindow(every: 1d, fn: quantile, createEmpty: false, p: 0.95)
|> yield(name: "p95_build_duration")
The result was transformative. We identified that a newly introduced static analysis tool was adding 10 minutes to every build; we moved it to a separate, non-blocking nightly job. We found a flaky integration test in the inventory
service that failed ~30% of the time, but only on Wednesdays (a mystery eventually traced to a dependency’s weekly data reset). These insights were impossible to glean from raw logs.
This system is not without its limitations. It currently relies on the CI job reaching the final step to report metrics. If the environment setup fails catastrophically early on, no data is captured. A potential future iteration could involve an external collector that polls the CI provider’s API for job statuses, providing a more complete picture of failures. Furthermore, the volume of test-level data is substantial. We will need to implement more aggressive InfluxDB downsampling tasks and shorter retention policies for fine-grained test results versus daily build aggregates to manage storage costs effectively. The schema is also still evolving, and managing those changes, even in a monorepo, requires careful coordination.