Implementing a Temporal Knowledge Graph via an NLP and Event Sourcing Pipeline on ArangoDB and Terraform


The core problem wasn’t a lack of data, but a lack of history. Our existing knowledge graph, built from millions of internal documents, could tell you the current relationship between a company and its subsidiary, but it couldn’t answer “What was their relationship before the Q3 acquisition announcement?” The system continuously overwrote state, destroying the temporal context critical for deep analysis. Every update was a loss of information. This led to a fundamental architectural rethink: we needed to treat state not as a mutable snapshot, but as an immutable stream of facts derived from our source texts.

Our initial concept was to pivot entirely to an Event Sourcing pattern. Instead of an NLP pipeline that reads a document and updates a graph, we would design it to produce a stream of immutable domain events: EntityDiscovered, RelationshipAsserted, AttributeModified. This event log would become the single source of truth. The ArangoDB graph would be relegated to a “read model,” a query-optimized projection of this log. This approach is costly in terms of complexity, but it offers a powerful guarantee: the ability to rebuild or create entirely new views of our data’s history at any time, simply by replaying events.

The technology stack to realize this concept required careful selection. For the NLP component, we needed a robust library for entity and relation extraction; a transformer-based model fine-tuned on our domain was the choice, but for this build log, we’ll use spaCy for its accessibility. For the database, ArangoDB was a clear winner. Its native graph model is essential, but its multi-model nature, allowing rich JSON documents for entity vertices, avoids the impedance mismatch of forcing all data into a rigid graph-only structure. Event Sourcing itself is a pattern, not a product, but we need a mechanism to persist and process the event stream. Finally, the entire multi-component system—NLP service, event processor, database—demanded a declarative, reproducible infrastructure. Manual setup in a real-world project is a recipe for configuration drift and failure. Terraform was the non-negotiable choice for managing the infrastructure as code.

Here’s the high-level architecture we settled on. It’s not simple, but it’s robust.

flowchart TD
    subgraph "Infrastructure Provisioned by Terraform"
        A[ArangoDB Cluster]
        B[NLP Ingestion Service]
        C[Event Stream Bus e.g., Kafka/Pulsar]
        D[Temporal Graph Projector Service]
    end

    E[Unstructured Text Documents] -->|HTTP POST| B
    B -->|Produces Events| C
    C -->|Consumes Events| D
    D -->|Writes Vertices/Edges| A
    F[Analytics Client] -->|AQL Queries| A

The first step in any serious project is to define the infrastructure foundation. Managing a database cluster, networking, and service deployments by hand is fragile. We defined our entire stack using Terraform. The pitfall here is treating Terraform scripts as one-off setup tools. They must be part of the application’s repository, versioned, and used for every single environmental change.

This is a module for provisioning our ArangoDB instance using Docker, suitable for development and testing. In production, this would be replaced by a module targeting a managed cloud service like ArangoDB Oasis or a Kubernetes operator.

terraform/modules/arangodb/main.tf:

# main.tf for ArangoDB module
# Note: This uses Docker for simplicity. Production would use a managed provider or Kubernetes operator.

variable "arango_image" {
  description = "The Docker image for ArangoDB."
  type        = string
  default     = "arangodb:3.10"
}

variable "arango_root_password" {
  description = "The root password for the ArangoDB instance."
  type        = string
  sensitive   = true
}

variable "container_name" {
  description = "The name for the Docker container."
  type        = string
  default     = "arangodb-temporal-graph"
}

resource "docker_image" "arangodb" {
  name = var.arango_image
}

resource "docker_container" "arangodb" {
  image = docker_image.arangodb.image_id
  name  = var.container_name
  ports {
    internal = 8529
    external = 8529
  }

  env = [
    "ARANGO_ROOT_PASSWORD=${var.arango_root_password}"
  ]

  # In a real project, you'd mount a volume for data persistence.
  # volumes {
  #   host_path      = "/path/on/host/data"
  #   container_path = "/var/lib/arangodb3"
  # }

  # Basic restart policy. Production would require a more robust health check.
  restart = "unless-stopped"
}

output "container_id" {
  value = docker_container.arangodb.id
}

output "endpoint" {
  value = "http://localhost:8529"
}

The root main.tf orchestrates the modules. Even for a simple setup, modularity is key for managing complexity as the system grows.

terraform/main.tf:

# Root main.tf
terraform {
  required_providers {
    docker = {
      source  = "kreuzwerker/docker"
      version = "~> 3.0.1"
    }
  }
}

provider "docker" {}

# Using a tfvars file is standard practice to avoid checking in secrets.
variable "db_root_password" {
  description = "Password for the ArangoDB root user."
  type        = string
  sensitive   = true
}

module "arangodb_instance" {
  source               = "./modules/arangodb"
  arango_root_password = var.db_root_password
}

output "arango_db_endpoint" {
  value = module.arangodb_instance.endpoint
}

To run this, one would create a terraform.tfvars file with db_root_password = "some-secure-password" and then execute terraform init and terraform apply. This declarative approach ensures that our database environment is perfectly reproducible.

With the database infrastructure defined, we moved to the core data models. In Event Sourcing, the event schemas are the most critical contract in the system. They must be versionable and explicit. We used Pydantic in Python for this, providing both validation and a clear definition of our data structures.

models/events.py:

import uuid
from datetime import datetime, timezone
from pydantic import BaseModel, Field

# A common mistake is to make events too generic. They should represent
# specific, factual business occurrences.

class EventMetadata(BaseModel):
    event_id: uuid.UUID = Field(default_factory=uuid.uuid4)
    event_type: str
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    source_document_id: str

class EntityDiscoveredPayload(BaseModel):
    entity_id: str  # e.g., a unique identifier like "CUSIP_12345"
    entity_type: str # e.g., "COMPANY", "PERSON"
    name: str
    attributes: dict[str, any]

class RelationshipAssertedPayload(BaseModel):
    relationship_id: uuid.UUID = Field(default_factory=uuid.uuid4)
    from_entity_id: str
    to_entity_id: str
    relationship_type: str # e.g., "OWNS", "IS_CEO_OF"
    valid_from: datetime # The timestamp when this fact becomes true
    attributes: dict[str, any]

class RelationshipTerminatedPayload(BaseModel):
    # We never delete. We mark the end of a relationship's validity.
    # This is central to building a temporal graph.
    relationship_id: uuid.UUID
    valid_to: datetime # The timestamp when this fact is no longer true

# These are the concrete events that will be stored in our log.
class EntityDiscovered(BaseModel):
    metadata: EventMetadata = Field(default_factory=lambda: EventMetadata(event_type='EntityDiscovered'))
    payload: EntityDiscoveredPayload

class RelationshipAsserted(BaseModel):
    metadata: EventMetadata = Field(default_factory=lambda: EventMetadata(event_type='RelationshipAsserted'))
    payload: RelationshipAssertedPayload

class RelationshipTerminated(BaseModel):
    metadata: EventMetadata = Field(default_factory=lambda: EventMetadata(event_type='RelationshipTerminated'))
    payload: RelationshipTerminatedPayload

The next component is the ingestion service. Its sole responsibility is to perform NLP on raw text and produce the events defined above. It does not know about ArangoDB or the graph structure; this separation of concerns is critical for maintainability.

services/nlp_ingestion_service.py:

import spacy
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import logging

from models.events import (
    EntityDiscovered, EntityDiscoveredPayload,
    RelationshipAsserted, RelationshipAssertedPayload
)

# In a real-world project, this NLP model would be a custom, fine-tuned
# transformer model. spaCy is used here for a runnable example.
# You would run `python -m spacy download en_core_web_sm`
nlp = spacy.load("en_core_web_sm")

app = FastAPI()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TextInput(BaseModel):
    document_id: str
    text: str

# This is a conceptual event publisher. In production, this would be a robust
# client for Kafka, Pulsar, or a cloud service like Kinesis/PubSub.
def publish_event(event):
    logger.info(f"Publishing event: {event.metadata.event_type} ({event.metadata.event_id})")
    # In a real system, this would serialize the event and send it to a message broker.
    # For this example, we'll just log it.
    print(event.json(indent=2))


@app.post("/process")
async def process_text(input_data: TextInput):
    """
    Accepts raw text, performs basic NLP, and generates domain events.
    """
    try:
        doc = nlp(input_data.text)
        events = []

        # 1. Entity Extraction (NER)
        entities = {}
        for ent in doc.ents:
            if ent.label_ in ["ORG", "PERSON"]:
                entity_id = f"{ent.label_}_{ent.text.replace(' ', '_')}"
                if entity_id not in entities:
                    payload = EntityDiscoveredPayload(
                        entity_id=entity_id,
                        entity_type=ent.label_,
                        name=ent.text,
                        attributes={"source_text_span": [ent.start_char, ent.end_char]}
                    )
                    event = EntityDiscovered(payload=payload)
                    event.metadata.source_document_id = input_data.document_id
                    events.append(event)
                    entities[entity_id] = payload
        
        # 2. Relation Extraction (Simplified example)
        # A production system would use a more sophisticated relation extraction model.
        # Here we look for a simple pattern: "PERSON, CEO of ORG"
        for sent in doc.sents:
            if "CEO of" in sent.text:
                people = [e for e in sent.ents if e.label_ == "PERSON"]
                orgs = [e for e in sent.ents if e.label_ == "ORG"]
                if len(people) > 0 and len(orgs) > 0:
                    person_id = f"PERSON_{people[0].text.replace(' ', '_')}"
                    org_id = f"ORG_{orgs[0].text.replace(' ', '_')}"
                    
                    # Ensure entities were discovered first
                    if person_id in entities and org_id in entities:
                        rel_payload = RelationshipAssertedPayload(
                            from_entity_id=person_id,
                            to_entity_id=org_id,
                            relationship_type="IS_CEO_OF",
                            valid_from=datetime.now(timezone.utc),
                            attributes={"confidence": 0.85} # Models should provide confidence
                        )
                        rel_event = RelationshipAsserted(payload=rel_payload)
                        rel_event.metadata.source_document_id = input_data.document_id
                        events.append(rel_event)

        # Publish all generated events
        for event in events:
            publish_event(event)

        return {"status": "success", "events_generated": len(events)}
    except Exception as e:
        logger.error(f"Failed to process document {input_data.document_id}: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Internal server error during NLP processing.")

The heart of the system is the projector. This service consumes the event stream and translates each event into changes in the ArangoDB graph. It’s the sole writer to the read model. A common mistake is to allow other services to write to this database, which breaks the entire pattern and leads to data inconsistency.

The projector must be idempotent. If it receives the same event twice (which can happen in distributed systems), it should not create duplicate data. We handle this by using the event’s payload.relationship_id as the _key for our edges.

services/temporal_graph_projector.py:

import os
import time
import logging
from arango import ArangoClient
from arango.exceptions import DocumentInsertError, DocumentUpdateError

from models.events import (
    EntityDiscovered,
    RelationshipAsserted,
    RelationshipTerminated
)

# --- Database Configuration & Connection ---
# In production, use a proper configuration management system (e.g., Vault, cloud secrets manager)
ARANGO_HOST = os.environ.get("ARANGO_HOST", "http://localhost:8529")
ARANGO_USER = os.environ.get("ARANGO_USER", "root")
ARANGO_PASSWORD = os.environ.get("ARANGO_PASSWORD", "some-secure-password")
DB_NAME = "TemporalKnowledgeGraph"
ENTITIES_COLLECTION = "entities"
RELATIONSHIPS_COLLECTION = "relationships"
GRAPH_NAME = "knowledge_graph"

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def get_db_connection():
    """Initializes and returns a connection to ArangoDB."""
    client = ArangoClient(hosts=ARANGO_HOST)
    sys_db = client.db("_system", username=ARANGO_USER, password=ARANGO_PASSWORD)

    if not sys_db.has_database(DB_NAME):
        sys_db.create_database(DB_NAME)

    db = client.db(DB_NAME, username=ARANGO_USER, password=ARANGO_PASSWORD)

    if not db.has_collection(ENTITIES_COLLECTION):
        db.create_collection(ENTITIES_COLLECTION)
    
    if not db.has_collection(RELATIONSHIPS_COLLECTION):
        # This MUST be an edge collection
        db.create_collection(RELATIONSHIPS_COLLECTION, edge=True)
    
    if not db.has_graph(GRAPH_NAME):
        db.create_graph(GRAPH_NAME, edge_definitions=[{
            "edge_collection": RELATIONSHIPS_COLLECTION,
            "from_vertex_collections": [ENTITIES_COLLECTION],
            "to_vertex_collections": [ENTITIES_COLLECTION],
        }])

    return db

class Projector:
    def __init__(self, db_connection):
        self.db = db_connection
        self.entities = self.db.collection(ENTITIES_COLLECTION)
        self.relationships = self.db.collection(RELATIONSHIPS_COLLECTION)
        logger.info("Projector initialized and connected to ArangoDB.")

    def handle_event(self, event_data: dict):
        """Main event dispatching logic."""
        event_type = event_data.get("metadata", {}).get("event_type")
        if not event_type:
            logger.warning(f"Received event with no event_type: {event_data}")
            return
        
        handler_method = getattr(self, f"_handle_{event_type.lower()}", self._handle_unknown)
        handler_method(event_data)

    def _handle_entitydiscovered(self, event_data: dict):
        event = EntityDiscovered(**event_data)
        payload = event.payload
        
        # The document key is the business ID, ensuring idempotency.
        doc = {
            "_key": payload.entity_id,
            "name": payload.name,
            "entity_type": payload.entity_type,
            "attributes": payload.attributes,
            "created_at": event.metadata.timestamp.isoformat()
        }
        
        try:
            # `overwrite=False` prevents accidental data clobbering.
            # If the entity already exists, we do nothing.
            self.entities.insert(doc, overwrite=False)
            logger.info(f"Discovered and created entity: {payload.entity_id}")
        except DocumentInsertError:
            logger.warning(f"Entity {payload.entity_id} already exists. Skipping creation.")

    def _handle_relationshipasserted(self, event_data: dict):
        event = RelationshipAsserted(**event_data)
        payload = event.payload
        
        # The edge key is the unique relationship ID from the event.
        edge = {
            "_key": str(payload.relationship_id),
            "_from": f"{ENTITIES_COLLECTION}/{payload.from_entity_id}",
            "_to": f"{ENTITIES_COLLECTION}/{payload.to_entity_id}",
            "type": payload.relationship_type,
            "attributes": payload.attributes,
            "valid_from": payload.valid_from.isoformat(),
            # `valid_to` is null initially, meaning the relationship is currently active.
            "valid_to": None,
            "asserted_at": event.metadata.timestamp.isoformat(),
        }

        try:
            self.relationships.insert(edge, overwrite=False)
            logger.info(f"Asserted relationship {payload.relationship_id} from {payload.from_entity_id} to {payload.to_entity_id}")
        except DocumentInsertError:
            logger.warning(f"Relationship {payload.relationship_id} already exists. Skipping insertion.")

    def _handle_relationshipterminated(self, event_data: dict):
        event = RelationshipTerminated(**event_data)
        payload = event.payload
        
        # This is where the temporal model is enforced. We DO NOT delete the edge.
        # We find the existing edge and set its `valid_to` timestamp.
        try:
            edge_doc = self.relationships.get(str(payload.relationship_id))
            if edge_doc:
                edge_doc['valid_to'] = payload.valid_to.isoformat()
                self.relationships.update(edge_doc)
                logger.info(f"Terminated relationship {payload.relationship_id} as of {payload.valid_to}")
            else:
                logger.error(f"Attempted to terminate non-existent relationship {payload.relationship_id}")
        except DocumentUpdateError as e:
            logger.error(f"Failed to update relationship {payload.relationship_id}: {e}")
            # In production, this would go to a dead-letter queue for investigation.

    def _handle_unknown(self, event_data: dict):
        logger.warning(f"Received unknown event type: {event_data.get('metadata', {}).get('event_type')}")


# --- Main Application Logic (Simulating an Event Stream Consumer) ---
if __name__ == "__main__":
    db = get_db_connection()
    projector = Projector(db)
    
    # Simulate receiving events from a message bus
    simulated_event_stream = [
        # Event data would come as serialized JSON from the bus
        {'metadata': {'event_id': 'a19a9e38-5178-45c1-8404-58535df25b39', 'event_type': 'EntityDiscovered', 'timestamp': '2023-10-27T10:00:00Z', 'source_document_id': 'doc-001'}, 'payload': {'entity_id': 'PERSON_Elon_Musk', 'entity_type': 'PERSON', 'name': 'Elon Musk', 'attributes': {}}},
        {'metadata': {'event_id': 'b28b8d34-4168-46c2-8405-68546df36c40', 'event_type': 'EntityDiscovered', 'timestamp': '2023-10-27T10:00:01Z', 'source_document_id': 'doc-001'}, 'payload': {'entity_id': 'ORG_Tesla', 'entity_type': 'ORG', 'name': 'Tesla', 'attributes': {}}},
        {'metadata': {'event_id': 'c37c7c30-3158-47c3-8406-79557df47d41', 'event_type': 'RelationshipAsserted', 'timestamp': '2023-10-27T10:00:02Z', 'source_document_id': 'doc-001'}, 'payload': {'relationship_id': 'd46d6b26-2148-48c4-8407-8a568df58e42', 'from_entity_id': 'PERSON_Elon_Musk', 'to_entity_id': 'ORG_Tesla', 'relationship_type': 'IS_CEO_OF', 'valid_from': '2008-10-01T00:00:00Z', 'attributes': {}}},
        {'metadata': {'event_id': 'e45e5a22-1138-49c5-8408-9b579df69f43', 'event_type': 'RelationshipTerminated', 'timestamp': '2023-10-27T10:05:00Z', 'source_document_id': 'doc-002'}, 'payload': {'relationship_id': 'd46d6b26-2148-48c4-8407-8a568df58e42', 'valid_to': '2023-10-26T23:59:59Z'}}, # Fictional termination event
    ]

    for event in simulated_event_stream:
        projector.handle_event(event)
        time.sleep(0.1) # Simulate processing delay

The real value of this complex architecture is unlocked by querying. With the valid_from and valid_to fields on our edges, we can reconstruct the state of our knowledge graph for any point in time using ArangoDB’s AQL.

Here are a few example queries that demonstrate this power.

To get the current state of the graph (all active relationships):

// AQL Query 1: Get current graph state
FOR v, e, p IN 1..5 OUTBOUND 'entities/PERSON_Elon_Musk' GRAPH 'knowledge_graph'
  FILTER e.valid_to == null
  RETURN {
    person: v.name,
    relationship: e.type,
    related_entity: p.vertices[1].name
  }

To get the state of the graph as it was on a specific date, for example, on January 1st, 2020:

// AQL Query 2: Get graph state at a specific point in time
LET target_date = "2020-01-01T00:00:00Z"

FOR v, e, p IN 1..5 OUTBOUND 'entities/PERSON_Elon_Musk' GRAPH 'knowledge_graph'
  FILTER e.valid_from <= target_date
  FILTER e.valid_to == null OR e.valid_to > target_date
  RETURN {
    person: v.name,
    relationship: e.type,
    related_entity: p.vertices[1].name,
    active_since: e.valid_from
  }

This second query is the justification for the entire architecture. It allows us to travel back in time and ask questions about our data’s history, a capability completely lost in a traditional CRUD-based system.

This architecture, while effective, is not without its limitations and trade-offs. The projector is a single point of failure and a potential performance bottleneck. In a production system, the event stream would need to be partitionable (e.g., by entity ID) to allow for multiple, parallel projector instances. The read model in ArangoDB is also eventually consistent with the event log, and the replication lag is a critical metric to monitor. For applications requiring read-your-writes consistency, this model would need CQRS-style enhancements to route specific queries back to the event log. Furthermore, the process of replaying the entire event history to build a new read model from scratch can be extremely time-consuming for large event logs; implementing periodic snapshotting of the graph state would be a necessary optimization for operational efficiency. The quality of the entire system is also critically dependent on the upstream NLP models; a robust MLOps practice for model versioning, evaluation, and a strategy for re-processing events with improved models is not an optional extra but a core requirement for long-term viability.


  TOC