Automating Mobile Release Gating by Analyzing User Feedback with NLP and ClickHouse


The fundamental flaw in our mobile release cycle was latency—not network latency, but human latency. We’d ship a new build to a small percentage of users and then wait. We’d watch dashboards for crash rates and KPIs, but the richest data—user reviews on the App Store and detailed crash reports—was a swamp of unstructured text. Manually sifting through this feedback to decide whether to continue a rollout took hours, sometimes days. By the time we spotted a pattern of complaints about a broken login flow, the damage was often already done. The process felt reactive and fundamentally broken.

Our initial concept was to shorten this feedback loop from hours to minutes and embed it directly into our CI/CD pipeline. The goal: create an automated quality gate that makes a data-driven “go/no-go” decision on promoting a canary release. This system needed to ingest raw user feedback, apply Natural Language Processing to structure it, store it in a high-performance analytics database, and expose a simple health check API for our GitLab CI runner to query.

Technology selection was driven by this need for speed and scale. For data storage, PostgreSQL was briefly considered but quickly dismissed. We anticipated millions of feedback events over time, and the core requirement was not transactional integrity but rapid analytical aggregation. We needed to ask questions like, “What is the rate of new bug-related feedback for version 3.1.5 compared to 3.1.4 in the first four hours post-launch?” This is classic OLAP territory. ClickHouse was the obvious choice for its columnar storage and blistering performance on GROUP BY queries over time-series data. Elasticsearch is another option, but its strength in full-text search was secondary to our need for raw analytical speed on structured fields.

For the NLP component, we evaluated third-party sentiment analysis APIs. They were easy to integrate but lacked the domain-specific nuance required. A user review saying “the new update is sick” could be positive or negative depending on context. A comment like “can’t find the checkout button since the redesign” is not just negative sentiment; it’s a critical usability bug report. This led us to a fine-tuned, multi-label classification model based on a Hugging Face transformer. It would classify each piece of feedback into categories like Bug, Crash, Feature Request, Negative UX, Praise, etc. This was more upfront work but provided the signal clarity we needed.

The entire system was designed as a data pipeline feeding a CI/CD-aware service.

sequenceDiagram
    participant App Stores as App/Play Store
    participant Scheduler as Cron Job
    participant IngestionSvc as Feedback Ingestion Service (Python/FastAPI)
    participant NLPModel as NLP Inference Module
    participant ClickHouse as ClickHouse
    participant GitLabCI as GitLab CI Pipeline

    Scheduler->>IngestionSvc: Trigger periodic fetch
    IngestionSvc->>App Stores: API Call: Fetch new reviews/crashes
    App Stores-->>IngestionSvc: Raw text feedback
    IngestionSvc->>NLPModel: Process batch of feedback text
    NLPModel-->>IngestionSvc: Structured Data (labels, sentiment)
    IngestionSvc->>ClickHouse: Batch INSERT processed data

    Note over GitLabCI: A new build is deployed to canary (e.g., 1% of users)

    GitLabCI->>GitLabCI: Job: deploy_canary
    GitLabCI->>GitLabCI: Job: wait_for_feedback (sleep 2h)
    GitLabCI->>IngestionSvc: API Call: /release-health?version=3.1.5
    IngestionSvc->>ClickHouse: SELECT ... WHERE app_version='3.1.5'
    ClickHouse-->>IngestionSvc: Aggregated health metrics
    IngestionSvc-->>GitLabCI: JSON Response { "status": "FAIL", "reason": "High bug report rate" }

    alt Health Check PASSED
        GitLabCI->>GitLabCI: Job: promote_to_production
    else Health Check FAILED
        GitLabCI->>GitLabCI: Job: rollback_canary & alert_team
    end

Part 1: The Data Ingestion Service

The core of the system is a Python service built with FastAPI. It’s responsible for fetching, processing, and ingesting the data. A critical mistake in early designs is to perform these tasks sequentially for each feedback item. In a real-world project, this is incredibly inefficient. The correct approach is to batch operations at every stage: fetch in batches, process with the NLP model in batches, and insert into ClickHouse in batches.

Here is a stripped-down but functional representation of the main ingestion logic. It uses a placeholder for the app store clients and focuses on the data flow and batching.

# feedback_ingestor/main.py
import os
import logging
import uuid
from datetime import datetime, timezone
from typing import List, Dict, Any

from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, Field

from .app_store_client import MockAppStoreClient  # Placeholder for actual clients
from .nlp_processor import NLPProcessor
from .db_writer import ClickHouseWriter

# --- Configuration ---
# In a real app, use something like Pydantic's SettingsManagement
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost")
CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", 9000))
MODEL_NAME = "distilbert-base-uncased-finetuned-sst-2-english" # Example model

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Service Initialization ---
app = FastAPI(title="Mobile Feedback Ingestion Service")
nlp_processor = NLPProcessor(model_name=MODEL_NAME)
db_writer = ClickHouseWriter(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT)

# --- Pydantic Models for Data Structure ---
class FeedbackItem(BaseModel):
    source: str  # e.g., 'app_store', 'google_play', 'crashlytics'
    raw_text: str
    app_version: str
    rating: int | None = None
    device_info: str | None = None
    event_timestamp: datetime

class ProcessedFeedback(BaseModel):
    event_id: uuid.UUID = Field(default_factory=uuid.uuid4)
    source: str
    app_version: str
    event_timestamp: datetime
    event_date: str # For ClickHouse partitioning
    raw_text: str
    rating: int
    device_info: str
    # NLP Outputs
    sentiment_label: str
    sentiment_score: float
    categories: List[str] # e.g., ['Bug', 'UI/UX']
    issue_signature: str # A hash of normalized text to group similar issues

# --- Core Processing Logic ---
def process_and_ingest_feedback(items: List[FeedbackItem]):
    """
    The main background task. It takes raw feedback, processes it with NLP,
    and writes to ClickHouse.
    """
    if not items:
        logger.info("No new feedback items to process.")
        return

    logger.info(f"Processing a batch of {len(items)} feedback items.")

    try:
        # Batch process with the NLP model for efficiency
        raw_texts = [item.raw_text for item in items]
        nlp_results = nlp_processor.process_batch(raw_texts)

        processed_records: List[Dict] = []
        for i, item in enumerate(items):
            result = nlp_results[i]
            
            # Create a stable signature for grouping similar raw text messages.
            # A common mistake is to group by raw_text, which fails for trivial differences.
            # Normalizing and hashing is more robust.
            issue_sig = nlp_processor.generate_issue_signature(item.raw_text)
            
            record = ProcessedFeedback(
                source=item.source,
                app_version=item.app_version,
                event_timestamp=item.event_timestamp,
                event_date=item.event_timestamp.strftime('%Y-%m-%d'),
                raw_text=item.raw_text,
                rating=item.rating or 0,
                device_info=item.device_info or "",
                sentiment_label=result['sentiment_label'],
                sentiment_score=result['sentiment_score'],
                categories=result['categories'],
                issue_signature=issue_sig
            )
            # Convert to dict for the database driver
            processed_records.append(record.dict())

        logger.info(f"Writing {len(processed_records)} processed records to ClickHouse.")
        db_writer.write_records(processed_records)

    except Exception as e:
        # In a production system, you'd add this to a dead-letter queue for retry.
        logger.error(f"Failed to process or ingest feedback batch: {e}", exc_info=True)

# --- API Endpoints ---
@app.post("/trigger-ingestion")
async def trigger_ingestion(background_tasks: BackgroundTasks):
    """
    This endpoint simulates being called by a cron job or scheduler.
    It fetches data and schedules the processing in the background.
    """
    logger.info("Ingestion trigger received.")
    
    # In a real system, you'd handle rate limiting and deduplication here.
    client = MockAppStoreClient()
    try:
        new_feedback = client.fetch_latest_feedback()
    except Exception as e:
        logger.error(f"Failed to fetch from app stores: {e}")
        raise HTTPException(status_code=500, detail="Could not fetch feedback.")
    
    # Don't block the API response. Process in the background.
    background_tasks.add_task(process_and_ingest_feedback, new_feedback)

    return {"status": "success", "message": f"Scheduled {len(new_feedback)} items for processing."}

@app.get("/release-health")
async def get_release_health(app_version: str):
    """
    The endpoint called by the CI/CD pipeline to gate a release.
    """
    if not app_version:
        raise HTTPException(status_code=400, detail="app_version parameter is required.")
        
    try:
        metrics = db_writer.get_health_metrics(app_version)
        
        # Define business logic for go/no-go.
        # This is overly simplistic; a real system might compare against a baseline
        # from the previous version or use anomaly detection.
        thresholds = {
            'bug_report_rate': 0.05, # 5% of feedback is bug-related
            'negative_sentiment_rate': 0.20, # 20% is negative
            'critical_issue_count': 5
        }
        
        failures = []
        if metrics['bug_report_rate'] > thresholds['bug_report_rate']:
            failures.append(f"Bug report rate {metrics['bug_report_rate']:.2f} exceeds threshold {thresholds['bug_report_rate']:.2f}")
        if metrics['negative_sentiment_rate'] > thresholds['negative_sentiment_rate']:
            failures.append(f"Negative sentiment rate {metrics['negative_sentiment_rate']:.2f} exceeds threshold {thresholds['negative_sentiment_rate']:.2f}")
        if metrics['critical_issue_count'] > thresholds['critical_issue_count']:
             failures.append(f"Critical issue count {metrics['critical_issue_count']} exceeds threshold {thresholds['critical_issue_count']}")

        if failures:
            return {
                "version": app_version,
                "status": "FAIL",
                "reasons": failures,
                "metrics": metrics
            }
        else:
            return {
                "version": app_version,
                "status": "PASS",
                "metrics": metrics
            }

    except Exception as e:
        logger.error(f"Error checking release health for version {app_version}: {e}", exc_info=True)
        # Fail open or closed? In CI/CD, failing closed is safer. If the health check fails, stop the release.
        return {
            "version": app_version,
            "status": "FAIL",
            "reasons": [f"Health check system error: {e}"],
            "metrics": {}
        }

Part 2: ClickHouse Schema and Querying

The choice of schema in ClickHouse is paramount for performance. A common pitfall is to treat it like a relational database. We need to design for our query patterns. Our primary queries will filter by app_version and aggregate over a time window. This directly informs our choice of partitioning, sorting key, and data types.

Here’s the table definition. The comments explain the rationale behind each choice.

-- DDL for the mobile_feedback table in ClickHouse
CREATE TABLE default.mobile_feedback
(
    `event_id` UUID,
    `source` LowCardinality(String), -- Source is one of a few values, ideal for LowCardinality
    `app_version` String,
    `event_timestamp` DateTime64(3, 'UTC'), -- Millisecond precision, UTC timezone
    `event_date` Date, -- Partitioning key
    `raw_text` String,
    `rating` UInt8,
    `device_info` String,

    -- NLP outputs
    `sentiment_label` LowCardinality(String), -- e.g., 'POSITIVE', 'NEGATIVE', 'NEUTRAL'
    `sentiment_score` Float32,
    `categories` Array(LowCardinality(String)), -- e.g., ['Bug', 'UI/UX']
    `issue_signature` String -- Not LowCardinality, as it can have many unique values
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date) -- Partition by month. Daily might be too granular unless volume is immense.
ORDER BY (app_version, source, event_timestamp) -- Sorting key. Queries filtering by version will be extremely fast.
SETTINGS index_granularity = 8192;

The corresponding Python code for writing to and reading from this table is encapsulated in the ClickHouseWriter class. A key detail is using parameterized queries to prevent SQL injection, even in an internal service.

# feedback_ingestor/db_writer.py
import logging
from typing import List, Dict, Any
from clickhouse_driver import Client

logger = logging.getLogger(__name__)

class ClickHouseWriter:
    def __init__(self, host: str, port: int, database: str = 'default'):
        self.host = host
        self.port = port
        self.database = database
        # A common mistake is to create a new client for every operation.
        # The client should be long-lived.
        try:
            self.client = Client(host=self.host, port=self.port, database=self.database)
            self.client.execute('SELECT 1')
            logger.info("Successfully connected to ClickHouse.")
        except Exception as e:
            logger.critical(f"Could not connect to ClickHouse at {host}:{port}. Error: {e}")
            raise

    def write_records(self, records: List[Dict]):
        """Inserts a list of dictionary records into the mobile_feedback table."""
        if not records:
            return
        try:
            # The driver handles converting dicts to the correct insertion format
            self.client.execute('INSERT INTO mobile_feedback VALUES', records, types_check=True)
            logger.info(f"Successfully inserted {len(records)} records.")
        except Exception as e:
            logger.error(f"Error inserting data into ClickHouse: {e}")
            # In a production system, implement a retry mechanism with exponential backoff.
            raise

    def get_health_metrics(self, app_version: str) -> Dict[str, Any]:
        """
        Executes the core analytical query to determine the health of a release.
        This is the query that powers the /release-health endpoint.
        """
        # This query is the heart of the system. It calculates key rates and counts.
        # Using has(categories, 'Bug') is efficient for searching within arrays.
        # The `if` and `countIf` functions are powerful ClickHouse features.
        query = """
        SELECT
            count() AS total_feedback,
            countIf(has(categories, 'Bug')) AS bug_reports,
            countIf(sentiment_label = 'NEGATIVE') AS negative_feedback,
            uniqExactIf(issue_signature, has(categories, 'Critical')) AS critical_issue_count,
            if(total_feedback > 0, bug_reports / total_feedback, 0) AS bug_report_rate,
            if(total_feedback > 0, negative_feedback / total_feedback, 0) AS negative_sentiment_rate
        FROM mobile_feedback
        WHERE app_version = %(app_version)s
          AND event_timestamp >= now() - INTERVAL 4 HOUR -- Look at a recent window
        """
        params = {'app_version': app_version}
        
        try:
            result = self.client.execute(query, params, with_column_types=True)
            if not result or not result[0]:
                logger.warning(f"No feedback data found for version {app_version} in the last 4 hours.")
                return {
                    'total_feedback': 0, 'bug_report_rate': 0.0, 
                    'negative_sentiment_rate': 0.0, 'critical_issue_count': 0
                }

            # The result is a list of tuples, and column names are in the second element
            col_names = [col[0] for col in result[1]]
            metrics = dict(zip(col_names, result[0][0]))
            return metrics

        except Exception as e:
            logger.error(f"Error querying health metrics for {app_version}: {e}")
            raise

Part 3: The NLP Module

The NLP part doesn’t have to be overly complex to be effective. We use the transformers library from Hugging Face. The pipeline API makes it straightforward to load a pre-trained model and use it for inference. A significant performance gain comes from processing texts in batches rather than one by one.

# feedback_ingestor/nlp_processor.py
import logging
import re
import hashlib
from transformers import pipeline, Pipeline
from typing import List, Dict

logger = logging.getLogger(__name__)

class NLPProcessor:
    def __init__(self, model_name: str):
        try:
            # We initialize two pipelines. It's often better to use separate, specialized
            # models for classification and sentiment rather than one giant model.
            logger.info(f"Loading sentiment analysis model: {model_name}")
            self.sentiment_pipeline: Pipeline = pipeline("sentiment-analysis", model=model_name)
            
            # Placeholder for a multi-label classification model.
            # In a real project, this would be a custom-trained model for categories
            # like 'Bug', 'Crash', 'Login', 'Payment', 'UI/UX', 'Feature Request', 'Critical'.
            logger.info("Loading zero-shot classification model as a placeholder for categories.")
            self.category_pipeline: Pipeline = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
            self.candidate_labels = ['bug', 'crash', 'login issue', 'payment problem', 'user interface', 'feature request', 'critical']

        except Exception as e:
            logger.critical(f"Failed to load NLP models: {e}")
            raise

    def process_batch(self, texts: List[str]) -> List[Dict]:
        """Processes a batch of texts for sentiment and categorization."""
        if not texts:
            return []
        
        try:
            sentiments = self.sentiment_pipeline(texts, truncation=True, padding=True)
            # The zero-shot pipeline is slower, so batching is essential.
            categories = self.category_pipeline(texts, self.candidate_labels, multi_label=True)
        except Exception as e:
            logger.error(f"Error during NLP batch processing: {e}")
            # Return a default error structure for the entire batch
            return [self._get_error_result() for _ in texts]
            
        processed_results = []
        for i in range(len(texts)):
            # Combine results, applying a threshold for category inclusion
            category_labels = [label for label, score in zip(categories[i]['labels'], categories[i]['scores']) if score > 0.60]
            
            processed_results.append({
                'sentiment_label': sentiments[i]['label'],
                'sentiment_score': sentiments[i]['score'],
                'categories': category_labels if category_labels else ['uncategorized']
            })
        return processed_results
    
    def generate_issue_signature(self, text: str) -> str:
        """
        Creates a consistent hash for similar text messages to aid in grouping.
        This is a crucial step for avoiding high cardinality issues in ClickHouse
        when trying to count unique problems.
        """
        # 1. Normalize: lowercase, remove numbers and non-alphanumeric chars
        normalized_text = re.sub(r'[^a-z\s]', '', text.lower())
        normalized_text = re.sub(r'\d+', '', normalized_text)
        # 2. Tokenize and sort to make it order-independent
        tokens = sorted(list(set(normalized_text.split())))
        # 3. Join and hash
        stable_string = "".join(tokens)
        return hashlib.sha256(stable_string.encode('utf-8')).hexdigest()

    def _get_error_result(self) -> Dict:
        return {
            'sentiment_label': 'UNKNOWN',
            'sentiment_score': 0.0,
            'categories': ['processing_error']
        }

Part 4: CI/CD Pipeline Integration

This is where everything comes together. The CI/CD pipeline script is the consumer of our feedback analysis system. Here is an example using GitLab CI syntax. The key is the check_release_health stage, which acts as a gate. It calls our FastAPI endpoint, parses the JSON response, and fails the pipeline if the status is not PASS.

# .gitlab-ci.yml

stages:
  - build
  - deploy_canary
  - verify
  - promote

variables:
  # The version number would typically be generated dynamically
  APP_VERSION: "3.1.5"
  FEEDBACK_SERVICE_URL: "http://feedback-service.internal:8000"

build_android:
  stage: build
  script:
    - echo "Building Android App version $APP_VERSION..."
    # ./gradlew assembleRelease
  artifacts:
    paths:
      - app/build/outputs/apk/release/app-release.apk

deploy_to_canary:
  stage: deploy_canary
  script:
    - echo "Deploying version $APP_VERSION to 1% of users..."
    # Use fastlane or another tool to upload the artifact to Google Play's internal test track
    - echo "Deployment to canary track successful."

check_release_health:
  stage: verify
  # This job will start after deploy_to_canary completes
  script:
    - echo "Waiting for 2 hours to collect initial user feedback..."
    - sleep 7200 # A real pipeline might have a more sophisticated polling mechanism
    - echo "Checking release health for version $APP_VERSION..."
    - |
      apk add curl jq # Install dependencies in the runner
      RESPONSE=$(curl -s -f "${FEEDBACK_SERVICE_URL}/release-health?app_version=${APP_VERSION}")
      
      if [ $? -ne 0 ]; then
        echo "::ERROR:: Health check endpoint failed or is unreachable."
        exit 1
      fi
      
      STATUS=$(echo $RESPONSE | jq -r '.status')
      REASONS=$(echo $RESPONSE | jq -r '.reasons | tostring')
      
      echo "--- Health Check Response ---"
      echo $RESPONSE | jq
      echo "---------------------------"

      if [ "$STATUS" != "PASS" ]; then
        echo "::ERROR:: Release health check failed for version $APP_VERSION."
        echo "Reason: $REASONS"
        # This exit code fails the GitLab CI job
        exit 1
      else
        echo "Release health check passed for version $APP_VERSION."
      fi
  # If this job fails, the pipeline stops here.

promote_to_production:
  stage: promote
  script:
    - echo "Promoting version $APP_VERSION to 100% of users..."
    # Use fastlane to promote the build from the canary track to full production
    - echo "Promotion successful."
  when: on_success # Only run if the 'check_release_health' job succeeds

The tangible outcome of this system was a dramatic reduction in the “time to detect” for production issues. We caught a critical bug in a new payment flow within three hours of a canary release, triggered by a spike in feedback categorized as payment problem and bug. The pipeline automatically halted the rollout, an alert was fired, and we rolled back before more than 1% of users were affected. The post-mortem was no longer about a failure in process, but a celebration of a system that worked as designed.

This architecture isn’t without its limitations. The NLP model’s accuracy is a potential weak point; it requires periodic retraining with new, labeled data to prevent concept drift. The thresholds for the health check are currently static and were determined heuristically. A more advanced implementation would involve establishing a baseline from previous releases and using anomaly detection to flag statistically significant deviations, making the system more adaptive. Furthermore, the feedback loop is still reactive, albeit much faster. The next frontier is to use this data proactively, perhaps correlating code changes from specific commits to negative feedback patterns to identify risky code before it even gets to canary.


  TOC