Building a Policy Enforcement Gateway for Granular Access Control in DVC-Based Data Science Workflows


The project started with a familiar, uncomfortable reality: our DVC remote storage, a single S3 bucket, had become a digital Wild West. Every data scientist had read/write credentials to the entire bucket. While this fostered rapid iteration initially, it became an unmanageable source of risk as the team and the number of projects grew. We faced accidental data deletion, untracked modifications to historical data versions, and a looming compliance issue with new projects involving Personally Identifiable Information (PII). The core problem was that our access control model was binary—all or nothing. DVC versions data, but we couldn’t version the access to that data. A junior data scientist working on a public sentiment analysis model had the same level of access as a senior working on a sensitive credit risk model. This was untenable.

Our initial, naive approach was to create separate IAM roles and S3 buckets for each project. This quickly led to an explosion of roles and policies. Worse, it didn’t solve the fundamental issue. Within a single project, we still couldn’t restrict access to specific datasets or data versions. For instance, we wanted QA engineers to be able to pull and validate a specific tagged version of a model and its corresponding evaluation dataset, but not have the ability to push new data or access raw, unprocessed PII datasets from the same project. Standard S3 bucket policies or IAM roles operate on object prefixes, which don’t map cleanly to DVC’s content-addressable storage layout (/ab/cdef123...). A policy that grants access to a specific .dvc file doesn’t grant access to the underlying data objects it points to. We needed a system that could make authorization decisions based on user identity, resource attributes, and the action being performed—a clear case for Attribute-Based Access Control (ABAC).

The decision was made to build a lightweight Policy Enforcement Point (PEP) gateway. This service would sit between our data scientists’ DVC clients and the backend S3 storage (we use MinIO for on-premise deployments). All DVC traffic would be proxied through this gateway. It would intercept every request, validate the user’s identity via an OIDC-compliant JWT token from our central IAM system (like Keycloak or Okta), fetch user attributes (e.g., role, team), and then evaluate a set of policies before either forwarding the request to MinIO or rejecting it. This architecture moves the authorization logic out of the storage layer and into a centralized, auditable service that we control completely.

graph TD
    subgraph Data Scientist Workstation
        A[DVC Client]
    end

    subgraph Secure Infrastructure
        B(PEP Gateway: FastAPI)
        C(IAM Provider: OIDC/JWT)
        D(Policy Engine)
        E[Storage: MinIO S3]
    end

    A -- "dvc pull/push with Auth Token" --> B
    B -- "1. Intercept Request" --> B
    B -- "2. Validate JWT" --> C
    C -- "3. Return User Attributes (Role, Team)" --> B
    B -- "4. Evaluate Request against Policies" --> D
    D -- "5. Decision (Allow/Deny)" --> B
    B -- "6a. Forward Request (on Allow)" --> E
    E -- "7. Return Data/Status" --> B
    B -- "8. Proxy Response to Client" --> A
    B -- "6b. Return 403 Forbidden (on Deny)" --> A

    style A fill:#f9f,stroke:#333,stroke-width:2px
    style B fill:#bbf,stroke:#333,stroke-width:2px
    style C fill:#ccf,stroke:#333,stroke-width:2px
    style E fill:#cce,stroke:#333,stroke-width:2px

The core of this system is the gateway itself. We chose FastAPI for its performance and modern Python features. It needed to be robust, configurable, and, most importantly, provide detailed, structured logs for every authorization decision—this was a primary requirement for our compliance team.

Here is the main structure of the gateway application. It’s not a toy example; this is the skeleton of the production service, incorporating configuration management, structured logging, and clear separation of concerns.

config.py: Environment-based Configuration

A real-world project must never have hardcoded credentials. We use Pydantic for settings management, which reads from environment variables and provides type validation.

# /gateway/config.py

import os
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    """
    Application configuration settings loaded from environment variables.
    """
    LOG_LEVEL: str = "INFO"
    
    # MinIO (S3 Backend) Configuration
    S3_ENDPOINT_URL: str
    S3_ACCESS_KEY: str
    S3_SECRET_KEY: str
    S3_REGION: str = "us-east-1"

    # IAM (OIDC Provider) Configuration
    OIDC_ISSUER_URL: str
    OIDC_JWKS_URI: str
    OIDC_AUDIENCE: str
    
    # Policy configuration
    POLICY_FILE_PATH: str = "/config/policies.json"

    class Config:
        # This allows loading from a .env file during development
        env_file = ".env"
        env_file_encoding = "utf-8"

# Instantiate settings to be imported by other modules
settings = Settings()

# Example .env file for local development:
# LOG_LEVEL="DEBUG"
# S3_ENDPOINT_URL="http://minio:9000"
# S3_ACCESS_KEY="minioadmin"
# S3_SECRET_KEY="minioadmin"
# OIDC_ISSUER_URL="http://keycloak:8080/realms/datascience"
# OIDC_JWKS_URI="http://keycloak:8080/realms/datascience/protocol/openid-connect/certs"
# OIDC_AUDIENCE="dvc-gateway"
# POLICY_FILE_PATH="./policies.json"

logger.py: Structured JSON Logging

For auditing, simple print statements are useless. We need structured logs that can be ingested and queried by systems like Splunk or an ELK stack.

# /gateway/logger.py

import logging
import sys
import json
from gateway.config import settings

class JsonFormatter(logging.Formatter):
    """
    Formats log records as a JSON string.
    """
    def format(self, record):
        log_record = {
            "timestamp": self.formatTime(record, self.datefmt),
            "level": record.levelname,
            "message": record.getMessage(),
            "name": record.name,
        }
        # Add extra fields if available
        if hasattr(record, 'extra_info'):
            log_record.update(record.extra_info)
        return json.dumps(log_record)

def setup_logger():
    """
    Configures and returns a logger with a JSON formatter.
    """
    logger = logging.getLogger("DVCSecureGateway")
    logger.setLevel(settings.LOG_LEVEL)
    
    # Avoid adding duplicate handlers if this function is called multiple times
    if not logger.handlers:
        handler = logging.StreamHandler(sys.stdout)
        formatter = JsonFormatter()
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
    return logger

logger = setup_logger()

The next critical piece is the authentication handler. This module is responsible for parsing the JWT from the Authorization header and validating it against our IAM provider’s public keys. We use the python-jose library for this.

auth.py: JWT Validation and User Attribute Extraction

# /gateway/auth.py

import httpx
from jose import jwt, jwk
from jose.exceptions import JWTError, ExpiredSignatureError, JWTClaimsError
from fastapi import Request, HTTPException, status
from functools import lru_cache

from gateway.config import settings
from gateway.logger import logger

@lru_cache(maxsize=1)
def get_jwks():
    """
    Fetches and caches the JSON Web Key Set (JWKS) from the OIDC provider.
    The cache ensures we don't hit the OIDC provider for every single request.
    """
    try:
        with httpx.Client() as client:
            response = client.get(settings.OIDC_JWKS_URI)
            response.raise_for_status()
            return response.json()
    except httpx.RequestError as e:
        logger.error("Failed to fetch JWKS from OIDC provider.", extra_info={"url": settings.OIDC_JWKS_URI, "error": str(e)})
        # In a real system, you might want a more resilient retry mechanism.
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail="Could not connect to authentication service to retrieve keys."
        )

def get_signing_key(token: str):
    """
    Finds the correct key from the JWKS to verify the token's signature.
    """
    jwks = get_jwks()
    try:
        unverified_header = jwt.get_unverified_header(token)
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not parse token header."
        )
        
    rsa_key = {}
    for key in jwks["keys"]:
        if key["kid"] == unverified_header["kid"]:
            rsa_key = {
                "kty": key["kty"],
                "kid": key["kid"],
                "use": key["use"],
                "n": key["n"],
                "e": key["e"],
            }
    if not rsa_key:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Unable to find appropriate signing key."
        )
    return rsa_key


async def get_current_user_attributes(request: Request) -> dict:
    """
    Dependency that validates the token and returns the decoded claims as user attributes.
    """
    auth_header = request.headers.get("Authorization")
    if not auth_header:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Authorization header is missing."
        )

    parts = auth_header.split()
    if parts[0].lower() != "bearer" or len(parts) != 2:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authorization header format. Expected 'Bearer <token>'."
        )
    
    token = parts[1]
    signing_key = get_signing_key(token)

    try:
        payload = jwt.decode(
            token,
            signing_key,
            algorithms=["RS256"],
            audience=settings.OIDC_AUDIENCE,
            issuer=settings.OIDC_ISSUER_URL
        )
        return payload
    except ExpiredSignatureError:
        logger.warning("Authentication failed: Token has expired.", extra_info={"token": token[:10] + "..."})
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired.")
    except JWTClaimsError as e:
        logger.warning(f"Authentication failed: Invalid claims - {e}", extra_info={"token": token[:10] + "..."})
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid claims: {e}")
    except JWTError as e:
        logger.warning(f"Authentication failed: Invalid token - {e}", extra_info={"token": token[:10] + "..."})
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid token: {e}")

With authentication sorted, we turn to authorization—the policy engine. For this version, we implemented a simple JSON-based rule engine. It’s not as powerful as Open Policy Agent (OPA), but it’s understandable, auditable, and sufficient for our initial needs. A key design decision was to make policies data-driven, loaded from a file, so we can update them without redeploying the gateway.

policy_engine.py: A Simple, Data-Driven ABAC Engine

// policies.json
{
  "policies": [
    {
      "name": "Allow data scientists full access to their own projects",
      "effect": "allow",
      "principals": {
        "roles": ["data_scientist"]
      },
      "actions": ["s3:GetObject", "s3:PutObject", "s3:ListBucket"],
      "resources": {
        "path_pattern": "dvc-projects/{principal.project}/*"
      },
      "conditions": [
        "principal.project is not null"
      ]
    },
    {
      "name": "Allow QA to read specific tagged data versions",
      "effect": "allow",
      "principals": {
        "roles": ["qa_engineer"]
      },
      "actions": ["s3:GetObject"],
      "resources": {
        "path_pattern": "dvc-projects/{principal.project}/files/md5/*",
        "tags": { "sensitivity": "non-pii" }
      }
    },
    {
      "name": "Deny access to PII data for QA engineers",
      "effect": "deny",
      "principals": {
        "roles": ["qa_engineer"]
      },
      "actions": ["*"],
      "resources": {
        "tags": { "sensitivity": "pii" }
      }
    },
    {
      "name": "Default deny",
      "effect": "deny",
      "principals": { "roles": ["*"] },
      "actions": ["*"],
      "resources": { "path_pattern": "*" }
    }
  ]
}

The engine to process these rules is surprisingly compact. The trick is to have a clear evaluation flow: find all matching policies and ensure that the highest-priority “deny” rule wins.

# /gateway/policy_engine.py

import json
import re
from pathlib import Path
from gateway.config import settings
from gateway.logger import logger

class PolicyEngine:
    def __init__(self, policy_file_path: str):
        self.policies = self._load_policies(policy_file_path)

    def _load_policies(self, file_path: str):
        path = Path(file_path)
        if not path.is_file():
            logger.error(f"Policy file not found at {file_path}")
            raise FileNotFoundError(f"Policy file not found: {file_path}")
        with open(path, 'r') as f:
            return json.load(f).get("policies", [])

    def is_allowed(self, user_attributes: dict, action: str, resource_path: str) -> bool:
        """
        Evaluates if a user is allowed to perform an action on a resource.
        The logic is: default deny. An action is allowed only if at least one 'allow'
        policy matches and no 'deny' policies match.
        """
        context = {
            "principal": user_attributes,
            "action": action,
            "resource": {"path": resource_path}
        }
        
        allow_match = False
        deny_match = False

        for policy in self.policies:
            if self._policy_matches(policy, context):
                if policy["effect"] == "allow":
                    allow_match = True
                elif policy["effect"] == "deny":
                    deny_match = True
                    # A deny rule is an explicit block, we can stop here.
                    break
        
        decision = allow_match and not deny_match
        
        log_extra = {
            "decision": "ALLOW" if decision else "DENY",
            "user": user_attributes.get("preferred_username", "unknown"),
            "user_roles": user_attributes.get("realm_access", {}).get("roles", []),
            "action": action,
            "resource": resource_path,
            "reason": "Explicit DENY rule matched" if deny_match else "No ALLOW rule matched" if not allow_match else "Allowed by policy"
        }
        logger.info("Policy evaluation complete", extra_info=log_extra)
        
        return decision

    def _policy_matches(self, policy: dict, context: dict) -> bool:
        return (
            self._principals_match(policy.get("principals", {}), context["principal"]) and
            self._actions_match(policy.get("actions", []), context["action"]) and
            self._resources_match(policy.get("resources", {}), context["resource"])
        )

    def _principals_match(self, policy_principals: dict, user: dict) -> bool:
        policy_roles = set(policy_principals.get("roles", []))
        if not policy_roles:
            return True # No role constraint
        
        user_roles = set(user.get("realm_access", {}).get("roles", []))
        return "*" in policy_roles or not policy_roles.isdisjoint(user_roles)

    def _actions_match(self, policy_actions: list, action: str) -> bool:
        if not policy_actions or "*" in policy_actions:
            return True
        return action in policy_actions
    
    def _resources_match(self, policy_resource: dict, resource: dict) -> bool:
        path_pattern = policy_resource.get("path_pattern")
        if not path_pattern or path_pattern == "*":
            return True
        
        # Simple wildcard matching, not full regex for performance.
        regex_pattern = path_pattern.replace('*', '.*')
        
        # Here we substitute variables like {principal.project}
        try:
            # A simple substitution. A real implementation may need a more robust templating engine.
            formatted_pattern = re.sub(r'\{principal\.(\w+)\}', lambda m: context['principal'].get(m.group(1), ''), regex_pattern)
            return bool(re.match(f"^{formatted_pattern}$", resource["path"]))
        except Exception:
            # Could fail if a variable is not present. Treat as a non-match.
            return False

# We instantiate it once to be used across the application.
policy_engine = PolicyEngine(settings.POLICY_FILE_PATH)

Finally, we assemble everything in the main FastAPI application. The proxy logic uses httpx.AsyncClient to stream requests and responses between the DVC client and MinIO, ensuring the gateway is non-blocking and memory-efficient, even for large data files.

main.py: The FastAPI Gateway and S3 Proxy

# /gateway/main.py

import httpx
from fastapi import FastAPI, Request, Response, Depends, HTTPException, status
from starlette.background import BackgroundTask

from gateway.config import settings
from gateway.auth import get_current_user_attributes
from gateway.policy_engine import policy_engine
from gateway.logger import logger

app = FastAPI(title="DVC Secure Gateway")

# We create a single client instance for connection pooling and performance.
s3_client = httpx.AsyncClient(base_url=settings.S3_ENDPOINT_URL)

def map_s3_action(method: str, path: str) -> str:
    """
    A naive mapping from HTTP request to an S3 action for the policy engine.
    A production system would need a more sophisticated parser for S3 API calls.
    """
    if method == "GET":
        # Check if it's a list bucket operation
        if '/' not in path.strip('/'):
             return "s3:ListBucket"
        return "s3:GetObject"
    elif method in ["PUT", "POST"]:
        return "s3:PutObject"
    elif method == "DELETE":
        return "s3:DeleteObject"
    elif method == "HEAD":
        return "s3:GetObject" # HEAD is often used to check existence
    return "s3:Unknown"


@app.api_route("/{path:path}")
async def proxy_s3_request(
    path: str,
    request: Request,
    user: dict = Depends(get_current_user_attributes)
):
    """
    The main proxy endpoint. It intercepts all requests, performs auth checks,
    and then forwards the request to the backend S3 storage.
    """
    # 1. Authorization Check
    s3_action = map_s3_action(request.method, path)
    if not policy_engine.is_allowed(user, s3_action, path):
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=f"Access denied by policy for action '{s3_action}' on resource '{path}'."
        )

    # 2. Build and send the downstream request
    backend_url = httpx.URL(path=request.url.path, query=request.url.query.encode("utf-8"))
    
    # We must filter out host headers as they are controlled by the client.
    headers_to_forward = [(k, v) for k, v in request.headers.items() if k.lower() != 'host']

    # The actual proxying request using streaming
    rp = await s3_client.request(
        method=request.method,
        url=backend_url,
        headers=headers_to_forward,
        content=request.stream(),
        # We need to explicitly sign requests for MinIO/S3 if not public
        # In this setup, the gateway needs its own powerful credentials.
        # This part requires a proper S3 signing library like botocore.
        # For simplicity, we assume MinIO allows access from the gateway's IP.
        # A full implementation would use `boto3.session.Session().create_client(...)`
        # and its presigning capabilities or handle signing manually.
    )

    # 3. Stream the response back to the client
    return Response(
        content=rp.content,
        status_code=rp.status_code,
        headers=rp.headers,
    )

@app.on_event("shutdown")
async def app_shutdown():
    await s3_client.aclose()

@app.get("/health", status_code=status.HTTP_200_OK)
def health_check():
    return {"status": "ok"}

To make this work, a data scientist would reconfigure their DVC remote. Instead of pointing directly to MinIO, they point to our gateway and include an auth header.

# Obtain a JWT token from the IAM system (e.g., via OIDC login flow)
export DVC_GATEWAY_TOKEN="eyJhbGciOiJSUzI1NiIs..."

# Configure DVC to use the gateway endpoint
dvc remote modify myremote url http://dvc-gateway.internal.company.com/dvc-projects/credit-scoring

# Configure DVC to add the Authorization header to all requests
dvc remote modify myremote header "Authorization: Bearer ${DVC_GATEWAY_TOKEN}"

# Now, DVC commands are proxied and secured
dvc pull -r myremote

The system works. A dvc pull from a data scientist on their assigned project succeeds. The logs show a successful policy evaluation. The same command from a QA engineer on a PII dataset is blocked with a 403 Forbidden, and the log entry clearly states which “deny” policy was triggered. We achieved granular, auditable access control for our versioned data assets.

This architecture, however, is not without its trade-offs and limitations. The gateway is a single point of failure and a potential performance bottleneck. While FastAPI and httpx are highly performant, every byte of data now flows through this service. For very large datasets, the added latency might be noticeable. Our current policy engine is simple; as rules become more complex, evaluating them could slow down requests. The next logical iteration is to replace our bespoke engine with Open Policy Agent, which is designed for high-performance, decoupled policy evaluation. Furthermore, the gateway’s logic for mapping HTTP requests to S3 actions is simplistic. A more robust solution might need to parse the specific S3 API XML/JSON payloads to make even finer-grained decisions. Finally, we are only securing data access. The execution environment for model training is another attack surface that needs its own security considerations.


  TOC