Implementing Transactional Feature Rollups in MongoDB for Real-Time Machine Learning Inference


The correctness of a real-time machine learning system, particularly in domains like financial fraud detection or credit scoring, hinges on the consistency of the features it consumes. A model’s prediction is only as reliable as the data it is fed at the moment of inference. A common architectural failure mode arises when models depend on both raw event data and derived, aggregated features—often called rollups. Consider a system where a new financial transaction is recorded, and simultaneously, aggregated features like daily_transaction_count and average_transaction_value_last_hour are updated. If an inference request for that user arrives during the brief window after the raw transaction has been written but before the aggregates are updated, the model sees a dangerously inconsistent state of the world. This data race can lead to catastrophic failures, such as missing a fraudulent transaction. The core technical challenge is ensuring absolute atomicity between the ingestion of a raw event and the calculation of its corresponding feature rollups.

sequenceDiagram
    participant Client
    participant IngestService
    participant TransactionDB
    participant FeatureDB
    participant InferenceService

    Client->>+IngestService: POST /event (tx_id: 123, user: A, amount: 500)
    IngestService->>+TransactionDB: INSERT {tx_id: 123, ...}
    TransactionDB-->>-IngestService: Acknowledged

    Note right of IngestService: --- INCONSISTENT STATE WINDOW ---

    InferenceService->>+TransactionDB: READ user A events
    TransactionDB-->>-InferenceService: [..., {tx_id: 123}]
    InferenceService->>+FeatureDB: READ user A features
    FeatureDB-->>-InferenceService: {daily_count: 5, daily_sum: 2300} (Stale)
    Note over InferenceService: Model sees new event but old rollup. Potential misprediction!

    IngestService->>+FeatureDB: UPDATE user A features {daily_count: 6, daily_sum: 2800}
    FeatureDB-->>-IngestService: Acknowledged
    IngestService-->>-Client: 200 OK

The diagram above illustrates this precise failure mode. The period between writing the raw event and updating the derived feature creates a window of inconsistency. Any read operation occurring within this window gets a corrupted view of reality. The fundamental requirement is to eliminate this window entirely by making the two writes a single, indivisible, atomic operation. This forces us to evaluate two primary architectural patterns.

Architectural Approach A: Decoupled Stream Processing

A conventional approach to this problem involves a decoupled, asynchronous pipeline using a message queue and a stream processing engine.

Architecture:
An ingestion service receives the raw event and publishes it to a durable message log like Apache Kafka. A separate stream processing application, built with a framework like Apache Flink or Spark Streaming, consumes these events. This application maintains the state for feature rollups (e.g., windowed counts, sums) and periodically writes the updated aggregates to a feature store, which in our case could still be MongoDB. The inference service then reads both raw data and aggregated features from MongoDB.

graph TD
    A[Ingestion API] --> B(Apache Kafka Topic);
    B --> C{Apache Flink Job};
    C --> D[MongoDB Feature Store];
    E[TensorFlow Inference Service] --> D;

Analysis of Merits:

  1. Scalability of Computation: The stream processor can be scaled independently of the ingestion service and the database. This is a strong advantage for computationally intensive feature engineering that involves complex windowing or sessionization logic.
  2. Decoupling and Resilience: The use of Kafka as a buffer decouples the ingestion layer from the processing layer. If the Flink job fails, events are retained in Kafka, and processing can resume once the job is restored, preventing data loss.
  3. Rich Ecosystem: Frameworks like Flink provide powerful, high-level abstractions for time-series analysis, watermarking for handling late data, and state management, which are non-trivial to implement from scratch.

Analysis of Flaws:

  1. Eventual Consistency by Design: This architecture’s primary weakness is that it is fundamentally eventually consistent. There is an unavoidable latency, however small, between an event’s arrival in Kafka and the final write of its aggregated feature into MongoDB. The window of inconsistency, while perhaps shortened, is inherent to the design. For high-stakes decisions, “eventually” consistent is often not consistent enough.
  2. Operational Overhead: This solution introduces at least two new, complex distributed systems into the stack: the message queue and the stream processor. Managing, monitoring, and ensuring the reliability of Kafka and Flink clusters adds significant operational complexity and cost.
  3. Exactly-Once Processing Complexity: While modern stream processors offer exactly-once processing semantics, achieving a true end-to-end exactly-once guarantee that includes an atomic write to an external sink like MongoDB (a “transactional sink”) is notoriously difficult. It often requires sophisticated two-phase commit protocols between the processor and the database connector, which can be brittle and impact performance. A common mistake is to achieve exactly-once processing in Flink but still have at-least-once writes to the database, leading to potential data duplication or incorrect aggregates.

In a real-world project where a single incorrect prediction could result in substantial financial loss, relying on an eventually consistent pipeline for critical feature generation introduces an unacceptable level of risk. The architectural complexity does not eliminate the core problem; it merely manages it.

Architectural Approach B: In-Database Transactional Rollups

An alternative approach leverages the capabilities of the database itself to enforce atomicity. With MongoDB supporting multi-document ACID transactions since version 4.0 (on replica sets), we can perform the raw event insertion and the feature document update within the same transaction.

Architecture:
The architecture is dramatically simplified. The ingestion service communicates directly with MongoDB. It initiates a transaction, performs both write operations, and commits. Only upon successful commit is the operation considered complete. The inference service reads from the same database, guaranteed to see a consistent state.

graph TD
    A[Ingestion API] -->|Executes ACID Transaction| B(MongoDB Replica Set);
    C[TensorFlow Inference Service] -->|Reads Consistent State| B;

Analysis of Merits:

  1. Guaranteed Strong Consistency: This is the paramount advantage. By wrapping both the insert of the new event and the update of the feature rollup in a single ACID transaction, we completely eliminate the window of inconsistency. The change becomes visible to all other readers as a single, atomic unit. The system moves from one consistent state to the next, with no intermediate inconsistent states exposed.
  2. Reduced Architectural Complexity: This design removes the need for a separate message queue and stream processing cluster. This translates to lower infrastructure costs, reduced operational burden, and a smaller surface area for potential failures. The system is simpler to reason about, deploy, and maintain.
  3. Data Integrity: The transactional guarantee ensures that we never have a partial update. If the feature aggregation fails for any reason (e.g., a bug in the update logic), the entire transaction is rolled back, and the raw event is not persisted either. This prevents data corruption.

Analysis of Flaws:

  1. Increased Database Workload: The computational work of aggregation is shifted from a dedicated processing tier to the database server and the application layer driving the transaction. This increases CPU and I/O load on the database. In a write-heavy system, this must be carefully monitored.
  2. Transaction Scope and Duration: Distributed transactions can be costly. Long-running transactions can hold locks, potentially leading to contention and reduced throughput. It is critical to keep the logic inside the transaction as minimal and efficient as possible. This pattern is ill-suited for rollups that require scanning large amounts of data to compute; it excels at simple, incremental updates ($inc, $set).
  3. Limited Computational Power: MongoDB is not a stream processor. This approach is not a fit for complex feature engineering that requires advanced windowing semantics or joining multiple data streams. The logic must be expressible within the database’s query and update language.

Final Decision and Rationale

For the target use case—a real-time fraud detection system—the cost of an incorrect decision due to inconsistent data is extremely high. Therefore, correctness and strong consistency must be prioritized over the potential for higher computational scale offered by a decoupled system. We select Architectural Approach B: In-Database Transactional Rollups. The operational simplicity is a significant bonus, but the driving factor is the ACID guarantee that eradicates the race condition at the heart of the problem. We accept the trade-off of placing a higher load on the database, which can be mitigated through proper capacity planning, schema design, and query optimization.

Core Implementation

The implementation consists of three main components: the MongoDB data model, a Python service for transactional processing, an inference service, and the OCI-compliant packaging for deployment.

1. MongoDB Data Model

We need two collections: one for immutable transaction events and another for the mutable user feature rollups.

  • transactions: Stores raw financial transactions.
  • user_features: Stores aggregated features for each user.
// Schema for the 'transactions' collection
{
  "_id": ObjectId("..."),
  "transaction_id": "txn_abc123",
  "user_id": "user_42",
  "amount": { "$numberDecimal": "500.75" }, // Use Decimal128 for financial data
  "timestamp": ISODate("..."),
  "merchant_id": "merchant_xyz"
}

// Schema for the 'user_features' collection
{
  "_id": "user_42", // Use user_id as the primary key for efficient lookups
  "last_updated": ISODate("..."),
  "rollups": {
    "hourly": {
      "count": 15,
      "sum": { "$numberDecimal": "4512.50" }
    },
    "daily": {
      "count": 85,
      "sum": { "$numberDecimal": "21034.75" }
    }
    // ... other time windows
  },
  "version": 1 // For optimistic concurrency control if needed
}

A common mistake is using floating-point numbers for currency; Decimal128 is essential for precision. Sharding the user_features collection on _id (the user_id) ensures that writes for different users are distributed across the cluster.

2. The Transactional Rollup Service

This Python service encapsulates the core logic. It requires a MongoDB replica set to use transactions. The code uses pymongo and demonstrates a robust, retryable transaction block.

# feature_rollup_service.py

import os
import logging
from datetime import datetime, timedelta
from decimal import Decimal
from pymongo import MongoClient, WriteConcern
from pymongo.errors import ConnectionFailure, OperationFailure
from bson.decimal128 import Decimal128

# --- Configuration ---
MONGO_URI = os.environ.get("MONGO_URI", "mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=rs0")
DB_NAME = "fraud_detection"

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class FeatureRollupService:
    """
    Handles the ingestion of transactions and the atomic update of feature rollups
    within a single ACID transaction.
    """

    def __init__(self, mongo_uri: str, db_name: str):
        try:
            self.client = MongoClient(mongo_uri)
            # Ping the server to verify connection
            self.client.admin.command('ping')
            logging.info("Successfully connected to MongoDB.")
        except ConnectionFailure as e:
            logging.error(f"Could not connect to MongoDB: {e}")
            raise
        
        self.db = self.client[db_name]
        self.transactions_coll = self.db.transactions
        self.features_coll = self.db.user_features

    def _transaction_callback(self, session, transaction_data: dict):
        """
        This function is executed within the scope of a transaction.
        A common pitfall is to perform non-transactional operations here.
        Only database operations using the 'session' object are part of the transaction.
        """
        user_id = transaction_data["user_id"]
        amount = transaction_data["amount"]
        timestamp = transaction_data["timestamp"]

        # 1. Insert the raw transaction event
        self.transactions_coll.insert_one(transaction_data, session=session)
        logging.info(f"Transaction insert prepared for user {user_id} in session.")

        # 2. Prepare the atomic update for feature rollups
        # We use $inc to atomically increment counters and sums.
        # This is far more efficient and safer than a read-modify-write pattern.
        update_payload = {
            "$inc": {
                "rollups.daily.count": 1,
                "rollups.daily.sum": Decimal128(amount),
                "rollups.hourly.count": 1,
                "rollups.hourly.sum": Decimal128(amount)
            },
            "$set": {
                "last_updated": timestamp
            }
        }
        
        # Use upsert=True to create the feature document if it doesn't exist.
        self.features_coll.update_one(
            {"_id": user_id},
            update_payload,
            upsert=True,
            session=session
        )
        logging.info(f"Feature rollup update prepared for user {user_id} in session.")


    def process_transaction(self, transaction_data: dict) -> bool:
        """
        Processes a single transaction, ensuring atomicity for data and rollups.
        """
        try:
            # Transactions require a client session.
            with self.client.start_session() as session:
                # The with_transaction function handles starting, committing,
                # and aborting transactions. It also includes retry logic for
                # transient transaction errors, which is a production must-have.
                session.with_transaction(
                    lambda s: self._transaction_callback(s, transaction_data),
                    write_concern=WriteConcern("majority")
                )
            logging.info(f"Successfully committed transaction for user {transaction_data['user_id']}.")
            return True
        except (ConnectionFailure, OperationFailure) as e:
            # OperationFailure can include TransientTransactionError, which is retried automatically.
            # If it fails after retries, it will be raised here.
            logging.error(f"Transaction failed for user {transaction_data['user_id']}: {e}")
            # In a real system, this might push the event to a dead-letter queue.
            return False

    def close(self):
        self.client.close()
        logging.info("MongoDB connection closed.")


# Example Usage (e.g., in a web server route)
if __name__ == "__main__":
    service = FeatureRollupService(MONGO_URI, DB_NAME)
    
    # Simulate an incoming transaction
    new_transaction = {
        "transaction_id": "txn_def456",
        "user_id": "user_789",
        "amount": Decimal("125.50"),
        "timestamp": datetime.utcnow(),
        "merchant_id": "merchant_abc"
    }

    success = service.process_transaction(new_transaction)
    print(f"Processing status: {'Success' if success else 'Failed'}")
    
    service.close()

The critical part is session.with_transaction(...). It abstracts away the complex logic of starting, committing, and retrying transactions, making the code cleaner and more robust. A common mistake is to write manual retry loops without correctly identifying which errors are transient and safe to retry.

3. The TensorFlow Inference Service

This service reads from MongoDB. A critical detail here is specifying a read_concern. To avoid reading data from a transaction that has not yet been committed to a majority of replicas, we use ReadConcern("majority"). This ensures that we only read data that is durably persisted, providing a consistent view.

# inference_service.py

import os
import logging
import tensorflow as tf
from pymongo import MongoClient, ReadConcern
from pymongo.errors import ConnectionFailure

# --- Configuration ---
MONGO_URI = os.environ.get("MONGO_URI", "mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=rs0")
DB_NAME = "fraud_detection"
MODEL_PATH = "./model/fraud_detector_v1" # Path to a SavedModel directory

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class InferenceService:
    def __init__(self, mongo_uri: str, db_name: str, model_path: str):
        try:
            self.client = MongoClient(mongo_uri)
            self.db = self.client[db_name]
            # Set a majority read concern to ensure we don't read uncommitted data
            self.features_coll = self.db.user_features.with_options(
                read_concern=ReadConcern("majority")
            )
            logging.info("Successfully connected to MongoDB for inference.")
        except ConnectionFailure as e:
            logging.error(f"Could not connect to MongoDB: {e}")
            raise
        
        try:
            self.model = tf.saved_model.load(model_path)
            logging.info(f"TensorFlow model loaded from {model_path}")
        except Exception as e:
            logging.error(f"Failed to load TensorFlow model: {e}")
            raise

    def predict(self, user_id: str) -> float:
        """
        Fetches features for a user and runs inference.
        """
        try:
            features_doc = self.features_coll.find_one({"_id": user_id})

            if not features_doc:
                logging.warning(f"No feature document found for user {user_id}. Using default features.")
                # In a real system, you'd have a defined default feature vector for new users
                feature_vector = [0.0, 0.0] 
            else:
                # This logic is highly dependent on your model's expected input
                # It's crucial that this mapping is versioned with the model
                daily_count = features_doc.get("rollups", {}).get("daily", {}).get("count", 0)
                daily_sum = features_doc.get("rollups", {}).get("daily", {}).get("sum", Decimal128("0.0")).to_decimal()
                
                feature_vector = [float(daily_count), float(daily_sum)]

            # Convert to TensorFlow tensor
            tensor_input = tf.constant([feature_vector], dtype=tf.float32)
            
            # Run inference
            prediction = self.model(tensor_input)
            
            # Assuming the model returns a single probability
            fraud_probability = prediction.numpy()[0][0]
            
            logging.info(f"Prediction for user {user_id}: {fraud_probability:.4f}")
            return fraud_probability

        except Exception as e:
            logging.error(f"An error occurred during prediction for user {user_id}: {e}")
            # Return a default/safe value or raise an exception
            return -1.0

# Example usage would be via an API framework like FastAPI or Flask

4. OCI-Compliant Packaging (Dockerfile)

Both services must be packaged into OCI-compliant container images for portable and scalable deployment. We use multi-stage builds to create lean production images.

# Dockerfile for the feature_rollup_service

# --- Build Stage ---
FROM python:3.9-slim as builder
WORKDIR /app
RUN pip install --no-cache-dir pymongo
COPY feature_rollup_service.py .

# --- Final Stage ---
FROM python:3.9-slim
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.9/site-packages/ /usr/local/lib/python3.9/site-packages/
COPY --from=builder /app/feature_rollup_service.py .

ENV MONGO_URI="mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=rs0"

CMD ["python", "feature_rollup_service.py"]

A similar Dockerfile would be created for the inference service, adding tensorflow to the dependencies. This packaging standardizes the deployment artifact, making it runnable on any container orchestrator like Kubernetes or Amazon ECS.

Limitations and Future Considerations

The primary limitation of this in-database transactional approach is its scalability under extreme write loads or for highly complex aggregation logic. A transaction that updates a single user document is fast and efficient. However, if the business logic required updating a global document (e.g., total_transactions_across_all_users), this would become a massive bottleneck due to lock contention on that single document. The pattern is best suited for workloads that can be partitioned by an entity ID, such as user_id.

Furthermore, if the feature engineering logic evolves to require complex, time-based windowing (e.g., “average transaction amount over a 30-minute sliding window with a 5-minute hop”), this becomes impractical to implement inside a database transaction. The logic is better suited to a dedicated stream processor. In such a scenario, a hybrid architecture might be necessary. The transactional pattern could be retained for critical, simple rollups (like daily counts), while a parallel, eventually consistent stream processing path handles the more complex, less critical features. This allows the system to benefit from both strong consistency where it matters most and the computational power of stream processing where it is needed.


  TOC