Our existing fraud detection mechanism was failing. It ran as a nightly batch process, querying a production PostgreSQL replica. By the time it identified a coordinated ring of malicious accounts, they had already been active for 18-24 hours, causing irreversible damage to the user ecosystem. The technical pain point was latency; the business demand was for a system that could score user interactions for fraudulent signals in under 500 milliseconds. This immediately invalidated our entire batch-processing paradigm and forced a complete architectural rethink.
The initial concept was a fully event-driven pipeline. An interaction event from an iOS client would be streamed to a backend, persisted in a data store suitable for complex relationship analysis, fed to a machine learning model for scoring, and the result visualized on a live dashboard for our security operations team. The challenge wasn’t any single part of this, but integrating a heterogeneous stack of technologies, each chosen for being best-in-class for its specific role, into a cohesive, low-latency system.
Our technology selection process was rigorous and driven by the stringent performance requirements.
Ingestion & Real-time Communication: Phoenix (Elixir). We needed to maintain persistent connections with potentially millions of iOS clients. The BEAM VM’s lightweight processes and the Phoenix framework’s Channel abstraction over WebSockets are purpose-built for this C10M problem. A traditional request/response framework in Ruby or Python would require a massive, costly fleet of servers and complex load balancing to handle this connection volume. Elixir’s GenServers also offered the perfect primitive for building resilient, stateful data processing workers within the application.
Data Storage & Analysis: Neo4j. The core of the problem was identifying coordinated inauthentic behavior. These patterns are not about individual user attributes but about the relationships between users: sharing the same device ID, using the same IP subnet, interacting in rapid, reciprocal patterns. Querying this in a relational database required deeply nested, multi-level
JOIN
s that were cripplingly slow. A graph database was the natural fit. Neo4j was chosen for its mature Cypher query language and proven performance in pathfinding and pattern matching queries.Machine Learning Model Serving: Scikit-learn (Python). Our data science team had already developed and validated fraud detection models using Python’s scientific computing stack (pandas, numpy, scikit-learn). Asking them to port these models to another language like Elixir would have been a significant project in itself, introducing risk and delays. Therefore, the architecture had to accommodate a separate Python service. The critical decision was the communication protocol between Phoenix and Python. We discarded REST/JSON due to serialization overhead and chose gRPC for its performance benefits with Protobuf.
Operator Dashboard: Solid.js. The security team’s dashboard needed to display a real-time stream of alerts and allow for interactive exploration of suspicious graph neighborhoods. We needed a frontend framework that could handle high-frequency data updates from a Phoenix Channel without performance degradation. Solid.js’s fine-grained reactivity model, which updates DOM nodes directly without a virtual DOM diffing step, made it a compelling choice over alternatives for this specific high-throughput use case.
Phase 1: The Phoenix Ingestion Endpoint and Processing Pipeline
The first step was building the ingestion layer. An iOS client connects to a Phoenix Channel, through which it sends interaction events. The EventChannel
is the entry point.
# lib/realtime_fraud_web/channels/user_socket.ex
defmodule RealtimeFraudWeb.UserSocket do
use Phoenix.Socket
channel "events:*", RealtimeFraudWeb.EventChannel
@impl true
def connect(%{"token" => token}, socket, _connect_info) do
case validate_token(token) do
{:ok, user_id} ->
{:ok, assign(socket, :user_id, user_id)}
{:error, _reason} ->
:error
end
end
# In a real-world project, this would be a proper JWT or session validation.
defp validate_token(token) do
if String.starts_with?(token, "user:") do
{:ok, token}
else
{:error, :invalid_token}
end
end
@impl true
def id(socket), do: "users_socket:#{socket.assigns.user_id}"
end
The EventChannel
receives the events. A common mistake is to perform heavy, blocking work directly within the handle_in
callback. This would tie up the channel’s process and degrade real-time performance. The pitfall here is that if Neo4j or the ML service slows down, it would create backpressure all the way to the client connection. Instead, we immediately offload the work to a dedicated pool of workers managed by a GenServer-based supervisor.
# lib/realtime_fraud_web/channels/event_channel.ex
defmodule RealtimeFraudWeb.EventChannel do
use Phoenix.Channel
alias RealtimeFraud.Pipeline.IngestionSupervisor
require Logger
def join("events:" <> _private_topic, _payload, socket) do
{:ok, socket}
end
# This is the entry point for events from the iOS client.
@impl true
def handle_in("interaction", payload, socket) do
# Do not block the channel. Immediately offload to the processing pipeline.
# We add metadata from the socket connection.
event_data = Map.merge(payload, %{
"ingested_at" => DateTime.utc_now(),
"source_user_id" => socket.assigns.user_id
})
# The Task.Supervisor allows for concurrent, supervised, fire-and-forget tasks.
Task.Supervisor.start_child(IngestionSupervisor, fn ->
RealtimeFraud.Pipeline.process_event(event_data)
end)
{:noreply, socket}
end
@impl true
def handle_in(event, _payload, socket) do
Logger.warn("Received unhandled event: #{event}")
{:noreply, socket}
end
end
# lib/realtime_fraud/pipeline/ingestion_supervisor.ex
defmodule RealtimeFraud.Pipeline.IngestionSupervisor do
use Task.Supervisor
def start_link(arg) do
Task.Supervisor.start_link(arg, name: __MODULE__)
end
end
# In application.ex, add to the supervision tree
# children = [
# {RealtimeFraud.Pipeline.IngestionSupervisor, []},
# ...
# ]
The RealtimeFraud.Pipeline.process_event/1
function is the orchestrator. It calls the GraphWriter
to persist the event and then the ScoringService
to get a fraud score.
Phase 2: Neo4j Integration and a Resilient Writer
Connecting to Neo4j from Elixir is done via the bolt_sips
library. A naive implementation would open a new connection for every write, which is inefficient. The library includes a connection pool, but for mission-critical systems, we need more explicit control over concurrency, backoff, and error handling. We wrapped the driver in our own GenServer-based worker pool using :poolboy
.
First, let’s define the GraphWriter
which is responsible for translating events into Cypher queries.
# lib/realtime_fraud/pipeline/graph_writer.ex
defmodule RealtimeFraud.Pipeline.GraphWriter do
@pool_name :neo4j_worker_pool
# Public API
def persist_interaction(event) do
# This call blocks until a worker is available from the pool.
# The timeout prevents waiting indefinitely if the system is overloaded.
:poolboy.transaction(
@pool_name,
fn worker_pid -> GenServer.call(worker_pid, {:persist, event}) end,
:timer.seconds(10)
)
end
# Worker implementation using GenServer
defmodule Worker do
use GenServer
require Logger
def start_link(_) do
GenServer.start_link(__MODULE__, [], [])
end
@impl true
def init(_) do
# Each worker maintains its own connection to Neo4j.
# Configuration should be in config.exs
conn_opts = Application.get_env(:realtime_fraud, :neo4j)
case Bolt.SIPS.start_link(conn_opts) do
{:ok, conn} ->
Logger.info("Neo4j Worker #{inspect self()} connected.")
{:ok, %{conn: conn}}
{:error, reason} ->
Logger.error("Neo4j Worker failed to connect: #{inspect reason}")
{:stop, :connection_failed}
end
end
@impl true
def handle_call({:persist, event}, _from, state = %{conn: conn}) do
# The MERGE clause is crucial. It creates the node/relationship
# if it doesn't exist, or matches it if it does. This makes
# the write operation idempotent.
cypher_query = """
MERGE (source:User {id: $source_user_id})
MERGE (target:User {id: $target_user_id})
CREATE (source)-[r:INTERACTED_WITH {
type: $interaction_type,
timestamp: $timestamp,
client_ip: $client_ip,
device_id: $device_id
}]->(target)
"""
params = %{
"source_user_id" => event["source_user_id"],
"target_user_id" => event["target_user_id"],
"interaction_type" => event["type"],
"timestamp" => DateTime.to_unix(event["ingested_at"]),
"client_ip" => event["ip_address"],
"device_id" => event["device_id"]
}
case Bolt.SIPS.query(conn, cypher_query, params) do
{:ok, _stream} ->
{:reply, :ok, state}
{:error, reason} ->
Logger.error("Failed to execute Cypher query: #{inspect reason}")
# In a production system, you'd add retry logic with exponential backoff here.
# For now, we'll just reply with an error.
{:reply, {:error, :db_error}, state}
end
end
end
end
The :poolboy
configuration in application.ex
defines the size of the worker pool, which throttles the concurrent writes to Neo4j, protecting it from being overwhelmed.
# lib/realtime_fraud/application.ex
def start(_type, _args) do
poolboy_config = [
name: {:local, :neo4j_worker_pool},
worker_module: RealtimeFraud.Pipeline.GraphWriter.Worker,
size: 20, # Max concurrent connections to Neo4j
max_overflow: 10 # Temporarily allow more workers under heavy load
]
children = [
{RealtimeFraud.Pipeline.IngestionSupervisor, []},
{:poolboy, poolboy_config},
# ... other children
]
# ...
end
Phase 3: The gRPC Bridge to the Scikit-learn Service
This was the most complex part of the integration. We needed a low-latency bridge between the Elixir and Python runtimes.
Step 1: Define the Protobuf contract. This proto
file is the source of truth for the RPC interface.
// fraud_scorer.proto
syntax = "proto3";
package fraudscorer;
service FraudScorer {
// Obtains a fraud score for a given interaction
rpc ScoreInteraction(ScoreRequest) returns (ScoreResponse) {}
}
message ScoreRequest {
string source_user_id = 1;
string target_user_id = 2;
// We don't send raw graph data. Instead, Phoenix will query Neo4j
// for pre-computed features and send them here.
map<string, float> features = 3;
}
message ScoreResponse {
string request_id = 1;
double score = 2; // A value between 0.0 and 1.0
bool is_fraudulent = 3; // True if score exceeds a threshold
}
Step 2: Implement the Python gRPC Server. This service loads a pre-trained Scikit-learn model and exposes the ScoreInteraction
endpoint.
# ml_service/server.py
import grpc
from concurrent import futures
import joblib
import numpy as np
# Import generated gRPC code
import fraud_scorer_pb2
import fraud_scorer_pb2_grpc
class FraudScorerService(fraud_scorer_pb2_grpc.FraudScorerServicer):
def __init__(self):
# In a real project, this model would be loaded from a model registry
# like S3 or MLflow.
try:
self.model = joblib.load("model.pkl")
# The order of features is critical and must match the model's training
self.feature_order = [
"jaccard_similarity",
"source_degree",
"target_degree",
"shared_device_count"
]
print("Fraud detection model loaded successfully.")
except FileNotFoundError:
print("Error: model.pkl not found. The service will not be able to score.")
self.model = None
def ScoreInteraction(self, request, context):
if not self.model:
context.set_code(grpc.StatusCode.UNAVAILABLE)
context.set_details("Model is not loaded.")
return fraud_scorer_pb2.ScoreResponse()
try:
# Reconstruct the feature vector in the correct order
feature_vector = [request.features.get(f, 0.0) for f in self.feature_order]
# Scikit-learn models expect a 2D array
prediction_input = np.array(feature_vector).reshape(1, -1)
# The model predicts the probability of the positive class (fraud)
score = self.model.predict_proba(prediction_input)[0, 1]
is_fraud = score > 0.85 # Business logic threshold
return fraud_scorer_pb2.ScoreResponse(
score=score,
is_fraudulent=is_fraud
)
except Exception as e:
print(f"Error during prediction: {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"Prediction failed: {e}")
return fraud_scorer_pb2.ScoreResponse()
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
fraud_scorer_pb2_grpc.add_FraudScorerServicer_to_server(
FraudScorerService(), server
)
server.add_insecure_port("[::]:50051")
server.start()
print("gRPC server started on port 50051.")
server.wait_for_termination()
if __name__ == "__main__":
serve()
Step 3: Implement the Elixir gRPC Client. The Elixir code is now responsible for fetching features from Neo4j and calling the Python service. A significant optimization was made here. Our first iteration involved querying graph structures and sending them over gRPC for Python to process. This was too slow. The winning strategy was to make Cypher do the heavy lifting of feature calculation.
# lib/realtime_fraud/ml/scoring_service.ex
defmodule RealtimeFraud.ML.ScoringService do
require Logger
# This GenServer would manage the gRPC channel/connection.
# For simplicity, we'll make a direct call here.
def get_score(source_user_id, target_user_id) do
with {:ok, features} <- fetch_graph_features(source_user_id, target_user_id),
{:ok, channel} <- GRPC.Stub.connect("localhost:50051"), # Connection should be pooled
request <- Fraudscorer.ScoreRequest.new(
source_user_id: source_user_id,
target_user_id: target_user_id,
features: features
),
{:ok, response} <- Fraudscorer.FraudScorer.Stub.score_interaction(channel, request) do
{:ok, %{score: response.score, is_fraudulent: response.is_fraudulent}}
else
{:error, reason} ->
Logger.error("Failed to get score: #{inspect reason}")
{:error, reason}
end
end
defp fetch_graph_features(source_user_id, target_user_id) do
# This Cypher query is the heart of the feature engineering.
# It calculates graph-specific features directly in the database.
cypher = """
MATCH (source:User {id: $source_id}), (target:User {id: $target_id})
// Feature 1: Jaccard similarity of neighbors
OPTIONAL MATCH (source)-[:INTERACTED_WITH]-(s_neighbor)
WITH source, target, collect(DISTINCT s_neighbor) AS s_neighbors
OPTIONAL MATCH (target)-[:INTERACTED_WITH]-(t_neighbor)
WITH source, target, s_neighbors, collect(DISTINCT t_neighbor) AS t_neighbors
// Feature 2: Node degrees
WITH source, target, s_neighbors, t_neighbors,
size((source)-[:INTERACTED_WITH]-()) AS source_degree,
size((target)-[:INTERACTED_WITH]-()) AS target_degree
// Feature 3: Shared device identifiers
MATCH (source)-[r1:INTERACTED_WITH]->(), (target)-[r2:INTERACTED_WITH]->()
WHERE r1.device_id = r2.device_id AND r1.device_id IS NOT NULL
WITH source, target, s_neighbors, t_neighbors, source_degree, target_degree,
count(DISTINCT r1.device_id) as shared_devices
WITH s_neighbors, t_neighbors, source_degree, target_degree, shared_devices
// Calculate intersection and union for Jaccard
WITH [n IN s_neighbors WHERE n IN t_neighbors] AS intersection,
s_neighbors + t_neighbors AS all_neighbors,
source_degree, target_degree, shared_devices
WITH intersection, [n IN all_neighbors | id(n)] AS all_ids,
source_degree, target_degree, shared_devices
RETURN
toFloat(size(intersection)) / toFloat(size(apoc.coll.toSet(all_ids))) AS jaccard_similarity,
source_degree,
target_degree,
shared_devices
"""
params = %{"source_id" => source_user_id, "target_id" => target_user_id}
# This uses a simplified direct query for clarity. In production, this would
# go through the same :poolboy setup as the writer.
with {:ok, conn} <- Bolt.SIPS.start_link(Application.get_env(:realtime_fraud, :neo4j)),
{:ok, stream} <- Bolt.SIPS.query(conn, cypher, params),
result <- Enum.at(stream, 0) do
features = %{
"jaccard_similarity" => result["jaccard_similarity"] || 0.0,
"source_degree" => Float.from(result["source_degree"] || 0),
"target_degree" => Float.from(result["target_degree"] || 0),
"shared_device_count" => Float.from(result["shared_devices"] || 0)
}
{:ok, features}
else
_ -> {:error, :feature_extraction_failed}
end
end
end
Phase 4: The Solid.js Real-time Dashboard
The final piece was the operator dashboard. After the pipeline processes an event and gets a score, if the score is above a threshold, Phoenix broadcasts an alert to a specific topic, e.g., alerts:ops
. A Solid.js frontend listens to this topic.
// src/components/AlertStream.jsx
import { createSignal, onCleanup, For } from "solid-js";
import { Socket } from "phoenix";
const AlertStream = () => {
const [alerts, setAlerts] = createSignal([]);
let channel;
// Establish connection to Phoenix Socket
const socket = new Socket("/socket", { params: { token: "ops-dashboard-token" } });
socket.connect();
// Join the specific channel for operator alerts
channel = socket.channel("alerts:ops", {});
channel.on("new_alert", (payload) => {
// Solid's fine-grained reactivity is efficient here.
// It prepends to the array without re-rendering the whole list.
setAlerts(currentAlerts => [payload, ...currentAlerts.slice(0, 99)]);
});
channel.join()
.receive("ok", resp => { console.log("Joined alerts channel successfully", resp) })
.receive("error", resp => { console.log("Unable to join alerts channel", resp) });
// Ensure we leave the channel when the component is unmounted.
onCleanup(() => {
channel.leave();
socket.disconnect();
});
return (
<div class="alert-container">
<h1>Real-time Fraud Alerts</h1>
<ul>
<For each={alerts()}>
{(alert, i) => (
<li class={alert.score > 0.95 ? 'critical' : 'warning'}>
<strong>Score: {alert.score.toFixed(4)}</strong>
<p>Interaction between <code>{alert.source_user_id}</code> and <code>{alert.target_user_id}</code></p>
<small>{new Date(alert.timestamp).toISOString()}</small>
</li>
)}
</For>
</ul>
</div>
);
};
export default AlertStream;
Here’s the final data flow:
graph TD subgraph "iOS Client" A[App Interaction] end subgraph "Phoenix (Elixir)" B[UserSocket] -- Event --> C{EventChannel}; C -- offload --> D[Ingestion Task]; D -- persist --> E[GraphWriter Pool]; D -- request features/score --> F{ScoringService Client}; F -- broadcast --> G[Alerts Channel]; end subgraph "Neo4j" H[(Graph Database)] end subgraph "ML Service (Python)" I[gRPC Server] -- predict --> J[Scikit-learn Model]; end subgraph "Operator Browser" K[Solid.js Component] end A -- WebSocket --> B; E -- Cypher (MERGE) --> H; F -- Cypher (MATCH) --> H; F -- gRPC Call --> I; G -- WebSocket Push --> K;
This polyglot architecture, while complex, allowed us to use the best tool for each job, achieving our sub-500ms goal for real-time fraud scoring. The main challenge was not in any single component, but in the seams between them—the gRPC interface, the database connection pooling strategy, and the flow of data through asynchronous processes.
The current implementation still has limitations. The gRPC communication is synchronous within the processing task; a slow ML service could still block a single ingestion task. A more resilient future iteration would use a message queue like RabbitMQ or Kafka between the Phoenix app and the Python service, creating a fully asynchronous, decoupled system. Furthermore, the Neo4j instance is a single point of failure. A migration to a Causal Cluster setup is necessary for true high availability, which brings its own set of challenges regarding transaction management and read/write routing that the GraphWriter
pool would need to handle.