Building a Cross-Stack Anomaly Detection Pipeline with Neo4j, Phoenix, and Scikit-learn


Our existing fraud detection mechanism was failing. It ran as a nightly batch process, querying a production PostgreSQL replica. By the time it identified a coordinated ring of malicious accounts, they had already been active for 18-24 hours, causing irreversible damage to the user ecosystem. The technical pain point was latency; the business demand was for a system that could score user interactions for fraudulent signals in under 500 milliseconds. This immediately invalidated our entire batch-processing paradigm and forced a complete architectural rethink.

The initial concept was a fully event-driven pipeline. An interaction event from an iOS client would be streamed to a backend, persisted in a data store suitable for complex relationship analysis, fed to a machine learning model for scoring, and the result visualized on a live dashboard for our security operations team. The challenge wasn’t any single part of this, but integrating a heterogeneous stack of technologies, each chosen for being best-in-class for its specific role, into a cohesive, low-latency system.

Our technology selection process was rigorous and driven by the stringent performance requirements.

  1. Ingestion & Real-time Communication: Phoenix (Elixir). We needed to maintain persistent connections with potentially millions of iOS clients. The BEAM VM’s lightweight processes and the Phoenix framework’s Channel abstraction over WebSockets are purpose-built for this C10M problem. A traditional request/response framework in Ruby or Python would require a massive, costly fleet of servers and complex load balancing to handle this connection volume. Elixir’s GenServers also offered the perfect primitive for building resilient, stateful data processing workers within the application.

  2. Data Storage & Analysis: Neo4j. The core of the problem was identifying coordinated inauthentic behavior. These patterns are not about individual user attributes but about the relationships between users: sharing the same device ID, using the same IP subnet, interacting in rapid, reciprocal patterns. Querying this in a relational database required deeply nested, multi-level JOINs that were cripplingly slow. A graph database was the natural fit. Neo4j was chosen for its mature Cypher query language and proven performance in pathfinding and pattern matching queries.

  3. Machine Learning Model Serving: Scikit-learn (Python). Our data science team had already developed and validated fraud detection models using Python’s scientific computing stack (pandas, numpy, scikit-learn). Asking them to port these models to another language like Elixir would have been a significant project in itself, introducing risk and delays. Therefore, the architecture had to accommodate a separate Python service. The critical decision was the communication protocol between Phoenix and Python. We discarded REST/JSON due to serialization overhead and chose gRPC for its performance benefits with Protobuf.

  4. Operator Dashboard: Solid.js. The security team’s dashboard needed to display a real-time stream of alerts and allow for interactive exploration of suspicious graph neighborhoods. We needed a frontend framework that could handle high-frequency data updates from a Phoenix Channel without performance degradation. Solid.js’s fine-grained reactivity model, which updates DOM nodes directly without a virtual DOM diffing step, made it a compelling choice over alternatives for this specific high-throughput use case.

Phase 1: The Phoenix Ingestion Endpoint and Processing Pipeline

The first step was building the ingestion layer. An iOS client connects to a Phoenix Channel, through which it sends interaction events. The EventChannel is the entry point.

# lib/realtime_fraud_web/channels/user_socket.ex
defmodule RealtimeFraudWeb.UserSocket do
  use Phoenix.Socket

  channel "events:*", RealtimeFraudWeb.EventChannel

  @impl true
  def connect(%{"token" => token}, socket, _connect_info) do
    case validate_token(token) do
      {:ok, user_id} ->
        {:ok, assign(socket, :user_id, user_id)}
      {:error, _reason} ->
        :error
    end
  end

  # In a real-world project, this would be a proper JWT or session validation.
  defp validate_token(token) do
    if String.starts_with?(token, "user:") do
      {:ok, token}
    else
      {:error, :invalid_token}
    end
  end

  @impl true
  def id(socket), do: "users_socket:#{socket.assigns.user_id}"
end

The EventChannel receives the events. A common mistake is to perform heavy, blocking work directly within the handle_in callback. This would tie up the channel’s process and degrade real-time performance. The pitfall here is that if Neo4j or the ML service slows down, it would create backpressure all the way to the client connection. Instead, we immediately offload the work to a dedicated pool of workers managed by a GenServer-based supervisor.

# lib/realtime_fraud_web/channels/event_channel.ex
defmodule RealtimeFraudWeb.EventChannel do
  use Phoenix.Channel
  alias RealtimeFraud.Pipeline.IngestionSupervisor

  require Logger

  def join("events:" <> _private_topic, _payload, socket) do
    {:ok, socket}
  end

  # This is the entry point for events from the iOS client.
  @impl true
  def handle_in("interaction", payload, socket) do
    # Do not block the channel. Immediately offload to the processing pipeline.
    # We add metadata from the socket connection.
    event_data = Map.merge(payload, %{
      "ingested_at" => DateTime.utc_now(),
      "source_user_id" => socket.assigns.user_id
    })

    # The Task.Supervisor allows for concurrent, supervised, fire-and-forget tasks.
    Task.Supervisor.start_child(IngestionSupervisor, fn ->
      RealtimeFraud.Pipeline.process_event(event_data)
    end)

    {:noreply, socket}
  end

  @impl true
  def handle_in(event, _payload, socket) do
    Logger.warn("Received unhandled event: #{event}")
    {:noreply, socket}
  end
end

# lib/realtime_fraud/pipeline/ingestion_supervisor.ex
defmodule RealtimeFraud.Pipeline.IngestionSupervisor do
  use Task.Supervisor

  def start_link(arg) do
    Task.Supervisor.start_link(arg, name: __MODULE__)
  end
end

# In application.ex, add to the supervision tree
# children = [
#   {RealtimeFraud.Pipeline.IngestionSupervisor, []},
#   ...
# ]

The RealtimeFraud.Pipeline.process_event/1 function is the orchestrator. It calls the GraphWriter to persist the event and then the ScoringService to get a fraud score.

Phase 2: Neo4j Integration and a Resilient Writer

Connecting to Neo4j from Elixir is done via the bolt_sips library. A naive implementation would open a new connection for every write, which is inefficient. The library includes a connection pool, but for mission-critical systems, we need more explicit control over concurrency, backoff, and error handling. We wrapped the driver in our own GenServer-based worker pool using :poolboy.

First, let’s define the GraphWriter which is responsible for translating events into Cypher queries.

# lib/realtime_fraud/pipeline/graph_writer.ex
defmodule RealtimeFraud.Pipeline.GraphWriter do
  @pool_name :neo4j_worker_pool
  
  # Public API
  def persist_interaction(event) do
    # This call blocks until a worker is available from the pool.
    # The timeout prevents waiting indefinitely if the system is overloaded.
    :poolboy.transaction(
      @pool_name,
      fn worker_pid -> GenServer.call(worker_pid, {:persist, event}) end,
      :timer.seconds(10)
    )
  end

  # Worker implementation using GenServer
  defmodule Worker do
    use GenServer
    require Logger

    def start_link(_) do
      GenServer.start_link(__MODULE__, [], [])
    end

    @impl true
    def init(_) do
      # Each worker maintains its own connection to Neo4j.
      # Configuration should be in config.exs
      conn_opts = Application.get_env(:realtime_fraud, :neo4j)
      case Bolt.SIPS.start_link(conn_opts) do
        {:ok, conn} -> 
          Logger.info("Neo4j Worker #{inspect self()} connected.")
          {:ok, %{conn: conn}}
        {:error, reason} -> 
          Logger.error("Neo4j Worker failed to connect: #{inspect reason}")
          {:stop, :connection_failed}
      end
    end

    @impl true
    def handle_call({:persist, event}, _from, state = %{conn: conn}) do
      # The MERGE clause is crucial. It creates the node/relationship
      # if it doesn't exist, or matches it if it does. This makes
      # the write operation idempotent.
      cypher_query = """
      MERGE (source:User {id: $source_user_id})
      MERGE (target:User {id: $target_user_id})
      CREATE (source)-[r:INTERACTED_WITH {
        type: $interaction_type,
        timestamp: $timestamp,
        client_ip: $client_ip,
        device_id: $device_id
      }]->(target)
      """

      params = %{
        "source_user_id" => event["source_user_id"],
        "target_user_id" => event["target_user_id"],
        "interaction_type" => event["type"],
        "timestamp" => DateTime.to_unix(event["ingested_at"]),
        "client_ip" => event["ip_address"],
        "device_id" => event["device_id"]
      }
      
      case Bolt.SIPS.query(conn, cypher_query, params) do
        {:ok, _stream} ->
          {:reply, :ok, state}
        {:error, reason} ->
          Logger.error("Failed to execute Cypher query: #{inspect reason}")
          # In a production system, you'd add retry logic with exponential backoff here.
          # For now, we'll just reply with an error.
          {:reply, {:error, :db_error}, state}
      end
    end
  end
end

The :poolboy configuration in application.ex defines the size of the worker pool, which throttles the concurrent writes to Neo4j, protecting it from being overwhelmed.

# lib/realtime_fraud/application.ex
def start(_type, _args) do
  poolboy_config = [
    name: {:local, :neo4j_worker_pool},
    worker_module: RealtimeFraud.Pipeline.GraphWriter.Worker,
    size: 20, # Max concurrent connections to Neo4j
    max_overflow: 10 # Temporarily allow more workers under heavy load
  ]

  children = [
    {RealtimeFraud.Pipeline.IngestionSupervisor, []},
    {:poolboy, poolboy_config},
    # ... other children
  ]
  # ...
end

Phase 3: The gRPC Bridge to the Scikit-learn Service

This was the most complex part of the integration. We needed a low-latency bridge between the Elixir and Python runtimes.

Step 1: Define the Protobuf contract. This proto file is the source of truth for the RPC interface.

// fraud_scorer.proto
syntax = "proto3";

package fraudscorer;

service FraudScorer {
  // Obtains a fraud score for a given interaction
  rpc ScoreInteraction(ScoreRequest) returns (ScoreResponse) {}
}

message ScoreRequest {
  string source_user_id = 1;
  string target_user_id = 2;
  // We don't send raw graph data. Instead, Phoenix will query Neo4j
  // for pre-computed features and send them here.
  map<string, float> features = 3;
}

message ScoreResponse {
  string request_id = 1;
  double score = 2; // A value between 0.0 and 1.0
  bool is_fraudulent = 3; // True if score exceeds a threshold
}

Step 2: Implement the Python gRPC Server. This service loads a pre-trained Scikit-learn model and exposes the ScoreInteraction endpoint.

# ml_service/server.py
import grpc
from concurrent import futures
import joblib
import numpy as np

# Import generated gRPC code
import fraud_scorer_pb2
import fraud_scorer_pb2_grpc

class FraudScorerService(fraud_scorer_pb2_grpc.FraudScorerServicer):
    def __init__(self):
        # In a real project, this model would be loaded from a model registry
        # like S3 or MLflow.
        try:
            self.model = joblib.load("model.pkl")
            # The order of features is critical and must match the model's training
            self.feature_order = [
                "jaccard_similarity",
                "source_degree",
                "target_degree",
                "shared_device_count"
            ]
            print("Fraud detection model loaded successfully.")
        except FileNotFoundError:
            print("Error: model.pkl not found. The service will not be able to score.")
            self.model = None

    def ScoreInteraction(self, request, context):
        if not self.model:
            context.set_code(grpc.StatusCode.UNAVAILABLE)
            context.set_details("Model is not loaded.")
            return fraud_scorer_pb2.ScoreResponse()

        try:
            # Reconstruct the feature vector in the correct order
            feature_vector = [request.features.get(f, 0.0) for f in self.feature_order]
            
            # Scikit-learn models expect a 2D array
            prediction_input = np.array(feature_vector).reshape(1, -1)
            
            # The model predicts the probability of the positive class (fraud)
            score = self.model.predict_proba(prediction_input)[0, 1]
            
            is_fraud = score > 0.85 # Business logic threshold
            
            return fraud_scorer_pb2.ScoreResponse(
                score=score,
                is_fraudulent=is_fraud
            )
        except Exception as e:
            print(f"Error during prediction: {e}")
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(f"Prediction failed: {e}")
            return fraud_scorer_pb2.ScoreResponse()


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    fraud_scorer_pb2_grpc.add_FraudScorerServicer_to_server(
        FraudScorerService(), server
    )
    server.add_insecure_port("[::]:50051")
    server.start()
    print("gRPC server started on port 50051.")
    server.wait_for_termination()

if __name__ == "__main__":
    serve()

Step 3: Implement the Elixir gRPC Client. The Elixir code is now responsible for fetching features from Neo4j and calling the Python service. A significant optimization was made here. Our first iteration involved querying graph structures and sending them over gRPC for Python to process. This was too slow. The winning strategy was to make Cypher do the heavy lifting of feature calculation.

# lib/realtime_fraud/ml/scoring_service.ex
defmodule RealtimeFraud.ML.ScoringService do
  require Logger

  # This GenServer would manage the gRPC channel/connection.
  # For simplicity, we'll make a direct call here.
  def get_score(source_user_id, target_user_id) do
    with {:ok, features} <- fetch_graph_features(source_user_id, target_user_id),
         {:ok, channel} <- GRPC.Stub.connect("localhost:50051"), # Connection should be pooled
         request <- Fraudscorer.ScoreRequest.new(
           source_user_id: source_user_id,
           target_user_id: target_user_id,
           features: features
         ),
         {:ok, response} <- Fraudscorer.FraudScorer.Stub.score_interaction(channel, request) do
      {:ok, %{score: response.score, is_fraudulent: response.is_fraudulent}}
    else
      {:error, reason} ->
        Logger.error("Failed to get score: #{inspect reason}")
        {:error, reason}
    end
  end

  defp fetch_graph_features(source_user_id, target_user_id) do
    # This Cypher query is the heart of the feature engineering.
    # It calculates graph-specific features directly in the database.
    cypher = """
    MATCH (source:User {id: $source_id}), (target:User {id: $target_id})
    
    // Feature 1: Jaccard similarity of neighbors
    OPTIONAL MATCH (source)-[:INTERACTED_WITH]-(s_neighbor)
    WITH source, target, collect(DISTINCT s_neighbor) AS s_neighbors
    OPTIONAL MATCH (target)-[:INTERACTED_WITH]-(t_neighbor)
    WITH source, target, s_neighbors, collect(DISTINCT t_neighbor) AS t_neighbors
    
    // Feature 2: Node degrees
    WITH source, target, s_neighbors, t_neighbors, 
         size((source)-[:INTERACTED_WITH]-()) AS source_degree,
         size((target)-[:INTERACTED_WITH]-()) AS target_degree
    
    // Feature 3: Shared device identifiers
    MATCH (source)-[r1:INTERACTED_WITH]->(), (target)-[r2:INTERACTED_WITH]->()
    WHERE r1.device_id = r2.device_id AND r1.device_id IS NOT NULL
    WITH source, target, s_neighbors, t_neighbors, source_degree, target_degree, 
         count(DISTINCT r1.device_id) as shared_devices
         
    WITH s_neighbors, t_neighbors, source_degree, target_degree, shared_devices
    // Calculate intersection and union for Jaccard
    WITH [n IN s_neighbors WHERE n IN t_neighbors] AS intersection,
         s_neighbors + t_neighbors AS all_neighbors,
         source_degree, target_degree, shared_devices
    WITH intersection, [n IN all_neighbors | id(n)] AS all_ids,
         source_degree, target_degree, shared_devices
    
    RETURN 
      toFloat(size(intersection)) / toFloat(size(apoc.coll.toSet(all_ids))) AS jaccard_similarity,
      source_degree,
      target_degree,
      shared_devices
    """

    params = %{"source_id" => source_user_id, "target_id" => target_user_id}
    
    # This uses a simplified direct query for clarity. In production, this would
    # go through the same :poolboy setup as the writer.
    with {:ok, conn} <- Bolt.SIPS.start_link(Application.get_env(:realtime_fraud, :neo4j)),
         {:ok, stream} <- Bolt.SIPS.query(conn, cypher, params),
         result <- Enum.at(stream, 0) do
      features = %{
        "jaccard_similarity" => result["jaccard_similarity"] || 0.0,
        "source_degree" => Float.from(result["source_degree"] || 0),
        "target_degree" => Float.from(result["target_degree"] || 0),
        "shared_device_count" => Float.from(result["shared_devices"] || 0)
      }
      {:ok, features}
    else
      _ -> {:error, :feature_extraction_failed}
    end
  end
end

Phase 4: The Solid.js Real-time Dashboard

The final piece was the operator dashboard. After the pipeline processes an event and gets a score, if the score is above a threshold, Phoenix broadcasts an alert to a specific topic, e.g., alerts:ops. A Solid.js frontend listens to this topic.

// src/components/AlertStream.jsx
import { createSignal, onCleanup, For } from "solid-js";
import { Socket } from "phoenix";

const AlertStream = () => {
  const [alerts, setAlerts] = createSignal([]);
  let channel;

  // Establish connection to Phoenix Socket
  const socket = new Socket("/socket", { params: { token: "ops-dashboard-token" } });
  socket.connect();

  // Join the specific channel for operator alerts
  channel = socket.channel("alerts:ops", {});
  
  channel.on("new_alert", (payload) => {
    // Solid's fine-grained reactivity is efficient here.
    // It prepends to the array without re-rendering the whole list.
    setAlerts(currentAlerts => [payload, ...currentAlerts.slice(0, 99)]);
  });

  channel.join()
    .receive("ok", resp => { console.log("Joined alerts channel successfully", resp) })
    .receive("error", resp => { console.log("Unable to join alerts channel", resp) });

  // Ensure we leave the channel when the component is unmounted.
  onCleanup(() => {
    channel.leave();
    socket.disconnect();
  });

  return (
    <div class="alert-container">
      <h1>Real-time Fraud Alerts</h1>
      <ul>
        <For each={alerts()}>
          {(alert, i) => (
            <li class={alert.score > 0.95 ? 'critical' : 'warning'}>
              <strong>Score: {alert.score.toFixed(4)}</strong>
              <p>Interaction between <code>{alert.source_user_id}</code> and <code>{alert.target_user_id}</code></p>
              <small>{new Date(alert.timestamp).toISOString()}</small>
            </li>
          )}
        </For>
      </ul>
    </div>
  );
};

export default AlertStream;

Here’s the final data flow:

graph TD
    subgraph "iOS Client"
        A[App Interaction]
    end
    subgraph "Phoenix (Elixir)"
        B[UserSocket] -- Event --> C{EventChannel};
        C -- offload --> D[Ingestion Task];
        D -- persist --> E[GraphWriter Pool];
        D -- request features/score --> F{ScoringService Client};
        F -- broadcast --> G[Alerts Channel];
    end
    subgraph "Neo4j"
        H[(Graph Database)]
    end
    subgraph "ML Service (Python)"
        I[gRPC Server] -- predict --> J[Scikit-learn Model];
    end
    subgraph "Operator Browser"
        K[Solid.js Component]
    end

    A -- WebSocket --> B;
    E -- Cypher (MERGE) --> H;
    F -- Cypher (MATCH) --> H;
    F -- gRPC Call --> I;
    G -- WebSocket Push --> K;

This polyglot architecture, while complex, allowed us to use the best tool for each job, achieving our sub-500ms goal for real-time fraud scoring. The main challenge was not in any single component, but in the seams between them—the gRPC interface, the database connection pooling strategy, and the flow of data through asynchronous processes.

The current implementation still has limitations. The gRPC communication is synchronous within the processing task; a slow ML service could still block a single ingestion task. A more resilient future iteration would use a message queue like RabbitMQ or Kafka between the Phoenix app and the Python service, creating a fully asynchronous, decoupled system. Furthermore, the Neo4j instance is a single point of failure. A migration to a Causal Cluster setup is necessary for true high availability, which brings its own set of challenges regarding transaction management and read/write routing that the GraphWriter pool would need to handle.


  TOC