Implementing Event-Driven Incremental Static Regeneration Using Nuxt.js and Redis Streams


The promise of a Static Site Generation (SSG) architecture with a framework like Nuxt.js is compelling: exceptional performance, enhanced security, and simplified hosting. The trade-off, however, has always been content freshness. For a large-scale catalog or documentation site with thousands of pages, a full site rebuild triggered by a minor data change in the backend database is not just inefficient; it’s operationally untenable. A five-minute build time is acceptable for a daily deployment, but it’s a critical failure when a price update needs to be live in seconds.

This was the exact problem we faced. Our product database was the single source of truth, but the Nuxt SSG front-end was perpetually out-of-sync, relying on a nightly build process. The initial proposed solution was to shorten the rebuild interval using webhooks. This merely papers over the cracks. Triggering a full rebuild every 15 minutes would hammer the CI/CD infrastructure and still leave a significant window of data staleness. The real solution required a fundamental shift from a monolithic build process to an event-driven, granular invalidation model. We needed to surgically update only the pages affected by a database change, in near real-time.

The architecture we settled on combines database Change Data Capture (CDC), a Redis Stream as a durable event bus, and a long-running listener process within the Nuxt.js server engine itself to perform targeted cache invalidations.

The Architectural Blueprint

Before diving into code, here is the high-level data flow. Understanding this is critical, as the implementation details directly map to this process.

graph TD
    A[PostgreSQL Database] -- Logical Replication --> B(CDC Emitter Service - Node.js);
    B -- XADD events --> C{Redis Stream: 'db-changes'};
    D[Nuxt.js Nitro Server] -- XREADGROUP from consumer group --> C;
    subgraph "Nitro Server Logic"
        D -- Receives message --> E{Event Processor};
        E -- Maps DB row to URL --> F{Path Invalidator};
        F -- Invalidates '/products/123' --> G([SSG Page Cache]);
    end
    H[End User] -- HTTP GET /products/123 --> D;
    D -- Cache Miss (due to invalidation) --> I{Re-renders page on-demand};
    I -- Serves fresh page --> H;
    I -- Populates cache --> G;

In a production environment, you would use a robust CDC tool like Debezium. For the purpose of this self-contained build log, we’ll simulate the CDC mechanism using a PostgreSQL trigger and its NOTIFY command. This keeps the focus on the application-level integration without requiring a full Kafka/Debezium stack.

Component 1: The Database and the Emitter Service

The entire process begins at the data source. Our model, managed by Prisma ORM, is a simple Product.

Prisma Schema (schema.prisma):

generator client {
  provider = "prisma-client-js"
}

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

model Product {
  id          Int      @id @default(autoincrement())
  name        String
  description String?
  price       Float
  stock       Int
  updatedAt   DateTime @updatedAt
  createdAt   DateTime @default(now())
}

To capture changes, we’ll create a PostgreSQL function and a trigger that fires on any INSERT or UPDATE to the products table. This function will craft a JSON payload and send it over a channel named db_events.

PostgreSQL Trigger Setup (setup.sql):

-- setup.sql
CREATE OR REPLACE FUNCTION notify_product_change()
RETURNS TRIGGER AS $$
DECLARE
  payload JSON;
BEGIN
  -- TG_OP is 'INSERT', 'UPDATE', or 'DELETE'
  payload = json_build_object(
    'operation', TG_OP,
    'table', TG_TABLE_NAME,
    'record_id', NEW.id
  );

  -- Send notification on the 'db_events' channel
  PERFORM pg_notify('db_events', payload::text);

  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Drop existing trigger to ensure idempotency
DROP TRIGGER IF EXISTS product_change_trigger ON products;

-- Create the trigger
CREATE TRIGGER product_change_trigger
AFTER INSERT OR UPDATE ON products
FOR EACH ROW
EXECUTE FUNCTION notify_product_change();

With the database configured to emit notifications, we need a dedicated service to listen for them and push them into a more durable, scalable message broker. A standalone Node.js process is perfect for this. We chose Redis Streams because of its persistence, consumer group functionality, and low operational overhead.

The CDC Emitter Service (emitter/index.js):

This service requires pg to listen to PostgreSQL notifications and redis to interact with Redis Streams.

// emitter/index.js
import { Client as PGClient } from 'pg';
import { createClient as createRedisClient } from 'redis';
import 'dotenv/config';

const STREAM_KEY = 'db-changes';

// Validate environment variables for production readiness
if (!process.env.DATABASE_URL || !process.env.REDIS_URL) {
  console.error('FATAL: DATABASE_URL and REDIS_URL must be defined in .env');
  process.exit(1);
}

const pgClient = new PGClient({ connectionString: process.env.DATABASE_URL });
const redisClient = createRedisClient({ url: process.env.REDIS_URL });

async function main() {
  console.log('Starting CDC Emitter Service...');

  // Connect to both services
  await Promise.all([
    pgClient.connect(),
    redisClient.connect()
  ]).catch(err => {
    console.error('Failed to connect to PG or Redis', err);
    process.exit(1);
  });

  console.log('Connected to PostgreSQL and Redis.');

  // Set up listener for PostgreSQL notifications
  pgClient.on('notification', async (msg) => {
    try {
      console.log(`[${new Date().toISOString()}] Received notification on channel:`, msg.channel);
      const payload = JSON.parse(msg.payload);
      
      // A common mistake is not validating the payload.
      if (!payload.operation || !payload.record_id) {
        console.warn('Received malformed payload, skipping:', msg.payload);
        return;
      }

      console.log('Parsed payload:', payload);

      // Add the event to the Redis Stream
      const streamId = await redisClient.xAdd(STREAM_KEY, '*', {
        operation: payload.operation,
        table: payload.table,
        recordId: String(payload.record_id), // Stream fields must be strings
      });

      console.log(`Successfully added event to stream ${STREAM_KEY} with ID: ${streamId}`);
    } catch (error) {
      console.error('Error processing notification and adding to stream:', error);
      // In a real-world project, you'd add more robust error handling,
      // possibly pushing to a dead-letter queue.
    }
  });

  // Start listening on the channel defined in the SQL trigger
  await pgClient.query('LISTEN db_events');
  console.log('Now listening for "db_events" notifications from PostgreSQL.');
}

// Graceful shutdown handling
async function shutdown() {
  console.log('Shutting down emitter service...');
  try {
    await pgClient.end();
    await redisClient.quit();
    console.log('Connections closed.');
    process.exit(0);
  } catch (err) {
    console.error('Error during shutdown:', err);
    process.exit(1);
  }
}

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

main().catch(err => {
    console.error('Emitter service encountered a fatal error:', err);
    shutdown();
});

This service is robust: it validates environment variables, handles connection errors, parses incoming JSON, and includes graceful shutdown hooks. It’s designed to run continuously as a background process.

Component 2: The Nuxt.js Consumer and Invalidator

This is where the magic happens. The Nuxt.js application, running in a server environment (not in the browser), needs to consume events from the Redis Stream and act upon them. Nuxt 3’s server engine, Nitro, allows us to run code on server startup, which is the perfect place to initialize our stream consumer.

We will create a server plugin. This plugin will run only once when the server starts.

Nuxt Server Plugin (server/plugins/redis.stream.consumer.ts):

// server/plugins/redis.stream.consumer.ts
import { createClient } from 'redis';

// These values should come from runtime config for better security
const STREAM_KEY = 'db-changes';
const GROUP_NAME = 'nuxt-invalidator-group';
const CONSUMER_NAME = `consumer-${process.pid}`; // Unique consumer name per process

export default defineNitroPlugin(async (nitroApp) => {
  // This plugin should only run in a server environment, not during build.
  if (!process.env.REDIS_URL || process.dev) {
    console.log('Skipping Redis Stream consumer setup (missing REDIS_URL or in dev mode).');
    return;
  }
  
  console.log(`[Nitro] Initializing Redis Stream consumer: ${CONSUMER_NAME}`);
  
  const redisClient = createClient({ url: process.env.REDIS_URL });
  
  redisClient.on('error', (err) => console.error('[Redis Client Error]', err));
  
  await redisClient.connect();

  // Ensure the consumer group exists. The '$' means only read new messages.
  // In a real-world project, you'd create the group with '0-0' once,
  // to process any messages that were missed while the consumer was down.
  try {
    await redisClient.xGroupCreate(STREAM_KEY, GROUP_NAME, '$', {
      MKSTREAM: true, // Create the stream if it doesn't exist
    });
    console.log(`Consumer group "${GROUP_NAME}" created or already exists.`);
  } catch (error: any) {
    if (error.message.includes('BUSYGROUP')) {
      console.log(`Consumer group "${GROUP_NAME}" already exists.`);
    } else {
      console.error('Failed to create consumer group:', error);
      // If we can't create the group, we can't proceed.
      return; 
    }
  }

  async function listenForEvents() {
    while (true) {
      try {
        const response = await redisClient.xReadGroup(
          GROUP_NAME,
          CONSUMER_NAME,
          { key: STREAM_KEY, id: '>' }, // '>' means read new, unread messages for this consumer
          { BLOCK: 5000, COUNT: 10 } // Block for 5 seconds, read up to 10 messages
        );

        if (!response || response.length === 0) {
          // Timed out, loop again
          continue;
        }

        // response is an array of streams, we only listen to one
        const messages = response[0].messages;

        for (const message of messages) {
          console.log(`[${CONSUMER_NAME}] Received message ID:`, message.id);
          const { operation, table, recordId } = message.message;

          // The core invalidation logic
          if (table === 'products' && recordId) {
            const path = `/products/${recordId}`;
            console.log(`Invalidating path: ${path}`);
            
            // This is the key Nitro API for cache invalidation.
            // It uses unstorage under the hood to tag the route as stale.
            await useStorage('cache:nitro:routes').setItem(path, {
              stale: true,
              cacheControl: 'private, no-cache, no-store, max-age=0, must-revalidate'
            });

            // Acknowledge the message was processed successfully
            await redisClient.xAck(STREAM_KEY, GROUP_NAME, message.id);
            console.log(`Acknowledged message ID:`, message.id);
          } else {
            console.warn('Received unhandled message, acknowledging to discard:', message);
            await redisClient.xAck(STREAM_KEY, GROUP_NAME, message.id);
          }
        }
      } catch (err) {
        console.error('Error in Redis stream consumer loop:', err);
        // Wait before retrying to avoid hammering Redis on connection loss
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    }
  }

  // Do not block the server startup process.
  // Run the listener in the background.
  listenForEvents();

  // Handle graceful shutdown
  nitroApp.hooks.hook('close', async () => {
    console.log('Closing Redis connection...');
    if (redisClient.isOpen) {
      await redisClient.quit();
    }
  });
});

This code is dense but critical. Let’s break down the pragmatic decisions:

  1. Consumer Groups: Using XREADGROUP is non-negotiable for production. It ensures that if multiple Nuxt server instances are running, they coordinate so that each message is processed by only one instance. It also allows a consumer to crash and pick up where it left off, as Redis tracks which messages have been acknowledged (XACK).
  2. Unique Consumer Name: CONSUMER_NAME includes the process ID. This is a simple way to ensure that if you scale up pods/processes on the same machine, they each register as a unique consumer within the group.
  3. Blocking Read: BLOCK: 5000 makes the connection efficient. Instead of constantly polling Redis in a tight loop, the client waits up to 5 seconds for a new message to arrive.
  4. Error Handling: The while(true) loop is wrapped in a try...catch. If the connection to Redis is lost, it will log the error and pause before attempting to reconnect, preventing a CPU-spinning failure loop.
  5. Invalidation API: The core action is useStorage('cache:nitro:routes').setItem(...). This is a lower-level Nitro API that manipulates the route cache directly. By setting stale: true, we tell Nitro that the next time a request comes in for /products/123, the cached HTML is invalid and a fresh version must be rendered from the data source.

Component 3: The Nuxt Page and Data Fetching

Finally, the Nuxt page itself needs to be set up for SSG and use useAsyncData to fetch data. This ensures it’s rendered at build time and can be re-rendered on-demand.

The Product Page (pages/products/[id].vue):

<!-- pages/products/[id].vue -->
<template>
  <div v-if="product">
    <h1>{{ product.name }}</h1>
    <p>Last updated: {{ new Date(product.updatedAt).toLocaleString() }}</p>
    <p>Price: ${{ product.price }}</p>
    <p>Stock: {{ product.stock }}</p>
    <pre>{{ product.description }}</pre>
  </div>
  <div v-else-if="error">
    <p>Error loading product: {{ error.message }}</p>
  </div>
  <div v-else>
    <p>Product not found.</p>
  </div>
</template>

<script setup lang="ts">
import { PrismaClient } from '@prisma/client';

const route = useRoute();
const id = Number(route.params.id);

// A pragmatic approach is to instantiate the Prisma client once per request
// in the server context. Avoid creating it in the global scope of a plugin.
const prisma = new PrismaClient();

const { data: product, error } = await useAsyncData(
  `product-${id}`,
  () => prisma.product.findUnique({ where: { id } }),
  {
    // server: true is the default, but explicitly stating it clarifies intent.
    // This fetch runs on the server during SSG build, and again on-demand after cache invalidation.
    server: true,
  }
);

// It's crucial to handle the case where the product is not found.
// This will correctly generate a 404 page during the SSG build.
if (process.server && !product.value) {
  throw createError({ statusCode: 404, statusMessage: 'Product Not Found' });
}

// Ensure Prisma client is disconnected after the request to avoid connection leaks.
// A more robust solution is to use a server middleware to manage the lifecycle.
onBeforeUnmount(() => {
  if (process.server) {
    prisma.$disconnect();
  }
});
</script>

When the site is built, Nuxt will crawl and pre-render a page for every product. When a user visits /products/123, they are served the static HTML instantly. Then, if we update product 123 in the database, the following happens:

  1. The PG trigger fires, sending a NOTIFY.
  2. The emitter service receives it and XADDs a message to the db-changes stream.
  3. The redis.stream.consumer.ts plugin in our Nuxt app, which is waiting via XREADGROUP, receives the message.
  4. It invalidates the cache for /products/123.
  5. The next user to visit /products/123 triggers a cache miss. Nitro executes the useAsyncData function again, fetching the fresh data from the database, renders a new static HTML file, serves it to the user, and places it in the cache for subsequent visitors.

Lingering Issues and Production Considerations

This architecture is powerful, but it introduces complexity. It’s not a silver bullet, and a pragmatic engineer must acknowledge the trade-offs.

The primary risk is a “thundering herd” problem on cache invalidation. A bulk import that updates 10,000 products in 30 seconds would fire 10,000 events, potentially overwhelming the Nuxt server with on-demand rendering requests. The stream consumer logic should be enhanced with batching or debouncing strategies, perhaps collecting invalidation paths for a few seconds and then invalidating them in a single batch.

Furthermore, the system’s resilience now depends on three components: the database, the emitter service, and the Redis instance. The emitter service must be managed by a process supervisor like PM2 or run in a container orchestrated by Kubernetes to ensure it restarts on failure. The Redis consumer group provides message delivery guarantees, but if the consumer logic has a bug and continuously crashes, messages can end up in the Pending Entries List (PEL), requiring manual intervention.

Finally, there’s the question of state drift. If an event is missed for any reason (e.g., the emitter was down, a bug in the logic), a page can become permanently stale. This event-driven system is for near real-time updates; it does not replace the need for a periodic, full-site rebuild (e.g., nightly) which acts as a self-healing mechanism to guarantee eventual consistency across the entire site.


  TOC