The fundamental flaw in our mobile release cycle was latency—not network latency, but human latency. We’d ship a new build to a small percentage of users and then wait. We’d watch dashboards for crash rates and KPIs, but the richest data—user reviews on the App Store and detailed crash reports—was a swamp of unstructured text. Manually sifting through this feedback to decide whether to continue a rollout took hours, sometimes days. By the time we spotted a pattern of complaints about a broken login flow, the damage was often already done. The process felt reactive and fundamentally broken.
Our initial concept was to shorten this feedback loop from hours to minutes and embed it directly into our CI/CD pipeline. The goal: create an automated quality gate that makes a data-driven “go/no-go” decision on promoting a canary release. This system needed to ingest raw user feedback, apply Natural Language Processing to structure it, store it in a high-performance analytics database, and expose a simple health check API for our GitLab CI runner to query.
Technology selection was driven by this need for speed and scale. For data storage, PostgreSQL was briefly considered but quickly dismissed. We anticipated millions of feedback events over time, and the core requirement was not transactional integrity but rapid analytical aggregation. We needed to ask questions like, “What is the rate of new bug-related feedback for version 3.1.5
compared to 3.1.4
in the first four hours post-launch?” This is classic OLAP territory. ClickHouse was the obvious choice for its columnar storage and blistering performance on GROUP BY
queries over time-series data. Elasticsearch is another option, but its strength in full-text search was secondary to our need for raw analytical speed on structured fields.
For the NLP component, we evaluated third-party sentiment analysis APIs. They were easy to integrate but lacked the domain-specific nuance required. A user review saying “the new update is sick” could be positive or negative depending on context. A comment like “can’t find the checkout button since the redesign” is not just negative sentiment; it’s a critical usability bug report. This led us to a fine-tuned, multi-label classification model based on a Hugging Face transformer. It would classify each piece of feedback into categories like Bug
, Crash
, Feature Request
, Negative UX
, Praise
, etc. This was more upfront work but provided the signal clarity we needed.
The entire system was designed as a data pipeline feeding a CI/CD-aware service.
sequenceDiagram participant App Stores as App/Play Store participant Scheduler as Cron Job participant IngestionSvc as Feedback Ingestion Service (Python/FastAPI) participant NLPModel as NLP Inference Module participant ClickHouse as ClickHouse participant GitLabCI as GitLab CI Pipeline Scheduler->>IngestionSvc: Trigger periodic fetch IngestionSvc->>App Stores: API Call: Fetch new reviews/crashes App Stores-->>IngestionSvc: Raw text feedback IngestionSvc->>NLPModel: Process batch of feedback text NLPModel-->>IngestionSvc: Structured Data (labels, sentiment) IngestionSvc->>ClickHouse: Batch INSERT processed data Note over GitLabCI: A new build is deployed to canary (e.g., 1% of users) GitLabCI->>GitLabCI: Job: deploy_canary GitLabCI->>GitLabCI: Job: wait_for_feedback (sleep 2h) GitLabCI->>IngestionSvc: API Call: /release-health?version=3.1.5 IngestionSvc->>ClickHouse: SELECT ... WHERE app_version='3.1.5' ClickHouse-->>IngestionSvc: Aggregated health metrics IngestionSvc-->>GitLabCI: JSON Response { "status": "FAIL", "reason": "High bug report rate" } alt Health Check PASSED GitLabCI->>GitLabCI: Job: promote_to_production else Health Check FAILED GitLabCI->>GitLabCI: Job: rollback_canary & alert_team end
Part 1: The Data Ingestion Service
The core of the system is a Python service built with FastAPI. It’s responsible for fetching, processing, and ingesting the data. A critical mistake in early designs is to perform these tasks sequentially for each feedback item. In a real-world project, this is incredibly inefficient. The correct approach is to batch operations at every stage: fetch in batches, process with the NLP model in batches, and insert into ClickHouse in batches.
Here is a stripped-down but functional representation of the main ingestion logic. It uses a placeholder for the app store clients and focuses on the data flow and batching.
# feedback_ingestor/main.py
import os
import logging
import uuid
from datetime import datetime, timezone
from typing import List, Dict, Any
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, Field
from .app_store_client import MockAppStoreClient # Placeholder for actual clients
from .nlp_processor import NLPProcessor
from .db_writer import ClickHouseWriter
# --- Configuration ---
# In a real app, use something like Pydantic's SettingsManagement
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost")
CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", 9000))
MODEL_NAME = "distilbert-base-uncased-finetuned-sst-2-english" # Example model
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Service Initialization ---
app = FastAPI(title="Mobile Feedback Ingestion Service")
nlp_processor = NLPProcessor(model_name=MODEL_NAME)
db_writer = ClickHouseWriter(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT)
# --- Pydantic Models for Data Structure ---
class FeedbackItem(BaseModel):
source: str # e.g., 'app_store', 'google_play', 'crashlytics'
raw_text: str
app_version: str
rating: int | None = None
device_info: str | None = None
event_timestamp: datetime
class ProcessedFeedback(BaseModel):
event_id: uuid.UUID = Field(default_factory=uuid.uuid4)
source: str
app_version: str
event_timestamp: datetime
event_date: str # For ClickHouse partitioning
raw_text: str
rating: int
device_info: str
# NLP Outputs
sentiment_label: str
sentiment_score: float
categories: List[str] # e.g., ['Bug', 'UI/UX']
issue_signature: str # A hash of normalized text to group similar issues
# --- Core Processing Logic ---
def process_and_ingest_feedback(items: List[FeedbackItem]):
"""
The main background task. It takes raw feedback, processes it with NLP,
and writes to ClickHouse.
"""
if not items:
logger.info("No new feedback items to process.")
return
logger.info(f"Processing a batch of {len(items)} feedback items.")
try:
# Batch process with the NLP model for efficiency
raw_texts = [item.raw_text for item in items]
nlp_results = nlp_processor.process_batch(raw_texts)
processed_records: List[Dict] = []
for i, item in enumerate(items):
result = nlp_results[i]
# Create a stable signature for grouping similar raw text messages.
# A common mistake is to group by raw_text, which fails for trivial differences.
# Normalizing and hashing is more robust.
issue_sig = nlp_processor.generate_issue_signature(item.raw_text)
record = ProcessedFeedback(
source=item.source,
app_version=item.app_version,
event_timestamp=item.event_timestamp,
event_date=item.event_timestamp.strftime('%Y-%m-%d'),
raw_text=item.raw_text,
rating=item.rating or 0,
device_info=item.device_info or "",
sentiment_label=result['sentiment_label'],
sentiment_score=result['sentiment_score'],
categories=result['categories'],
issue_signature=issue_sig
)
# Convert to dict for the database driver
processed_records.append(record.dict())
logger.info(f"Writing {len(processed_records)} processed records to ClickHouse.")
db_writer.write_records(processed_records)
except Exception as e:
# In a production system, you'd add this to a dead-letter queue for retry.
logger.error(f"Failed to process or ingest feedback batch: {e}", exc_info=True)
# --- API Endpoints ---
@app.post("/trigger-ingestion")
async def trigger_ingestion(background_tasks: BackgroundTasks):
"""
This endpoint simulates being called by a cron job or scheduler.
It fetches data and schedules the processing in the background.
"""
logger.info("Ingestion trigger received.")
# In a real system, you'd handle rate limiting and deduplication here.
client = MockAppStoreClient()
try:
new_feedback = client.fetch_latest_feedback()
except Exception as e:
logger.error(f"Failed to fetch from app stores: {e}")
raise HTTPException(status_code=500, detail="Could not fetch feedback.")
# Don't block the API response. Process in the background.
background_tasks.add_task(process_and_ingest_feedback, new_feedback)
return {"status": "success", "message": f"Scheduled {len(new_feedback)} items for processing."}
@app.get("/release-health")
async def get_release_health(app_version: str):
"""
The endpoint called by the CI/CD pipeline to gate a release.
"""
if not app_version:
raise HTTPException(status_code=400, detail="app_version parameter is required.")
try:
metrics = db_writer.get_health_metrics(app_version)
# Define business logic for go/no-go.
# This is overly simplistic; a real system might compare against a baseline
# from the previous version or use anomaly detection.
thresholds = {
'bug_report_rate': 0.05, # 5% of feedback is bug-related
'negative_sentiment_rate': 0.20, # 20% is negative
'critical_issue_count': 5
}
failures = []
if metrics['bug_report_rate'] > thresholds['bug_report_rate']:
failures.append(f"Bug report rate {metrics['bug_report_rate']:.2f} exceeds threshold {thresholds['bug_report_rate']:.2f}")
if metrics['negative_sentiment_rate'] > thresholds['negative_sentiment_rate']:
failures.append(f"Negative sentiment rate {metrics['negative_sentiment_rate']:.2f} exceeds threshold {thresholds['negative_sentiment_rate']:.2f}")
if metrics['critical_issue_count'] > thresholds['critical_issue_count']:
failures.append(f"Critical issue count {metrics['critical_issue_count']} exceeds threshold {thresholds['critical_issue_count']}")
if failures:
return {
"version": app_version,
"status": "FAIL",
"reasons": failures,
"metrics": metrics
}
else:
return {
"version": app_version,
"status": "PASS",
"metrics": metrics
}
except Exception as e:
logger.error(f"Error checking release health for version {app_version}: {e}", exc_info=True)
# Fail open or closed? In CI/CD, failing closed is safer. If the health check fails, stop the release.
return {
"version": app_version,
"status": "FAIL",
"reasons": [f"Health check system error: {e}"],
"metrics": {}
}
Part 2: ClickHouse Schema and Querying
The choice of schema in ClickHouse is paramount for performance. A common pitfall is to treat it like a relational database. We need to design for our query patterns. Our primary queries will filter by app_version
and aggregate over a time window. This directly informs our choice of partitioning, sorting key, and data types.
Here’s the table definition. The comments explain the rationale behind each choice.
-- DDL for the mobile_feedback table in ClickHouse
CREATE TABLE default.mobile_feedback
(
`event_id` UUID,
`source` LowCardinality(String), -- Source is one of a few values, ideal for LowCardinality
`app_version` String,
`event_timestamp` DateTime64(3, 'UTC'), -- Millisecond precision, UTC timezone
`event_date` Date, -- Partitioning key
`raw_text` String,
`rating` UInt8,
`device_info` String,
-- NLP outputs
`sentiment_label` LowCardinality(String), -- e.g., 'POSITIVE', 'NEGATIVE', 'NEUTRAL'
`sentiment_score` Float32,
`categories` Array(LowCardinality(String)), -- e.g., ['Bug', 'UI/UX']
`issue_signature` String -- Not LowCardinality, as it can have many unique values
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date) -- Partition by month. Daily might be too granular unless volume is immense.
ORDER BY (app_version, source, event_timestamp) -- Sorting key. Queries filtering by version will be extremely fast.
SETTINGS index_granularity = 8192;
The corresponding Python code for writing to and reading from this table is encapsulated in the ClickHouseWriter
class. A key detail is using parameterized queries to prevent SQL injection, even in an internal service.
# feedback_ingestor/db_writer.py
import logging
from typing import List, Dict, Any
from clickhouse_driver import Client
logger = logging.getLogger(__name__)
class ClickHouseWriter:
def __init__(self, host: str, port: int, database: str = 'default'):
self.host = host
self.port = port
self.database = database
# A common mistake is to create a new client for every operation.
# The client should be long-lived.
try:
self.client = Client(host=self.host, port=self.port, database=self.database)
self.client.execute('SELECT 1')
logger.info("Successfully connected to ClickHouse.")
except Exception as e:
logger.critical(f"Could not connect to ClickHouse at {host}:{port}. Error: {e}")
raise
def write_records(self, records: List[Dict]):
"""Inserts a list of dictionary records into the mobile_feedback table."""
if not records:
return
try:
# The driver handles converting dicts to the correct insertion format
self.client.execute('INSERT INTO mobile_feedback VALUES', records, types_check=True)
logger.info(f"Successfully inserted {len(records)} records.")
except Exception as e:
logger.error(f"Error inserting data into ClickHouse: {e}")
# In a production system, implement a retry mechanism with exponential backoff.
raise
def get_health_metrics(self, app_version: str) -> Dict[str, Any]:
"""
Executes the core analytical query to determine the health of a release.
This is the query that powers the /release-health endpoint.
"""
# This query is the heart of the system. It calculates key rates and counts.
# Using has(categories, 'Bug') is efficient for searching within arrays.
# The `if` and `countIf` functions are powerful ClickHouse features.
query = """
SELECT
count() AS total_feedback,
countIf(has(categories, 'Bug')) AS bug_reports,
countIf(sentiment_label = 'NEGATIVE') AS negative_feedback,
uniqExactIf(issue_signature, has(categories, 'Critical')) AS critical_issue_count,
if(total_feedback > 0, bug_reports / total_feedback, 0) AS bug_report_rate,
if(total_feedback > 0, negative_feedback / total_feedback, 0) AS negative_sentiment_rate
FROM mobile_feedback
WHERE app_version = %(app_version)s
AND event_timestamp >= now() - INTERVAL 4 HOUR -- Look at a recent window
"""
params = {'app_version': app_version}
try:
result = self.client.execute(query, params, with_column_types=True)
if not result or not result[0]:
logger.warning(f"No feedback data found for version {app_version} in the last 4 hours.")
return {
'total_feedback': 0, 'bug_report_rate': 0.0,
'negative_sentiment_rate': 0.0, 'critical_issue_count': 0
}
# The result is a list of tuples, and column names are in the second element
col_names = [col[0] for col in result[1]]
metrics = dict(zip(col_names, result[0][0]))
return metrics
except Exception as e:
logger.error(f"Error querying health metrics for {app_version}: {e}")
raise
Part 3: The NLP Module
The NLP part doesn’t have to be overly complex to be effective. We use the transformers
library from Hugging Face. The pipeline
API makes it straightforward to load a pre-trained model and use it for inference. A significant performance gain comes from processing texts in batches rather than one by one.
# feedback_ingestor/nlp_processor.py
import logging
import re
import hashlib
from transformers import pipeline, Pipeline
from typing import List, Dict
logger = logging.getLogger(__name__)
class NLPProcessor:
def __init__(self, model_name: str):
try:
# We initialize two pipelines. It's often better to use separate, specialized
# models for classification and sentiment rather than one giant model.
logger.info(f"Loading sentiment analysis model: {model_name}")
self.sentiment_pipeline: Pipeline = pipeline("sentiment-analysis", model=model_name)
# Placeholder for a multi-label classification model.
# In a real project, this would be a custom-trained model for categories
# like 'Bug', 'Crash', 'Login', 'Payment', 'UI/UX', 'Feature Request', 'Critical'.
logger.info("Loading zero-shot classification model as a placeholder for categories.")
self.category_pipeline: Pipeline = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
self.candidate_labels = ['bug', 'crash', 'login issue', 'payment problem', 'user interface', 'feature request', 'critical']
except Exception as e:
logger.critical(f"Failed to load NLP models: {e}")
raise
def process_batch(self, texts: List[str]) -> List[Dict]:
"""Processes a batch of texts for sentiment and categorization."""
if not texts:
return []
try:
sentiments = self.sentiment_pipeline(texts, truncation=True, padding=True)
# The zero-shot pipeline is slower, so batching is essential.
categories = self.category_pipeline(texts, self.candidate_labels, multi_label=True)
except Exception as e:
logger.error(f"Error during NLP batch processing: {e}")
# Return a default error structure for the entire batch
return [self._get_error_result() for _ in texts]
processed_results = []
for i in range(len(texts)):
# Combine results, applying a threshold for category inclusion
category_labels = [label for label, score in zip(categories[i]['labels'], categories[i]['scores']) if score > 0.60]
processed_results.append({
'sentiment_label': sentiments[i]['label'],
'sentiment_score': sentiments[i]['score'],
'categories': category_labels if category_labels else ['uncategorized']
})
return processed_results
def generate_issue_signature(self, text: str) -> str:
"""
Creates a consistent hash for similar text messages to aid in grouping.
This is a crucial step for avoiding high cardinality issues in ClickHouse
when trying to count unique problems.
"""
# 1. Normalize: lowercase, remove numbers and non-alphanumeric chars
normalized_text = re.sub(r'[^a-z\s]', '', text.lower())
normalized_text = re.sub(r'\d+', '', normalized_text)
# 2. Tokenize and sort to make it order-independent
tokens = sorted(list(set(normalized_text.split())))
# 3. Join and hash
stable_string = "".join(tokens)
return hashlib.sha256(stable_string.encode('utf-8')).hexdigest()
def _get_error_result(self) -> Dict:
return {
'sentiment_label': 'UNKNOWN',
'sentiment_score': 0.0,
'categories': ['processing_error']
}
Part 4: CI/CD Pipeline Integration
This is where everything comes together. The CI/CD pipeline script is the consumer of our feedback analysis system. Here is an example using GitLab CI syntax. The key is the check_release_health
stage, which acts as a gate. It calls our FastAPI endpoint, parses the JSON response, and fails the pipeline if the status is not PASS
.
# .gitlab-ci.yml
stages:
- build
- deploy_canary
- verify
- promote
variables:
# The version number would typically be generated dynamically
APP_VERSION: "3.1.5"
FEEDBACK_SERVICE_URL: "http://feedback-service.internal:8000"
build_android:
stage: build
script:
- echo "Building Android App version $APP_VERSION..."
# ./gradlew assembleRelease
artifacts:
paths:
- app/build/outputs/apk/release/app-release.apk
deploy_to_canary:
stage: deploy_canary
script:
- echo "Deploying version $APP_VERSION to 1% of users..."
# Use fastlane or another tool to upload the artifact to Google Play's internal test track
- echo "Deployment to canary track successful."
check_release_health:
stage: verify
# This job will start after deploy_to_canary completes
script:
- echo "Waiting for 2 hours to collect initial user feedback..."
- sleep 7200 # A real pipeline might have a more sophisticated polling mechanism
- echo "Checking release health for version $APP_VERSION..."
- |
apk add curl jq # Install dependencies in the runner
RESPONSE=$(curl -s -f "${FEEDBACK_SERVICE_URL}/release-health?app_version=${APP_VERSION}")
if [ $? -ne 0 ]; then
echo "::ERROR:: Health check endpoint failed or is unreachable."
exit 1
fi
STATUS=$(echo $RESPONSE | jq -r '.status')
REASONS=$(echo $RESPONSE | jq -r '.reasons | tostring')
echo "--- Health Check Response ---"
echo $RESPONSE | jq
echo "---------------------------"
if [ "$STATUS" != "PASS" ]; then
echo "::ERROR:: Release health check failed for version $APP_VERSION."
echo "Reason: $REASONS"
# This exit code fails the GitLab CI job
exit 1
else
echo "Release health check passed for version $APP_VERSION."
fi
# If this job fails, the pipeline stops here.
promote_to_production:
stage: promote
script:
- echo "Promoting version $APP_VERSION to 100% of users..."
# Use fastlane to promote the build from the canary track to full production
- echo "Promotion successful."
when: on_success # Only run if the 'check_release_health' job succeeds
The tangible outcome of this system was a dramatic reduction in the “time to detect” for production issues. We caught a critical bug in a new payment flow within three hours of a canary release, triggered by a spike in feedback categorized as payment problem
and bug
. The pipeline automatically halted the rollout, an alert was fired, and we rolled back before more than 1% of users were affected. The post-mortem was no longer about a failure in process, but a celebration of a system that worked as designed.
This architecture isn’t without its limitations. The NLP model’s accuracy is a potential weak point; it requires periodic retraining with new, labeled data to prevent concept drift. The thresholds for the health check are currently static and were determined heuristically. A more advanced implementation would involve establishing a baseline from previous releases and using anomaly detection to flag statistically significant deviations, making the system more adaptive. Furthermore, the feedback loop is still reactive, albeit much faster. The next frontier is to use this data proactively, perhaps correlating code changes from specific commits to negative feedback patterns to identify risky code before it even gets to canary.