The signal-to-noise ratio in our CI pipeline was collapsing. Our primary product, a hybrid Android application, relies heavily on a complex WebView for its core functionality. While this allows for rapid iteration, it creates a testing nightmare. We use Cypress for our end-to-end tests, running against the WebView instance on emulators. Every merge request triggers hundreds of tests, and with intermittent network issues, backend fluctuations, and genuine front-end bugs, the failure logs became an unmanageable firehose of text. Developers were spending more time sifting through duplicative, noisy failure logs than fixing the actual problems. A single backend outage could generate fifty unique-looking but causally identical Cypress failures, burying the one critical regression we actually needed to find.
Our initial concept was to move beyond simple text-based log aggregation. We needed a system that could ingest the raw failure artifacts from Cypress and apply intelligent, domain-aware processing. The goal was to transform unstructured error messages and stack traces into structured, searchable data. This would allow a developer to ask specific questions like, “Show me all test failures on the payment screen related to API timeouts in the last 24 hours,” instead of manually searching for keywords in Jenkins logs. This required a dedicated ingestion service that would sit between our CI runner and our data store.
The technology selection process was driven by pragmatism and the need for rapid implementation. The bridge between Cypress and the Android WebView was already established using Appium as a WebDriver proxy, allowing Cypress to attach to the Chrome DevTools protocol of the WebView. The real challenge was the backend diagnostic pipeline.
- Ingestion API: Python with FastAPI. This was a straightforward choice. We needed a lightweight, high-performance web framework. FastAPI’s use of Pydantic for data validation and Starlette for its ASGI foundation makes it incredibly fast and reliable for building robust API endpoints. The Python ecosystem is also second to none for NLP tasks.
- NLP Engine: spaCy. We considered simple regex matching, but it’s brittle and unmaintainable. We needed something more robust. While larger models like those from Hugging Face are powerful, they represented overkill for this task. spaCy offers a production-ready, performant library with excellent support for rule-based matching (
Matcher
) and Named Entity Recognition (NER). Its speed was critical; this processing step could not become a bottleneck in the CI feedback loop. - Search Index: Meilisearch. The final piece was making the processed data searchable. Elasticsearch is the default choice in many organizations, but its operational complexity and resource footprint were deterrents for a focused internal tool. Meilisearch provided exactly what we needed: incredible out-of-the-box search performance, typo tolerance, and a dead-simple REST API. For our use case—fast queries by developers under pressure—Meilisearch’s focus on search speed over analytical complexity was the perfect fit.
The entire system can be visualized with the following data flow:
sequenceDiagram participant CI Runner as CI Runner (Android Emulator) participant Cypress as Cypress Test participant API as Diagnostic API (Python/FastAPI) participant spaCy as spaCy Processor participant Meili as Meilisearch CI Runner->>+Cypress: Executes test suite Cypress-->>-CI Runner: Test Fails Note over Cypress: `after:run` hook captures failure artifact Cypress->>+API: POST /v1/report_failure (JSON Payload) API->>+spaCy: process_failure_data(payload) spaCy-->>-API: Returns structured entities (e.g., component, error_type) API->>+Meili: add_documents([processed_payload]) Meili-->>-API: Indexing task enqueued API-->>-Cypress: HTTP 202 Accepted
The core of the implementation work was divided into three parts: configuring Cypress to dispatch failure data, building the FastAPI ingestion and processing service, and correctly configuring the Meilisearch index for optimal querying.
Part 1: Cypress Failure Data Egress
A common mistake is to try to perform complex logic within the Cypress runtime itself. The node.js environment within Cypress is for test orchestration, not heavy data processing. The correct approach is to configure Cypress to do the minimum work necessary: capture the failure state and dispatch it to an external service.
We achieved this by using the after:run
event in our cypress.config.js
and a custom error handling command.
cypress.config.js
const { defineConfig } = require('cypress');
const axios = require('axios');
module.exports = defineConfig({
e2e: {
setupNodeEvents(on, config) {
on('after:run', async (results) => {
if (results.totalFailed > 0) {
console.log(`[DIAGNOSTICS] Found ${results.totalFailed} failed tests. Reporting...`);
const diagnosticApiEndpoint = config.env.DIAGNOSTIC_API_ENDPOINT;
if (!diagnosticApiEndpoint) {
console.error('[DIAGNOSTICS] Error: DIAGNOSTIC_API_ENDPOINT is not set. Skipping report.');
return;
}
const failurePromises = results.runs
.filter(run => run.stats.failures > 0)
.flatMap(run =>
run.tests
.filter(test => test.state === 'failed')
.map(test => {
// The title array gives the describe/it hierarchy
const testTitle = test.title.join(' > ');
const errorMessage = test.displayError ? test.displayError : 'No error message captured.';
const payload = {
test_id: `${run.spec.relative}-${test.title.join('-')}`,
test_title: testTitle,
duration_ms: test.duration,
error_message: errorMessage,
// In a real project, include device name, OS version, etc. from CI environment
context: {
spec_file: run.spec.relative,
platform: 'android',
run_url: process.env.CI_JOB_URL || 'local'
}
};
console.log(`[DIAGNOSTICS] Sending report for: ${testTitle}`);
return axios.post(diagnosticApiEndpoint, payload, {
timeout: 5000 // A pragmatic timeout
}).catch(err => {
// It's critical to handle failures in reporting itself, to not fail the CI job.
console.error(`[DIAGNOSTICS] Failed to report test failure for "${testTitle}": ${err.message}`);
});
})
);
await Promise.all(failurePromises);
console.log('[DIAGNOSTICS] All failure reports sent.');
}
});
},
// Other e2e config...
},
env: {
// This should be set by the CI environment variables
DIAGNOSTIC_API_ENDPOINT: 'http://localhost:8000/v1/report_failure'
}
});
This configuration ensures that after every CI run, we iterate through only the failed tests, construct a clean JSON payload, and POST it to our backend. The payload is deliberately kept simple and serializable. A crucial detail is the non-fatal error handling; the failure of the diagnostic pipeline should never block the primary CI pipeline.
Part 2: The FastAPI Ingestion and spaCy Processing Service
This is the heart of the system. The Python service is responsible for receiving the data, validating it, processing it with spaCy, and then handing it off to Meilisearch.
Here is the complete, runnable code for the service.
main.py
import os
import logging
import datetime
import uuid
from functools import lru_cache
import spacy
from spacy.matcher import Matcher
import meilisearch
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, Field
# --- Basic Configuration ---
# In a production setup, these would come from environment variables or a config service.
MEILI_URL = os.getenv("MEILI_URL", "http://localhost:7700")
MEILI_MASTER_KEY = os.getenv("MEILI_MASTER_KEY", "aSampleMasterKey")
MEILI_INDEX_NAME = "test-failures"
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
# --- Logging Setup ---
# Proper logging is non-negotiable for a service like this.
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# --- Pydantic Models for Data Validation ---
# This ensures data integrity from the moment it enters our system.
class FailureContext(BaseModel):
spec_file: str
platform: str
run_url: str
class FailureReportPayload(BaseModel):
test_id: str
test_title: str
duration_ms: int
error_message: str
context: FailureContext
class ProcessedFailureDocument(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime.datetime = Field(default_factory=datetime.datetime.utcnow)
test_id: str
test_title: str
duration_ms: int
error_message: str
context: FailureContext
entities: dict = Field(default_factory=dict)
# --- spaCy Processing Logic ---
class NlpProcessor:
def __init__(self, model_name="en_core_web_sm"):
"""
Loads the spaCy model and initializes the Matcher.
The pitfall to avoid is loading the model on every request. It must be a singleton.
"""
try:
self.nlp = spacy.load(model_name)
self.matcher = Matcher(self.nlp.vocab)
self._setup_rules()
logger.info(f"spaCy model '{model_name}' loaded successfully.")
except OSError:
logger.error(f"spaCy model '{model_name}' not found. Please run 'python -m spacy download {model_name}'")
raise
def _setup_rules(self):
"""
Define rule-based patterns for extracting specific, known error formats.
This is more reliable than NER for highly structured text snippets.
"""
# Example: "Timed out retrying after 4000ms: Expected to find element: `#login-button`, but never found it."
timeout_pattern = [
{"LOWER": "timed"}, {"LOWER": "out"}, {"IS_PUNCT": True, "OP": "?"},
{"LOWER": "retrying"}, {"LOWER": "after"}, {"IS_DIGIT": True}, {"LOWER": "ms"},
{"IS_PUNCT": True}, {"TEXT": {"REGEX": ".+element: `(.+)`"}}
]
# Example: "cy.request() failed on GET /api/v2/users/profile - 502 Bad Gateway"
api_error_pattern = [
{"LOWER": "cy.request()"}, {"LOWER": "failed"}, {"LOWER": "on"},
{"TEXT": {"IN": ["GET", "POST", "PUT", "DELETE"]}},
{"TEXT": {"REGEX": "^/api/"}, "alias": "endpoint"},
{"IS_PUNCT": True, "OP": "?"}, {"IS_DIGIT": True, "alias": "status_code"}
]
self.matcher.add("TIMEOUT_ERROR", [timeout_pattern])
self.matcher.add("API_ERROR", [api_error_pattern])
def extract_entities(self, text: str) -> dict:
"""
Processes text to extract both rule-based matches and general named entities.
"""
doc = self.nlp(text)
entities = {"rule_based": [], "ner": []}
# 1. Rule-based matching
matches = self.matcher(doc)
for match_id, start, end in matches:
rule_id_str = self.nlp.vocab.strings[match_id]
span = doc[start:end]
entities["rule_based"].append({
"type": rule_id_str,
"text": span.text
})
# 2. General NER
for ent in doc.ents:
# Filter for potentially useful entities to reduce noise
if ent.label_ in ["PERSON", "ORG", "GPE", "DATE", "CARDINAL"]:
entities["ner"].append({
"type": ent.label_,
"text": ent.text
})
return entities
# --- Global Instances ---
# Using @lru_cache as a simple way to create a singleton for the processor.
@lru_cache(maxsize=1)
def get_nlp_processor() -> NlpProcessor:
return NlpProcessor()
def get_meili_client() -> meilisearch.Client:
return meilisearch.Client(MEILI_URL, MEILI_MASTER_KEY)
# --- FastAPI Application ---
app = FastAPI(title="Diagnostic Ingestion Service")
@app.on_event("startup")
def startup_event():
"""On startup, ensure our NLP model is loaded and the Meilisearch index is configured."""
logger.info("Application startup...")
get_nlp_processor() # Pre-load the model
try:
client = get_meili_client()
client.create_index(uid=MEILI_INDEX_NAME, options={'primaryKey': 'id'})
index = client.index(MEILI_INDEX_NAME)
# This configuration is CRITICAL for performance and usability.
# A common mistake is to forget this and wonder why filtering is slow or doesn't work.
current_settings = {
'searchableAttributes': ['test_title', 'error_message'],
'filterableAttributes': ['test_id', 'context.platform', 'context.spec_file', 'timestamp'],
'sortableAttributes': ['timestamp', 'duration_ms'],
'rankingRules': [
'words', 'typo', 'proximity', 'attribute', 'sort', 'exactness',
'timestamp:desc' # Custom rule: newer failures are more relevant
]
}
index.update_settings(current_settings)
logger.info(f"Meilisearch index '{MEILI_INDEX_NAME}' is ready and configured.")
except Exception as e:
logger.critical(f"Failed to connect or configure Meilisearch: {e}")
# In a real system, this might prevent the app from starting.
# For now, we log a critical error.
@app.post("/v1/report_failure", status_code=status.HTTP_202_ACCEPTED)
async def report_failure(payload: FailureReportPayload):
"""
Receives a test failure report, processes it for entities, and indexes it.
"""
logger.info(f"Received failure report for test: {payload.test_id}")
try:
processor = get_nlp_processor()
extracted_entities = processor.extract_entities(payload.error_message)
document = ProcessedFailureDocument(
**payload.dict(),
entities=extracted_entities
)
client = get_meili_client()
index = client.index(MEILI_INDEX_NAME)
# add_documents is an async operation on Meilisearch's side.
task = index.add_documents([document.dict()])
logger.info(f"Enqueued document {document.id} for indexing. Task UID: {task.task_uid}")
return {"message": "Report accepted for processing.", "document_id": document.id}
except Exception as e:
logger.error(f"Error processing failure report for {payload.test_id}: {e}", exc_info=True)
# We throw an HTTP exception to provide feedback to the client if something goes wrong.
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to process and index failure report."
)
@app.get("/health")
def health_check():
return {"status": "ok"}
This service demonstrates several production-grade practices:
- Singleton Pattern: The
spacy
model is heavy. Loading it once on startup using@lru_cache
is essential for request performance. - Configuration Management: Critical settings are pulled from environment variables.
- Input Validation: Pydantic models reject malformed payloads immediately.
- Explicit Index Configuration: We don’t rely on Meilisearch defaults. We explicitly define which attributes are searchable, filterable, and sortable. This has a massive impact on query performance and capability. The custom ranking rule to prioritize newer results is a small but powerful tweak.
- Asynchronous Hand-off: We submit the document to Meilisearch and immediately return a
202 Accepted
. We don’t wait for indexing to complete, which keeps the API response fast.
Part 3: The Final Result and Querying
With the pipeline in place, the developer experience is transformed. Instead of grep
ing through logs, they can now perform powerful, typo-tolerant queries against a dedicated Meilisearch instance.
For example, a developer investigating a spate of timeouts can now search for:"timed out login"
Meilisearch will find documents containing error messages like:"Timed out retrying after 4000ms: Expected to find element: '#login-button', but never found it."
A more advanced query could use filters to narrow the search:
// Example cURL request to Meilisearch API
curl \
-X POST 'http://localhost:7700/indexes/test-failures/search' \
-H 'Content-Type: application/json' \
--data-binary '{
"q": "api gateway",
"filter": "timestamp > 1672531200"
}'
This would find all failures mentioning “api gateway” that occurred after the specified Unix timestamp. The spaCy-extracted entities, while not directly searched in this simple example, can be used for more advanced filtering or faceting in a UI built on top of this API, allowing developers to filter by entities.rule_based.type = "API_ERROR"
.
The real value unlocked is the speed of root cause analysis. When a new failure comes in, a quick search can reveal if it’s a known, pre-existing issue or a novel regression. This system allows us to see patterns in failures that were previously invisible in the noise, such as a single flaky test that fails across multiple Android versions or a specific API endpoint that consistently fails under load.
This solution is not without its limitations. The current spaCy Matcher
rules are specific and must be maintained as new error patterns emerge in our application. The NER model is a general-purpose one; its accuracy on our domain-specific jargon (like internal component names) is limited. A significant future iteration would involve training a custom NER model on our own corpus of failure logs to more accurately identify components, error codes, and other internal identifiers.
Furthermore, this pipeline only analyzes textual data. A more advanced version could integrate image analysis to detect visual regressions from screenshots captured on failure or even analyze video recordings of the test run. The current architecture, however, provides a solid and extensible foundation. It has already shifted our team’s focus from deciphering logs to fixing bugs, a clear measure of its success.