Implementing an Asynchronous Control Plane for Apache Spark Using a RESTful API and a Test-Driven React UI


The initial request was straightforward: allow data scientists to run their ad-hoc Spark jobs without direct SSH access to the cluster nodes. The existing workflow involved manual spark-submit commands, which was not only a security concern but also prone to human error, impossible to audit, and a bottleneck for non-engineering teams. The immediate, naive solution proposed was a simple web form that would trigger spark-submit synchronously via a REST endpoint. This approach would have failed spectacularly. Any Spark job lasting longer than a standard HTTP timeout—which is most of them—would break the connection, leaving the user with no feedback and the backend with an orphaned process.

This realization shifted the problem from building a simple web frontend to designing a resilient, asynchronous control plane. The core challenge became decoupling the long-running Spark job execution from the stateless, short-lived nature of HTTP requests. The architecture required a cross-stack solution involving a RESTful API to manage job lifecycle, Apache Spark for the computation itself, and a reactive, test-driven frontend to provide reliable feedback to the user.

Our final architecture looked like this:

sequenceDiagram
    participant User
    participant ReactUI as React UI (Browser)
    participant API as Control Plane API (Flask)
    participant Spark as Apache Spark Cluster

    User->>ReactUI: Fills form & clicks "Submit Job"
    ReactUI->>+API: POST /api/v1/jobs (job params)
    API-->>-ReactUI: 202 Accepted ({"job_id": "xyz-123"})
    Note right of API: Spawns spark-submit 
as a background process.
Stores job metadata. API->>Spark: spark-submit my_job.py ... ReactUI->>ReactUI: Enters polling state for "xyz-123" loop Poll for Status ReactUI->>+API: GET /api/v1/jobs/xyz-123 API-->>-ReactUI: 200 OK ({"status": "RUNNING"}) ReactUI->>User: Update UI: "Job is Running..." ReactUI->>ReactUI: Wait 5 seconds end ReactUI->>+API: GET /api/v1/jobs/xyz-123 Note right of API: Checks process status,
reads output logs. API-->>-ReactUI: 200 OK ({"status": "SUCCEEDED", "results_path": "/path/to/output"}) ReactUI->>User: Update UI: "Job Succeeded!"

This post-mortem details the implementation of this system, focusing on the pragmatic choices and pitfalls encountered at each layer: the asynchronous API, the Spark job itself, the Babel-transpiled React frontend, and critically, the testing strategy using React Testing Library to guarantee the UI’s robustness in handling these asynchronous states.

The Asynchronous API: The System’s Core

The entire architecture hinges on the API’s ability to handle long-running tasks without blocking. We chose Python with Flask for its simplicity in prototyping, but the principles apply to any backend framework. The key was to treat job submission as a two-phase process: acceptance and status inquiry.

A common mistake in such systems is to use a complex message queue like RabbitMQ or Celery from day one. While that’s the correct production-grade solution, for our initial version, a managed subprocess approach was sufficient to validate the core logic, provided we carefully handled process state.

Here is the core of the Flask application. It’s not production-ready—it lacks a persistent database for job tracking and proper authentication—but it demonstrates the fundamental asynchronous pattern.

api_server.py

import uuid
import subprocess
import os
import threading
from flask import Flask, request, jsonify
from flask_cors import CORS
import logging
from werkzeug.exceptions import NotFound

# Basic logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

app = Flask(__name__)
CORS(app) # For local development between React and Flask servers

# In-memory storage. In production, this MUST be a persistent database (e.g., Redis, PostgreSQL).
jobs = {}

# A lock for thread-safe access to the 'jobs' dictionary
jobs_lock = threading.Lock()

# Define paths relative to the script location
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
SPARK_APP_PATH = os.path.join(BASE_DIR, "spark_job", "word_count.py")
INPUT_DATA_PATH = os.path.join(BASE_DIR, "data", "input.txt")
OUTPUT_DIR = os.path.join(BASE_DIR, "data", "output")

if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

def run_spark_job(job_id, input_path, output_path):
    """
    Executes the spark-submit command in a non-blocking way.
    Updates the job status in the shared 'jobs' dictionary.
    """
    command = [
        "spark-submit",
        "--master", "local[*]",  # For local testing; change for a real cluster
        SPARK_APP_PATH,
        input_path,
        output_path
    ]
    
    log_file_path = os.path.join(OUTPUT_DIR, f"{job_id}.log")

    try:
        with open(log_file_path, 'w') as log_file:
            logging.info(f"Starting Spark job {job_id}. Command: {' '.join(command)}")
            process = subprocess.Popen(command, stdout=log_file, stderr=subprocess.STDOUT)
        
        with jobs_lock:
            jobs[job_id]['process'] = process
            jobs[job_id]['status'] = 'RUNNING'
            jobs[job_id]['log_path'] = log_file_path

        # This will block this thread until the process completes, but the HTTP request has already returned.
        process.wait()
        
        # After completion, update final status
        with jobs_lock:
            return_code = process.returncode
            if return_code == 0:
                jobs[job_id]['status'] = 'SUCCEEDED'
                logging.info(f"Spark job {job_id} succeeded.")
            else:
                jobs[job_id]['status'] = 'FAILED'
                logging.error(f"Spark job {job_id} failed with return code {return_code}.")

    except Exception as e:
        with jobs_lock:
            jobs[job_id]['status'] = 'FAILED'
            jobs[job_id]['error_message'] = str(e)
        logging.error(f"Exception while running job {job_id}: {e}")


@app.route('/api/v1/jobs', methods=['POST'])
def submit_job():
    """
    Accepts a job request, starts it in the background, and returns a job ID immediately.
    """
    job_id = str(uuid.uuid4())
    output_path = os.path.join(OUTPUT_DIR, job_id)

    job_metadata = {
        'job_id': job_id,
        'status': 'PENDING',
        'input_path': INPUT_DATA_PATH,
        'output_path': output_path,
        'process': None # To be populated by the thread
    }
    
    with jobs_lock:
        jobs[job_id] = job_metadata
    
    # Run the Spark job in a separate thread to avoid blocking the main Flask thread
    thread = threading.Thread(target=run_spark_job, args=(job_id, INPUT_DATA_PATH, output_path))
    thread.daemon = True # Allows main program to exit even if threads are running
    thread.start()

    logging.info(f"Submitted job {job_id}. API responded to client.")
    
    # HTTP 202 Accepted: The request has been accepted for processing, but the processing has not been completed.
    return jsonify({'job_id': job_id}), 202


@app.route('/api/v1/jobs/<job_id>', methods=['GET'])
def get_job_status(job_id):
    """
    Returns the current status of a submitted job.
    """
    with jobs_lock:
        job = jobs.get(job_id)

    if not job:
        raise NotFound(f"Job with ID {job_id} not found.")

    response = {
        'job_id': job['job_id'],
        'status': job['status']
    }
    
    if job['status'] == 'FAILED' and 'error_message' in job:
        response['error_message'] = job['error_message']
    
    if job['status'] in ['SUCCEEDED', 'FAILED'] and job.get('log_path'):
        try:
            with open(job['log_path'], 'r') as f:
                # Return last 100 lines of the log for brevity
                log_lines = f.readlines()
                response['log_preview'] = ''.join(log_lines[-100:])
        except IOError as e:
            logging.warning(f"Could not read log file for job {job_id}: {e}")
            response['log_preview'] = "Log file not available."

    return jsonify(response)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5001, debug=True)

The critical design choices here are:

  1. Immediate Response: The POST /api/v1/jobs endpoint returns a 202 Accepted status with a unique job_id almost instantly. It offloads the actual spark-submit execution to a background thread.
  2. State Management: A simple in-memory dictionary tracks job states. A threading.Lock is crucial to prevent race conditions when multiple threads access this shared state. In a real-world project, this would be replaced by a database or a Redis cache for persistence and scalability.
  3. Status Endpoint: GET /api/v1/jobs/{jobId} acts as the polling target. It reads the current state from the shared dictionary and returns it. This is a stateless operation from the API’s perspective.
  4. Logging: We redirect stdout and stderr from the Spark process to a log file. The status endpoint provides a preview of this log, which is invaluable for debugging failed jobs without giving the user direct server access.

The Spark Application

The Spark job itself needs to be robust but can be relatively simple for this control plane demonstration. It must read from an input source and write to a destination so we can verify its success. The following PySpark script calculates word counts from a text file.

spark_job/word_count.py

import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, col
import time

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: word_count.py <input_path> <output_path>", file=sys.stderr)
        sys.exit(-1)

    input_path = sys.argv[1]
    output_path = sys.argv[2]

    spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()

    # Simulate a longer-running job
    print("Starting word count job...")
    time.sleep(10) # A fake delay to make the RUNNING state visible in the UI

    try:
        lines = spark.read.text(input_path).select(lower(col("value")).alias("line"))
        
        # The core transformation logic
        words = lines.select(explode(split(col("line"), "\\s+")).alias("word"))
        word_counts = words.groupBy("word").count().orderBy(col("count").desc())
        
        # A check for empty words resulting from multiple spaces
        word_counts = word_counts.filter(col("word") != "")

        print(f"Job processing complete. Writing results to {output_path}")
        
        # Writing the output as a single CSV file for easy inspection
        word_counts.coalesce(1).write.mode("overwrite").csv(output_path, header=True)
        
        print("Job finished successfully.")

    except Exception as e:
        print(f"An error occurred during Spark job execution: {e}", file=sys.stderr)
        spark.stop()
        sys.exit(1)

    spark.stop()

This script is standard PySpark, but the time.sleep(10) is a deliberate, pragmatic addition for development. It ensures the job runs long enough for the UI’s polling mechanism to be visibly active, making it easier to test and debug the frontend state transitions.

The Frontend: React UI, Babel, and Testing

The frontend’s primary responsibility is to manage the user interaction and reflect the asynchronous job state accurately. We used Create React App, which configures Babel and other build tools out of the box. Babel’s role is crucial but often invisible; it transpiles modern JavaScript (ES6+ and JSX) into browser-compatible code.

Our babel.config.js is standard for a React project, but it’s important to recognize its function:

// babel.config.js
module.exports = {
  presets: [
    '@babel/preset-env', // For transpiling ES6+ down to ES5
    ['@babel/preset-react', { runtime: 'automatic' }] // For transpiling JSX
  ]
};

The core of the UI is the component that handles the form submission and status polling.

src/JobControl.js

import React, { useState, useEffect, useCallback } from 'react';

const API_BASE_URL = 'http://localhost:5001';

// Represents the possible states of our UI component
const JOB_STATE = {
  IDLE: 'IDLE',
  SUBMITTING: 'SUBMITTING',
  POLLING: 'POLLING',
  SUCCEEDED: 'SUCCEEDED',
  FAILED: 'FAILED',
};

function JobControl() {
  const [jobId, setJobId] = useState(null);
  const [jobStatus, setJobStatus] = useState(null);
  const [logPreview, setLogPreview] = useState('');
  const [uiState, setUiState] = useState(JOB_STATE.IDLE);
  const [error, setError] = useState('');

  const pollStatus = useCallback(async (currentJobId) => {
    try {
      const response = await fetch(`${API_BASE_URL}/api/v1/jobs/${currentJobId}`);
      if (!response.ok) {
        throw new Error(`API Error: ${response.statusText}`);
      }
      const data = await response.json();
      setJobStatus(data.status);
      setLogPreview(data.log_preview || '');

      if (data.status === 'SUCCEEDED') {
        setUiState(JOB_STATE.SUCCEEDED);
      } else if (data.status === 'FAILED') {
        setUiState(JOB_STATE.FAILED);
        setError(data.error_message || 'The job failed. Check logs for details.');
      }
      // If still RUNNING or PENDING, the useEffect will trigger another poll
    } catch (err) {
      console.error('Polling failed:', err);
      setUiState(JOB_STATE.FAILED);
      setError('Failed to fetch job status. Check network connection or server status.');
    }
  }, []);

  // Effect for polling
  useEffect(() => {
    if (uiState === JOB_STATE.POLLING && jobId) {
      const intervalId = setInterval(() => {
        pollStatus(jobId);
      }, 3000); // Poll every 3 seconds

      return () => clearInterval(intervalId); // Cleanup on component unmount or state change
    }
  }, [uiState, jobId, pollStatus]);

  const handleSubmit = async (event) => {
    event.preventDefault();
    setUiState(JOB_STATE.SUBMITTING);
    setError('');
    setJobId(null);
    setJobStatus(null);
    setLogPreview('');

    try {
      const response = await fetch(`${API_BASE_URL}/api/v1/jobs`, { method: 'POST' });
      if (!response.ok) {
        throw new Error('Failed to submit job.');
      }
      const data = await response.json();
      setJobId(data.job_id);
      setUiState(JOB_STATE.POLLING); // Kick off the polling process
    } catch (err) {
      setUiState(JOB_STATE.FAILED);
      setError(err.message);
    }
  };

  const handleReset = () => {
    setUiState(JOB_STATE.IDLE);
    setJobId(null);
    setJobStatus(null);
    setError('');
    setLogPreview('');
  };

  const isButtonDisabled = uiState === JOB_STATE.SUBMITTING || uiState === JOB_STATE.POLLING;

  return (
    <div>
      <h1>Spark Job Control Plane</h1>
      {uiState === JOB_STATE.IDLE && (
        <form onSubmit={handleSubmit}>
          <button type="submit">Submit New Word Count Job</button>
        </form>
      )}

      {uiState !== JOB_STATE.IDLE && (
        <div>
          <h2>Job Status</h2>
          <p>Job ID: {jobId || 'N/A'}</p>
          <p data-testid="job-status">Status: {jobStatus || 'Submitting...'}</p>

          {uiState === JOB_STATE.SUBMITTING && <p>Submitting job to the cluster...</p>}
          {uiState === JOB_STATE.POLLING && <p>Job is running. Polling for updates...</p>}
          {uiState === JOB_STATE.SUCCEEDED && <p style={{ color: 'green' }}>Job completed successfully!</p>}
          {uiState === JOB_STATE.FAILED && <p style={{ color: 'red' }}>Error: {error}</p>}
          
          {logPreview && (
            <pre style={{ border: '1px solid #ccc', padding: '10px', background: '#f5f5f5', whiteSpace: 'pre-wrap' }}>
              <code>{logPreview}</code>
            </pre>
          )}

          {!isButtonDisabled && <button onClick={handleReset}>Run Another Job</button>}
        </div>
      )}
    </div>
  );
}

export default JobControl;

This component is more complex than it appears because it’s a state machine. It transitions between IDLE, SUBMITTING, POLLING, SUCCEEDED, and FAILED. Managing these states correctly is where bugs often hide. This is why testing is not optional; it’s a necessity.

Testing with React Testing Library

React Testing Library (RTL) forces you to test components the way a user interacts with them. We don’t test implementation details (like the internal uiState variable). We test that when a user clicks a button, the correct text eventually appears on the screen. This makes tests less brittle to refactoring.

For our asynchronous component, we need to mock the fetch API and control its responses over time to simulate the polling lifecycle. We use Jest’s mocking capabilities for this.

src/JobControl.test.js

import React from 'react';
import { render, screen, fireEvent, waitFor } from '@testing-library/react';
import '@testing-library/jest-dom';
import JobControl from './JobControl';

// Mock the global fetch API
global.fetch = jest.fn();

// Utility to reset mocks before each test
beforeEach(() => {
  fetch.mockClear();
});

test('submits a job, polls for status, and displays success', async () => {
  // --- Stage 1: Initial Render & Submission ---
  // Mock the initial POST request to submit the job
  fetch.mockResolvedValueOnce({
    ok: true,
    json: async () => ({ job_id: 'test-job-123' }),
  });

  render(<JobControl />);
  
  // User clicks the submit button
  fireEvent.click(screen.getByRole('button', { name: /Submit New Word Count Job/i }));

  // Verify the submission state appears
  expect(await screen.findByText(/Submitting job to the cluster.../i)).toBeInTheDocument();
  
  // Wait for the initial POST call to resolve and the UI to enter polling state
  await waitFor(() => {
    expect(screen.getByText(/Job is running. Polling for updates.../i)).toBeInTheDocument();
  });
  
  // Check that the POST request was made correctly
  expect(fetch).toHaveBeenCalledWith('http://localhost:5001/api/v1/jobs', { method: 'POST' });
  expect(fetch).toHaveBeenCalledTimes(1);


  // --- Stage 2: Polling - First status check (RUNNING) ---
  // Mock the GET request for polling, returning 'RUNNING' status
  fetch.mockResolvedValueOnce({
    ok: true,
    json: async () => ({ status: 'RUNNING', log_preview: 'Job has started...' }),
  });

  // RTL's `waitFor` combined with `jest.useFakeTimers` would be more robust,
  // but for this example we'll rely on `waitFor` detecting the DOM change after the poll.
  await waitFor(() => {
    // The status text should update to 'RUNNING'
    expect(screen.getByTestId('job-status')).toHaveTextContent('Status: RUNNING');
  });
  
  // Check that the first GET poll was made
  expect(fetch).toHaveBeenCalledWith('http://localhost:5001/api/v1/jobs/test-job-123');
  expect(fetch).toHaveBeenCalledTimes(2);


  // --- Stage 3: Polling - Second status check (SUCCEEDED) ---
  // Mock the final GET request, returning 'SUCCEEDED'
  fetch.mockResolvedValueOnce({
    ok: true,
    json: async () => ({ status: 'SUCCEEDED', log_preview: '...Job finished successfully.' }),
  });

  // Wait for the UI to reflect the final success state
  await waitFor(() => {
    expect(screen.getByText(/Job completed successfully!/i)).toBeInTheDocument();
  });
  
  // Final status text should be 'SUCCEEDED'
  expect(screen.getByTestId('job-status')).toHaveTextContent('Status: SUCCEEDED');
  // Check that the 'Run Another Job' button is now available
  expect(screen.getByRole('button', { name: /Run Another Job/i })).toBeInTheDocument();
  
  // Check that the final GET poll was made
  expect(fetch).toHaveBeenCalledTimes(3);
});


test('handles job submission failure', async () => {
  // Mock a failed POST request
  fetch.mockResolvedValueOnce({
    ok: false,
    statusText: 'Internal Server Error'
  });

  render(<JobControl />);
  
  fireEvent.click(screen.getByRole('button', { name: /Submit New Word Count Job/i }));

  // Wait for the error message to appear
  await waitFor(() => {
    expect(screen.getByText(/Error: Failed to submit job./i)).toBeInTheDocument();
  });
});

test('handles job failure during polling', async () => {
  // Mock successful submission
  fetch.mockResolvedValueOnce({
    ok: true,
    json: async () => ({ job_id: 'test-job-fail' }),
  });
  // Mock the poll response to be FAILED
  fetch.mockResolvedValueOnce({
    ok: true,
    json: async () => ({ status: 'FAILED', error_message: 'Spark context initialization failed.' }),
  });
  
  render(<JobControl />);
  fireEvent.click(screen.getByRole('button', { name: /Submit New Word Count Job/i }));
  
  // Wait for the failure message to be displayed
  await waitFor(() => {
    expect(screen.getByText(/Error: Spark context initialization failed./i)).toBeInTheDocument();
  });
  
  expect(screen.getByTestId('job-status')).toHaveTextContent('Status: FAILED');
});

This test suite validates the entire happy path and two critical failure modes. The use of async/await and RTL’s waitFor is essential for testing asynchronous behavior. It ensures that our assertions run only after the component has had time to fetch data and re-render.

Lingering Issues and Future Iterations

This implementation, while functional, is a proof-of-concept with several limitations that would need to be addressed in a production system. The most significant pitfall is the direct management of subprocess from the web server. This approach is brittle; if the API server crashes, it orphans the Spark processes and loses all job state. A robust solution would introduce a dedicated job queue (like Celery with Redis or RabbitMQ) to decouple the API from the job execution workers.

Furthermore, the API should not be directly invoking spark-submit. This presents a security risk (command injection) and tightly couples the API to the cluster’s command-line tools. A far better architecture would use a service like Apache Livy, which provides a REST API specifically for managing Spark jobs, abstracting away the underlying submission mechanism.

On the frontend, polling is inefficient. For a system with many active users, it would generate significant network traffic. A better approach would be to use WebSockets or Server-Sent Events (SSE) to allow the server to push status updates to the client in real-time, eliminating the need for constant polling. Finally, the in-memory job store is a single point of failure and would be replaced with a persistent database in any real-world scenario.


  TOC