The initial system was a security incident waiting to happen. A collection of Python scripts, glued together with cron, processed terabytes of customer data sitting in an S3 bucket. The EC2 instance running these scripts had an IAM role with s3:GetObject
permissions on arn:aws:s3:::our-main-bucket/*
. Any script, any developer with access, any vulnerability in a dependency could potentially access or exfiltrate any customer’s data. This wasn’t just a theoretical risk; it was a ticking time bomb flagged in every security review. The mandate was clear: implement a zero-trust, least-privilege data access layer. Every data access operation must be authorized for a specific user, for a specific resource, for a limited time.
Our design constraints were multifaceted. The data science team lived and breathed Pandas; any solution had to seamlessly integrate with their workflows, not replace them. The platform was expanding, with multiple data-centric services on the roadmap, suggesting a shared infrastructure and codebase would be more maintainable. This pointed directly to a monorepo structure. Finally, performance mattered. This new layer couldn’t become a significant bottleneck. This led us to our technology stack: FastAPI for its asynchronous performance, a monorepo managed with pdm
for code sharing, AWS IAM with STS for the security backbone, and Pandas as the core processing engine. The goal was to build a service that acted as a secure data gatekeeper, vending temporary, narrowly-scoped credentials to a Pandas-based processing backend on a per-request basis.
The monorepo structure was the first piece of the puzzle. In a real-world project with multiple interlocking services, managing dependencies and shared code is a primary source of friction. A monorepo, when managed correctly, solves this by providing a single source of truth. We used pdm
with its workspace feature.
Here is the finalized directory structure:
.
├── apps
│ └── data-api
│ ├── pyproject.toml
│ ├── src
│ │ └── data_api
│ │ ├── __init__.py
│ │ ├── main.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ └── analysis.py
│ │ ├── security
│ │ │ └── auth.py
│ │ └── settings.py
│ └── tests
│ └── test_analysis_endpoint.py
├── infra
│ └── terraform
│ ├── iam.tf
│ ├── outputs.tf
│ └── variables.tf
├── libs
│ └── security-lib
│ ├── pyproject.toml
│ ├── src
│ │ └── security_lib
│ │ ├── __init__.py
│ │ ├── aws_sts.py
│ │ └── exceptions.py
│ └── tests
│ └── test_aws_sts.py
├── pdm.lock
└── pyproject.toml
The root pyproject.toml
defines the workspace, telling pdm
to treat apps/data-api
and libs/security-lib
as editable installs.
# ./pyproject.toml
[tool.pdm]
workspaces = ["apps/*", "libs/*"]
[project]
name = "data-platform"
version = "0.1.0"
# ... other project metadata
This setup allows the data-api
service to directly import security-lib
as if it were an installed package, ensuring we don’t duplicate the critical security logic.
Next, we defined the IAM infrastructure using Terraform. This is the heart of the security model. We needed two primary roles:
-
data-api-service-role
: The execution role for the FastAPI application itself (e.g., attached to an EC2 instance or ECS task). This role has minimal permissions, primarily the ability to callsts:AssumeRole
. -
data-processor-target-role
: The role that will be assumed. It has the actual permissions to read from S3, but its trust policy is configured to only allowdata-api-service-role
to assume it.
# infra/terraform/iam.tf
variable "aws_account_id" {
description = "AWS Account ID"
type = string
}
variable "data_bucket_name" {
description = "Name of the S3 bucket containing tenant data"
type = string
}
# Role for the FastAPI service itself.
# In a real setup, this would be attached to an ECS Task or EC2 Instance Profile.
resource "aws_iam_role" "data_api_service_role" {
name = "DataAPIServiceRole"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Principal = {
# This would be ecs-tasks.amazonaws.com for ECS, or ec2.amazonaws.com for EC2
Service = "ec2.amazonaws.com"
},
Action = "sts:AssumeRole"
}
]
})
}
# The target role that our service will assume to access data.
resource "aws_iam_role" "data_processor_target_role" {
name = "DataProcessorTargetRole"
# The key part: the Trust Policy. It only allows our service role to assume this target role.
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Principal = {
AWS = aws_iam_role.data_api_service_role.arn
},
Action = "sts:AssumeRole"
}
]
})
}
# The policy granting permission to assume the target role.
resource "aws_iam_policy" "assume_target_role_policy" {
name = "AssumeDataProcessorTargetRolePolicy"
description = "Allows assuming the data processor role"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = "sts:AssumeRole",
Resource = aws_iam_role.data_processor_target_role.arn
}
]
})
}
# Attach the assume role policy to our service role.
resource "aws_iam_role_policy_attachment" "service_role_can_assume" {
role = aws_iam_role.data_api_service_role.name
policy_arn = aws_iam_policy.assume_target_role_policy.arn
}
# The base policy for the target role. This grants broad S3 read access,
# which will be scoped down dynamically with a session policy.
resource "aws_iam_policy" "s3_read_access_policy" {
name = "S3BaseReadAccessPolicy"
description = "Base S3 read access for the data processor role"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"s3:GetObject",
"s3:ListBucket"
],
Resource = [
"arn:aws:s3:::${var.data_bucket_name}",
"arn:aws:s3:::${var.data_bucket_name}/*"
]
}
]
})
}
# Attach the S3 read policy to the target role.
resource "aws_iam_role_policy_attachment" "target_role_s3_read" {
role = aws_iam_role.data_processor_target_role.name
policy_arn = aws_iam_policy.s3_read_access_policy.arn
}
A critical design choice here is that the data_processor_target_role
has broad read permissions. The security doesn’t come from this policy; it comes from the inline session policy we will generate at runtime to drastically shrink these permissions for each request.
With the infrastructure defined, we built the shared security-lib
. This library’s sole purpose is to handle the sts:AssumeRole
call and generate the dynamic session policy. This abstraction is vital; the application logic in data-api
should not be concerned with the mechanics of AWS STS.
# libs/security-lib/src/security_lib/aws_sts.py
import json
import logging
from typing import Dict, Any
import boto3
from botocore.exceptions import ClientError
from pydantic import BaseModel, Field
from .exceptions import AWSCredentialError
# Configure logging for the library
logger = logging.getLogger(__name__)
class TemporaryCredentials(BaseModel):
"""Pydantic model for holding temporary AWS credentials."""
access_key_id: str = Field(alias="AccessKeyId")
secret_access_key: str = Field(alias="SecretAccessKey")
session_token: str = Field(alias="SessionToken")
class Config:
allow_population_by_field_name = True
class AWSSecureTokenService:
"""
A client for generating temporary, scoped-down AWS credentials.
"""
def __init__(self, role_to_assume_arn: str, region_name: str = "us-east-1"):
if not role_to_assume_arn:
raise ValueError("Role to assume ARN must be provided.")
self.role_to_assume_arn = role_to_assume_arn
# This client will use the environment's credentials (e.g., the EC2 instance role)
self.sts_client = boto3.client("sts", region_name=region_name)
def _generate_s3_read_policy(self, bucket_name: str, tenant_id: str) -> str:
"""
Generates a JSON string for an IAM policy that grants read-only access
to a specific tenant's prefix in an S3 bucket.
A common mistake is to make this policy too complex. Keep it simple and focused.
The principle of least privilege is enforced here.
"""
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{bucket_name}/{tenant_id}/*"]
}
]
}
return json.dumps(policy)
async def get_scoped_s3_credentials(
self,
session_name: str,
bucket_name: str,
tenant_id: str,
duration_seconds: int = 900
) -> TemporaryCredentials:
"""
Assumes the configured IAM role and applies a session policy to restrict
permissions to a specific tenant's data in S3.
"""
policy_json = self._generate_s3_read_policy(bucket_name, tenant_id)
# The pitfall here is the Policy JSON string size limit (2048 chars).
# For our prefix-based approach, this is fine. For more complex scenarios
# involving many specific objects, this could be a problem.
if len(policy_json.encode('utf-8')) > 2048:
logger.error("Generated IAM policy exceeds size limits.")
raise AWSCredentialError("Generated policy is too large.")
try:
logger.info(f"Assuming role {self.role_to_assume_arn} for session {session_name}")
response = self.sts_client.assume_role(
RoleArn=self.role_to_assume_arn,
RoleSessionName=session_name,
Policy=policy_json,
DurationSeconds=duration_seconds
)
creds = response.get("Credentials")
if not creds:
raise AWSCredentialError("AssumeRole response did not contain credentials.")
return TemporaryCredentials.parse_obj(creds)
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code")
logger.error(f"AWS ClientError assuming role: {error_code}", exc_info=True)
# Provide a clean abstraction over botocore exceptions
raise AWSCredentialError(f"Failed to assume role: {error_code}") from e
except Exception as e:
logger.error(f"An unexpected error occurred during AssumeRole: {e}", exc_info=True)
raise AWSCredentialError("An unexpected error occurred.") from e
Now we could assemble the FastAPI service. It brings together the configuration, the security library, and the data processing logic.
# apps/data-api/src/data_api/main.py
from fastapi import FastAPI
from .routers import analysis
from .settings import get_settings
app = FastAPI(
title="Secure Data API",
description="An API for performing data analysis with scoped S3 access.",
version="1.0.0"
)
# In a production app, you would have more robust logging configuration
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
settings = get_settings()
app.include_router(analysis.router, prefix="/v1")
@app.get("/health")
async def health_check():
return {"status": "ok", "service_name": settings.app_name}
The core logic resides in the router, which orchestrates the entire flow from authentication to data processing.
# apps/data-api/src/data_api/routers/analysis.py
import logging
import uuid
from typing import Dict, Any
import pandas as pd
from fastapi import APIRouter, Depends, HTTPException, Header
from pydantic import BaseModel
import boto3
from botocore.exceptions import ClientError
from security_lib.aws_sts import AWSSecureTokenService, TemporaryCredentials
from security_lib.exceptions import AWSCredentialError
from ..settings import Settings, get_settings
from ..security.auth import verify_api_key
# Dependency to provide the STS service client
def get_sts_service(settings: Settings = Depends(get_settings)) -> AWSSecureTokenService:
# This is a good place for a singleton pattern in a real application
# to avoid re-initializing the client on every request.
return AWSSecureTokenService(role_to_assume_arn=settings.target_iam_role_arn)
router = APIRouter(
prefix="/analysis",
tags=["Analysis"],
dependencies=[Depends(verify_api_key)] # Simple API key auth for this example
)
logger = logging.getLogger(__name__)
class AnalysisRequest(BaseModel):
tenant_id: str
s3_key: str
class DataProcessor:
"""
Encapsulates the logic of fetching data from S3 using temporary
credentials and processing it with Pandas.
"""
def __init__(self, temp_creds: TemporaryCredentials, region_name: str = "us-east-1"):
# A common mistake is to reuse the default boto3 session.
# It's crucial to create a new session/client with the temporary credentials.
self.s3_client = boto3.client(
's3',
aws_access_key_id=temp_creds.access_key_id,
aws_secret_access_key=temp_creds.secret_access_key,
aws_session_token=temp_creds.session_token,
region_name=region_name
)
def process_csv_from_s3(self, bucket: str, key: str) -> Dict[str, Any]:
"""
Downloads a CSV from S3, loads it into Pandas, and performs a sample analysis.
"""
try:
logger.info(f"Fetching s3://{bucket}/{key} with scoped credentials.")
s3_object = self.s3_client.get_object(Bucket=bucket, Key=key)
# This is more memory efficient than downloading to a file first.
df = pd.read_csv(s3_object['Body'])
# Perform a non-trivial, production-style analysis
if 'category' not in df.columns or 'value' not in df.columns:
raise ValueError("CSV must contain 'category' and 'value' columns.")
summary = df.groupby('category')['value'].agg(['sum', 'mean', 'count']).reset_index()
return summary.to_dict(orient='records')
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
logger.warning(f"S3 key not found: s3://{bucket}/{key}")
raise HTTPException(status_code=404, detail="Data file not found.")
elif e.response['Error']['Code'] == 'AccessDenied':
logger.error(f"S3 Access Denied for s3://{bucket}/{key}. This should not happen with correctly scoped credentials.")
# This error is critical, it implies a logic flaw in our policy generation.
raise HTTPException(status_code=403, detail="Permission denied to access S3 data.")
else:
logger.error(f"Unhandled S3 ClientError: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Error retrieving data from S3.")
except Exception as e:
logger.error(f"Error processing pandas DataFrame: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Data processing failed.")
@router.post("/")
async def run_analysis(
request: AnalysisRequest,
settings: Settings = Depends(get_settings),
sts_service: AWSSecureTokenService = Depends(get_sts_service),
):
session_name = f"data-api-session-{request.tenant_id}-{uuid.uuid4()}"
try:
temp_creds = await sts_service.get_scoped_s3_credentials(
session_name=session_name,
bucket_name=settings.data_bucket_name,
tenant_id=request.tenant_id
)
except AWSCredentialError as e:
logger.error(f"Failed to get temporary credentials for tenant {request.tenant_id}: {e}")
raise HTTPException(status_code=503, detail="Could not obtain secure access credentials.")
processor = DataProcessor(temp_creds)
# In a real-world project, this IO-bound processing task should be run
# in a separate thread pool to avoid blocking the main asyncio event loop.
# FastAPI handles this automatically for `def` endpoints, but for `async def`
# you might use `asyncio.to_thread`.
results = processor.process_csv_from_s3(
bucket=settings.data_bucket_name,
key=f"{request.tenant_id}/{request.s3_key}"
)
return {"analysis_results": results}
The request flow is now complete and secure.
sequenceDiagram participant Client participant FastAPI as Data API Service participant STS as AWS STS participant S3 Client->>+FastAPI: POST /v1/analysis ({"tenant_id": "T123", "s3_key": "data.csv"}) Note over FastAPI: 1. Authenticate Request FastAPI->>+STS: AssumeRole(RoleArn=DataProcessorTargetRole, Policy="Allow s3:GetObject on bucket/T123/*") STS-->>-FastAPI: 2. Return Temporary Credentials Note over FastAPI: 3. Create new S3 client with temp creds FastAPI->>+S3: GetObject("bucket/T123/data.csv") S3-->>-FastAPI: 4. Return CSV data stream Note over FastAPI: 5. Process data with Pandas FastAPI-->>-Client: 6. Return JSON analysis results
No system is complete without testing. The use of a shared library and dependency injection makes this straightforward. We used moto
to mock AWS services, which is essential for verifying IAM logic without touching real infrastructure.
# libs/security-lib/tests/test_aws_sts.py
import pytest
import boto3
from moto import mock_sts, mock_s3
from security_lib.aws_sts import AWSSecureTokenService
from security_lib.exceptions import AWSCredentialError
@pytest.fixture
def aws_credentials():
# ... boilerplate for mocking AWS credentials
pass
@mock_sts
@mock_s3
def test_scoped_credentials_can_access_allowed_path(aws_credentials):
# Setup
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="test-bucket")
conn.put_object(Bucket="test-bucket", Key="tenant-a/data.csv", Body="col1,col2\n1,2")
conn.put_object(Bucket="test-bucket", Key="tenant-b/data.csv", Body="col1,col2\n3,4")
# This part would require setting up mock IAM roles, which moto supports
# For simplicity, we assume role setup is correct and focus on the policy.
# In a real test, you'd create the roles with boto3 inside the mocked context.
mock_role_arn = "arn:aws:iam::123456789012:role/DataProcessorTargetRole"
sts_service = AWSSecureTokenService(role_to_assume_arn=mock_role_arn)
# Run in an event loop for async method
import asyncio
temp_creds = asyncio.run(sts_service.get_scoped_s3_credentials(
session_name="test-session",
bucket_name="test-bucket",
tenant_id="tenant-a"
))
# Create a new S3 client with the temporary, scoped credentials
scoped_s3_client = boto3.client(
"s3",
aws_access_key_id=temp_creds.access_key_id,
aws_secret_access_key=temp_creds.secret_access_key,
aws_session_token=temp_creds.session_token,
)
# Assert: Can access allowed object
response = scoped_s3_client.get_object(Bucket="test-bucket", Key="tenant-a/data.csv")
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
# Assert: Cannot access forbidden object
with pytest.raises(ClientError) as exc_info:
scoped_s3_client.get_object(Bucket="test-bucket", Key="tenant-b/data.csv")
assert exc_info.value.response["Error"]["Code"] == "AccessDenied"
This test proves that our core security mechanism works: credentials vended for tenant-a
are denied access to tenant-b
‘s data, achieving the principle of least privilege at the request level.
This architecture solves the initial security problem effectively. We replaced a single, overly-permissive IAM role with a dynamic, just-in-time system that generates credentials with the bare minimum permissions required to complete a specific task. However, the solution is not without its own set of trade-offs and areas for future improvement. The constant AssumeRole
calls introduce latency; while we can mitigate this with intelligent caching of temporary credentials, a distributed cache like Redis or Memcached would be needed for a horizontally-scaled service, adding operational complexity. Furthermore, the base execution role for the FastAPI service is still a long-lived credential. A more mature implementation might leverage a service mesh with SPIFFE/SPIRE to provide the service with its own short-lived cryptographic identity, eliminating the need for any static IAM role on the compute instance itself. The current error handling is robust, but a more advanced system could implement a retry mechanism that automatically refreshes expired temporary credentials, making the system more resilient to transient failures.