The core operational friction wasn’t in model development itself, but in the handoff and execution cycle. Business analysts, proficient in SQL and the logic of our domain, were writing experimental models in shared Jupyter environments. This led to a predictable set of production issues: dependency conflicts, lack of version control for executed code, and insecure handling of data warehouse credentials scattered across notebooks. The request was to build a system that allowed these analysts to trigger training jobs from their local machines in a secure, isolated, and repeatable manner, directly against our Snowflake Data Warehouse. A web interface was dismissed early on; the team wanted a native macOS application for better integration and a more fluid user experience. This decision immediately put SwiftUI on the table. For the backend, the requirement for strict isolation between jobs pointed directly at containerization. We chose Podman over Docker for its rootless, daemonless architecture—a significant security win in a multi-user environment.
Our initial architecture sketch looked like this:
graph TD A[macOS Client - SwiftUI] -- REST API (JSON) --> B{Backend API Server - FastAPI}; B -- Podman REST API --> C[Podman Service]; C -- Manages --> D[Ephemeral Podman Containers]; D -- Executes --> E[train.py - Scikit-learn]; E -- Reads Data --> F[Snowflake Data Warehouse]; E -- Writes Artifacts --> G[Shared Volume/Artifact Store]; B -- Streams Logs --> A; subgraph "Execution Host (Linux Server)" B C D G end subgraph "Analyst's Machine" A end subgraph "Cloud Infrastructure" F end
The fundamental contract is simple: the SwiftUI client makes an HTTP request to a backend service to initiate a job. The backend service, acting as an orchestrator, translates this request into a podman run
command. The container executes a standard Python script, connects to the data warehouse, trains a model using Scikit-learn, and writes the output artifact. The complexity, as always, is in the implementation details: secure credential management, job lifecycle tracking, and real-time log streaming.
The Backend API: FastAPI as the Central Nervous System
A lightweight Python API server was the logical choice for the orchestration layer. We chose FastAPI for its performance and automatic OpenAPI documentation, which proved invaluable for the SwiftUI developer. The core responsibility of this service is to expose endpoints for managing the lifecycle of a training job.
A real-world project requires robust configuration management, not hardcoded values. We used Pydantic’s BaseSettings
to manage environment-specific configurations.
# file: app/config.py
import os
from pydantic import BaseSettings, Field
class Settings(BaseSettings):
"""
Application settings loaded from environment variables.
"""
PODMAN_BASE_URL: str = Field(..., env="PODMAN_BASE_URL") # e.g., "unix:///run/user/1000/podman/podman.sock"
BASE_IMAGE_NAME: str = Field("localhost/ml-runner:latest", env="BASE_IMAGE_NAME")
ARTIFACTS_HOST_PATH: str = Field(..., env="ARTIFACTS_HOST_PATH")
SCRIPTS_HOST_PATH: str = Field(..., env="SCRIPTS_HOST_PATH")
# Snowflake Credentials - In a real setup, these would come from a vault.
# For this example, we pass them via env, which is a common but less secure pattern.
SNOWFLAKE_USER: str = Field(..., env="SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD: str = Field(..., env="SNOWFLAKE_PASSWORD")
SNOWFLAKE_ACCOUNT: str = Field(..., env="SNOWFLAKE_ACCOUNT")
SNOWFLAKE_WAREHOUSE: str = Field(..., env="SNOWFLAKE_WAREHOUSE")
SNOWFLAKE_DATABASE: str = Field(..., env="SNOWFLAKE_DATABASE")
SNOWFLAKE_SCHEMA: str = Field(..., env="SNOWFLAKE_SCHEMA")
class Config:
case_sensitive = True
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
The main application file defines the endpoints. We need to create a job, check its status, and retrieve logs. A common mistake is to make the job creation endpoint synchronous; it must be asynchronous to handle container startup latency. We’ll return a job ID immediately and let the client poll for status and logs.
# file: app/main.py
import uuid
import logging
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from typing import Dict, List
from .podman_manager import PodmanManager, ContainerCreationFailed
from .config import settings
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = FastAPI(title="ML Job Runner API")
podman_manager = PodmanManager(base_url=settings.PODMAN_BASE_URL)
# In-memory database for tracking jobs. A real system would use Redis or a proper DB.
JOB_STORE: Dict[str, Dict] = {}
class JobRequest(BaseModel):
script_name: str = Field(..., description="Name of the Python script to run, e.g., 'train_churn_model.py'")
hyperparameters: Dict[str, str] = Field(default_factory=dict, description="Hyperparameters to pass as env vars.")
class JobStatus(BaseModel):
job_id: str
container_id: str
status: str # e.g., CREATED, RUNNING, FAILED, COMPLETED
logs: str | None = None
@app.post("/jobs", status_code=202, response_model=JobStatus)
async def create_training_job(job_request: JobRequest, background_tasks: BackgroundTasks):
job_id = str(uuid.uuid4())
logger.info(f"Received job request {job_id} for script '{job_request.script_name}'.")
# A simple check to ensure the requested script exists on the host path.
# This prevents path traversal attacks.
script_path = os.path.join(settings.SCRIPTS_HOST_PATH, job_request.script_name)
if not os.path.exists(script_path) or not os.path.isfile(script_path):
logger.error(f"Script not found or is not a file: {script_path}")
raise HTTPException(status_code=400, detail=f"Script '{job_request.script_name}' not found.")
# Prepare environment variables for the container
env_vars = {
"SNOWFLAKE_USER": settings.SNOWFLAKE_USER,
"SNOWFLAKE_PASSWORD": settings.SNOWFLAKE_PASSWORD,
"SNOWFLAKE_ACCOUNT": settings.SNOWFLAKE_ACCOUNT,
"SNOWFLAKE_WAREHOUSE": settings.SNOWFLAKE_WAREHOUSE,
"SNOWFLAKE_DATABASE": settings.SNOWFLAKE_DATABASE,
"SNOWFLAKE_SCHEMA": settings.SNOWFLAKE_SCHEMA,
**{f"HP_{k.upper()}": v for k, v in job_request.hyperparameters.items()}
}
JOB_STORE[job_id] = {"status": "PENDING", "container_id": None}
try:
# Running the container creation in the background
# so we can return a response to the client immediately.
background_tasks.add_task(
podman_manager.run_training_container,
job_id=job_id,
script_name=job_request.script_name,
env_vars=env_vars,
job_store=JOB_STORE
)
return JobStatus(job_id=job_id, container_id="", status="PENDING")
except Exception as e:
logger.error(f"Failed to schedule job {job_id}: {e}")
JOB_STORE[job_id] = {"status": "FAILED", "container_id": None, "error": str(e)}
raise HTTPException(status_code=500, detail="Failed to schedule container execution.")
@app.get("/jobs/{job_id}", response_model=JobStatus)
async def get_job_status(job_id: str):
job_info = JOB_STORE.get(job_id)
if not job_info:
raise HTTPException(status_code=404, detail="Job not found")
container_id = job_info.get("container_id")
if not container_id:
return JobStatus(job_id=job_id, container_id="", status=job_info.get("status", "UNKNOWN"))
# Update status from Podman if the job is still running
if job_info.get("status") == "RUNNING":
container_state = await podman_manager.get_container_state(container_id)
if container_state['State']['Status'] != 'running':
exit_code = container_state['State']['ExitCode']
job_info['status'] = "COMPLETED" if exit_code == 0 else "FAILED"
return JobStatus(job_id=job_id, container_id=container_id, status=job_info['status'])
@app.get("/jobs/{job_id}/logs", response_model=str)
async def get_job_logs(job_id: str):
job_info = JOB_STORE.get(job_id)
if not job_info or not job_info.get("container_id"):
raise HTTPException(status_code=404, detail="Job or container not found")
try:
logs = await podman_manager.get_container_logs(job_info["container_id"])
return logs
except Exception as e:
logger.error(f"Failed to retrieve logs for container {job_info['container_id']}: {e}")
raise HTTPException(status_code=500, detail="Could not retrieve logs.")
Interfacing with Podman
The podman-py
library provides a convenient client, but for clarity and to highlight the underlying API, we’ll use aiohttp
to interact directly with the Podman REST API socket. This avoids an extra dependency and gives us finer control over timeouts and error handling.
The PodmanManager
class encapsulates all logic for creating containers, checking their state, and fetching logs. A critical aspect here is mounting volumes correctly. We need to mount the directory containing our training scripts (read-only) and a directory for the output artifacts (read-write).
# file: app/podman_manager.py
import aiohttp
import json
import logging
import os
from .config import settings
logger = logging.getLogger(__name__)
class ContainerCreationFailed(Exception):
pass
class PodmanManager:
def __init__(self, base_url: str):
# Using a Unix socket connector for local communication with Podman service
if base_url.startswith("unix://"):
connector = aiohttp.UnixConnector(path=base_url[7:])
self.base_url = "http://localhost" # dummy url for aiohttp
else:
connector = None
self.base_url = base_url
self.session = aiohttp.ClientSession(connector=connector)
async def close(self):
await self.session.close()
async def run_training_container(self, job_id: str, script_name: str, env_vars: dict, job_store: dict):
# The path inside the container
container_script_path = f"/scripts/{script_name}"
container_artifacts_path = "/artifacts"
# Unique container name for this job
container_name = f"ml-job-{job_id}"
# https://docs.podman.io/en/latest/_static/api.html#operation/ContainerCreate
container_config = {
"name": container_name,
"image": settings.BASE_IMAGE_NAME,
"command": ["python", container_script_path],
"env": env_vars,
"mounts": [
{
"Type": "bind",
"Source": settings.SCRIPTS_HOST_PATH,
"Destination": "/scripts",
"options": ["ro"] # Mount scripts read-only
},
{
"Type": "bind",
"Source": os.path.join(settings.ARTIFACTS_HOST_PATH, job_id),
"Destination": container_artifacts_path
}
],
# Ensure the output directory exists on the host
# This is a critical step that's often missed
"hooks": {}
}
# Ensure the job-specific artifact directory exists on the host before starting
job_artifact_dir = os.path.join(settings.ARTIFACTS_HOST_PATH, job_id)
try:
os.makedirs(job_artifact_dir, exist_ok=True)
except OSError as e:
logger.error(f"Could not create artifact directory {job_artifact_dir}: {e}")
job_store[job_id].update({"status": "FAILED", "error": "Artifact directory creation failed"})
return
try:
# Create container
async with self.session.post(f"{self.base_url}/v4.0.0/libpod/containers/create", json=container_config) as response:
if response.status != 201:
error_text = await response.text()
logger.error(f"Podman create failed ({response.status}): {error_text}")
raise ContainerCreationFailed(f"Podman create error: {error_text}")
data = await response.json()
container_id = data["Id"]
logger.info(f"Container {container_id} created for job {job_id}.")
job_store[job_id].update({"status": "CREATED", "container_id": container_id})
# Start container
start_url = f"{self.base_url}/v4.0.0/libpod/containers/{container_id}/start"
async with self.session.post(start_url) as response:
if response.status != 204: # 204 No Content is success
error_text = await response.text()
logger.error(f"Podman start failed ({response.status}): {error_text}")
job_store[job_id].update({"status": "FAILED", "error": f"Podman start error: {error_text}"})
# Attempt to clean up the failed container
await self.remove_container(container_id)
return
logger.info(f"Container {container_id} started.")
job_store[job_id].update({"status": "RUNNING"})
except aiohttp.ClientError as e:
logger.error(f"HTTP client error during container creation for job {job_id}: {e}")
job_store[job_id].update({"status": "FAILED", "error": str(e)})
except Exception as e:
logger.error(f"Unexpected error during container creation for job {job_id}: {e}")
job_store[job_id].update({"status": "FAILED", "error": str(e)})
async def get_container_state(self, container_id: str):
inspect_url = f"{self.base_url}/v4.0.0/libpod/containers/{container_id}/json"
async with self.session.get(inspect_url) as response:
response.raise_for_status()
return await response.json()
async def get_container_logs(self, container_id: str) -> str:
logs_url = f"{self.base_url}/v4.0.0/libpod/containers/{container_id}/logs?stdout=true&stderr=true"
async with self.session.get(logs_url) as response:
response.raise_for_status()
# The logs are returned as a binary stream which needs decoding
log_bytes = await response.read()
return log_bytes.decode('utf-8', errors='ignore')
async def remove_container(self, container_id: str):
remove_url = f"{self.base_url}/v4.0.0/libpod/containers/{container_id}?force=true"
async with self.session.delete(remove_url) as response:
if response.status not in [204, 404]: # Ignore if already gone
logger.warning(f"Failed to cleanup container {container_id}: {response.status}")
The base container image is crucial for reproducibility. It pre-installs all common dependencies.
# file: images/ml-runner/Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies that might be needed by Python libraries
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Create non-root user for security
RUN useradd -m appuser
USER appuser
# This entrypoint is overridden by the command in the API call,
# but it's good practice to have a default.
ENTRYPOINT ["python"]
The requirements.txt
would contain scikit-learn
, pandas
, snowflake-connector-python
, etc.
The Training Script: Executing Inside the Container
This is a standard Python script designed to be run non-interactively. It reads configuration from environment variables, which is a container-friendly pattern. It connects to the data warehouse, performs training, and saves its output to a pre-defined path (/artifacts
) which is mapped to a host volume.
# file: scripts/train_churn_model.py
import os
import pandas as pd
import snowflake.connector
import logging
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import joblib
# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Credentials from environment variables
SNOWFLAKE_USER = os.environ.get("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.environ.get("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ACCOUNT = os.environ.get("SNOWFLAKE_ACCOUNT")
# ... other snowflake params
# Hyperparameters
HP_N_ESTIMATORS = int(os.environ.get("HP_N_ESTIMATORS", 100))
HP_MAX_DEPTH = int(os.environ.get("HP_MAX_DEPTH", 10))
ARTIFACT_PATH = "/artifacts/churn_model.joblib"
METRICS_PATH = "/artifacts/metrics.json"
# --- Main Execution Logic ---
def fetch_data():
logging.info("Connecting to Snowflake...")
try:
with snowflake.connector.connect(
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
account=SNOWFLAKE_ACCOUNT,
# ... other params
) as conn:
with conn.cursor() as cur:
logging.info("Executing query...")
cur.execute("SELECT FEATURE_1, FEATURE_2, TARGET FROM TRAINING_DATA_VIEW LIMIT 10000;")
df = cur.fetch_pandas_all()
logging.info(f"Fetched {len(df)} rows from Snowflake.")
return df
except Exception as e:
logging.error(f"Failed to fetch data from Snowflake: {e}")
raise
def train_model(df: pd.DataFrame):
logging.info("Starting model training...")
X = df[['FEATURE_1', 'FEATURE_2']]
y = df['TARGET']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = RandomForestClassifier(n_estimators=HP_N_ESTIMATORS, max_depth=HP_MAX_DEPTH, random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
logging.info(f"Model accuracy: {accuracy:.4f}")
logging.info(f"Saving model artifact to {ARTIFACT_PATH}")
joblib.dump(model, ARTIFACT_PATH)
logging.info(f"Saving metrics to {METRICS_PATH}")
with open(METRICS_PATH, 'w') as f:
json.dump({"accuracy": accuracy}, f)
if __name__ == "__main__":
# A simple check for credentials
if not all([SNOWFLAKE_USER, SNOWFLAKE_PASSWORD, SNOWFLAKE_ACCOUNT]):
logging.error("Snowflake credentials are not fully configured in environment variables.")
exit(1)
data = fetch_data()
train_model(data)
logging.info("Training job completed successfully.")
The SwiftUI Client: Mission Control
The macOS client provides the user interface. Using SwiftUI and the MVVM
pattern, we can create a clean separation between the view and the business logic. The APIService
handles communication with our backend. Using async/await
in Swift makes handling asynchronous network calls straightforward.
// file: Models.swift
import Foundation
// Codable structs must match the JSON structure from the FastAPI backend.
struct JobRequest: Codable {
let scriptName: String
let hyperparameters: [String: String]
// Handle snake_case to camelCase conversion
enum CodingKeys: String, CodingKey {
case scriptName = "script_name"
case hyperparameters
}
}
struct JobStatus: Codable, Identifiable {
let id = UUID() // For SwiftUI lists
let jobId: String
let containerId: String
var status: String
enum CodingKeys: String, CodingKey {
case jobId = "job_id"
case containerId = "container_id"
case status
}
}
// file: APIService.swift
import Foundation
class APIService {
private let baseURL = URL(string: "http://localhost:8000")!
func startJob(scriptName: String, params: [String: String]) async throws -> JobStatus {
let url = baseURL.appendingPathComponent("/jobs")
var request = URLRequest(url: url)
request.httpMethod = "POST"
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
let jobRequest = JobRequest(scriptName: scriptName, hyperparameters: params)
request.httpBody = try JSONEncoder().encode(jobRequest)
let (data, response) = try await URLSession.shared.data(for: request)
guard let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 202 else {
throw URLError(.badServerResponse)
}
return try JSONDecoder().decode(JobStatus.self, from: data)
}
func fetchJobStatus(jobId: String) async throws -> JobStatus {
let url = baseURL.appendingPathComponent("/jobs/\(jobId)")
let (data, _) = try await URLSession.shared.data(from: url)
return try JSONDecoder().decode(JobStatus.self, from: data)
}
func fetchJobLogs(jobId: String) async throws -> String {
let url = baseURL.appendingPathComponent("/jobs/\(jobId)/logs")
let (data, _) = try await URLSession.shared.data(from: url)
// The API returns a plain string, not JSON, so we decode it directly.
guard let logs = String(data: data, encoding: .utf8) else {
throw URLError(.cannotDecodeContentData)
}
return logs
}
}
// file: JobViewModel.swift
import Foundation
import Combine
@MainActor
class JobViewModel: ObservableObject {
@Published var jobs: [JobStatus] = []
@Published var selectedJobLogs: String = "Select a job to see logs..."
private let apiService = APIService()
private var timers: [String: Timer] = [:] // Timers for polling job status
func createNewJob() {
Task {
do {
let newJob = try await apiService.startJob(
scriptName: "train_churn_model.py",
params: ["n_estimators": "150"]
)
jobs.append(newJob)
startPolling(for: newJob.jobId)
} catch {
print("Error creating job: \(error)")
}
}
}
private func startPolling(for jobId: String) {
// A common pitfall is creating multiple timers. Ensure one per job.
guard timers[jobId] == nil else { return }
timers[jobId] = Timer.scheduledTimer(withTimeInterval: 3.0, repeats: true) { [weak self] _ in
self?.updateJobStatus(jobId: jobId)
}
}
private func stopPolling(for jobId: String) {
timers[jobId]?.invalidate()
timers[jobId] = nil
}
func updateJobStatus(jobId: String) {
Task {
do {
let updatedStatus = try await apiService.fetchJobStatus(jobId: jobId)
if let index = jobs.firstIndex(where: { $0.jobId == jobId }) {
jobs[index].status = updatedStatus.status
// Stop polling if the job is in a terminal state.
if ["COMPLETED", "FAILED"].contains(updatedStatus.status) {
stopPolling(for: jobId)
// Final log fetch after completion
await fetchLogs(for: jobId)
}
}
} catch {
print("Error updating status for job \(jobId): \(error)")
stopPolling(for: jobId) // Stop on error too
}
}
}
func fetchLogs(for jobId: String) async {
do {
self.selectedJobLogs = "Fetching logs for \(jobId)..."
let logs = try await apiService.fetchJobLogs(jobId: jobId)
self.selectedJobLogs = logs.isEmpty ? "No logs yet." : logs
} catch {
self.selectedJobLogs = "Failed to fetch logs: \(error.localizedDescription)"
}
}
}
// file: ContentView.swift
import SwiftUI
struct ContentView: View {
@StateObject private var viewModel = JobViewModel()
@State private var selectedJobId: String?
var body: some View {
NavigationView {
VStack {
List(viewModel.jobs, selection: $selectedJobId) { job in
HStack {
Text(job.jobId.prefix(8))
.font(.system(.body, design: .monospaced))
Spacer()
Text(job.status)
.padding(.horizontal, 8)
.padding(.vertical, 4)
.background(statusColor(for: job.status))
.foregroundColor(.white)
.cornerRadius(4)
}
.tag(job.jobId)
}
.onChange(of: selectedJobId) { newJobId in
if let jobId = newJobId {
Task { await viewModel.fetchLogs(for: jobId) }
}
}
Button("Start New Training Job") {
viewModel.createNewJob()
}
.padding()
}
.navigationTitle("ML Jobs")
// Detail View for logs
ScrollView {
Text(viewModel.selectedJobLogs)
.font(.system(.body, design: .monospaced))
.frame(maxWidth: .infinity, alignment: .leading)
.padding()
}
.background(Color(.textBackgroundColor))
}
}
private func statusColor(for status: String) -> Color {
switch status.uppercased() {
case "RUNNING": return .blue
case "COMPLETED": return .green
case "FAILED": return .red
case "PENDING": return .orange
default: return .gray
}
}
}
This implementation establishes a functional end-to-end system. However, it’s far from production-ready. The current job tracking is in-memory and will be lost on API server restart; a persistent store like Redis is necessary. The polling mechanism for logs and status is inefficient and should be replaced with WebSockets or Server-Sent Events for a true real-time experience. Furthermore, there is no job queueing; a sudden influx of requests would overwhelm the execution host. A message queue like RabbitMQ should be placed between the API and the Podman manager to buffer and schedule jobs. Finally, security could be hardened by using Podman secrets for credential management instead of environment variables, and the API should be protected by a proper authentication layer.