Implementing Traceable Asynchronous Workflows from Astro to AWS SQS with End-to-End Jest Integration Testing


The core challenge wasn’t simply offloading a task. We had a performant Astro frontend that needed to trigger long-running backend processes—think report generation or media transcoding. A synchronous API call was a non-starter; it would lock up the user interface and inevitably hit gateway timeouts. The obvious architectural move was to decouple the request from the execution using a message queue. But this introduced a new, more insidious problem: a visibility black hole.

When a user’s request failed an hour later in some far-off worker process, debugging was a nightmare of sifting through disconnected logs. We had no reliable way to trace the journey of a single request from the initial button click in the browser to the final log line of the worker that processed it. This lack of traceability in our asynchronous system was becoming a significant operational bottleneck. Any solution had to be built with testability and observability as first-class citizens, not as afterthoughts.

Our initial concept was straightforward: an Astro API route would act as the ingestion point, validating the request and dispatching a message to an AWS SQS queue. A separate Node.js process (in our case, an AWS Lambda function) would poll the queue and execute the task. The critical piece we decided to add from day one was a correlationId. This unique identifier, generated at the start of the workflow, would be passed through every component, ensuring we could stitch together logs from disparate services to see the full picture of a single transaction.

The technology selection was driven by pragmatism. We already used Astro for our content-heavy sites and Jest for unit testing. AWS SQS is a robust, no-frills queue that perfectly fit our use case without the operational overhead of something like Kafka. The real decision point was how to test this distributed, asynchronous system. A full-blown end-to-end framework like Cypress felt too heavy and focused on UI interactions. We needed to validate the backend integration points. The decision was made to extend our existing Jest setup to handle integration tests against a locally mocked AWS environment using LocalStack. This would allow us to run fast, reliable, and cost-effective tests in our CI pipeline.

The first step was establishing the entry point: an Astro API route. This route’s only responsibilities are to validate the incoming payload, generate a unique correlationId, and dispatch the job to SQS. It must not perform any heavy lifting itself.

Here’s the implementation of src/pages/api/jobs/submit.ts in our Astro project. Note the immediate generation of the correlationId and its consistent use in all logging.

// src/pages/api/jobs/submit.ts

import type { APIRoute } from 'astro';
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
import { randomUUID } from 'crypto';

// A simple logger factory that would be replaced with a real one like Pino or Winston.
// The key is that it's initialized with a correlationId.
const createLogger = (correlationId: string) => ({
  info: (message: string, context: object = {}) => console.log(JSON.stringify({ level: 'info', message, correlationId, ...context })),
  error: (message: string, context: object = {}) => console.error(JSON.stringify({ level: 'error', message, correlationId, ...context })),
});

// In a real project, this configuration would come from environment variables.
const sqsClient = new SQSClient({
  region: process.env.AWS_REGION || "us-east-1",
  // For local testing with LocalStack, we point the endpoint to the local container.
  endpoint: process.env.SQS_ENDPOINT_URL || undefined,
});

const queueUrl = process.env.JOB_QUEUE_URL;

export const POST: APIRoute = async ({ request }) => {
  const correlationId = request.headers.get('x-correlation-id') || randomUUID();
  const logger = createLogger(correlationId);

  if (!queueUrl) {
    logger.error("Configuration error: JOB_QUEUE_URL is not set.");
    return new Response(JSON.stringify({ message: "Server configuration error." }), { status: 500 });
  }

  let jobPayload: { type: string; data: any };
  try {
    jobPayload = await request.json();
    // Basic validation
    if (!jobPayload.type || typeof jobPayload.type !== 'string' || !jobPayload.data) {
      throw new Error("Invalid job payload structure.");
    }
  } catch (err) {
    logger.error("Failed to parse request body.", { error: (err as Error).message });
    return new Response(JSON.stringify({ message: "Bad Request: Invalid JSON payload." }), { status: 400 });
  }

  logger.info("Received job submission request.", { jobType: jobPayload.type });

  try {
    const command = new SendMessageCommand({
      QueueUrl: queueUrl,
      MessageBody: JSON.stringify(jobPayload),
      // This is the critical step for traceability. We pass the correlationId
      // as a message attribute so it's not part of the message body itself.
      // This keeps the body clean and focused on the job data.
      MessageAttributes: {
        CorrelationId: {
          DataType: "String",
          StringValue: correlationId,
        },
      },
    });

    const result = await sqsClient.send(command);
    logger.info("Successfully dispatched job to SQS.", { messageId: result.MessageId });

    return new Response(JSON.stringify({ 
      message: "Job accepted for processing.", 
      jobId: result.MessageId, // We can return the SQS message ID as a reference
      correlationId: correlationId,
    }), { status: 202 });

  } catch (err) {
    logger.error("Failed to send message to SQS.", { error: (err as Error).message });
    // In a real-world scenario, you might have a retry mechanism here or alert monitoring.
    return new Response(JSON.stringify({ message: "Internal Server Error: Could not queue job." }), { status: 500 });
  }
};

A common mistake is to embed metadata like a correlation ID directly inside the MessageBody. This couples your processing logic to the metadata structure. The correct approach is to use SQS Message Attributes. They are designed specifically for this kind of out-of-band metadata, allowing workers to route or handle messages based on attributes without needing to parse the entire body.

Next, the worker. This is a standalone Node.js process (e.g., a Lambda function) that processes messages from the queue. Its first action upon receiving a message must be to extract the correlationId from the message attributes and establish it in its own logging context. This ensures that every log line emitted by the worker is tagged with the same identifier from the initial API request.

// src/worker/handler.ts

import type { SQSHandler, SQSEvent } from 'aws-lambda';

// Re-using the same logger factory concept as the API route.
const createLogger = (correlationId: string) => ({
  info: (message: string, context: object = {}) => console.log(JSON.stringify({ level: 'info', message, correlationId, ...context })),
  error: (message: string, context: object = {}) => console.error(JSON.stringify({ level: 'error', message, correlationId, ...context })),
});

// A dummy processor function to simulate work.
const processJob = async (job: { type: string; data: any }): Promise<{ success: boolean; result: string }> => {
  return new Promise(resolve => {
    setTimeout(() => {
      console.log(`Processing job of type ${job.type} with data:`, job.data);
      if (job.type === 'FAILABLE_TASK' && Math.random() > 0.5) {
        throw new Error("Simulated processing failure.");
      }
      resolve({ success: true, result: `Completed job type ${job.type}` });
    }, 1000);
  });
};

export const handler: SQSHandler = async (event: SQSEvent) => {
  for (const record of event.Records) {
    // Default to a new UUID if correlationId is somehow missing, but this should be alerted on.
    const correlationId = record.messageAttributes.CorrelationId?.stringValue || 'missing-correlation-id';
    const logger = createLogger(correlationId);

    logger.info("Received message from SQS.", { messageId: record.messageId });

    try {
      const jobPayload = JSON.parse(record.body);
      
      // Here you would have a switch or handler mapping based on jobPayload.type
      logger.info("Starting job processing.", { jobType: jobPayload.type });
      const result = await processJob(jobPayload);
      logger.info("Job processing completed successfully.", { result });

      // If the message is processed successfully, Lambda will automatically delete it from the queue.
      // If an error is thrown, the message will become visible again after the visibility timeout
      // and can be retried, eventually moving to a Dead-Letter Queue (DLQ) if configured.

    } catch (err) {
      logger.error("Error processing SQS message.", { 
        error: (err as Error).message,
        stack: (err as Error).stack,
        messageId: record.messageId 
      });
      // By re-throwing the error, we signal to the Lambda environment that this message
      // processing failed. This is crucial for the SQS retry mechanism to work.
      throw err;
    }
  }
};

With the application logic in place, the focus shifts to testing. The goal is to write a Jest test that:

  1. Starts a local, containerized AWS environment using LocalStack.
  2. Programmatically creates the necessary SQS queue for the test run.
  3. Calls our Astro API endpoint to submit a job.
  4. Waits and verifies that the worker (which we’ll run directly in the test) processes the message.
  5. Validates that the correlationId was correctly propagated.
  6. Cleans up all resources after the test.

This requires a more sophisticated Jest configuration. We’ll use globalSetup and globalTeardown scripts to manage the lifecycle of our Docker container for LocalStack.

First, the docker-compose.yml for LocalStack:

# docker-compose.yml
version: "3.8"

services:
  localstack:
    container_name: "localstack_main"
    image: localstack/localstack:latest
    ports:
      - "4566:4566" # Main gateway
    environment:
      - SERVICES=sqs,lambda
      - DEFAULT_REGION=us-east-1
      - DEBUG=1

Next, the Jest setup files.

// tests/jest-global-setup.js
const { execSync } = require('child_process');
const { SQSClient, CreateQueueCommand } = require("@aws-sdk/client-sqs");
const util = require('util');
const exec = util.promisify(require('child_process').exec);

const SQS_ENDPOINT_URL = 'http://localhost:4566';
const AWS_REGION = 'us-east-1';
const QUEUE_NAME = 'test-job-queue';

const wait_for_localstack_ready = async () => {
  console.log('Waiting for LocalStack to be ready...');
  for (let i = 0; i < 30; i++) {
    try {
      const { stdout } = await exec('curl http://localhost:4566/_localstack/health');
      const health = JSON.parse(stdout);
      if (health.services && health.services.sqs === 'running') {
        console.log('LocalStack SQS is ready.');
        return;
      }
    } catch (e) {
      // Ignore errors and retry
    }
    await new Promise(resolve => setTimeout(resolve, 2000));
  }
  throw new Error('LocalStack did not become ready in time.');
};

module.exports = async () => {
  console.log('\nStarting LocalStack container...');
  execSync('docker-compose up -d', { stdio: 'inherit' });

  await wait_for_localstack_ready();

  console.log('Creating SQS queue for tests...');
  const sqsClient = new SQSClient({
    region: AWS_REGION,
    endpoint: SQS_ENDPOINT_URL,
  });

  const command = new CreateQueueCommand({ QueueName: QUEUE_NAME });
  const response = await sqsClient.send(command);
  
  // Set environment variables for the tests to use
  process.env.SQS_ENDPOINT_URL = SQS_ENDPOINT_URL;
  process.env.AWS_REGION = AWS_REGION;
  process.env.JOB_QUEUE_URL = response.QueueUrl;
  
  console.log(`Test queue created: ${response.QueueUrl}`);
};
// tests/jest-global-teardown.js
const { execSync } = require('child_process');

module.exports = async () => {
  console.log('\nStopping LocalStack container...');
  execSync('docker-compose down', { stdio: 'inherit' });
};

Update jest.config.js:

// jest.config.js
module.exports = {
  preset: 'ts-jest',
  testEnvironment: 'node',
  globalSetup: '<rootDir>/tests/jest-global-setup.js',
  globalTeardown: '<rootDir>/tests/jest-global-teardown.js',
  testMatch: ['**/tests/**/*.test.ts'],
};

Now for the integration test itself. This is where all the pieces come together. We’ll use supertest to make an HTTP request to our API endpoint. Then, we need a mechanism to poll the SQS queue to simulate the worker’s behavior.

// tests/job-submission.test.ts

import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
import { handler as workerHandler } from '../src/worker/handler';
import { POST as submitJobHandler } from '../src/pages/api/jobs/submit';

// A mock for the Astro request object
const createMockRequest = (body: any, headers: Record<string, string> = {}) => ({
  json: async () => body,
  headers: new Map(Object.entries(headers)),
});

// A robust polling utility to wait for an asynchronous condition to be met.
// A common mistake is not having this, leading to flaky tests.
const poll = async (
  fn: () => Promise<boolean>,
  { interval, timeout }: { interval: number; timeout: number }
): Promise<void> => {
  const startTime = Date.now();
  while (Date.now() - startTime < timeout) {
    if (await fn()) {
      return;
    }
    await new Promise(resolve => setTimeout(resolve, interval));
  }
  throw new Error(`Polling timed out after ${timeout}ms.`);
};

describe('Job Submission End-to-End Flow', () => {
  let sqsClient: SQSClient;
  
  beforeAll(() => {
    sqsClient = new SQSClient({
      region: process.env.AWS_REGION,
      endpoint: process.env.SQS_ENDPOINT_URL,
    });
  });

  it('should process a job with the same correlationId from submission to completion', async () => {
    // 1. Setup: Capture logs to verify correlationId later
    const logs: any[] = [];
    const originalConsoleLog = console.log;
    console.log = (message) => {
      try {
        logs.push(JSON.parse(message));
      } catch (e) {
        // Ignore non-json logs
      }
      originalConsoleLog(message);
    };

    // 2. Act: Submit the job via the API handler
    const jobPayload = { type: 'REPORT_GENERATION', data: { userId: 'user-123' } };
    const request = createMockRequest(jobPayload);
    const response = await submitJobHandler({ request } as any);

    expect(response.status).toBe(202);
    const responseBody = await response.json();
    const submittedCorrelationId = responseBody.correlationId;
    expect(submittedCorrelationId).toBeDefined();

    // 3. Process: Simulate the worker picking up the message
    // We'll poll SQS to find the message that was just sent.
    let receivedMessage: any;
    await poll(async () => {
      const receiveCommand = new ReceiveMessageCommand({
        QueueUrl: process.env.JOB_QUEUE_URL,
        MessageAttributeNames: ['All'],
        WaitTimeSeconds: 5,
      });
      const { Messages } = await sqsClient.send(receiveCommand);
      if (Messages && Messages.length > 0) {
        receivedMessage = Messages[0];
        return true;
      }
      return false;
    }, { interval: 1000, timeout: 15000 });

    expect(receivedMessage).toBeDefined();

    // Create the SQSEvent payload for the worker handler
    const sqsEvent = {
      Records: [receivedMessage],
    };

    // Invoke the worker handler
    await workerHandler(sqsEvent as any, {} as any, () => {});

    // 4. Assert: Check the logs for consistent correlationId
    const apiLog = logs.find(log => log.message === 'Successfully dispatched job to SQS.');
    const workerStartLog = logs.find(log => log.message === 'Received message from SQS.');
    const workerEndLog = logs.find(log => log.message === 'Job processing completed successfully.');
    
    expect(apiLog).toBeDefined();
    expect(workerStartLog).toBeDefined();
    expect(workerEndLog).toBeDefined();

    expect(apiLog.correlationId).toBe(submittedCorrelationId);
    expect(workerStartLog.correlationId).toBe(submittedCorrelationId);
    expect(workerEndLog.correlationId).toBe(submittedCorrelationId);

    // 5. Cleanup
    const deleteCommand = new DeleteMessageCommand({
      QueueUrl: process.env.JOB_QUEUE_URL,
      ReceiptHandle: receivedMessage.ReceiptHandle,
    });
    await sqsClient.send(deleteCommand);
    console.log = originalConsoleLog; // Restore console.log
  }, 20000); // Increase test timeout
});

This architecture provides a solid foundation. The correlationId allows us to trace a request across service boundaries, which is invaluable for debugging in production. The Jest integration test validates this entire flow, providing high confidence that our core traceability mechanism is working before we deploy.

sequenceDiagram
    participant User
    participant Astro Frontend
    participant Astro API Route
    participant AWS SQS
    participant Worker (Lambda)

    User->>Astro Frontend: Clicks "Generate Report"
    Astro Frontend->>Astro API Route: POST /api/jobs/submit with {job data}
    activate Astro API Route
    Astro API Route->>Astro API Route: Generates correlationId
    Astro API Route->>AWS SQS: SendMessage({body: {job data}, attributes: {correlationId}})
    activate AWS SQS
    AWS SQS-->>Astro API Route: MessageID
    deactivate AWS SQS
    Astro API Route-->>Astro Frontend: 202 Accepted, {correlationId}
    deactivate Astro API Route

    Note right of Worker (Lambda): Worker is polling SQS...
    Worker (Lambda)->>AWS SQS: ReceiveMessage()
    activate AWS SQS
    AWS SQS-->>Worker (Lambda): Message({body, attributes})
    deactivate AWS SQS
    activate Worker (Lambda)
    Worker (Lambda)->>Worker (Lambda): Extracts correlationId from attributes
    Worker (Lambda)->>Worker (Lambda): Processes job... (logs with correlationId)
    Worker (Lambda)->>AWS SQS: DeleteMessage()
    deactivate Worker (Lambda)

However, this solution has its boundaries. While the correlationId is a massive improvement over no tracing, it’s a manual implementation of what distributed tracing systems like OpenTelemetry provide out of the box. For a more complex microservices architecture, we would evolve this pattern to propagate full W3C Trace Context headers, enabling richer observability with parent/child spans. Furthermore, our test relies on running the worker handler directly. In a more advanced setup, we might containerize the worker and have LocalStack’s Lambda emulator invoke it, providing an even higher-fidelity test environment. This current implementation strikes a pragmatic balance between complexity, cost, and the immediate, critical need for traceable, asynchronous job processing.


  TOC