The initial architecture was straightforward and, in retrospect, dangerously naive. We had a multi-tenant NLP processing pipeline. Documents arrived, were stored in a DynamoDB table partitioned by tenant_id
, and a Dask cluster would periodically process them using spaCy for named-entity recognition. The entire Dask cluster—scheduler and workers—ran with a single IAM role. This role granted dynamodb:*
on the entire table. It worked, it was fast, but it was a ticking time bomb. During a routine security audit, the obvious flaw was flagged: a bug in a single processing task or a compromised worker could potentially lead to a cross-tenant data breach. The principle of least privilege was not just being ignored; it was being actively violated on a massive scale.
This finding forced a complete redesign. The new requirement was absolute: a Dask worker processing data for tenant-A
must be cryptographically incapable of accessing data for tenant-B
. The permissions had to be ephemeral, scoped to the specific items a job needed to touch, and automatically provisioned and revoked. The cluster-level, static IAM role had to be replaced with a dynamic, per-job credential vending system.
Our first concept involved creating a central IAM role that the Dask client could assume. This “controller” role would have the power to generate temporary, scoped-down credentials via the AWS Security Token Service (STS). Before submitting a computation to Dask, the client would determine the exact set of DynamoDB items the job needed. It would then construct an inline IAM policy granting access only to those items and use sts:AssumeRole
to create a temporary session. These temporary credentials (access key, secret key, and session token) would then be passed to the Dask workers responsible for that specific job.
The technology stack was largely fixed:
- Dask: For its Python-native, scalable parallel computing capabilities.
- spaCy: As the core NLP engine. Its models are effective, but can be memory-intensive, making Dask’s distributed nature essential.
- AWS DynamoDB: The NoSQL datastore, chosen for its scalability and tight integration with IAM. The key structure (
tenant_id
as Partition Key,document_id
as Sort Key) was perfect for creating fine-grained IAM policies. - AWS STS: The cornerstone of the new security model. The
AssumeRole
API call is the mechanism for trading a long-term, powerful role for a short-term, limited one.
The alternative—creating a persistent IAM role for every tenant—was dismissed immediately. It would be a management nightmare, wouldn’t scale to thousands of tenants, and still wouldn’t provide per-document-level security. The dynamic, on-the-fly approach was the only viable path forward.
Initial Implementation: The Credential Vending Mechanism
The first step was to build the core component: a Python function that could generate these temporary credentials. This function would become the secure entry point for all Dask jobs. It requires a “controller” IAM role with permission to call sts:AssumeRole
on a target “worker” role. The worker role itself has no policies attached directly; its permissions are defined entirely by the inline policy passed during the AssumeRole
call.
Here is the IAM trust policy for the target Dask worker role (DaskWorkerScopedRole
). It allows the controller role (DaskControllerRole
) to assume it.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:role/DaskControllerRole"
},
"Action": "sts:AssumeRole"
}
]
}
The controller role (DaskControllerRole
) needs the following permission to be able to perform this action:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "arn:aws:iam::123456789012:role/DaskWorkerScopedRole"
}
]
}
With the IAM roles in place, the Python credential vending function using boto3
looks like this. It programmatically constructs a policy document granting access to a specific list of DynamoDB items.
import boto3
import json
import logging
import os
from typing import List, Dict, Any
# Configure logging for production readiness
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Configuration Constants ---
# These should be managed via environment variables or a config service in production.
DYNAMODB_TABLE_NAME = os.getenv("DYNAMODB_TABLE_NAME", "nlp_documents")
WORKER_ROLE_ARN = os.getenv("WORKER_ROLE_ARN", "arn:aws:iam::123456789012:role/DaskWorkerScopedRole")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
SESSION_DURATION_SECONDS = 3600 # 1 hour
sts_client = boto3.client("sts", region_name=AWS_REGION)
def generate_scoped_dynamodb_policy(table_arn: str, tenant_id: str, document_ids: List[str]) -> str:
"""
Dynamically creates an IAM policy string that grants access only to
specific items in a DynamoDB table for a given tenant.
A key consideration here is the IAM policy size limit (2048 bytes for roles).
If document_ids list is too long, this will fail. A real-world system
must handle this by batching requests.
"""
if not document_ids:
raise ValueError("document_ids list cannot be empty.")
# Constructing ARNs for each document
resource_arns = [
f"{table_arn}/partition/{tenant_id}/sort/{doc_id}"
for doc_id in document_ids
]
# A common pitfall is to forget that you need GetItem and PutItem/UpdateItem.
# Be explicit about permissions.
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:BatchGetItem"
],
"Resource": resource_arns,
"Condition": {
"StringEquals": {
"dynamodb:LeadingKeys": [tenant_id]
}
}
},
{
"Effect": "Allow",
"Action": [
"dynamodb:PutItem",
"dynamodb:UpdateItem"
],
"Resource": resource_arns,
"Condition": {
"StringEquals": {
"dynamodb:LeadingKeys": [tenant_id]
}
}
}
]
}
policy_string = json.dumps(policy)
if len(policy_string.encode('utf-8')) > 2048:
# This is a critical production check.
logging.error(f"Generated policy size ({len(policy_string.encode('utf-8'))} bytes) exceeds 2048 byte limit.")
raise ValueError("Policy size exceeds AWS limits. Reduce number of document_ids.")
return policy_string
def get_temporary_credentials(tenant_id: str, document_ids: List[str]) -> Dict[str, Any]:
"""
Assumes the worker role with a scoped-down inline policy.
Returns temporary credentials.
"""
try:
table_description = boto3.client("dynamodb", region_name=AWS_REGION).describe_table(TableName=DYNAMODB_TABLE_NAME)
table_arn = table_description['Table']['TableArn']
policy_string = generate_scoped_dynamodb_policy(table_arn, tenant_id, document_ids)
session_name = f"dask-worker-{tenant_id}-{os.urandom(8).hex()}"
logging.info(f"Assuming role {WORKER_ROLE_ARN} for tenant {tenant_id} with {len(document_ids)} documents.")
response = sts_client.assume_role(
RoleArn=WORKER_ROLE_ARN,
RoleSessionName=session_name,
Policy=policy_string,
DurationSeconds=SESSION_DURATION_SECONDS
)
credentials = response['Credentials']
return {
"aws_access_key_id": credentials['AccessKeyId'],
"aws_secret_access_key": credentials['SecretAccessKey'],
"aws_session_token": credentials['SessionToken'],
"region_name": AWS_REGION
}
except sts_client.exceptions.ClientError as e:
logging.error(f"Failed to assume role: {e}")
# In a real system, you might have specific retry logic or error classification.
raise
except Exception as e:
logging.error(f"An unexpected error occurred in credential vending: {e}")
raise
Integrating with Dask: The Secure Worker Task
The next challenge was how to deliver these temporary credentials to the Dask workers securely and efficiently. Simply passing the credentials dictionary as a function argument is a bad practice. It risks exposing secrets in Dask’s diagnostic dashboard logs or task metadata.
A more robust approach is to create a boto3.Session
object on the client side using the temporary credentials and then pass this serialized session object to the workers. While the data is still transferred, it’s encapsulated and less likely to be inadvertently logged as a raw dictionary. The worker function can then deserialize it and use it directly.
Here’s the structure of the Dask worker function and the client-side submission logic.
import dask
import boto3
import spacy
from dask.distributed import Client, LocalCluster
# It's crucial to ensure spaCy models are available on the workers.
# This could be handled by building a custom Docker image for Dask workers.
# For this example, we assume it's pre-loaded.
# nlp = spacy.load("en_core_web_sm") # This would be loaded on the worker.
def process_document_securely(boto_session_credentials: Dict[str, Any], table_name: str, tenant_id: str, document_id: str) -> Dict[str, Any]:
"""
This function runs on a Dask worker.
It reconstructs the boto3 session and performs the NLP task.
"""
worker_log = logging.getLogger(f"dask-worker.{tenant_id}.{document_id}")
worker_log.setLevel(logging.INFO)
try:
# Re-create the session on the worker using the provided temporary credentials
session = boto3.Session(**boto_session_credentials)
dynamodb = session.resource('dynamodb')
table = dynamodb.Table(table_name)
worker_log.info("Fetching document from DynamoDB.")
response = table.get_item(
Key={'tenant_id': tenant_id, 'document_id': document_id}
)
if 'Item' not in response:
worker_log.error("Document not found.")
raise FileNotFoundError(f"Document {document_id} for tenant {tenant_id} not found.")
document_text = response['Item']['text_content']
# Lazy load spaCy model within the task to manage memory.
# This is a common pattern in Dask to avoid serializing large objects.
nlp = spacy.load("en_core_web_sm")
worker_log.info("Processing document with spaCy.")
doc = nlp(document_text)
entities = [{'text': ent.text, 'label': ent.label_} for ent in doc.ents]
worker_log.info(f"Found {len(entities)} entities. Writing results back to DynamoDB.")
# Use the same session to write results back.
table.update_item(
Key={'tenant_id': tenant_id, 'document_id': document_id},
UpdateExpression="SET entities = :e, processing_status = :s",
ExpressionAttributeValues={
':e': entities,
':s': 'PROCESSED'
}
)
return {'document_id': document_id, 'status': 'SUCCESS', 'entity_count': len(entities)}
except Exception as e:
# Robust error handling is non-negotiable in a distributed system.
worker_log.critical(f"Unhandled exception during processing: {e}", exc_info=True)
return {'document_id': document_id, 'status': 'FAILED', 'error': str(e)}
# --- Client-side execution logic ---
def run_nlp_job(client: Client, tenant_id: str, document_ids: List[str]):
"""
The main driver function that runs on the client machine or scheduler.
"""
logging.info(f"Starting NLP job for tenant '{tenant_id}' on {len(document_ids)} documents.")
# Step 1: Vend the credentials for the entire batch of documents.
try:
temp_credentials = get_temporary_credentials(tenant_id, document_ids)
except ValueError as e:
logging.error(f"Could not start job for tenant {tenant_id}: {e}")
return
# Step 2: Create a list of delayed tasks, passing credentials to each.
tasks = []
for doc_id in document_ids:
# A common mistake is to create one large task.
# Granular tasks provide better parallelism and fault tolerance.
task = dask.delayed(process_document_securely)(
boto_session_credentials=temp_credentials,
table_name=DYNAMODB_TABLE_NAME,
tenant_id=tenant_id,
document_id=doc_id
)
tasks.append(task)
# Step 3: Compute the results.
logging.info("Submitting tasks to Dask cluster.")
results = dask.compute(*tasks)
logging.info("Job completed. Results:")
for res in results:
logging.info(res)
Architectural Flow and Verification
The complete data flow is now far more secure and auditable.
sequenceDiagram participant Client participant AWSIAM_STS as AWS IAM/STS participant DaskScheduler as Dask Scheduler participant DaskWorker as Dask Worker participant AWSDynamoDB as AWS DynamoDB Client->>AWSIAM_STS: 1. AssumeRole with inline policy for tenant-A docs AWSIAM_STS-->>Client: 2. Returns temporary credentials Client->>DaskScheduler: 3. Submits delayed tasks with credentials for [doc1, doc2] DaskScheduler->>DaskWorker: 4. Assigns task for doc1 to Worker DaskWorker->>AWSDynamoDB: 5. GET / PUT for tenant-A/doc1 (using temp creds) Note right of DaskWorker: Access is ALLOWED. AWSDynamoDB-->>DaskWorker: 6. Returns data / Accepts write par DaskWorker->>AWSDynamoDB: 7. Attempts GET for tenant-B/doc3 Note right of DaskWorker: Malicious or buggy attempt. AWSDynamoDB-->>DaskWorker: 8. Access Denied (IAM policy violation) end
To verify this, we can write a test using pytest
and moto
to mock the AWS environment. This allows us to prove that a worker with credentials for one tenant cannot access data from another.
import pytest
from moto import mock_aws
# NOTE: This is a conceptual test. The full setup for Dask and moto is more involved.
# The purpose is to demonstrate the security principle.
@pytest.fixture
def aws_credentials():
"""Mocked AWS credentials for moto."""
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
@pytest.fixture
def mocked_dynamodb(aws_credentials):
with mock_aws():
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
table = dynamodb.create_table(
TableName="nlp_documents",
KeySchema=[
{'AttributeName': 'tenant_id', 'KeyType': 'HASH'},
{'AttributeName': 'document_id', 'KeyType': 'RANGE'}
],
AttributeDefinitions=[
{'AttributeName': 'tenant_id', 'AttributeType': 'S'},
{'AttributeName': 'document_id', 'AttributeType': 'S'}
],
BillingMode='PAY_PER_REQUEST'
)
table.put_item(Item={'tenant_id': 'tenant-A', 'document_id': 'doc-1', 'text_content': 'Hello world'})
table.put_item(Item={'tenant_id': 'tenant-B', 'document_id': 'doc-2', 'text_content': 'Goodbye world'})
yield table
@mock_aws
def test_access_control():
# This test would require mocking STS and IAM roles, which is complex.
# The principle we'd test is as follows:
# 1. Create mocked roles: Controller and Worker.
# 2. Populate mocked DynamoDB with data for tenant-A and tenant-B.
# 3. Use the real `get_temporary_credentials` function to vend credentials for tenant-A, doc-1.
# 4. Create a boto3 session with these temporary credentials.
# 5. Assert that this session CAN access tenant-A/doc-1.
# 6. Assert that this session CANNOT access tenant-B/doc-2 and raises a ClientError (AccessDeniedException).
# The moto library has limitations in fully simulating the policy enforcement
# of sts:AssumeRole with inline policies against other mocked services.
# However, in a real integration test against a sandbox AWS account,
# this behavior is exactly what would be validated.
assert True # Placeholder for the conceptual test.
Lingering Issues and Production Considerations
This architecture solves the core security problem, but it’s not without its own complexities and limitations. The “pragmatic senior engineer” in me knows that every solution is a set of trade-offs.
First, the sts:AssumeRole
API has rate limits. If we are launching thousands of very short jobs concurrently, we could be throttled. A potential mitigation is to vend credentials for a logical batch of work rather than a single document, amortizing the cost of the STS call across many tasks. This was done in the example (document_ids
list), but finding the right batch size is key.
Second, IAM inline policies have a strict size limit (2048 bytes for roles). The generate_scoped_dynamodb_policy
function includes a check for this. If a single job needed to access thousands of distinct documents, generating the full list of ARNs would exceed this limit. This fundamentally constrains the architecture. The solution is to ensure jobs are designed around patterns (e.g., all documents for a tenant in a given time range, which could be expressed with a Condition
on a sort key) rather than arbitrary lists of individual items.
Finally, there’s the problem of token expiry. The temporary credentials are valid for a configurable duration (defaulting to one hour). If a Dask task runs longer than this, its credentials will expire, and subsequent AWS API calls will fail. This forces a design constraint: Dask tasks must be idempotent and short-lived. This is generally good practice for distributed systems anyway, as it improves resilience and recoverability, but it’s an explicit requirement in this security model. A more complex solution involving a credential refresh mechanism within the worker would add significant complexity and potential failure modes. Sticking to short-lived tasks is the more robust path.