Implementing Dynamic JWT-Driven Row and Column Security for a GraphQL Data Lake API


The central problem was an analytics API spiraling out of control. Our multi-tenant platform dumps billions of events into a central data lake built on S3 with Apache Iceberg tables. Initially, the business intelligence team was the only consumer, using Trino directly. Then came the demand for customer-facing dashboards. The first iteration involved bespoke REST endpoints for each chart. GET /api/v1/reports/tenant/{tenantId}/sales-summary. This quickly became a maintenance nightmare. Every new filter, every new data point required a new endpoint or polluting an existing one with query parameters.

GraphQL was the logical next step for its flexibility. A single endpoint could serve countless variations of data requests. The technical hurdle, however, was security. In a shared data lake, a single mistake in a GraphQL resolver could expose Tenant A’s data to Tenant B. A simple WHERE tenant_id = ? check at the top level wasn’t sufficient. Our product required fine-grained permissions: certain user roles could only see specific columns (e.g., a ‘Sales Rep’ role can’t see profit_margin), and some roles had further row-level restrictions (e.g., a ‘Regional Manager’ can only see data for their assigned region). Hardcoding this logic in resolvers was a non-starter; it would create an unmaintainable mess of if/else statements.

Our concept was to decentralize the permission definition while centralizing enforcement. We decided to embed a user’s data access policy directly into their JSON Web Token (JWT) during the authentication flow. This policy would be a declarative JSON object specifying exactly what they are allowed to see. The GraphQL layer’s sole responsibility would then be to parse this policy and dynamically construct a provably secure SQL query to execute against our Trino data lake. This approach turns the API layer into a sophisticated, policy-aware query translator.

The Anatomy of a Policy-Enhanced JWT

A standard JWT contains claims like sub (subject), iat (issued at), and exp (expiration). We extended this with a custom, namespaced claim block: https://myapp.com/data_access_policy. Storing the full policy here means the GraphQL API becomes stateless regarding permissions. It doesn’t need to call an external authorization service on every request, which simplifies the architecture significantly.

Here’s a sample decoded payload for a ‘Regional Sales Manager’ in ‘Tenant-123’ who can only view US East data and is restricted from seeing financial metrics.

{
  "iss": "https://auth.myapp.com/",
  "sub": "auth0|user-abc-456",
  "aud": "https://api.myapp.com",
  "iat": 1678886400,
  "exp": 1678890000,
  "scope": "read:analytics",
  "https://myapp.com/data_access_policy": {
    "version": 1,
    "tenant_id": "tenant-123",
    "tables": {
      "sales_events": {
        "readable_columns": [
          "event_id",
          "timestamp",
          "product_id",
          "product_name",
          "quantity",
          "sale_price",
          "customer_id",
          "region"
        ],
        "row_filter_predicate": "region = 'US-EAST'"
      },
      "inventory_levels": {
        "readable_columns": ["product_id", "warehouse_id", "stock_count"],
        "row_filter_predicate": null
      }
    }
  }
}

The pitfall here is token size. If a user has access to hundreds of tables with complex rules, the JWT could become bloated. For our use case, roles are well-defined and typically grant access to a dozen or so core tables, keeping the token size manageable. A project with more granular permissions might need to fetch policies from a cache or service, trading network latency for smaller tokens.

The code to generate such a token on the authentication service side is straightforward.

// Auth Service Snippet - NOT part of the GraphQL API
// This would run after a user successfully logs in.
const jwt = require('jsonwebtoken');

const PRIVATE_KEY = process.env.JWT_PRIVATE_KEY;

function issuePolicyEnhancedToken(user, permissions) {
  const payload = {
    iss: 'https://auth.myapp.com/',
    sub: user.id,
    aud: 'https://api.myapp.com',
    scope: 'read:analytics',
    'https://myapp.com/data_access_policy': {
      version: 1,
      tenant_id: user.tenantId,
      tables: permissions // This would be fetched from a user/role permission database
    }
  };

  const options = {
    algorithm: 'RS256',
    expiresIn: '1h'
  };

  return jwt.sign(payload, PRIVATE_KEY, options);
}

// Example usage:
const userProfile = { id: 'auth0|user-abc-456', tenantId: 'tenant-123' };
const userPermissions = {
  sales_events: {
    readable_columns: ['event_id', 'timestamp', 'product_id', 'product_name', 'quantity', 'sale_price', 'customer_id', 'region'],
    row_filter_predicate: "region = 'US-EAST'"
  },
  inventory_levels: {
    readable_columns: ['product_id', 'warehouse_id', 'stock_count'],
    row_filter_predicate: null
  }
};

const token = issuePolicyEnhancedToken(userProfile, userPermissions);
console.log(token);

GraphQL Schema and the Resolver Layer

Our GraphQL schema defines the available queries. For this example, we’ll focus on a single query for sales events.

# schema.graphql
type SalesEvent {
  event_id: ID!
  timestamp: String!
  product_id: String!
  product_name: String
  quantity: Int!
  sale_price: Float
  customer_id: String
  region: String
}

type Query {
  salesEvents(limit: Int = 100, offset: Int = 0): [SalesEvent]
}

The core of the implementation lies in the resolver for salesEvents. A naive implementation would be dangerously insecure. The correct approach requires a secure query builder that acts as an intermediary between the GraphQL request and the Trino database client.

Our Apollo Server setup extracts the validated JWT payload and places it in the context object, making it available to all resolvers.

// server.js - Apollo Server setup
const { ApolloServer } = require('@apollo/server');
const { expressMiddleware } = require('@apollo/server/express4');
const { ApolloServerPluginDrainHttpServer } = require('@apollo/server/plugin/drainHttpServer');
const express = require('express');
const http = require('http');
const cors = require('cors');
const { authMiddleware } = require('./auth'); // Assume this middleware validates the JWT and attaches payload to req.auth

// ... typeDefs and resolvers imports

async function startServer() {
  const app = express();
  const httpServer = http.createServer(app);

  const server = new ApolloServer({
    typeDefs,
    resolvers,
    plugins: [ApolloServerPluginDrainHttpServer({ httpServer })],
  });

  await server.start();

  app.use(
    '/graphql',
    cors(),
    express.json(),
    authMiddleware, // Validates JWT and attaches payload to req.auth
    expressMiddleware(server, {
      context: async ({ req }) => {
        // A common mistake is to fail open. If auth is missing, throw an error.
        if (!req.auth) {
          // In a real project, this would be a more specific GraphQL error.
          throw new Error('User not authenticated');
        }
        return { user: req.auth };
      },
    }),
  );

  await new Promise((resolve) => httpServer.listen({ port: 4000 }, resolve));
  console.log(`🚀 Server ready at http://localhost:4000/graphql`);
}

The Secure SQL Query Builder

This module is the security-critical heart of the system. It has one job: given the GraphQL query’s requested fields and the user’s data access policy from the JWT, construct a valid and secure Trino SQL query. It must never fail open.

// secure-query-builder.js

const { GraphQLError } = require('graphql');

const POLICY_CLAIM = 'https://myapp.com/data_access_policy';

class SecureQueryBuilder {
  /**
   * @param {object} userContext - The decoded JWT payload from the context.
   * @param {string} tableName - The target table for the query.
   * @param {object} graphqlInfo - The GraphQL resolve info object.
   */
  constructor(userContext, tableName, graphqlInfo) {
    if (!userContext || !userContext[POLICY_CLAIM]) {
      throw new GraphQLError('Access policy not found in user context.', {
        extensions: { code: 'FORBIDDEN' },
      });
    }

    this.policy = userContext[POLICY_CLAIM];
    this.tenantId = this.policy.tenant_id;
    this.tablePolicy = this.policy.tables ? this.policy.tables[tableName] : undefined;
    this.requestedFields = this.getRequestedFields(graphqlInfo);
    this.tableName = tableName;
    
    // In a real-world project, catalog and schema should come from config.
    this.catalog = 'iceberg';
    this.schema = 'analytics_db';
  }

  /**
   * Extracts the top-level fields requested in the GraphQL query.
   * This is a simplified implementation. A production version would need to handle fragments and aliases.
   */
  getRequestedFields(info) {
    const fieldNodes = info.fieldNodes[0].selectionSet.selections;
    return fieldNodes.map(selection => selection.name.value);
  }

  /**
   * Builds the final, secure SQL query string.
   * @param {{limit: number, offset: number}} options
   * @returns {string} The SQL query to be executed.
   */
  buildQuery({ limit = 100, offset = 0 } = {}) {
    // 1. Check if the user has any policy for the requested table.
    if (!this.tablePolicy) {
      throw new GraphQLError(`Access denied to table: ${this.tableName}`, {
        extensions: { code: 'FORBIDDEN' },
      });
    }

    // 2. Column-level security: Intersect requested fields with allowed columns.
    const allowedColumns = new Set(this.tablePolicy.readable_columns);
    const selectedColumns = this.requestedFields.filter(field => {
      if (field === '__typename') return false; // Ignore introspection fields
      return allowedColumns.has(field);
    });

    // Check for unauthorized column requests. This is a critical security check.
    const unauthorizedColumns = this.requestedFields.filter(field => field !== '__typename' && !allowedColumns.has(field));
    if (unauthorizedColumns.length > 0) {
      throw new GraphQLError(`Access denied to columns: ${unauthorizedColumns.join(', ')}`, {
        extensions: { code: 'FORBIDDEN' },
      });
    }

    // If no valid columns are selected, there's no query to run.
    if (selectedColumns.length === 0) {
      // It's better to return an empty result than an error if the user legitimately
      // requested only fields they can't see. Or, error out. This is a design choice.
      // Here, we choose to error.
      throw new GraphQLError('Query contains no accessible fields.', {
        extensions: { code: 'BAD_REQUEST' },
      });
    }
    const selectClause = selectedColumns.map(c => `"${c}"`).join(', ');

    // 3. Row-level security: Build the WHERE clause.
    // The tenant_id filter is NON-NEGOTIABLE.
    let whereClauses = [`tenant_id = '${this.tenantId}'`]; // IMPORTANT: Assume tenant_id is sanitized and not user-input.
                                                          // A better approach would use prepared statements if the DB driver supports it.
                                                          // Trino's node client doesn't support parameterized queries in the same way as, e.g., node-postgres.
                                                          // Input validation is key.

    if (this.tablePolicy.row_filter_predicate) {
      whereClauses.push(`(${this.tablePolicy.row_filter_predicate})`);
    }
    const whereClause = `WHERE ${whereClauses.join(' AND ')}`;

    // 4. Assemble the final query.
    // Ensure limit and offset are integers to prevent injection.
    const sanitizedLimit = parseInt(limit, 10);
    const sanitizedOffset = parseInt(offset, 10);
    if (isNaN(sanitizedLimit) || isNaN(sanitizedOffset)) {
        throw new GraphQLError('Invalid limit or offset.', { extensions: { code: 'BAD_REQUEST' } });
    }

    const query = `
      SELECT ${selectClause}
      FROM ${this.catalog}.${this.schema}."${this.tableName}"
      ${whereClause}
      ORDER BY "timestamp" DESC
      LIMIT ${sanitizedLimit}
      OFFSET ${sanitizedOffset}
    `;

    return query;
  }
}

module.exports = { SecureQueryBuilder };

The key security controls are:

  1. Deny by Default: If no policy exists for a table, access is forbidden.
  2. Column Intersection: We explicitly check if every requested field is present in the readable_columns array. Any deviation results in a hard failure. This prevents data leakage through newly added schema fields that haven’t been assigned to policies yet.
  3. Mandatory Tenancy Filter: The tenant_id from the JWT is always included in the WHERE clause, forming the primary data isolation boundary.
  4. Additional Predicates: The row_filter_predicate is appended with an AND, further constraining the result set. A critical assumption here is that the predicate stored in the auth database is safe and not constructed from raw user input.

Integrating the Builder into the Resolver

The resolver now becomes very thin. Its job is simply to instantiate the SecureQueryBuilder, generate the SQL, execute it, and return the data.

// resolvers.js
const { TrinoClient } = require('./trino-client'); // A wrapper for the Trino client library
const { SecureQueryBuilder } = require('./secure-query-builder');
const { GraphQLError } = require('graphql');
const logger = require('./logger'); // Assume a standard logger like Winston

const trino = new TrinoClient({
  server: process.env.TRINO_SERVER || 'http://localhost:8080',
  user: 'graphql-api',
  catalog: 'iceberg',
  schema: 'analytics_db',
});

const resolvers = {
  Query: {
    salesEvents: async (_, { limit, offset }, context, info) => {
      const tableName = 'sales_events'; // In a larger app, this might be derived from the query name.
      try {
        const queryBuilder = new SecureQueryBuilder(context.user, tableName, info);
        const sql = queryBuilder.buildQuery({ limit, offset });
        
        logger.info({ message: 'Executing secure Trino query', sql: sql, tenant: context.user['https://myapp.com/data_access_policy'].tenant_id });

        const results = await trino.query(sql);
        return results;

      } catch (error) {
        // Log the internal error details for debugging.
        logger.error({ 
          message: 'Failed to resolve salesEvents', 
          error: error.message,
          stack: error.stack,
          isGqlError: error instanceof GraphQLError
        });

        // If it's already a GraphQLError (like our permission errors), re-throw it.
        if (error instanceof GraphQLError) {
          throw error;
        }

        // For unexpected errors (e.g., Trino connection failure), throw a generic error to the client.
        throw new GraphQLError('An internal error occurred while fetching data.', {
          extensions: { code: 'INTERNAL_SERVER_ERROR' },
        });
      }
    },
  },
};

module.exports = { resolvers };

The logging here is critical. We log the generated SQL for every request, which is invaluable for auditing and debugging. If a data leak were ever suspected, we would have a precise record of every query executed by the API layer.

Visualizing the Architecture Flow

The entire request lifecycle can be visualized as a clear, security-gated pipeline.

sequenceDiagram
    participant Client
    participant GraphQL_API as GraphQL API (Apollo)
    participant Auth as Auth Middleware
    participant QB as Secure Query Builder
    participant Trino
    participant DataLake as Data Lake (S3/Iceberg)

    Client->>GraphQL_API: POST /graphql (Query + JWT in Header)
    GraphQL_API->>Auth: Validate JWT
    Auth-->>GraphQL_API: Attach decoded payload to context
    
    GraphQL_API->>QB: Instantiate(context, table, gqlInfo)
    QB->>QB: 1. Validate policy existence
    QB->>QB: 2. Intersect requested fields with allowed columns
    QB->>QB: 3. Construct WHERE clause from tenant_id and predicate
    QB-->>GraphQL_API: Return secure SQL string
    
    GraphQL_API->>Trino: Execute SQL query
    Trino->>DataLake: Read Parquet files based on query plan
    DataLake-->>Trino: Return data rows
    Trino-->>GraphQL_API: Return result set
    
    GraphQL_API->>Client: Return JSON response

Testing the Security Logic

In a real-world project, the SecureQueryBuilder is the most critical component to unit test. We don’t need to spin up a database; we just need to verify that it produces the correct SQL strings or throws the correct errors based on different inputs.

// secure-query-builder.test.js
const { SecureQueryBuilder } = require('./secure-query-builder');
const { GraphQLError } = require('graphql');

// Mock GraphQL info object
const createMockInfo = (fields) => ({
  fieldNodes: [{
    selectionSet: {
      selections: fields.map(f => ({ name: { value: f } }))
    }
  }]
});

// Mock user context with policy
const mockUserContext = {
  'https://myapp.com/data_access_policy': {
    tenant_id: 'tenant-123',
    tables: {
      'sales_events': {
        readable_columns: ['event_id', 'product_id', 'quantity', 'region', 'timestamp'],
        row_filter_predicate: "region = 'US-EAST'"
      }
    }
  }
};

describe('SecureQueryBuilder', () => {

  it('should build a correct query for allowed fields', () => {
    const info = createMockInfo(['event_id', 'quantity', 'region', 'timestamp']);
    const builder = new SecureQueryBuilder(mockUserContext, 'sales_events', info);
    const sql = builder.buildQuery({ limit: 50, offset: 10 });

    expect(sql).toContain('SELECT "event_id", "quantity", "region", "timestamp"');
    expect(sql).toContain("FROM iceberg.analytics_db.\"sales_events\"");
    expect(sql).toContain("WHERE tenant_id = 'tenant-123' AND (region = 'US-EAST')");
    expect(sql).toContain('ORDER BY "timestamp" DESC');
    expect(sql).toContain('LIMIT 50');
    expect(sql).toContain('OFFSET 10');
  });

  it('should throw GraphQLError for forbidden columns', () => {
    // User requests 'product_id' (allowed) and 'sale_price' (forbidden)
    const info = createMockInfo(['product_id', 'sale_price']);
    const builder = new SecureQueryBuilder(mockUserContext, 'sales_events', info);
    
    expect(() => builder.buildQuery()).toThrow(GraphQLError);
    try {
      builder.buildQuery();
    } catch (e) {
      expect(e.message).toContain('Access denied to columns: sale_price');
      expect(e.extensions.code).toBe('FORBIDDEN');
    }
  });

  it('should throw if user has no policy for the requested table', () => {
    const info = createMockInfo(['some_field']);
    // Requesting 'inventory_levels' which is not in the mock policy
    const builder = new SecureQueryBuilder(mockUserContext, 'inventory_levels', info);

    expect(() => builder.buildQuery()).toThrow('Access denied to table: inventory_levels');
  });

  it('should throw if the JWT policy claim is missing', () => {
    const info = createMockInfo(['event_id']);
    const emptyContext = { user: { sub: 'user123' } }; // No policy claim
    
    expect(() => new SecureQueryBuilder(emptyContext, 'sales_events', info)).toThrow('Access policy not found in user context.');
  });
});

These tests provide a safety net, ensuring that refactoring or adding new features doesn’t inadvertently introduce a security vulnerability.

The current implementation provides a robust foundation. However, its primary limitation is the static nature of the row_filter_predicate. It’s just a string. A more advanced system might use a structured query language like JSONata or define a simple JSON-based rule engine to construct more complex WHERE clauses safely. Another consideration is performance; for extremely complex policies, the overhead of building the query string on every request could become noticeable. In such a scenario, one might explore caching the generated SQL templates based on the user’s role and the requested fields. Finally, this pattern doesn’t address query cost analysis; a malicious actor with valid credentials could still issue an extremely complex GraphQL query that translates to a very expensive Trino query. Implementing query depth limits and complexity scoring at the GraphQL layer is a necessary next step for a fully hardened production system.


  TOC