Implementing a Dynamic Attribute-Based Access Control Layer in Sanic Using Couchbase as a Policy Information Point


The project’s initial role-based access control (RBAC) was simple and effective. A user was an admin, a manager, or a viewer, and permissions were hardcoded. This approach shattered when business requirements evolved. We were suddenly faced with rules like: “A manager from the EMEA region can only approve expense reports under $5000 from their own department, and only during local business hours.” Trying to model this by creating an explosion of micro-roles like EMEA_Finance_Manager_Under_5k was a clear path to unmaintainable chaos.

This is a classic symptom that indicates RBAC has reached its limits. The problem isn’t about who the user is (their role), but about the attributes of the user, the resource they’re trying to access, and the context of the action itself. This realization pushed us toward an Attribute-Based Access Control (ABAC) model. Our goal was to build a system where policies were treated as data, not code, allowing us to modify complex access rules without a single line of code deployment.

The architecture settled on Sanic for its asynchronous performance at the API layer, and Couchbase for its flexible, high-performance document storage. Sanic would act as the Policy Enforcement Point (PEP), intercepting every request. A custom Python module would serve as our Policy Decision Point (PDP), evaluating the rules. Crucially, Couchbase would be our Policy Information Point (PIP), storing not just the user and resource data, but the policies themselves as structured JSON documents.

graph TD
    subgraph Sanic Application
        A[User Request w/ JWT] --> B{PEP: Sanic Middleware};
    end

    B --"Access Request (Subject, Action, Resource)"--> C[PDP: Policy Evaluation Engine];
    C --"Fetch Attributes & Policies"--> D[(PIP: Couchbase)];
    D --"User Attrs, Resource Attrs, Applicable Policies"--> C;
    C --"Decision: Permit / Deny"--> B;

    subgraph Sanic Response Flow
        B -- Permit --> E[Target API Endpoint Logic];
        B -- Deny --> F[HTTP 403 Forbidden Response];
    end

The core challenge was designing a system that was both flexible and performed under load. Every protected API call would trigger this policy evaluation flow, so any latency introduced here would impact the entire application.

Data Modeling for Policies and Attributes in Couchbase

The first step was designing the Couchbase data structures. We needed three primary document types: users, resources, and policies. A pitfall here is to design these in isolation. They must be designed with the N1QL queries of the PDP in mind.

We’re using a single bucket with a type field to distinguish document models, a common pattern in Couchbase.

User Document:
This contains all attributes related to the subject. The key would be something like user::1a2b3c4d.

{
  "type": "user",
  "userId": "1a2b3c4d",
  "tenantId": "tenant-acme",
  "username": "alice",
  "roles": ["manager", "finance-approver"],
  "attributes": {
    "department": "finance",
    "region": "EMEA",
    "clearanceLevel": 3,
    "ipAddress": "192.168.1.100"
  }
}

Resource Document (Example: Expense Report):
This contains the attributes of the object being accessed. The key: report::9f8e7d6c.

{
  "type": "report",
  "reportId": "9f8e7d6c",
  "tenantId": "tenant-acme",
  "status": "pending_approval",
  "attributes": {
    "amount": 4500,
    "currency": "USD",
    "department": "finance",
    "region": "EMEA",
    "sensitivity": "confidential",
    "createdBy": "user::bob_user_id"
  }
}

Policy Document:
This is the most critical piece of the model. A poor structure here makes evaluation logic a nightmare. We settled on a structure that defines a target for quick applicability checks and a set of rules. The key: policy::finance-approval-emea.

{
  "type": "policy",
  "policyId": "finance-approval-emea",
  "description": "Allows EMEA managers to approve expense reports under $5000 from their own department.",
  "target": {
    "subject": {
      "roles": ["manager", "finance-approver"],
      "attributes": {
        "region": "EMEA"
      }
    },
    "resource": {
      "type": "report"
    },
    "action": ["approve"]
  },
  "effect": "Permit",
  "rules": [
    {
      "description": "Amount must be less than 5000",
      "condition": {
        "operator": "lessThan",
        "resource_attr": "amount",
        "value": 5000
      }
    },
    {
      "description": "Subject and Resource departments must match",
      "condition": {
        "operator": "equal",
        "subject_attr": "department",
        "resource_attr": "department"
      }
    }
  ],
  "priority": 100
}

A second policy might enforce a denial:

{
  "type": "policy",
  "policyId": "deny-high-sensitivity-access",
  "description": "Denies access to highly sensitive reports for users with clearance below 5.",
  "target": {
    "resource": {
      "type": "report",
      "attributes": {
        "sensitivity": "top_secret"
      }
    }
  },
  "effect": "Deny",
  "rules": [
    {
      "condition": {
        "operator": "lessThan",
        "subject_attr": "clearanceLevel",
        "value": 5
      }
    }
  ],
  "priority": 999
}

The priority field is essential for resolving conflicts. Our PDP will use a “deny-overrides” principle: if any applicable Deny policy matches, access is forbidden, regardless of any Permit policies.

For this to be performant, we need an index. A simple query for policies will be based on the action, the resource type, and maybe a subject role.

/* couchbase/indexes.n1ql */
CREATE INDEX `idx_policies_target`
ON `your_bucket_name`(`target.action`, `target.resource.type`, `target.subject.roles`)
WHERE `type` = "policy";

In a real-world project, you must use the EXPLAIN command on your PDP queries to ensure this index is being used. A full collection scan during an authorization check is a production outage waiting to happen.

The Policy Decision Point (PDP) Engine

The PDP is the brain. It takes the context of a request, fetches relevant policies, and makes a decision. It should be a standalone, testable component, completely decoupled from Sanic.

# abac/pdp.py

import logging
from typing import Any, Dict, List, Optional
from couchbase.cluster import Cluster, QueryOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.exceptions import CouchbaseException

# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class PolicyDecisionPoint:
    """
    Evaluates access requests against policies stored in Couchbase.
    Implements a deny-overrides algorithm.
    """

    def __init__(self, couchbase_cluster: Cluster, bucket_name: str):
        self.cluster = couchbase_cluster
        self.bucket_name = bucket_name
        self.scope = self.cluster.bucket(self.bucket_name).default_scope()
        logging.info("PolicyDecisionPoint initialized.")

    def _evaluate_condition(self, condition: Dict, subject: Dict, resource: Dict) -> bool:
        """Evaluates a single rule condition."""
        operator = condition.get("operator")
        subject_attr_key = condition.get("subject_attr")
        resource_attr_key = condition.get("resource_attr")
        value = condition.get("value")

        # In a production system, this would be far more robust,
        # handling nested attributes and more operators.
        subject_val = subject.get("attributes", {}).get(subject_attr_key)
        resource_val = resource.get("attributes", {}).get(resource_attr_key)

        try:
            if operator == "equal":
                # Handle comparison between subject and resource attributes
                if subject_attr_key and resource_attr_key:
                    return subject_val == resource_val
                # Handle comparison with a static value
                val_to_check = subject_val if subject_attr_key else resource_val
                return val_to_check == value
            
            if operator == "lessThan":
                val_to_check = subject_val if subject_attr_key else resource_val
                if val_to_check is None or value is None:
                    return False
                return float(val_to_check) < float(value)
            
            # ... other operators like 'greaterThan', 'in', 'contains' would go here
            
        except (ValueError, TypeError) as e:
            logging.warning(f"Condition evaluation failed for operator {operator}: {e}")
            return False

        logging.warning(f"Unsupported operator: {operator}")
        return False

    def _evaluate_policy(self, policy: Dict, subject: Dict, resource: Dict) -> bool:
        """Checks if all rules in a single policy are met."""
        for rule in policy.get("rules", []):
            if not self._evaluate_condition(rule.get("condition", {}), subject, resource):
                # If any rule fails, the policy does not permit access
                return False
        return True

    async def find_applicable_policies(self, subject: Dict, action: str, resource: Dict) -> List[Dict]:
        """
        Queries Couchbase for policies that might apply to the given context.
        This is a critical performance path.
        """
        # We use the target block for a pre-filtering query.
        # This leverages our GSI for performance.
        query = f"""
            SELECT p.*
            FROM `{self.bucket_name}` p
            WHERE p.type = 'policy'
              AND (
                p.target.action IS NOT VALUED
                OR $action IN p.target.action
              )
              AND (
                p.target.resource.type IS NOT VALUED
                OR p.target.resource.type = $resource_type
              )
              AND (
                p.target.subject.roles IS NOT VALUED
                OR (ANY role IN $subject_roles SATISFIES role IN p.target.subject.roles END)
              )
        """
        params = {
            "action": action,
            "resource_type": resource.get("type"),
            "subject_roles": subject.get("roles", [])
        }
        
        try:
            logging.info(f"Executing policy query for action '{action}'")
            result = self.cluster.query(query, QueryOptions(named_parameters=params))
            policies = [row for row in result.rows()]
            logging.info(f"Found {len(policies)} potentially applicable policies.")
            return policies
        except CouchbaseException as e:
            logging.error(f"Failed to query for policies: {e}")
            return []


    async def is_allowed(self, subject: Dict, action: str, resource: Dict) -> bool:
        """
        Main decision method. Fetches policies and evaluates them.
        """
        if not all([subject, action, resource]):
            logging.warning("ABAC evaluation called with incomplete context.")
            return False
            
        policies = await self.find_applicable_policies(subject, action, resource)

        if not policies:
            logging.info(f"No applicable policies found for action '{action}'. Denying by default.")
            return False # Default deny

        # Sort by priority to handle overrides correctly if needed, though deny-overrides is simpler
        policies.sort(key=lambda p: p.get('priority', 0), reverse=True)

        permit_found = False
        for policy in policies:
            policy_data = policy.get(self.bucket_name, {})
            # A common mistake is to not handle the N1QL result format correctly.
            # The bucket name is often the top-level key in the result row.
            
            if self._evaluate_policy(policy_data, subject, resource):
                if policy_data.get("effect") == "Deny":
                    logging.info(f"Deny policy '{policy_data.get('policyId')}' matched. Access denied.")
                    return False # Deny overrides everything
                if policy_data.get("effect") == "Permit":
                    permit_found = True
        
        if permit_found:
             logging.info("At least one Permit policy matched and no Deny policies matched. Access granted.")
        else:
             logging.info("No Permit policies matched the request. Access denied.")
             
        return permit_found

This PDP is designed to be testable. You can mock the couchbase_cluster object and feed it canned policy and user data to verify the logic of _evaluate_condition and the overall is_allowed decision flow without hitting a live database.

Sanic Integration: The Policy Enforcement Point (PEP)

The PEP’s job is to stop a request before it reaches the business logic if the PDP says “Deny”. In Sanic, a middleware is the perfect place for this.

The flow inside the middleware must be:

  1. Extract and validate the JWT.
  2. From the JWT, get the subject’s identity (e.g., userId).
  3. Load the full subject attributes from Couchbase. A common optimization is to include non-sensitive, frequently used attributes like roles and tenant ID directly in the JWT payload to avoid this lookup on every request. We will do that here.
  4. Identify the action and the resource from the request context.
  5. Load the resource’s attributes from Couchbase. This database call is often unavoidable.
  6. Pass everything to the PDP.
  7. Halt the request with a 403 if access is denied.
# app/main.py

import os
import logging
from sanic import Sanic, response
from sanic.request import Request
from sanic.exceptions import Forbidden, NotFound, Unauthorized
from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions

# Assuming pdp is in a sibling directory
from abac.pdp import PolicyDecisionPoint

# --- Configuration ---
# In a real app, use environment variables or a config file.
CB_HOST = os.getenv("CB_HOST", "localhost")
CB_USER = os.getenv("CB_USER", "admin")
CB_PASS = os.getenv("CB_PASSWORD", "password")
CB_BUCKET = os.getenv("CB_BUCKET", "your_bucket_name")
JWT_SECRET = "your-very-secret-key" # NEVER hardcode this in production

# --- Boilerplate JWT validation (replace with a proper library like pyjwt) ---
import jwt

def decode_jwt(token: str) -> dict:
    try:
        # In production, you must validate 'aud', 'iss', 'exp', etc.
        return jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
    except jwt.PyJWTError as e:
        raise Unauthorized(f"Invalid token: {e}")

# --- Sanic App Setup ---
app = Sanic("ABAC_Secured_App")

@app.listener('before_server_start')
async def setup_db(app, loop):
    """
    Initialize Couchbase connection and PDP instance.
    The PDP is attached to the app context to be accessible in middleware/routes.
    """
    logging.info("Connecting to Couchbase cluster...")
    try:
        auth = PasswordAuthenticator(CB_USER, CB_PASS)
        # It's crucial to handle timeouts in production
        cluster = Cluster(f'couchbase://{CB_HOST}', ClusterOptions(auth))
        cluster.wait_until_ready(timeout=5) # Fail fast if DB is not available
        app.ctx.cb_cluster = cluster
        app.ctx.cb_bucket = cluster.bucket(CB_BUCKET)
        app.ctx.collection = app.ctx.cb_bucket.default_collection()
        app.ctx.pdp = PolicyDecisionPoint(cluster, CB_BUCKET)
        logging.info("Couchbase connection successful and PDP is ready.")
    except Exception as e:
        logging.critical(f"Failed to connect to Couchbase: {e}")
        # Stop the server from starting if the database connection fails
        exit(1)

@app.listener('after_server_stop')
async def teardown_db(app, loop):
    """Cleanly disconnect from Couchbase."""
    logging.info("Disconnecting from Couchbase cluster.")
    if hasattr(app.ctx, 'cb_cluster'):
        app.ctx.cb_cluster.close()

# --- The ABAC Middleware (PEP) ---
@app.on_request
async def enforce_abac_policy(request: Request):
    """
    This middleware intercepts all requests to protected endpoints.
    """
    # A simple check to exclude non-protected routes like /login or /health
    if request.path in ["/login", "/health"]:
        return

    auth_header = request.headers.get("Authorization")
    if not auth_header or not auth_header.startswith("Bearer "):
        raise Unauthorized("Missing or malformed Authorization header")

    token = auth_header.split(" ")[1]
    jwt_payload = decode_jwt(token)
    
    # Subject attributes are partially from JWT to save a DB lookup
    subject = {
        "userId": jwt_payload.get("sub"),
        "tenantId": jwt_payload.get("tid"),
        "roles": jwt_payload.get("roles", []),
        "attributes": jwt_payload.get("attrs", {})
    }

    # Derive action and resource from the request
    # This logic can be highly application-specific
    action_map = {"GET": "read", "POST": "create", "PUT": "update", "DELETE": "delete"}
    action = action_map.get(request.method)

    if not action:
        logging.warning(f"Unsupported HTTP method for ABAC: {request.method}")
        raise Forbidden("Action not permitted")

    # Example: resource is identified by a path parameter, e.g., /reports/<report_id>
    report_id = request.args.get("report_id")
    if not report_id:
        # Handle cases where the resource is not an entity, e.g., creating a new one
        # For simplicity, we assume all protected endpoints have a resource ID for now
        return # Or apply a different policy check

    try:
        # This is a performance-critical DB fetch
        result = await app.ctx.collection.get(f"report::{report_id}")
        resource = result.content_as[dict]
    except Exception: # Be more specific with exceptions in production
        raise NotFound(f"Resource with ID {report_id} not found")

    pdp: PolicyDecisionPoint = request.app.ctx.pdp
    
    # The moment of truth
    is_authorized = await pdp.is_allowed(subject=subject, action=action, resource=resource)

    if not is_authorized:
        raise Forbidden("Access denied by policy.")
    
    # If we get here, the request is allowed to proceed to the route handler.
    # We can attach the fetched objects to the request context to avoid re-fetching.
    request.ctx.subject = subject
    request.ctx.resource = resource

# --- Example Protected Endpoint ---
@app.get("/reports")
async def get_report(request: Request):
    # The middleware has already done the heavy lifting.
    # The resource fetched for policy evaluation is available in the context.
    report_id = request.args.get("report_id")
    resource = request.ctx.resource
    
    # Business logic here
    return response.json({
        "message": f"Access granted to report {report_id}",
        "data": resource
    })

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, debug=True)

The implementation shows a critical trade-off. We’ve placed user roles and some attributes inside the JWT. This reduces latency by avoiding a user document lookup on every request. The downside is that if a user’s roles or attributes change, the existing JWT remains valid until it expires. This stale data problem is a classic issue in distributed systems. Solutions involve short-lived tokens, refresh tokens, or a token revocation list, each adding complexity.

Testing the Implementation

To test this, you need to populate Couchbase with data and then use curl to simulate requests.

1. Create Couchbase documents:

  • A user user::alice with roles ["manager", "finance-approver"] and region EMEA.
  • A report report::report001 with amount: 4500 and department: "finance".
  • A policy policy::finance-approval-emea as defined earlier.

2. Generate a JWT for Alice:
You can use an online tool or a script to generate a JWT with the payload:
{"sub": "alice", "tid": "tenant-acme", "roles": ["manager", "finance-approver"], "attrs": {"region": "EMEA", "department": "finance"}}

3. Test a permitted request:

TOKEN="your_generated_jwt_for_alice"
curl -X GET "http://localhost:8000/reports?report_id=report001" \
     -H "Authorization: Bearer $TOKEN"

This should succeed with a 200 OK and the report data because all conditions in the finance-approval-emea policy are met.

4. Test a denied request (amount too high):

  • Create report::report002 with amount: 6000.
TOKEN="your_generated_jwt_for_alice"
curl -i -X GET "http://localhost:8000/reports?report_id=report002" \
     -H "Authorization: Bearer $TOKEN"

This should fail with a HTTP/1.1 403 Forbidden because the rule resource_attr: "amount" < 5000 is not satisfied. The PDP will find the policy, evaluate its rules, find a failure, and thus the policy does not grant a “Permit”. With no other permit policies matching, the default is deny.

Limitations and Future Iterations

This implementation provides a solid, decoupled ABAC foundation, but it’s not without its weak points in a large-scale production environment.

First, performance is a major concern. Every authorized request triggers at least one N1QL query to find policies and one K/V get for the resource. While Couchbase is fast, at thousands of requests per second, this becomes a bottleneck. A caching layer is not an option; it’s a requirement. This could be an in-memory LRU cache in the Sanic application to store policy decisions for a short TTL (e.g., cache[hash(subject_id, action, resource_id)] = decision). The complexity then shifts to cache invalidation when a policy or a resource’s attributes change.

Second, the policy language is a custom JSON structure. While simple for this example, it lacks the expressiveness of standardized policy languages like OPA’s Rego. A future iteration could involve replacing our custom Python PDP with an Open Policy Agent sidecar. In that model, Sanic would query the OPA agent for a decision, and OPA would be configured to pull its data (the “I” in PIP) from Couchbase. This outsources the complex decision logic to a dedicated, highly optimized tool.

Finally, the management of policies—the Policy Administration Point (PAP)—is currently manual. A production system would require a separate service with a UI for security administrators to create, update, and audit policies stored in Couchbase, completing the full ABAC architecture.


  TOC