The core technical pain point wasn’t novel: a distributed Laravel fleet generating millions of structured log events per hour. The conventional approach of shipping these directly from PHP-FPM workers to an Elasticsearch cluster was creating unacceptable request latency and resource contention. Each PHP process would establish its own connection, serialize a single event, and send it over the wire, resulting in a storm of small, inefficient HTTP requests against the Elasticsearch ingest nodes. This was a classic N+1 problem, but for logging.
Our initial concept was a simple offload service: a dedicated middleware to act as an aggregator. The Laravel applications would fire-and-forget events to this service, which would then buffer, batch, and perform bulk indexing into Elasticsearch. The goal was to decouple the application logic from the observability pipeline’s performance characteristics. This immediately ruled out solutions that required heavy sidecars or complex in-process agents. We needed a single, lightweight, high-performance binary.
Technology selection became the critical first step. Go was a contender, with its mature ecosystem and simple concurrency model. However, for a piece of long-running network infrastructure where memory safety and predictable performance under load were paramount, Rust presented a more compelling case. Its ownership model eliminates entire classes of bugs common in systems programming, and its async ecosystem, built around tokio
, is exceptionally well-suited for high-throughput I/O-bound tasks. The decision was made to build this middleware in Rust.
The real challenge emerged when we discussed data transformation. The events from different Laravel services had varying schemas. We needed to cleanse, enrich, and sometimes completely restructure the JSON payloads before indexing. Hardcoding this logic into the Rust binary was a non-starter. Every schema change would require a recompilation and redeployment of the core middleware, a process far too slow and risky for our CI/CD pipeline. This is where we diverged from a standard implementation. The requirement was for a system where developers could define their own transformation logic on the fly. This led to the idea of embedding a scripting engine.
V8 was overkill, its resource footprint too large for our “lightweight” mandate. We needed something fast, safe, and with a small memory footprint. The inspiration came from the front-end world, specifically from tools like Turbopack and swc
. These Rust-based tools process JavaScript/TypeScript at phenomenal speeds. While we weren’t building a bundler, we could leverage the same underlying technology: a modern, high-performance JS engine written in or for Rust. We settled on using the rquickjs
crate, which provides safe, high-level bindings to the QuickJS engine. This allowed us to execute user-defined JavaScript transformation functions within a tight, sandboxed Rust loop, giving us the required flexibility without sacrificing the performance and safety of the core application.
The final architecture materialized as a multi-stage async pipeline within a single Rust binary.
graph TD subgraph Laravel Fleet L1[Laravel Service 1] --> M L2[Laravel Service 2] --> M L3[...] --> M end subgraph Rust Middleware M(HTTP Ingest Endpoint
axum) --> C{mpsc::channel} C --> W[Batching Worker
tokio::spawn] W -- per-event --> JS(JS Transformation Engine
rquickjs) JS -- transformed event --> W W -- bulk request --> ES[Resilient Elasticsearch Client
elasticsearch-rs] end subgraph Elasticsearch Cluster ES --> E1[ES Node 1] ES --> E2[ES Node 2] end style M fill:#222,stroke:#393,stroke-width:2px style W fill:#222,stroke:#393,stroke-width:2px style JS fill:#333,stroke:#f90,stroke-width:2px
Let’s walk through the implementation of this system, piece by piece. The code presented is not a toy example; it’s the core of a production-ready service.
Part 1: Project Setup and the Ingestion Endpoint
The foundation is a standard Rust binary project. The Cargo.toml
outlines our key dependencies.
# Cargo.toml
[package]
name = "log-ingestor"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
axum = "0.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
elasticsearch = "8.5.0-alpha.1" # Using an alpha for latest features, pin to stable in prod
reqwest = { version = "0.11", features = ["json"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rquickjs = { version = "0.4", features = ["full"] }
once_cell = "1.17"
The entry point, main.rs
, sets up the tracing
subscriber for structured logging, initializes our shared state (including the channel sender and the JS transformer), and starts the axum
web server.
// src/main.rs
use axum::{
extract::State,
http::StatusCode,
routing::post,
Json, Router,
};
use serde_json::Value;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::mpsc;
mod batcher;
mod transformer;
mod elastic;
// Shared state across all Axum handlers.
// Arc is used for thread-safe reference counting.
#[derive(Clone)]
struct AppState {
tx: mpsc::Sender<Value>,
}
#[tokio::main]
async fn main() {
// Setup structured logging. In production, this would be configured to output JSON.
tracing_subscriber::fmt::init();
// Configuration - in a real app, this would come from a file or environment variables.
let buffer_size = 10_000; // Max number of logs to hold in memory.
let batch_size = 500;
let batch_timeout_secs = 5;
let elasticsearch_url = "http://localhost:9200";
let js_transform_script_path = "transform.js";
// Create the MPSC channel for decoupling the ingestion from the batching.
let (tx, rx) = mpsc::channel(buffer_size);
// Initialize the JS transformer. This will load and compile the script.
// We wrap it in an Arc to share it safely with the batching worker.
let transformer = Arc::new(
transformer::JsTransformer::new(js_transform_script_path)
.expect("Failed to initialize JS Transformer")
);
// Initialize the Elasticsearch client.
let es_client = Arc::new(
elastic::setup_elasticsearch_client(elasticsearch_url)
.expect("Failed to create Elasticsearch client")
);
// Spawn the background worker task. This task owns the receiving end of the channel.
tokio::spawn(batcher::run_batcher(
rx,
es_client,
transformer,
batch_size,
batch_timeout_secs,
));
// Setup the application state and the Axum router.
let app_state = AppState { tx };
let app = Router::new()
.route("/ingest", post(ingest_handler))
.with_state(app_state);
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
tracing::info!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
/// The handler for our single ingestion endpoint.
/// It accepts a JSON payload and sends it to the channel.
async fn ingest_handler(
State(state): State<AppState>,
Json(payload): Json<Value>,
) -> StatusCode {
// The send operation can fail if the channel is full or closed.
// In a real-world project, if the channel is full, it signifies backpressure.
// We should return a 503 Service Unavailable to signal the client to slow down.
match state.tx.send(payload).await {
Ok(_) => StatusCode::ACCEPTED,
Err(e) => {
tracing::error!("Failed to send log to channel: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
}
}
}
This structure is clean and robust. The web server’s only job is to deserialize the JSON and push it into a channel as fast as possible. All the heavy lifting is deferred to a background task, preventing the batching or Elasticsearch indexing process from blocking incoming requests.
Part 2: The JavaScript Transformation Engine
This is the most unique component of the architecture. The transformer.rs
module encapsulates all logic related to QuickJS. We use once_cell
to create a thread-safe, lazily initialized singleton for the QuickJS runtime and context. This is crucial because creating a runtime is a somewhat expensive operation that we only want to perform once.
// src/transformer.rs
use rquickjs::{Context, Function, Module, Object, Runtime, Value};
use std::fs;
use std::path::Path;
use std::thread;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum TransformerError {
#[error("Failed to read script file: {0}")]
Io(#[from] std::io::Error),
#[error("QuickJS error: {0}")]
QuickJs(#[from] rquickjs::Error),
#[error("Transformation function '{0}' not found in script")]
FunctionNotFound(String),
#[error("Transformation function returned non-object value")]
InvalidReturnType,
}
// A wrapper around the QuickJS runtime and context.
pub struct JsTransformer {
// Each transformer instance gets its own runtime and context,
// ensuring thread isolation if we ever need multiple transformers.
// For now, we'll share one via Arc.
runtime: Runtime,
context: Context,
}
impl JsTransformer {
pub fn new<P: AsRef<Path>>(script_path: P) -> Result<Self, TransformerError> {
let script_content = fs::read_to_string(script_path)?;
// It's critical to spawn a new thread for the runtime if you're not using
// the `tokio` feature of rquickjs. QuickJS runtimes are !Send and !Sync.
// Or, more simply, use the provided async runtime feature.
// For this example, let's keep it simple.
let runtime = Runtime::new()?;
let context = Context::full(&runtime)?;
// The 'context.globals()' provides access to the global object of the JS context.
context.globals().set("version", "1.0.0")?;
// Load the script as an ES module. This is better than `eval` as it provides
// proper scoping and error reporting.
// The name 'transform' is an arbitrary name for the module.
Module::evaluate(context.clone(), "transform", &script_content)?;
tracing::info!("Successfully loaded and evaluated JS transform script.");
Ok(Self { runtime, context })
}
/// Transforms a single JSON value.
/// It serializes the Rust `serde_json::Value` to a string, parses it in JS,
/// calls the transform function, and then deserializes the result back.
pub fn transform(&self, log: &serde_json::Value) -> Result<serde_json::Value, TransformerError> {
// We operate within a closure given to the context to handle lifetimes correctly.
self.context.with(|ctx| {
let globals = ctx.globals();
// Get the exported 'transform' function from our JS module.
let transform_fn: Function = globals
.get("transform")
.map_err(|_| TransformerError::FunctionNotFound("transform".to_string()))?;
// Convert the Rust serde_json::Value to a JS value.
let log_str = serde_json::to_string(log)
.map_err(|e| rquickjs::Error::Exception { msg: e.to_string(), file: "".into(), line: 0 })?;
// We pass the JSON string and have JS parse it. This is often safer and
// more robust than trying to convert complex nested structures directly.
let js_log_obj: Object = ctx.eval(&format!("JSON.parse({})", log_str))?;
// Call the JS function: transform(logObject)
let result: Value = transform_fn.call((js_log_obj,))?;
// The JS function is expected to return an object or null.
// If it returns null, we interpret that as "drop this log".
if result.is_null() || result.is_undefined() {
return Err(TransformerError::InvalidReturnType); // Will be filtered out
}
let result_obj: Object = result.into_object().ok_or(TransformerError::InvalidReturnType)?;
// To get the result back into Rust, we stringify it in JS and parse in Rust.
let json_obj: Object = globals.get("JSON")?;
let stringify_fn: Function = json_obj.get("stringify")?;
let result_str: String = stringify_fn.call((result_obj,))?;
let final_val = serde_json::from_str(&result_str)
.map_err(|e| rquickjs::Error::Exception { msg: e.to_string(), file: "".into(), line: 0 })?;
Ok(final_val)
})
}
}
And here is an example transform.js
file. This is the part our application developers would modify.
// transform.js
// This is a user-defined transformation function.
// It receives a log object and must return a new object to be indexed,
// or null/undefined to discard the log.
function transform(log) {
// A pitfall here is not handling unexpected schemas gracefully.
// Always check for the existence of fields before accessing them.
if (!log || !log.message || !log.context) {
// Discard malformed logs.
return null;
}
const newLog = {
// Add a timestamp for Elasticsearch.
"@timestamp": new Date().toISOString(),
"service": log.context.service || "unknown-service",
"level": log.level_name.toLowerCase(),
"message": log.message,
// Flatten the context for easier searching in Kibana.
...log.context
};
// Example of enrichment: add geoip data if an IP is present.
if (newLog.client_ip === "123.45.67.89") { // In reality, you'd use a lookup library
newLog.geoip = {
"country_code": "US",
"city": "Mountain View"
};
}
// In a production environment, you must ensure that all fields
// you create align with the Elasticsearch index mapping.
delete newLog.extra; // Remove unnecessary fields
return newLog;
}
// We must expose the function on the global scope for the Rust code to find it.
// When using ES modules, you'd use `export`. For simplicity with `eval`, global is fine.
globalThis.transform = transform;
This separation is powerful. The core, high-performance middleware is stable, while the business logic of log transformation remains flexible and can be updated by simply replacing a text file and restarting the service.
Part 3: The Batching Worker and Resilient Elasticsearch Client
The batcher.rs
module contains the heart of the pipeline. It’s an async function running in an infinite loop, pulling messages from the channel.
// src/batcher.rs
use crate::{elastic::ElasticsearchClient, transformer::JsTransformer};
use serde_json::Value;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::timeout;
pub async fn run_batcher(
mut rx: mpsc::Receiver<Value>,
es_client: Arc<ElasticsearchClient>,
transformer: Arc<JsTransformer>,
batch_size: usize,
batch_timeout_secs: u64,
) {
let mut batch = Vec::with_capacity(batch_size);
let batch_timeout = Duration::from_secs(batch_timeout_secs);
loop {
// Wait for a message, but with a timeout.
// This ensures that we send a batch even if traffic is low.
match timeout(batch_timeout, rx.recv()).await {
// A message was received.
Ok(Some(log)) => {
batch.push(log);
if batch.len() >= batch_size {
process_and_send_batch(
&mut batch,
es_client.clone(),
transformer.clone()
).await;
}
}
// The channel was closed. The application is shutting down.
Ok(None) => {
tracing::info!("Channel closed. Sending final batch.");
if !batch.is_empty() {
process_and_send_batch(&mut batch, es_client, transformer).await;
}
break;
}
// Timeout occurred. Send whatever we have.
Err(_) => {
if !batch.is_empty() {
tracing::debug!("Batch timeout reached. Sending partial batch.");
process_and_send_batch(&mut batch, es_client.clone(), transformer.clone()).await;
}
}
}
}
tracing::info!("Batcher has shut down.");
}
async fn process_and_send_batch(
batch: &mut Vec<Value>,
es_client: Arc<ElasticsearchClient>,
transformer: Arc<JsTransformer>,
) {
let batch_size = batch.len();
tracing::info!("Processing batch of {} logs.", batch_size);
let transformed_logs: Vec<Value> = batch
.iter()
.filter_map(|log| {
// The transformation can fail (e.g., script error, invalid return).
// We log the error and filter out the problematic log.
// A common mistake is to let one bad log fail the entire batch.
match transformer.transform(log) {
Ok(transformed) => Some(transformed),
Err(e) => {
tracing::warn!("Failed to transform log: {}. Original: {}", e, log);
None
}
}
})
.collect();
if transformed_logs.is_empty() {
tracing::warn!("Batch of size {} resulted in 0 logs after transformation.", batch_size);
batch.clear();
return;
}
match es_client.bulk_index("logs-prod", transformed_logs).await {
Ok(successful) => {
tracing::info!(
"Successfully indexed {} of {} logs in the batch.",
successful,
batch_size
);
}
Err(e) => {
tracing::error!("Failed to index batch: {}", e);
// In a production system, failed batches MUST be handled.
// This could involve writing to a dead-letter queue or a local file for later retry.
// For this example, we are dropping them, which is not ideal.
}
}
// Clear the batch for the next set of logs.
batch.clear();
}
The elastic.rs
module handles the interaction with Elasticsearch. The key is using the _bulk
API and properly handling its complex response format, which can indicate partial success.
// src/elastic.rs
use elasticsearch::{
http::{transport::{Transport, TransportBuilder}, Url},
Bulk, BulkParts, Elasticsearch, Error,
};
use serde_json::Value;
pub struct ElasticsearchClient {
client: Elasticsearch,
}
#[derive(thiserror::Error, Debug)]
pub enum ElasticError {
#[error("Elasticsearch client error: {0}")]
Client(#[from] Error),
#[error("Bulk indexing operation had failures: {0}")]
BulkFailed(String),
}
pub fn setup_elasticsearch_client(url: &str) -> Result<ElasticsearchClient, Error> {
let url = Url::parse(url)?;
// In production, configure connection pooling, request timeouts, and retries.
let transport = TransportBuilder::new(url.into()).build()?;
Ok(ElasticsearchClient {
client: Elasticsearch::new(transport),
})
}
impl ElasticsearchClient {
pub async fn bulk_index(&self, index: &str, docs: Vec<Value>) -> Result<usize, ElasticError> {
// The body for a bulk request is a newline-delimited JSON (NDJSON).
// Each document needs two lines: the action/metadata, and the document source.
let mut body: Vec<u8> = Vec::new();
for doc in docs {
let header = serde_json::json!({ "index": { "_index": index } });
body.extend_from_slice(serde_json::to_string(&header).unwrap().as_bytes());
body.push(b'\n');
body.extend_from_slice(serde_json::to_string(&doc).unwrap().as_bytes());
body.push(b'\n');
}
let response = self.client
.bulk(BulkParts::Index(index))
.body(body)
.send()
.await?;
let response_body = response.json::<Value>().await?;
// A critical step: check the 'errors' field in the bulk response.
// A 200 OK status does NOT guarantee all documents were indexed successfully.
if response_body["errors"].as_bool().unwrap_or(false) {
let mut successful_count = 0;
let mut error_details = String::new();
if let Some(items) = response_body["items"].as_array() {
for item in items {
if let Some(index_op) = item.get("index") {
if index_op["error"].is_null() {
successful_count += 1;
} else {
// Log the first few errors for debugging.
if error_details.len() < 1024 {
error_details.push_str(&format!("{:?}, ", index_op["error"]));
}
}
}
}
}
if successful_count > 0 {
// Partial success is still an error condition we must report.
tracing::warn!(
"Partial success in bulk indexing. Succeeded: {}. Failed details: {}",
successful_count,
error_details
);
return Ok(successful_count);
} else {
return Err(ElasticError::BulkFailed(error_details));
}
}
// If 'errors' is false, we can assume all documents were indexed.
Ok(response_body["items"].as_array().map_or(0, |i| i.len()))
}
}
Part 4: Laravel Integration
On the Laravel side, integration is straightforward. We can create a custom log channel or a simple helper function. The key is to use an efficient HTTP client like Guzzle and make the request non-blocking if possible, for instance by dispatching a job.
// app/Logging/IngestorLogger.php
namespace App\Logging;
use Monolog\Logger;
use Monolog\Handler\AbstractProcessingHandler;
use Illuminate\Support\Facades\Http;
class IngestorLogger extends AbstractProcessingHandler
{
protected string $url;
public function __construct(string $url, $level = Logger::DEBUG, bool $bubble = true)
{
$this->url = $url;
parent::__construct($level, $bubble);
}
protected function write(array $record): void
{
// A common mistake is to make a blocking HTTP call here.
// For high-volume logging, dispatching this to a queue is a better pattern.
// Http::async() is not truly async in a typical PHP-FPM setup,
// but Guzzle's promise-based approach can help in some contexts.
// For simplicity, we use a timeout.
try {
Http::timeout(1) // Low timeout to avoid blocking the main request.
->contentType('application/json')
->post($this->url, $record['formatted']);
} catch (\Exception $e) {
// If the ingestor is down, we must not crash the application.
// Log to a fallback channel (e.g., local file) instead.
report($e);
}
}
}
This completes the loop. Events flow from PHP, through the high-performance Rust middleware where they are transformed and batched, and finally into Elasticsearch efficiently.
The primary limitation of this implementation is its reliance on an in-memory buffer. A crash or restart of the middleware process would result in the loss of any logs currently in the batch
vector or the mpsc
channel. A more resilient production system would introduce a persistence layer, such as writing batches to a local file before attempting to send them to Elasticsearch, and cleaning them up only after a successful bulk acknowledgment. This would implement an at-least-once delivery semantic.
Furthermore, the sandboxing of the QuickJS engine is basic. While rquickjs
provides protection against crashes, it doesn’t enforce resource limits (CPU time, memory). For executing untrusted scripts, a more robust solution involving per-script runtimes with strict limits or even a WASM-based engine like Wasmer would be necessary. The current design is optimized for speed and flexibility in a trusted environment where the transformation scripts are written by internal developers.