The initial request was straightforward: allow data scientists to run their ad-hoc Spark jobs without direct SSH access to the cluster nodes. The existing workflow involved manual spark-submit
commands, which was not only a security concern but also prone to human error, impossible to audit, and a bottleneck for non-engineering teams. The immediate, naive solution proposed was a simple web form that would trigger spark-submit
synchronously via a REST endpoint. This approach would have failed spectacularly. Any Spark job lasting longer than a standard HTTP timeout—which is most of them—would break the connection, leaving the user with no feedback and the backend with an orphaned process.
This realization shifted the problem from building a simple web frontend to designing a resilient, asynchronous control plane. The core challenge became decoupling the long-running Spark job execution from the stateless, short-lived nature of HTTP requests. The architecture required a cross-stack solution involving a RESTful API to manage job lifecycle, Apache Spark for the computation itself, and a reactive, test-driven frontend to provide reliable feedback to the user.
Our final architecture looked like this:
sequenceDiagram participant User participant ReactUI as React UI (Browser) participant API as Control Plane API (Flask) participant Spark as Apache Spark Cluster User->>ReactUI: Fills form & clicks "Submit Job" ReactUI->>+API: POST /api/v1/jobs (job params) API-->>-ReactUI: 202 Accepted ({"job_id": "xyz-123"}) Note right of API: Spawns spark-submit
as a background process.
Stores job metadata. API->>Spark: spark-submit my_job.py ... ReactUI->>ReactUI: Enters polling state for "xyz-123" loop Poll for Status ReactUI->>+API: GET /api/v1/jobs/xyz-123 API-->>-ReactUI: 200 OK ({"status": "RUNNING"}) ReactUI->>User: Update UI: "Job is Running..." ReactUI->>ReactUI: Wait 5 seconds end ReactUI->>+API: GET /api/v1/jobs/xyz-123 Note right of API: Checks process status,
reads output logs. API-->>-ReactUI: 200 OK ({"status": "SUCCEEDED", "results_path": "/path/to/output"}) ReactUI->>User: Update UI: "Job Succeeded!"
This post-mortem details the implementation of this system, focusing on the pragmatic choices and pitfalls encountered at each layer: the asynchronous API, the Spark job itself, the Babel-transpiled React frontend, and critically, the testing strategy using React Testing Library to guarantee the UI’s robustness in handling these asynchronous states.
The Asynchronous API: The System’s Core
The entire architecture hinges on the API’s ability to handle long-running tasks without blocking. We chose Python with Flask for its simplicity in prototyping, but the principles apply to any backend framework. The key was to treat job submission as a two-phase process: acceptance and status inquiry.
A common mistake in such systems is to use a complex message queue like RabbitMQ or Celery from day one. While that’s the correct production-grade solution, for our initial version, a managed subprocess
approach was sufficient to validate the core logic, provided we carefully handled process state.
Here is the core of the Flask application. It’s not production-ready—it lacks a persistent database for job tracking and proper authentication—but it demonstrates the fundamental asynchronous pattern.
api_server.py
import uuid
import subprocess
import os
import threading
from flask import Flask, request, jsonify
from flask_cors import CORS
import logging
from werkzeug.exceptions import NotFound
# Basic logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app = Flask(__name__)
CORS(app) # For local development between React and Flask servers
# In-memory storage. In production, this MUST be a persistent database (e.g., Redis, PostgreSQL).
jobs = {}
# A lock for thread-safe access to the 'jobs' dictionary
jobs_lock = threading.Lock()
# Define paths relative to the script location
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
SPARK_APP_PATH = os.path.join(BASE_DIR, "spark_job", "word_count.py")
INPUT_DATA_PATH = os.path.join(BASE_DIR, "data", "input.txt")
OUTPUT_DIR = os.path.join(BASE_DIR, "data", "output")
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
def run_spark_job(job_id, input_path, output_path):
"""
Executes the spark-submit command in a non-blocking way.
Updates the job status in the shared 'jobs' dictionary.
"""
command = [
"spark-submit",
"--master", "local[*]", # For local testing; change for a real cluster
SPARK_APP_PATH,
input_path,
output_path
]
log_file_path = os.path.join(OUTPUT_DIR, f"{job_id}.log")
try:
with open(log_file_path, 'w') as log_file:
logging.info(f"Starting Spark job {job_id}. Command: {' '.join(command)}")
process = subprocess.Popen(command, stdout=log_file, stderr=subprocess.STDOUT)
with jobs_lock:
jobs[job_id]['process'] = process
jobs[job_id]['status'] = 'RUNNING'
jobs[job_id]['log_path'] = log_file_path
# This will block this thread until the process completes, but the HTTP request has already returned.
process.wait()
# After completion, update final status
with jobs_lock:
return_code = process.returncode
if return_code == 0:
jobs[job_id]['status'] = 'SUCCEEDED'
logging.info(f"Spark job {job_id} succeeded.")
else:
jobs[job_id]['status'] = 'FAILED'
logging.error(f"Spark job {job_id} failed with return code {return_code}.")
except Exception as e:
with jobs_lock:
jobs[job_id]['status'] = 'FAILED'
jobs[job_id]['error_message'] = str(e)
logging.error(f"Exception while running job {job_id}: {e}")
@app.route('/api/v1/jobs', methods=['POST'])
def submit_job():
"""
Accepts a job request, starts it in the background, and returns a job ID immediately.
"""
job_id = str(uuid.uuid4())
output_path = os.path.join(OUTPUT_DIR, job_id)
job_metadata = {
'job_id': job_id,
'status': 'PENDING',
'input_path': INPUT_DATA_PATH,
'output_path': output_path,
'process': None # To be populated by the thread
}
with jobs_lock:
jobs[job_id] = job_metadata
# Run the Spark job in a separate thread to avoid blocking the main Flask thread
thread = threading.Thread(target=run_spark_job, args=(job_id, INPUT_DATA_PATH, output_path))
thread.daemon = True # Allows main program to exit even if threads are running
thread.start()
logging.info(f"Submitted job {job_id}. API responded to client.")
# HTTP 202 Accepted: The request has been accepted for processing, but the processing has not been completed.
return jsonify({'job_id': job_id}), 202
@app.route('/api/v1/jobs/<job_id>', methods=['GET'])
def get_job_status(job_id):
"""
Returns the current status of a submitted job.
"""
with jobs_lock:
job = jobs.get(job_id)
if not job:
raise NotFound(f"Job with ID {job_id} not found.")
response = {
'job_id': job['job_id'],
'status': job['status']
}
if job['status'] == 'FAILED' and 'error_message' in job:
response['error_message'] = job['error_message']
if job['status'] in ['SUCCEEDED', 'FAILED'] and job.get('log_path'):
try:
with open(job['log_path'], 'r') as f:
# Return last 100 lines of the log for brevity
log_lines = f.readlines()
response['log_preview'] = ''.join(log_lines[-100:])
except IOError as e:
logging.warning(f"Could not read log file for job {job_id}: {e}")
response['log_preview'] = "Log file not available."
return jsonify(response)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001, debug=True)
The critical design choices here are:
- Immediate Response: The
POST /api/v1/jobs
endpoint returns a202 Accepted
status with a uniquejob_id
almost instantly. It offloads the actualspark-submit
execution to a background thread. - State Management: A simple in-memory dictionary tracks job states. A
threading.Lock
is crucial to prevent race conditions when multiple threads access this shared state. In a real-world project, this would be replaced by a database or a Redis cache for persistence and scalability. - Status Endpoint:
GET /api/v1/jobs/{jobId}
acts as the polling target. It reads the current state from the shared dictionary and returns it. This is a stateless operation from the API’s perspective. - Logging: We redirect
stdout
andstderr
from the Spark process to a log file. The status endpoint provides a preview of this log, which is invaluable for debugging failed jobs without giving the user direct server access.
The Spark Application
The Spark job itself needs to be robust but can be relatively simple for this control plane demonstration. It must read from an input source and write to a destination so we can verify its success. The following PySpark script calculates word counts from a text file.
spark_job/word_count.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, lower, col
import time
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: word_count.py <input_path> <output_path>", file=sys.stderr)
sys.exit(-1)
input_path = sys.argv[1]
output_path = sys.argv[2]
spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()
# Simulate a longer-running job
print("Starting word count job...")
time.sleep(10) # A fake delay to make the RUNNING state visible in the UI
try:
lines = spark.read.text(input_path).select(lower(col("value")).alias("line"))
# The core transformation logic
words = lines.select(explode(split(col("line"), "\\s+")).alias("word"))
word_counts = words.groupBy("word").count().orderBy(col("count").desc())
# A check for empty words resulting from multiple spaces
word_counts = word_counts.filter(col("word") != "")
print(f"Job processing complete. Writing results to {output_path}")
# Writing the output as a single CSV file for easy inspection
word_counts.coalesce(1).write.mode("overwrite").csv(output_path, header=True)
print("Job finished successfully.")
except Exception as e:
print(f"An error occurred during Spark job execution: {e}", file=sys.stderr)
spark.stop()
sys.exit(1)
spark.stop()
This script is standard PySpark, but the time.sleep(10)
is a deliberate, pragmatic addition for development. It ensures the job runs long enough for the UI’s polling mechanism to be visibly active, making it easier to test and debug the frontend state transitions.
The Frontend: React UI, Babel, and Testing
The frontend’s primary responsibility is to manage the user interaction and reflect the asynchronous job state accurately. We used Create React App, which configures Babel and other build tools out of the box. Babel’s role is crucial but often invisible; it transpiles modern JavaScript (ES6+ and JSX) into browser-compatible code.
Our babel.config.js
is standard for a React project, but it’s important to recognize its function:
// babel.config.js
module.exports = {
presets: [
'@babel/preset-env', // For transpiling ES6+ down to ES5
['@babel/preset-react', { runtime: 'automatic' }] // For transpiling JSX
]
};
The core of the UI is the component that handles the form submission and status polling.
src/JobControl.js
import React, { useState, useEffect, useCallback } from 'react';
const API_BASE_URL = 'http://localhost:5001';
// Represents the possible states of our UI component
const JOB_STATE = {
IDLE: 'IDLE',
SUBMITTING: 'SUBMITTING',
POLLING: 'POLLING',
SUCCEEDED: 'SUCCEEDED',
FAILED: 'FAILED',
};
function JobControl() {
const [jobId, setJobId] = useState(null);
const [jobStatus, setJobStatus] = useState(null);
const [logPreview, setLogPreview] = useState('');
const [uiState, setUiState] = useState(JOB_STATE.IDLE);
const [error, setError] = useState('');
const pollStatus = useCallback(async (currentJobId) => {
try {
const response = await fetch(`${API_BASE_URL}/api/v1/jobs/${currentJobId}`);
if (!response.ok) {
throw new Error(`API Error: ${response.statusText}`);
}
const data = await response.json();
setJobStatus(data.status);
setLogPreview(data.log_preview || '');
if (data.status === 'SUCCEEDED') {
setUiState(JOB_STATE.SUCCEEDED);
} else if (data.status === 'FAILED') {
setUiState(JOB_STATE.FAILED);
setError(data.error_message || 'The job failed. Check logs for details.');
}
// If still RUNNING or PENDING, the useEffect will trigger another poll
} catch (err) {
console.error('Polling failed:', err);
setUiState(JOB_STATE.FAILED);
setError('Failed to fetch job status. Check network connection or server status.');
}
}, []);
// Effect for polling
useEffect(() => {
if (uiState === JOB_STATE.POLLING && jobId) {
const intervalId = setInterval(() => {
pollStatus(jobId);
}, 3000); // Poll every 3 seconds
return () => clearInterval(intervalId); // Cleanup on component unmount or state change
}
}, [uiState, jobId, pollStatus]);
const handleSubmit = async (event) => {
event.preventDefault();
setUiState(JOB_STATE.SUBMITTING);
setError('');
setJobId(null);
setJobStatus(null);
setLogPreview('');
try {
const response = await fetch(`${API_BASE_URL}/api/v1/jobs`, { method: 'POST' });
if (!response.ok) {
throw new Error('Failed to submit job.');
}
const data = await response.json();
setJobId(data.job_id);
setUiState(JOB_STATE.POLLING); // Kick off the polling process
} catch (err) {
setUiState(JOB_STATE.FAILED);
setError(err.message);
}
};
const handleReset = () => {
setUiState(JOB_STATE.IDLE);
setJobId(null);
setJobStatus(null);
setError('');
setLogPreview('');
};
const isButtonDisabled = uiState === JOB_STATE.SUBMITTING || uiState === JOB_STATE.POLLING;
return (
<div>
<h1>Spark Job Control Plane</h1>
{uiState === JOB_STATE.IDLE && (
<form onSubmit={handleSubmit}>
<button type="submit">Submit New Word Count Job</button>
</form>
)}
{uiState !== JOB_STATE.IDLE && (
<div>
<h2>Job Status</h2>
<p>Job ID: {jobId || 'N/A'}</p>
<p data-testid="job-status">Status: {jobStatus || 'Submitting...'}</p>
{uiState === JOB_STATE.SUBMITTING && <p>Submitting job to the cluster...</p>}
{uiState === JOB_STATE.POLLING && <p>Job is running. Polling for updates...</p>}
{uiState === JOB_STATE.SUCCEEDED && <p style={{ color: 'green' }}>Job completed successfully!</p>}
{uiState === JOB_STATE.FAILED && <p style={{ color: 'red' }}>Error: {error}</p>}
{logPreview && (
<pre style={{ border: '1px solid #ccc', padding: '10px', background: '#f5f5f5', whiteSpace: 'pre-wrap' }}>
<code>{logPreview}</code>
</pre>
)}
{!isButtonDisabled && <button onClick={handleReset}>Run Another Job</button>}
</div>
)}
</div>
);
}
export default JobControl;
This component is more complex than it appears because it’s a state machine. It transitions between IDLE
, SUBMITTING
, POLLING
, SUCCEEDED
, and FAILED
. Managing these states correctly is where bugs often hide. This is why testing is not optional; it’s a necessity.
Testing with React Testing Library
React Testing Library (RTL) forces you to test components the way a user interacts with them. We don’t test implementation details (like the internal uiState
variable). We test that when a user clicks a button, the correct text eventually appears on the screen. This makes tests less brittle to refactoring.
For our asynchronous component, we need to mock the fetch
API and control its responses over time to simulate the polling lifecycle. We use Jest’s mocking capabilities for this.
src/JobControl.test.js
import React from 'react';
import { render, screen, fireEvent, waitFor } from '@testing-library/react';
import '@testing-library/jest-dom';
import JobControl from './JobControl';
// Mock the global fetch API
global.fetch = jest.fn();
// Utility to reset mocks before each test
beforeEach(() => {
fetch.mockClear();
});
test('submits a job, polls for status, and displays success', async () => {
// --- Stage 1: Initial Render & Submission ---
// Mock the initial POST request to submit the job
fetch.mockResolvedValueOnce({
ok: true,
json: async () => ({ job_id: 'test-job-123' }),
});
render(<JobControl />);
// User clicks the submit button
fireEvent.click(screen.getByRole('button', { name: /Submit New Word Count Job/i }));
// Verify the submission state appears
expect(await screen.findByText(/Submitting job to the cluster.../i)).toBeInTheDocument();
// Wait for the initial POST call to resolve and the UI to enter polling state
await waitFor(() => {
expect(screen.getByText(/Job is running. Polling for updates.../i)).toBeInTheDocument();
});
// Check that the POST request was made correctly
expect(fetch).toHaveBeenCalledWith('http://localhost:5001/api/v1/jobs', { method: 'POST' });
expect(fetch).toHaveBeenCalledTimes(1);
// --- Stage 2: Polling - First status check (RUNNING) ---
// Mock the GET request for polling, returning 'RUNNING' status
fetch.mockResolvedValueOnce({
ok: true,
json: async () => ({ status: 'RUNNING', log_preview: 'Job has started...' }),
});
// RTL's `waitFor` combined with `jest.useFakeTimers` would be more robust,
// but for this example we'll rely on `waitFor` detecting the DOM change after the poll.
await waitFor(() => {
// The status text should update to 'RUNNING'
expect(screen.getByTestId('job-status')).toHaveTextContent('Status: RUNNING');
});
// Check that the first GET poll was made
expect(fetch).toHaveBeenCalledWith('http://localhost:5001/api/v1/jobs/test-job-123');
expect(fetch).toHaveBeenCalledTimes(2);
// --- Stage 3: Polling - Second status check (SUCCEEDED) ---
// Mock the final GET request, returning 'SUCCEEDED'
fetch.mockResolvedValueOnce({
ok: true,
json: async () => ({ status: 'SUCCEEDED', log_preview: '...Job finished successfully.' }),
});
// Wait for the UI to reflect the final success state
await waitFor(() => {
expect(screen.getByText(/Job completed successfully!/i)).toBeInTheDocument();
});
// Final status text should be 'SUCCEEDED'
expect(screen.getByTestId('job-status')).toHaveTextContent('Status: SUCCEEDED');
// Check that the 'Run Another Job' button is now available
expect(screen.getByRole('button', { name: /Run Another Job/i })).toBeInTheDocument();
// Check that the final GET poll was made
expect(fetch).toHaveBeenCalledTimes(3);
});
test('handles job submission failure', async () => {
// Mock a failed POST request
fetch.mockResolvedValueOnce({
ok: false,
statusText: 'Internal Server Error'
});
render(<JobControl />);
fireEvent.click(screen.getByRole('button', { name: /Submit New Word Count Job/i }));
// Wait for the error message to appear
await waitFor(() => {
expect(screen.getByText(/Error: Failed to submit job./i)).toBeInTheDocument();
});
});
test('handles job failure during polling', async () => {
// Mock successful submission
fetch.mockResolvedValueOnce({
ok: true,
json: async () => ({ job_id: 'test-job-fail' }),
});
// Mock the poll response to be FAILED
fetch.mockResolvedValueOnce({
ok: true,
json: async () => ({ status: 'FAILED', error_message: 'Spark context initialization failed.' }),
});
render(<JobControl />);
fireEvent.click(screen.getByRole('button', { name: /Submit New Word Count Job/i }));
// Wait for the failure message to be displayed
await waitFor(() => {
expect(screen.getByText(/Error: Spark context initialization failed./i)).toBeInTheDocument();
});
expect(screen.getByTestId('job-status')).toHaveTextContent('Status: FAILED');
});
This test suite validates the entire happy path and two critical failure modes. The use of async
/await
and RTL’s waitFor
is essential for testing asynchronous behavior. It ensures that our assertions run only after the component has had time to fetch data and re-render.
Lingering Issues and Future Iterations
This implementation, while functional, is a proof-of-concept with several limitations that would need to be addressed in a production system. The most significant pitfall is the direct management of subprocess
from the web server. This approach is brittle; if the API server crashes, it orphans the Spark processes and loses all job state. A robust solution would introduce a dedicated job queue (like Celery with Redis or RabbitMQ) to decouple the API from the job execution workers.
Furthermore, the API should not be directly invoking spark-submit
. This presents a security risk (command injection) and tightly couples the API to the cluster’s command-line tools. A far better architecture would use a service like Apache Livy, which provides a REST API specifically for managing Spark jobs, abstracting away the underlying submission mechanism.
On the frontend, polling is inefficient. For a system with many active users, it would generate significant network traffic. A better approach would be to use WebSockets or Server-Sent Events (SSE) to allow the server to push status updates to the client in real-time, eliminating the need for constant polling. Finally, the in-memory job store is a single point of failure and would be replaced with a persistent database in any real-world scenario.