Implementing a Secure Real-Time Transcription Layer over WebRTC with GraphQL Subscriptions and JWT


The initial technical pain point was clear: integrating real-time transcription into our multi-party WebRTC platform was both a performance bottleneck and an operational cost nightmare. Relying on third-party cloud APIs meant shipping raw audio streams out to the public internet, introducing significant latency and data privacy concerns. Furthermore, their pricing models scaled poorly with the number of concurrent streams. The objective became to build an in-house, low-latency transcription service that was tightly coupled with our WebRTC infrastructure, scalable, and secure.

Our first concept was a monolithic architecture where the WebRTC Selective Forwarding Unit (SFU) process would also handle transcription. This was quickly discarded. A real-world project cannot afford to have a CPU-intensive, potentially unstable NLP model crash the core media routing component. Decoupling was non-negotiable. This led to a distributed architecture: the WebRTC SFU’s only job is to route media efficiently. A separate, horizontally scalable pool of NLP workers would be responsible for processing. The challenge then shifted to the pipeline: how to get audio from the SFU to an NLP worker and the resulting text back to the clients with minimal delay.

This is where the technology selection became critical. For the SFU, we chose Mediasoup, a powerful and flexible Node.js library that gives fine-grained control over media streams. For the API and real-time client updates, GraphQL was a superior choice over REST. Its subscription model is tailor-made for pushing live transcription data, and its typed schema provides a robust contract for managing the complex state of rooms, participants, and media producers. For security, JWT was the standard, allowing us to embed authorization claims directly into tokens. The NLP component was designed as a black box service, ready to consume an audio stream and produce text, allowing us to swap models (e.g., Whisper, Vosk) without re-architecting the pipeline.

The resulting architecture looks like this:

sequenceDiagram
    participant Client
    participant GraphQL_API as GraphQL API (Node.js/Apollo)
    participant Mediasoup_SFU as Mediasoup SFU
    participant Redis_PubSub as Redis Pub/Sub
    participant NLP_Worker as NLP Worker

    Client->>GraphQL_API: Mutation: joinRoom(roomId) [JWT Auth]
    GraphQL_API-->>Client: SFU connection details
    Client->>Mediasoup_SFU: Establish WebRTC Transport
    Client->>Mediasoup_SFU: Produce Audio Stream

    Mediasoup_SFU->>Mediasoup_SFU: Identify audio producer for transcription
    Note right of Mediasoup_SFU: Create PlainTransport to pipe RTP audio out
    Mediasoup_SFU->>NLP_Worker: Stream RTP packets to a listening UDP port

    NLP_Worker->>NLP_Worker: Decode RTP and process audio
    NLP_Worker-->>Redis_PubSub: PUBLISH room:transcriptions "{ text: '...' }"

    Redis_PubSub-->>GraphQL_API: Receive message on subscribed channel
    GraphQL_API-->>Client: Push data via GraphQL Subscription

The core of the implementation is orchestrating these independent components. Let’s start with the foundation: a secure GraphQL server.

Securing the Control Plane with JWT and GraphQL

Before any media flows, a client must authenticate. Our GraphQL API, built with Apollo Server and Express, uses a middleware to validate JWTs on every request. A common mistake is to perform validation inside every resolver; a centralized middleware is far more maintainable.

// src/auth/jwt.ts
import jwt from 'jsonwebtoken';
import { Request } from 'express';

const JWT_SECRET = process.env.JWT_SECRET || 'a-very-secret-key-for-dev';

export interface UserContext {
  userId: string;
  permissions: string[];
}

export const validateToken = (req: Request): UserContext | null => {
  const authHeader = req.headers.authorization;
  if (!authHeader) {
    return null;
  }

  const token = authHeader.split(' ')[1];
  if (!token) {
    return null;
  }

  try {
    const payload = jwt.verify(token, JWT_SECRET) as {
      userId: string;
      permissions: string[];
    };
    return {
      userId: payload.userId,
      permissions: payload.permissions || [],
    };
  } catch (error) {
    // In a real-world project, you'd log this error.
    console.error('JWT validation failed:', error);
    return null;
  }
};

This validateToken function is then integrated into the Apollo Server context. This makes the user’s identity and permissions available to all resolvers, which is critical for authorization logic later on.

// src/server.ts
import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import express from 'express';
import http from 'http';
import cors from 'cors';
import { json } from 'body-parser';
import { typeDefs } from './graphql/schema';
import { resolvers } from './graphql/resolvers';
import { UserContext, validateToken } from './auth/jwt';

// ... (WebSocket server setup for subscriptions omitted for brevity)

interface AppContext {
  user: UserContext | null;
}

async function startServer() {
  const app = express();
  const httpServer = http.createServer(app);

  const server = new ApolloServer<AppContext>({
    typeDefs,
    resolvers,
    plugins: [ApolloServerPluginDrainHttpServer({ httpServer })],
  });

  await server.start();

  app.use(
    '/graphql',
    cors<cors.CorsRequest>(),
    json(),
    expressMiddleware(server, {
      context: async ({ req }) => ({
        user: validateToken(req as Request),
      }),
    }),
  );
  
  // ... (start http server)
}

Now, every resolver can check context.user. For instance, the joinRoom mutation must ensure a user is authenticated.

GraphQL Schema for Real-Time State Management

The GraphQL schema defines the entire API surface. It needs to handle not just WebRTC signaling but also the real-time transcription data. The key here is the Subscription type.

# src/graphql/schema.graphql

type Query {
  getRoom(roomId: ID!): Room
}

type Mutation {
  # Returns routerRtpCapabilities for the client to initialize Mediasoup device
  getRouterRtpCapabilities(roomId: ID!): String!

  # Creates a transport on the server
  createWebRtcTransport(roomId: ID!, direction: TransportDirection!): TransportParams!

  # Connects a client-side transport to the server-side transport
  connectWebRtcTransport(roomId: ID!, transportId: ID!, dtlsParameters: String!): Boolean!

  # Instructs the server to start receiving media
  produce(
    roomId: ID!
    transportId: ID!
    kind: MediaKind!
    rtpParameters: String!
    appData: String
  ): ID!
}

type Subscription {
  transcriptionUpdated(roomId: ID!): TranscriptionSegment!
}

enum TransportDirection {
  SEND
  RECV
}

enum MediaKind {
  audio
  video
}

type Room {
  id: ID!
  participants: [Participant!]!
}

type Participant {
  id: ID!
  userId: String!
}

type TransportParams {
  id: ID!
  iceParameters: String!
  iceCandidates: String!
  dtlsParameters: String!
}

type TranscriptionSegment {
  userId: String!
  text: String!
  isPartial: Boolean!
}

The mutations directly map to the steps required for a client to establish a WebRTC connection with Mediasoup. The transcriptionUpdated subscription is the channel through which clients will receive live text. The pitfall to avoid here is making the signaling too chatty. We bundle parameters into JSON strings to keep the number of distinct mutations manageable.

Bridging GraphQL and the WebRTC SFU

The resolvers are the glue. They take GraphQL requests and translate them into actions on our Mediasoup SFU instance. This requires careful state management. We’ll use a simple in-memory map to store room and participant state for this example, but a production system would use Redis or a database.

First, we need to initialize our Mediasoup worker and router. This is a one-time setup process.

// src/services/mediasoupManager.ts
import * as mediasoup from 'mediasoup';
import { Worker } from 'mediasoup/node/lib/Worker';
import { Router } from 'mediasoup/node/lib/Router';
import { RtpCapabilities } from 'mediasoup/node/lib/RtpParameters';
import os from 'os';

// Basic configuration
const mediasoupConfig = {
  worker: {
    rtcMinPort: 40000,
    rtcMaxPort: 49999,
    logLevel: 'warn',
    logTags: ['info', 'ice', 'dtls', 'rtp', 'srtp', 'rtcp'],
  },
  router: {
    mediaCodecs: [
      {
        kind: 'audio',
        mimeType: 'audio/opus',
        clockRate: 48000,
        channels: 2,
      },
      // video codecs omitted...
    ],
  },
};

let worker: Worker;
let router: Router;

export async function startMediasoup() {
  worker = await mediasoup.createWorker({
    logLevel: mediasoupConfig.worker.logLevel,
  });

  worker.on('died', () => {
    console.error('Mediasoup worker has died, exiting in 2 seconds...');
    setTimeout(() => process.exit(1), 2000);
  });

  router = await worker.createRouter({
    mediaCodecs: mediasoupConfig.router.mediaCodecs,
  });
  console.log('Mediasoup worker and router started.');
}

export const getRouter = () => router;

The resolver for createWebRtcTransport will call router.createWebRtcTransport. It’s crucial to handle errors here, as network configuration issues can often cause this step to fail.

// src/graphql/resolvers.ts (partial)
import { getRouter } from '../services/mediasoupManager';
// ... other state management logic

const resolvers = {
  Mutation: {
    // ...
    createWebRtcTransport: async (_: any, { roomId, direction }, context) => {
      if (!context.user) throw new Error('Authentication required');

      const router = getRouter();
      const transport = await router.createWebRtcTransport({
        listenIps: [{ ip: '0.0.0.0', announcedIp: process.env.ANNOUNCED_IP }],
        enableUdp: true,
        enableTcp: true,
        preferUdp: true,
      });

      // Store transport on our room state object, linking it to the user
      // ... state management logic ...

      return {
        id: transport.id,
        iceParameters: JSON.stringify(transport.iceParameters),
        iceCandidates: JSON.stringify(transport.iceCandidates),
        dtlsParameters: JSON.stringify(transport.dtlsParameters),
      };
    },
    // ... more resolvers for connect and produce
  }
};

The ANNOUNCED_IP is a critical configuration detail. In production, this must be the public IP address of the server so that clients can connect to it. A common mistake in development is leaving this as 127.0.0.1, which will work locally but fail in any other environment.

The Audio Pipeline: From SFU to NLP Worker

This is the most challenging and original part of the system. Once a client calls the produce mutation and starts sending audio, we need to extract that audio stream from the SFU and send it to our NLP service. Mediasoup’s PlainTransport is the perfect tool for this. It can take an RTP stream from a producer and forward it over plain RTP to a specific IP and port on the server’s local network.

When a user’s audio producer is created, we trigger this logic.

// src/services/transcriptionService.ts
import { Router } from 'mediasoup/node/lib/Router';
import { Producer } from 'mediasoup/node/lib/Producer';
import dgram from 'dgram';

const NLP_WORKER_IP = '127.0.0.1';
const NLP_WORKER_PORT_START = 50000;
let nextNlpPort = NLP_WORKER_PORT_START;

// A map to keep track of consumers piping audio to NLP workers
const transcriptionConsumers = new Map<string, { consumerId: string, transportId: string }>();

export async function startTranscription(router: Router, producer: Producer, userId: string, roomId: string) {
  if (transcriptionConsumers.has(producer.id)) {
    console.warn(`Transcription already running for producer ${producer.id}`);
    return;
  }
  
  // Find an available port for the NLP worker to listen on.
  // In a real system, this would be managed by a service discovery mechanism.
  const nlpListenPort = nextNlpPort++; 
  
  // Create a transport to pipe RTP to our NLP worker
  const plainTransport = await router.createPlainTransport({
    listenIp: '0.0.0.0',
    rtcpMux: false,
    comedia: true,
  });

  // The IP and port where the NLP worker should listen for this stream
  console.log(`NLP transport created. Worker for user ${userId} should listen on port ${nlpListenPort}`);

  // Create a consumer on the SFU to receive the user's audio
  const rtpConsumer = await plainTransport.consume({
    producerId: producer.id,
    rtpCapabilities: router.rtpCapabilities, // Use router's capabilities for simplicity
    paused: true, // Start paused
  });

  transcriptionConsumers.set(producer.id, { consumerId: rtpConsumer.id, transportId: plainTransport.id });

  // Connect the transport to the NLP worker's listening port
  await plainTransport.connect({
    ip: NLP_WORKER_IP,
    port: nlpListenPort,
  });

  // It's critical to resume the consumer *after* connecting the transport.
  await rtpConsumer.resume();
  
  console.log(`Streaming audio for producer ${producer.id} to ${NLP_WORKER_IP}:${nlpListenPort}`);
  
  // Here, you would notify the NLP worker pool to start a worker
  // listening on `nlpListenPort` and associate it with `roomId` and `userId`.
  // This could be done via an API call or a message queue.
  // For this example, we assume the worker is started manually.
}

This code snippet is dense with important details. comedia: true is essential for Mediasoup to start sending RTP without waiting for an incoming packet. We create a consumer on the server-side that consumes the user’s audio and pipes it into the PlainTransport, which then sends it as a raw RTP stream to a port we’ve designated for an NLP worker.

The NLP Worker and Publishing Results

The NLP worker is a separate process or container. Its job is to listen on its assigned UDP port, decode the RTP stream, perform transcription, and publish the results. Using gstreamer is a robust way to handle the RTP decoding. A Node.js NLP worker could orchestrate this.

// nlp-worker/index.js
const dgram = require('dgram');
const { spawn } = require('child_process');
const Redis = require('ioredis');

const port = parseInt(process.argv[2], 10);
const roomId = process.argv[3];
const userId = process.argv[4];

if (!port || !roomId || !userId) {
  console.error('Usage: node index.js <port> <roomId> <userId>');
  process.exit(1);
}

const redis = new Redis(); // Connects to localhost:6379 by default
const server = dgram.createSocket('udp4');

server.on('listening', () => {
  const address = server.address();
  console.log(`UDP Server listening on ${address.address}:${address.port}`);
});

server.bind(port);

// GStreamer pipeline to decode Opus RTP and output raw audio
// The pitfall here is getting the caps (capabilities) string exactly right.
// It must match the media codec from the Mediasoup router.
const gstPipeline = [
  'udpsrc',
  `port=${port}`,
  '!','application/x-rtp,media=audio,clock-rate=48000,encoding-name=OPUS,payload=111',
  '!','rtpopusdepay',
  '!','opusdec',
  '!','audioconvert',
  '!','audioresample',
  '!','filesink',
  'location=/dev/stdout' // Pipe raw audio to stdout
].join(' ');

const gstreamer = spawn('gst-launch-1.0', gstPipeline.split(' '));

gstreamer.stderr.on('data', (data) => {
  console.error(`[gstreamer-err]: ${data}`);
});

// This is a placeholder for a real NLP engine.
// In a real project, you'd pipe gstreamer.stdout to a library like 'vosk'.
// For this example, we simulate receiving text fragments.
setInterval(() => {
    const transcript = {
        userId: userId,
        text: `This is a simulated partial transcript at ${new Date().toISOString()}`,
        isPartial: true
    };
    
    // Publish the result to a Redis channel specific to the room
    const channel = `transcriptions:${roomId}`;
    redis.publish(channel, JSON.stringify(transcript));
    console.log(`Published to ${channel}: ${transcript.text}`);
}, 2000);


process.on('SIGINT', () => {
  console.log('Shutting down NLP worker...');
  gstreamer.kill();
  server.close();
  redis.quit();
  process.exit(0);
});

The NLP worker is completely decoupled. It receives its configuration (port, room, user) on startup. It uses gstreamer to handle the complex task of RTP de-packetization and Opus decoding, piping raw audio to its standard output. A real implementation would consume this stream. The worker then publishes the result to a Redis channel. This use of a message broker is crucial for scalability.

Delivering Transcriptions with GraphQL Subscriptions

The final piece is the GraphQL subscription resolver. It needs to listen to the Redis channel and push data to connected clients. We’ll use the graphql-redis-subscriptions library.

// src/graphql/resolvers.ts (continued)
import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';

const pubsub = new RedisPubSub({
  publisher: new Redis(),
  subscriber: new Redis(),
});

const TRANSCRIPTION_CHANNEL = (roomId: string) => `transcriptions:${roomId}`;

const resolvers = {
  // ... Queries and Mutations
  Subscription: {
    transcriptionUpdated: {
      subscribe: (_: any, { roomId }, context) => {
        if (!context.user) throw new Error('Authentication required');
        // Add authorization logic here: can this user subscribe to this room?
        
        return pubsub.asyncIterator(TRANSCRIPTION_CHANNEL(roomId));
      }
    }
  }
};

The implementation is surprisingly simple because the library handles the complexities of the Redis subscription and the WebSocket connection. When the NLP worker publishes a message, the pubsub engine picks it up, and Apollo Server pushes it through the active WebSocket connection to the correct client.

The biggest challenge with this architecture remains latency. Each hop adds milliseconds: client to SFU, SFU to NLP worker’s network stack, RTP decoding, NLP inference, publishing to Redis, and finally pushing through the WebSocket. Minimizing this requires running the NLP workers on the same low-latency network as the SFU and using highly optimized NLP models.

Another lingering issue is resource management. The current implementation naively assigns an incrementing port. A production system needs a proper NLP worker manager that allocates/deallocates workers from a pool, monitors their health, and handles failures gracefully. Speaker diarization—identifying who spoke which words—is also not addressed. This would require routing each participant’s audio stream to a separate NLP process and then intelligently merging the resulting transcripts, a significantly more complex orchestration problem. This architecture, however, provides a robust and scalable foundation upon which such features can be built.


  TOC