Implementing a Two-Stage Retrieval-Rerank Pipeline with Qdrant and Express for a High-Throughput Mobile API Gateway


The initial problem was straightforward: our iOS application’s keyword-based search was yielding poor results for user queries that required semantic understanding. A query for “healthy snacks without nuts” would often return results for “peanut butter energy bars” simply because of keyword overlap. The business objective was to increase user engagement by delivering highly relevant search results, which meant we had to move to a vector-based semantic search architecture.

Our first pass at a solution was a simple retrieval-only pipeline. We generated embeddings for our entire product catalog, indexed them in a vector database, and performed a nearest-neighbor search at query time. While this was a significant improvement over keyword search, it introduced a new, more subtle problem. For a query like “comfortable office chair for long hours,” the top results were semantically close but not necessarily the best options. We’d get five visually similar chairs, all scoring high on “comfort” and “office,” but the truly ergonomic, premium option might be ranked #4 behind cheaper, less suitable alternatives. The core issue is that a retrieval model is optimized for recall—finding a broad set of relevant candidates—but not necessarily for precision or the specific ranking order.

This led us to a two-stage architecture: a fast, high-recall retrieval stage followed by a slower, high-precision reranking stage. The idea is to fetch a larger set of candidates (say, top 50) from the vector database and then use a more computationally expensive and sophisticated model to re-order only these 50 candidates to produce the final top 10 list. This approach creates a system that is both fast and accurate, a critical requirement for a snappy mobile user experience.

Technology Selection and System Blueprint

In a real-world project, technology choices are driven by constraints: performance requirements, team expertise, operational overhead, and cost.

  1. Vector Database: Qdrant. We needed a production-ready vector database. While libraries like Faiss are powerful, they are not databases; they lack APIs, storage management, and dynamic indexing. We evaluated several managed services but opted for a self-hosted Qdrant instance for its performance, filtering capabilities, and the memory-efficient HNSW indexing implementation. Its ability to store payloads alongside vectors and filter on them pre-search was a deciding factor, allowing us to narrow down the search space (e.g., by category or price) before the vector search even begins.

  2. Orchestration Service: Express.js. The service coordinating the two stages is primarily I/O-bound. It calls an embedding service, queries Qdrant, calls a reranking service, and formats the final response. Node.js, with its non-blocking event loop, is exceptionally well-suited for this. Express.js provides a minimal, unopinionated framework to build this orchestration layer quickly. We didn’t need the complexity of a Java/Spring or Python/Django framework for a service with such a narrow, well-defined responsibility.

  3. API Gateway: Apache APISIX. Exposing the Express service directly to the internet is a non-starter in a production environment. We needed a robust API gateway to handle cross-cutting concerns. APISIX was chosen for its high performance (built on Nginx and LuaJIT) and its dynamic, plugin-based architecture. We could offload JWT authentication, rate limiting, and centralized logging to the gateway, keeping the business logic in the Express app clean. A key strategic advantage is the ability to use its traffic-split plugin for future A/B testing of different reranking models without any client-side changes.

  4. Mobile Client: iOS (Swift). The consumer of this API. The implementation here must be resilient, handling network timeouts, various HTTP status codes, and efficiently decoding the JSON response into native Swift objects.

Here is the complete architectural flow we designed:

sequenceDiagram
    participant iOS Client
    participant APISIX Gateway
    participant Express Service
    participant Qdrant
    participant Reranker Model

    iOS Client->>+APISIX Gateway: GET /search?q=... (with JWT)
    APISIX Gateway->>APISIX Gateway: 1. jwt-auth plugin: Validate Token
    APISIX Gateway->>APISIX Gateway: 2. limit-conn plugin: Check Rate Limit
    APISIX Gateway->>+Express Service: Forward request
    Express Service->>+Qdrant: Search with query vector (Top 50 candidates)
    Qdrant-->>-Express Service: Return 50 candidate documents
    Express Service->>+Reranker Model: Rerank candidates with original query
    Reranker Model-->>-Express Service: Return re-ordered list of documents
    Express Service->>-APISIX Gateway: Final Top 10 results (JSON)
    APISIX Gateway-->>-iOS Client: 200 OK with JSON payload

Phase 1: Data Ingestion and Qdrant Configuration

Before building the API, we need data. The ingestion process is a one-off batch job for this example, but in production, this would be a streaming pipeline. We use a simple Python script to generate embeddings with a sentence-transformers model and push them to Qdrant.

First, let’s define our infrastructure with docker-compose.yml.

# docker-compose.yml
version: '3.8'

services:
  qdrant:
    image: qdrant/qdrant:v1.7.4
    container_name: qdrant_db
    ports:
      - "6333:6333"
      - "6334:6334"
    volumes:
      - ./qdrant_storage:/qdrant/storage:z
    environment:
      # Production systems should configure this properly
      QDRANT__SERVICE__API_KEY: "your-super-secret-key"

  # The Express.js service we will build
  # We will uncomment this later
  # rerank_api:
  #   build: .
  #   container_name: rerank_api
  #   ports:
  #     - "3000:3000"
  #   environment:
  #     - QDRANT_URL=http://qdrant:6333
  #     - QDRANT_API_KEY=your-super-secret-key
  #     - EMBEDDING_SERVICE_URL=http://your-embedding-service
  #     - RERANK_SERVICE_URL=http://your-rerank-service
  #   depends_on:
  #     - qdrant

Next, the ingestion script. A common pitfall here is not handling failures or batching properly, which can overwhelm the database or the embedding model.

# scripts/ingest.py
import os
import uuid
import json
from qdrant_client import QdrantClient, models
from sentence_transformers import SentenceTransformer

# --- Configuration ---
# In a real app, use environment variables
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "your-super-secret-key")
COLLECTION_NAME = "product_catalog"
EMBEDDING_MODEL = "all-MiniLM-L6-v2" # 384 dimensions

def get_product_data():
    """Placeholder to load your data. Returns a list of dicts."""
    with open('data/products.json', 'r') as f:
        return json.load(f)

def main():
    """Main ingestion script."""
    client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)
    model = SentenceTransformer(EMBEDDING_MODEL)

    # Check if collection exists. If so, delete and recreate.
    # In production, you might want to update instead of recreating.
    try:
        if client.collection_exists(collection_name=COLLECTION_NAME):
            print(f"Collection '{COLLECTION_NAME}' already exists. Deleting.")
            client.delete_collection(collection_name=COLLECTION_NAME)
    except Exception as e:
        print(f"An error occurred checking collection: {e}")
        # This can happen on first run if Qdrant is still starting.
        # A robust script would have a retry mechanism.
    
    print(f"Creating collection '{COLLECTION_NAME}'.")
    client.recreate_collection(
        collection_name=COLLECTION_NAME,
        vectors_config=models.VectorParams(size=384, distance=models.Distance.COSINE),
        # Production tuning: consider on_disk=True for large datasets
        # and fine-tune hnsw_config for your latency/accuracy trade-off.
        hnsw_config=models.HnswConfigDiff(
            m=16, # Number of bi-directional links for each new element
            ef_construct=100 # Number of candidates to consider during index construction
        )
    )

    print("Creating payload index for 'category'.")
    client.create_payload_index(
        collection_name=COLLECTION_NAME,
        field_name="category",
        field_schema=models.PayloadSchemaType.KEYWORD
    )

    products = get_product_data()
    # Combining title and description for better semantic representation
    documents = [f"{p['name']}: {p['description']}" for p in products]
    
    print(f"Generating embeddings for {len(documents)} documents...")
    # It's crucial to use show_progress_bar for long-running jobs
    embeddings = model.encode(documents, show_progress_bar=True, batch_size=64)

    print("Uploading points to Qdrant...")
    client.upload_points(
        collection_name=COLLECTION_NAME,
        points=[
            models.PointStruct(
                id=str(uuid.uuid4()),
                vector=embedding.tolist(),
                payload={
                    "product_id": p["id"],
                    "name": p["name"],
                    "category": p["category"],
                    "price": p["price"]
                }
            )
            for p, embedding in zip(products, embeddings)
        ],
        wait=True, # Wait for the operation to complete
        parallel=4 # Number of parallel upload processes
    )
    print("Ingestion complete.")

if __name__ == "__main__":
    main()

This script sets up a Qdrant collection with a specific HNSW configuration, creates a payload index for efficient filtering, and uploads our product data. The pragmatic choice here is to batch embeddings and use parallel uploads to speed up ingestion.

Phase 2: Building the Express.js Orchestration Service

This is the heart of our backend logic. We will structure the application for maintainability, separating concerns into services, controllers, and routes.

Project Structure:

/
├── src/
│   ├── api/
│   │   ├── routes/
│   │   │   └── search.route.js
│   │   ├── controllers/
│   │   │   └── search.controller.js
│   │   └── middlewares/
│   │       └── errorHandler.js
│   ├── services/
│   │   ├── qdrant.service.js
│   │   ├── embedding.service.js
│   │   └── rerank.service.js
│   ├── config/
│   │   └── index.js
│   └── app.js
├── .env
├── package.json
└── Dockerfile

Configuration (src/config/index.js)

A common mistake is hardcoding configuration. We use dotenv to manage environment variables.

// src/config/index.js
const dotenv = require('dotenv');
dotenv.config();

const config = {
    port: process.env.PORT || 3000,
    qdrant: {
        url: process.env.QDRANT_URL,
        apiKey: process.env.QDRANT_API_KEY,
    },
    services: {
        embeddingUrl: process.env.EMBEDDING_SERVICE_URL,
        rerankUrl: process.env.RERANK_SERVICE_URL,
    },
    // Constants for our search logic
    search: {
        collectionName: 'product_catalog',
        retrievalLimit: 50,
        finalLimit: 10,
    },
};

module.exports = config;

Qdrant Service (src/services/qdrant.service.js)

This module encapsulates all interaction with Qdrant. Using a dedicated service makes the code testable and isolates the dependency.

// src/services/qdrant.service.js
const { QdrantClient } = require('@qdrant/js-client-rest');
const config = require('../config');

// Initialize client once and reuse
const qdrantClient = new QdrantClient({ 
    url: config.qdrant.url, 
    apiKey: config.qdrant.apiKey 
});

/**
 * Searches for documents in the Qdrant collection.
 * @param {number[]} queryVector - The vector representation of the search query.
 * @param {number} limit - The number of results to retrieve.
 * @returns {Promise<object[]>} - A promise that resolves to an array of search results.
 */
async function searchProducts(queryVector, limit) {
    try {
        // The `search` method is where you can specify HNSW parameters (`ef`)
        // to trade off latency for accuracy at query time.
        const searchResult = await qdrantClient.search(config.search.collectionName, {
            vector: queryVector,
            limit: limit,
            with_payload: true, // We need the payload for the reranker
            // search_params: { hnsw_ef: 128 } // Optional: Tune search-time EF
        });

        // We only care about the payload and score for the next stage.
        return searchResult.map(result => ({
            id: result.id,
            score: result.score,
            payload: result.payload,
        }));
    } catch (error) {
        // Proper logging is critical in production.
        console.error('Qdrant search failed:', error);
        // Throw a custom error to be handled by the controller/middleware
        throw new Error('Failed to retrieve search candidates from database.');
    }
}

module.exports = { searchProducts };

Embedding & Reranking Services (Stubs)

In a real system, these would be separate microservices, likely Python-based, to leverage the ML ecosystem. For this article, we’ll create simple service wrappers that call these hypothetical endpoints. This decouples our Node.js app from the ML model implementation.

// src/services/embedding.service.js
const axios = require('axios');
const config = require('../config');

async function getEmbedding(text) {
    try {
        const response = await axios.post(config.services.embeddingUrl, { text });
        return response.data.embedding;
    } catch (error) {
        console.error('Embedding service call failed:', error.message);
        throw new Error('Failed to generate query embedding.');
    }
}
module.exports = { getEmbedding };

// src/services/rerank.service.js
const axios = require('axios');
const config = require('../config');

async function rerankResults(query, documents) {
    try {
        // The reranker needs the original query and the candidate documents.
        const response = await axios.post(config.services.rerankUrl, { 
            query,
            documents // documents is an array of objects with payloads
        });
        return response.data.results;
    } catch (error) {
        console.error('Rerank service call failed:', error.message);
        throw new Error('Failed to rerank search results.');
    }
}
module.exports = { rerankResults };

The Controller (src/api/controllers/search.controller.js)

This is the orchestrator. It executes the logic in sequence, handles request validation, and formats the final response.

// src/api/controllers/search.controller.js
const embeddingService = require('../../services/embedding.service.js');
const qdrantService = require('../../services/qdrant.service.js');
const rerankService = require('../../services/rerank.service.js');
const config = require('../../config');

async function semanticSearch(req, res, next) {
    const { q: query } = req.query;

    if (!query || typeof query !== 'string' || query.trim().length === 0) {
        return res.status(400).json({ error: 'Query parameter "q" is required.' });
    }

    try {
        // Stage 0: Get query embedding
        const queryVector = await embeddingService.getEmbedding(query);

        // Stage 1: Retrieval from Qdrant
        const candidates = await qdrantService.searchProducts(
            queryVector,
            config.search.retrievalLimit
        );
        
        // If retrieval returns nothing, we can short-circuit.
        if (candidates.length === 0) {
            return res.json({ data: [] });
        }

        // The documents to be reranked should be in a format the model expects.
        // Here, we pass the original text content.
        const documentsToRerank = candidates.map(c => ({
            id: c.payload.product_id,
            text: `${c.payload.name}: ${c.payload.description}` // Example format
        }));

        // Stage 2: Reranking
        const rerankedIds = await rerankService.rerankResults(query, documentsToRerank);
        
        // The reranker service returns an ordered list of product_ids.
        // We now need to map this back to our full candidate data.
        const candidateMap = new Map(candidates.map(c => [c.payload.product_id, c.payload]));
        
        const finalResults = rerankedIds
            .map(id => candidateMap.get(id))
            .filter(Boolean) // Filter out any potential mismatches
            .slice(0, config.search.finalLimit);

        res.json({ data: finalResults });

    } catch (error) {
        // Pass the error to our centralized error handler
        next(error);
    }
}

module.exports = { semanticSearch };

Bringing it all together (app.js, search.route.js, errorHandler.js)

// src/api/routes/search.route.js
const express = require('express');
const searchController = require('../controllers/search.controller.js');
const router = express.Router();

router.get('/', searchController.semanticSearch);

module.exports = router;

// ---

// src/api/middlewares/errorHandler.js
// A centralized error handler keeps controller logic clean.
function errorHandler(err, req, res, next) {
    console.error(`[ERROR] ${new Date().toISOString()} - ${err.stack}`);
    
    // In production, you wouldn't send the stack trace to the client.
    // Instead, log it and send a generic error message.
    const statusCode = err.statusCode || 500;
    const message = (process.env.NODE_ENV === 'production' && statusCode === 500)
        ? 'An internal server error occurred.'
        : err.message;
        
    res.status(statusCode).json({ error: message });
}

module.exports = errorHandler;

// ---

// src/app.js
const express = require('express');
const morgan = require('morgan'); // For request logging
const helmet = require('helmet'); // For basic security headers
const config = require('./config');
const searchRoutes = require('./api/routes/search.route');
const errorHandler = require('./api/middlewares/errorHandler');

const app = express();

// Middlewares
app.use(helmet());
app.use(express.json());
app.use(morgan('dev'));

// Health check endpoint
app.get('/health', (req, res) => res.status(200).send('OK'));

// Routes
app.use('/api/v1/search', searchRoutes);

// Centralized error handling
app.use(errorHandler);

app.listen(config.port, () => {
    console.log(`Server is running on port ${config.port}`);
});

Finally, the Dockerfile to containerize the service.

# Dockerfile
FROM node:18-alpine

WORKDIR /usr/src/app

COPY package*.json ./

RUN npm install

COPY . .

EXPOSE 3000

CMD [ "node", "src/app.js" ]

Phase 3: Securing and Exposing via Apache APISIX

With the service running, we now configure APISIX to manage access. We’ll use the YAML configuration method, which is well-suited for GitOps workflows.

# apisix_config.yaml
# This file would be mounted into the APISIX container.
routes:
  - id: "search-api-v1"
    uri: "/api/v1/search"
    # Match any sub-path
    uris:
      - "/api/v1/search/*"
      - "/api/v1/search"
    upstream_id: "rerank-service-upstream"
    plugins:
      # Protect against request spikes.
      # Allows 100 requests per second, with a burst of 50.
      limit-req:
        rate: 100
        burst: 50
        key_type: "var"
        key: "remote_addr" # Rate limit by client IP
      
      # Secure the endpoint with JWT.
      # APISIX will validate the token against the provided secret.
      # The consumer must be created separately via the APISIX Admin API.
      jwt-auth:
        key: "user-key" # This corresponds to a pre-configured consumer
        
plugins:
  - id: "key-auth"
    name: "key-auth"
    disable: true # Example: not using it on this route but it's available

upstreams:
  - id: "rerank-service-upstream"
    type: "roundrobin"
    nodes:
      # "rerank_api:3000" is the address within the Docker network.
      "rerank_api:3000": 1

This configuration defines a route that listens on /api/v1/search. Before forwarding the request to our Express service (rerank_api:3000), it enforces two plugins: limit-req to prevent abuse and jwt-auth to ensure only authenticated clients can access the service. This delegation of responsibility is a core principle of microservice architecture.

Phase 4: iOS Client Integration

The final piece is the mobile client. The implementation must be robust. A common mistake is to not handle different network conditions or server responses gracefully.

// iOS/APIService.swift
import Foundation

// MARK: - Data Models (Codable)
// It's crucial that these structs match the JSON response from the API.
struct SearchResponse: Codable {
    let data: [Product]
}

struct Product: Codable, Identifiable {
    let productId: String
    let name: String
    let category: String
    let price: Double

    var id: String { productId }

    enum CodingKeys: String, CodingKey {
        case productId = "product_id"
        case name, category, price
    }
}

// MARK: - API Error Handling
enum APIError: Error, LocalizedError {
    case invalidURL
    case requestFailed(statusCode: Int)
    case decodingError(Error)
    case unknownError(Error)
    
    var errorDescription: String? {
        switch self {
        case .invalidURL:
            return "The API endpoint URL is invalid."
        case .requestFailed(let statusCode):
            return "The request failed with status code: \(statusCode)."
        case .decodingError:
            return "Failed to decode the server response."
        case .unknownError:
            return "An unexpected error occurred."
        }
    }
}


// MARK: - API Service
class SearchService {
    private let baseURL = URL(string: "https://your-apisix-gateway.com/api/v1")!
    private let urlSession: URLSession
    
    // Using dependency injection for URLSession makes this class testable.
    init(urlSession: URLSession = .shared) {
        self.urlSession = urlSession
    }
    
    func performSearch(query: String, authToken: String) async throws -> [Product] {
        guard var components = URLComponents(url: baseURL.appendingPathComponent("search"), resolvingAgainstBaseURL: false) else {
            throw APIError.invalidURL
        }
        
        components.queryItems = [URLQueryItem(name: "q", value: query)]
        
        guard let url = components.url else {
            throw APIError.invalidURL
        }
        
        var request = URLRequest(url: url)
        request.httpMethod = "GET"
        // The JWT token is passed in the Authorization header.
        request.setValue("Bearer \(authToken)", forHTTPHeaderField: "Authorization")
        // Setting a reasonable timeout is critical for mobile apps.
        request.timeoutInterval = 15.0

        do {
            let (data, response) = try await urlSession.data(for: request)
            
            guard let httpResponse = response as? HTTPURLResponse else {
                throw APIError.unknownError(NSError(domain: "InvalidResponse", code: 0, userInfo: nil))
            }
            
            guard (200...299).contains(httpResponse.statusCode) else {
                // APISIX might return 401 (Unauthorized), 429 (Too Many Requests), etc.
                // The client must be able to handle these.
                throw APIError.requestFailed(statusCode: httpResponse.statusCode)
            }
            
            do {
                let searchResponse = try JSONDecoder().decode(SearchResponse.self, from: data)
                return searchResponse.data
            } catch {
                throw APIError.decodingError(error)
            }
            
        } catch let error where error is APIError {
            throw error
        } catch {
            throw APIError.unknownError(error)
        }
    }
}

This Swift code defines a clean, async/await-based service for fetching search results. It correctly sets the Authorization header for the JWT, defines Codable models for easy JSON parsing, and includes comprehensive error handling to differentiate between network issues, server errors, and data parsing failures. This level of detail is non-negotiable for a production-grade mobile application.

The two-stage retrieval-rerank architecture, while more complex than a simple vector search, provides a necessary balance between latency and relevance for a high-quality user experience. The main operational challenge shifts from tuning a single vector index to managing and observing a multi-step pipeline. The current implementation uses mocked services for embedding and reranking; in a production environment, these would be GPU-powered microservices requiring their own scaling and monitoring strategies. Furthermore, the data ingestion pipeline is currently a batch process. A more advanced system would leverage Change Data Capture (CDC) from a primary database to update the Qdrant index in near real-time. The choice of APISIX as a gateway provides a solid foundation for future enhancements, such as canary releasing new reranking models using its traffic splitting capabilities, thereby allowing for data-driven improvements to the search algorithm without impacting system stability.


  TOC