The core operational database, a sharded MySQL cluster, was buckling under the strain of analytical queries. Every ad-hoc report run by the business intelligence team caused latency spikes for our customer-facing transactional services. The mandate was clear: isolate analytical workloads from transactional workloads completely, but with a data freshness requirement of under one minute. A classic CQRS problem, but our scale and resilience requirements immediately ruled out simpler solutions. This is the log of how we built a durable, high-throughput pipeline connecting our new transactional core on TiDB to our analytical powerhouse, Snowflake.
Technical Pain Point and Initial Architecture
The primary issue was resource contention. Our OLTP workload is characterized by high-concurrency, short-lived transactions. The OLAP workload involves long-running, heavy queries that scan massive datasets. Running both on the same system was untenable.
Our architectural decision was to split the write and read models physically.
- Write Model (Command Side): We selected TiDB. Its MySQL compatibility eased migration, but its true value was horizontal scalability for both storage and compute, promising to handle our projected transactional growth for years.
- Read Model (Query Side): Snowflake was the incumbent data warehouse, deeply integrated with our BI tools. Its architecture, separating storage and compute, was perfect for our unpredictable, bursty analytical query patterns.
- The Bridge: The critical piece was the transport layer connecting TiDB to Snowflake. We considered several options. A direct CDC-to-Snowflake tool felt too “black box,” offering little control over transformation, error handling, and buffering. Kafka was a strong contender but was deemed operational overkill for this point-to-point requirement. We settled on Redis Streams. It provides the necessary persistence, consumer groups for resilient processing, and is incredibly lightweight and performant, acting as a high-speed buffer between the two worlds.
- Application Layer: Quarkus was chosen for the services orchestrating this flow. Its fast startup times, low memory footprint, and superb developer experience (especially with reactive extensions) made it ideal for building efficient, cloud-native message producers and consumers.
The resulting high-level architecture looked like this:
graph TD subgraph "Transactional Domain" A[External Clients] --> B{Quarkus Write Service}; B -- 1. Write Transaction --> C[(TiDB Cluster)]; C -- 2. Within Same TX --> D[(Outbox Table)]; E{Quarkus Outbox Poller} -- 3. Poll for New Events --> D; E -- 4. Publish Event --> F{Redis Streams}; end subgraph "Analytical Domain" G{Quarkus Consumer Service} -- 5. Read from Stream --> F; G -- 6. Batch Insert --> H[(Snowflake)]; I[BI & Analytics Tools] -- 7. Query Data --> H; end style C fill:#f9f,stroke:#333,stroke-width:2px style H fill:#9cf,stroke:#333,stroke-width:2px style F fill:#f69,stroke:#333,stroke-width:2px
A crucial design decision here is the use of the Transactional Outbox pattern. Directly publishing to Redis from the main application transaction is a common mistake. If the database commit succeeds but the Redis publish fails, the system state becomes inconsistent. The outbox pattern ensures that the event to be published is committed atomically with the primary business data. A separate poller process then guarantees at-least-once delivery of that event to the message bus.
Phase 1: The Transactional Write Service
The first component is a standard Quarkus REST service that handles incoming write requests. Its responsibility is to persist the state change in TiDB and record the event in the outbox table within a single atomic transaction.
Project Dependencies
The pom.xml
for this service needs Quarkus basics, Panache for JPA, the MySQL JDBC driver (for TiDB compatibility), and the Redis client.
<dependencies>
<!-- Quarkus Core -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>
<!-- Persistence -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-mysql</artifactId>
</dependency>
<!-- Redis Client for the publisher -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-redis-client</artifactId>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-scheduler</artifactId>
</dependency>
</dependencies>
Entity Definitions
We need two entities: the primary business entity (ProductOrder
) and the OutboxEvent
.
// File: src/main/java/org/acme/pipeline/model/ProductOrder.java
package org.acme.pipeline.model;
import javax.persistence.*;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.UUID;
@Entity
@Table(name = "product_orders")
public class ProductOrder {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
public Long id;
@Column(nullable = false, unique = true)
public UUID orderUuid;
@Column(nullable = false)
public String productId;
@Column(nullable = false)
public int quantity;
@Column(nullable = false, precision = 19, scale = 4)
public BigDecimal unitPrice;
@Column(nullable = false)
public Instant createdAt;
public ProductOrder() {
this.orderUuid = UUID.randomUUID();
this.createdAt = Instant.now();
}
}
// File: src/main/java/org/acme/pipeline/model/OutboxEvent.java
package org.acme.pipeline.model;
import javax.persistence.*;
import java.time.Instant;
import java.util.UUID;
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
public enum EventStatus {
NEW, PUBLISHED
}
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
public Long id;
@Column(nullable = false)
public UUID aggregateId; // The ID of the entity that was changed (e.g., orderUuid)
@Column(nullable = false)
public String eventType; // e.g., "ORDER_CREATED"
@Lob
@Column(nullable = false, columnDefinition = "TEXT")
public String payload; // JSON representation of the event data
@Column(nullable = false)
public Instant createdAt;
@Enumerated(EnumType.STRING)
@Column(nullable = false)
public EventStatus status;
public OutboxEvent() {
this.createdAt = Instant.now();
this.status = EventStatus.NEW;
}
}
The Transactional Service Logic
The OrderService
encapsulates the core business logic. The @Transactional
annotation is key; it ensures that both the ProductOrder
and OutboxEvent
are saved in the same database transaction. If one fails, both are rolled back.
// File: src/main/java/org/acme/pipeline/service/OrderService.java
package org.acme.pipeline.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.acme.pipeline.model.OutboxEvent;
import org.acme.pipeline.model.ProductOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.transaction.Transactional;
@ApplicationScoped
public class OrderService {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderService.class);
@Inject
ObjectMapper objectMapper;
@Transactional
public ProductOrder createOrder(String productId, int quantity) {
// Step 1: Create and persist the core business entity
ProductOrder order = new ProductOrder();
order.productId = productId;
order.quantity = quantity;
// In a real system, price would be looked up
order.unitPrice = new java.math.BigDecimal("19.99");
order.persist();
// Step 2: Create the corresponding outbox event
OutboxEvent event = new OutboxEvent();
event.aggregateId = order.orderUuid;
event.eventType = "ORDER_CREATED";
event.payload = createOrderCreatedPayload(order);
event.persist();
LOGGER.info("Successfully created order {} and outbox event within transaction.", order.orderUuid);
return order;
}
private String createOrderCreatedPayload(ProductOrder order) {
try {
ObjectNode payload = objectMapper.createObjectNode();
payload.put("orderUuid", order.orderUuid.toString());
payload.put("productId", order.productId);
payload.put("quantity", order.quantity);
payload.put("unitPrice", order.unitPrice.toPlainString());
payload.put("createdAt", order.createdAt.toString());
return objectMapper.writeValueAsString(payload);
} catch (Exception e) {
// This is a critical failure, as it would cause the transaction to fail.
// It should only happen on a serious bug in object mapping.
throw new IllegalStateException("Failed to serialize order payload", e);
}
}
}
Phase 2: The Outbox Poller and Redis Publisher
This component is responsible for reliably moving events from the TiDB outbox table to the Redis Stream. It runs as a scheduled task within the same Quarkus application (or a separate one for better resource isolation).
Configuration
The application.properties
file needs database and Redis connection details.
# TiDB/MySQL Configuration
quarkus.datasource.db-kind=mysql
quarkus.datasource.username=root
quarkus.datasource.password=
quarkus.datasource.jdbc.url=jdbc:mysql://localhost:4000/test
# Hibernate Configuration
quarkus.hibernate-orm.database.generation=update
# Redis Configuration
quarkus.redis.hosts=redis://127.0.0.1:6379
# Application specific config
pipeline.redis.stream.key=order_events
The Polling Logic
The poller uses a Quarkus @Scheduled
method. It queries for NEW
events, publishes each to Redis Streams, and then updates their status to PUBLISHED
in a new transaction. A pitfall here is transaction management. The read/publish/update cycle for each batch must be transactional to avoid data loss if the application crashes mid-process.
// File: src/main/java/org/acme/pipeline/service/OutboxPublisher.java
package org.acme.pipeline.service;
import io.quarkus.redis.datasource.RedisDataSource;
import io.quarkus.redis.datasource.stream.StreamCommands;
import io.quarkus.scheduler.Scheduled;
import org.acme.pipeline.model.OutboxEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.transaction.Transactional;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ApplicationScoped
public class OutboxPublisher {
private static final Logger LOGGER = LoggerFactory.getLogger(OutboxPublisher.class);
private static final String STREAM_KEY = "order_events";
private final StreamCommands<String, String, String> streamCommands;
@Inject
public OutboxPublisher(RedisDataSource ds) {
// Using the command-based API for streams
this.streamCommands = ds.stream(String.class);
}
@Scheduled(every = "5s", identity = "outbox-publisher-job")
@Transactional
public void publishNewEvents() {
// Step 1: Find unprocessed events. 'forUpdate' acquires a pessimistic lock
// on the rows to prevent other publisher instances from processing the same events.
List<OutboxEvent> eventsToPublish = OutboxEvent
.find("status", OutboxEvent.EventStatus.NEW)
.withLock(javax.persistence.LockModeType.PESSIMISTIC_WRITE)
.page(0, 100) // Process in batches
.list();
if (eventsToPublish.isEmpty()) {
return;
}
LOGGER.info("Found {} new events to publish.", eventsToPublish.size());
for (OutboxEvent event : eventsToPublish) {
try {
// Step 2: Publish to Redis Streams
// The payload is the main message body. We add metadata for routing/versioning.
Map<String, String> messageBody = new HashMap<>();
messageBody.put("aggregateId", event.aggregateId.toString());
messageBody.put("eventType", event.eventType);
messageBody.put("payload", event.payload);
streamCommands.xadd(STREAM_KEY, messageBody);
// Step 3: Mark the event as published
event.status = OutboxEvent.EventStatus.PUBLISHED;
event.persist();
} catch (Exception e) {
// If Redis is down or there's a network issue, the transaction will be
// rolled back. The event status remains NEW and will be picked up
// in the next polling cycle. This is the core of at-least-once delivery.
LOGGER.error("Failed to publish event {}. Transaction will be rolled back.", event.id, e);
// We re-throw to ensure the transaction manager catches it and rolls back.
throw new RuntimeException("Event publishing failed", e);
}
}
LOGGER.info("Successfully published {} events.", eventsToPublish.size());
}
}
Phase 3: The Snowflake Consumer Service
This is a separate Quarkus application. Its sole purpose is to read from the Redis Stream, process the events in batches, and write them to Snowflake. Resilience is paramount here.
Dependencies and Configuration
The pom.xml
needs the Redis client and the Snowflake JDBC driver.
<!-- pom.xml for consumer service -->
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-redis-client</artifactId>
</dependency>
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.29</version> <!-- Use a recent version -->
</dependency>
<!-- We need a JSON library -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
</dependencies>
The configuration is more complex, requiring Snowflake credentials and connection pool settings. Storing these in a secure vault is critical in production.
# application.properties for consumer
# Redis Configuration
quarkus.redis.hosts=redis://127.0.0.1:6379
# Snowflake Configuration
snowflake.jdbc.url=jdbc:snowflake://<your_account>.snowflakecomputing.com/
snowflake.user=<your_user>
snowflake.password=<your_password>
snowflake.warehouse=COMPUTE_WH
snowflake.db=PROD_DB
snowflake.schema=RAW_EVENTS
# Consumer specific config
pipeline.redis.stream.key=order_events
pipeline.redis.consumer.group=snowflake-ingest-group
pipeline.redis.consumer.name=consumer-instance-1 # Should be unique per instance
The Resilient Consumer Logic
This is the most complex part of the system. We use a Redis Consumer Group to coordinate multiple instances of the consumer service. The logic must handle message processing, batching for Snowflake, and failure scenarios.
// File: src/main/java/org/acme/pipeline/consumer/SnowflakeEventConsumer.java
package org.acme.pipeline.consumer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.quarkus.redis.datasource.RedisDataSource;
import io.quarkus.redis.datasource.stream.StreamCommands;
import io.quarkus.redis.datasource.stream.XReadGroupArgs;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.sql.*;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ApplicationScoped
public class SnowflakeEventConsumer implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeEventConsumer.class);
@Inject
Vertx vertx;
@Inject
ObjectMapper objectMapper;
@Inject
RedisDataSource redisDataSource;
@ConfigProperty(name = "pipeline.redis.stream.key")
String streamKey;
@ConfigProperty(name = "pipeline.redis.consumer.group")
String consumerGroup;
@ConfigProperty(name = "pipeline.redis.consumer.name")
String consumerName;
// Snowflake connection properties
@ConfigProperty(name = "snowflake.jdbc.url")
String sfUrl;
@ConfigProperty(name = "snowflake.user")
String sfUser;
@ConfigProperty(name = "snowflake.password")
String sfPassword;
@ConfigProperty(name = "snowflake.warehouse")
String sfWarehouse;
@ConfigProperty(name = "snowflake.db")
String sfDb;
@ConfigProperty(name = "snowflake.schema")
String sfSchema;
private volatile boolean running = true;
private StreamCommands<String, String, String> streamCommands;
void onStart(@Observes StartupEvent ev) {
this.streamCommands = redisDataSource.stream(String.class);
// Ensure the consumer group exists. Create it if it doesn't.
// '$' means start from the latest message. '0-0' would mean from the beginning.
try {
streamCommands.xgroupCreate(streamKey, consumerGroup, "$", true);
LOGGER.info("Consumer group '{}' created or already exists for stream '{}'.", consumerGroup, streamKey);
} catch (Exception e) {
// Ignore if the group already exists
if (!e.getMessage().contains("BUSYGROUP")) {
throw e;
}
}
// Start the consumer loop in a separate thread
Thread.ofVirtual().start(this);
}
@Override
public void run() {
LOGGER.info("Starting Snowflake consumer loop...");
while (running) {
try {
// Block for up to 2 seconds waiting for new messages.
// '>' means get messages not yet delivered to any other consumer in this group.
Map<String, List<Map<String, String>>> messages = streamCommands.xreadgroup(
consumerGroup,
consumerName,
new XReadGroupArgs().block(Duration.ofSeconds(2)).count(100),
streamKey,
">"
);
if (messages == null || messages.isEmpty() || messages.get(streamKey) == null) {
continue; // No new messages, loop again
}
List<Map<String, String>> messageBatch = messages.get(streamKey);
LOGGER.debug("Received {} messages from stream.", messageBatch.size());
processBatch(messageBatch);
} catch (Exception e) {
LOGGER.error("Error in consumer loop. Retrying in 5 seconds.", e);
try {
Thread.sleep(5000);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
}
}
}
}
private void processBatch(List<Map<String, String>> messageBatch) {
List<String> successfullyProcessedIds = new ArrayList<>();
// In a real production system, use a proper connection pool like HikariCP.
// For simplicity, we are creating a new connection per batch.
try (Connection conn = createSnowflakeConnection()) {
// Disable autocommit for batching
conn.setAutoCommit(false);
// Assuming a target table `RAW_ORDER_EVENTS` in Snowflake
// with columns: EVENT_ID, AGGREGATE_ID, EVENT_TYPE, PAYLOAD (VARIANT)
String sql = "INSERT INTO RAW_ORDER_EVENTS (EVENT_ID, AGGREGATE_ID, EVENT_TYPE, PAYLOAD, INGESTED_AT) SELECT ?, ?, ?, PARSE_JSON(?), CURRENT_TIMESTAMP()";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
for (Map<String, String> message : messageBatch) {
String messageId = message.keySet().iterator().next();
Map<String, String> body = (Map<String, String>) message.values().iterator().next();
pstmt.setString(1, messageId);
pstmt.setString(2, body.get("aggregateId"));
pstmt.setString(3, body.get("eventType"));
pstmt.setString(4, body.get("payload"));
pstmt.addBatch();
// Track this ID for acknowledgment after successful DB commit
successfullyProcessedIds.add(messageId);
}
pstmt.executeBatch();
conn.commit();
LOGGER.info("Successfully ingested {} events into Snowflake.", successfullyProcessedIds.size());
// Acknowledge messages in Redis *after* the database commit succeeds.
// This is the crux of at-least-once delivery. If the app crashes here,
// the messages are not ACK'd and will be redelivered.
streamCommands.xack(streamKey, consumerGroup, successfullyProcessedIds.toArray(new String[0]));
} catch (SQLException e) {
LOGGER.error("SQLException during Snowflake batch insert. Rolling back.", e);
conn.rollback();
// We do NOT acknowledge the messages, they will be redelivered after the
// pending entry timeout. A dead-letter queue mechanism would be needed for poison pills.
}
} catch (Exception e) {
LOGGER.error("Failed to connect to Snowflake or process batch. Messages will be retried.", e);
}
}
private Connection createSnowflakeConnection() throws SQLException {
Properties props = new Properties();
props.put("user", sfUser);
props.put("password", sfPassword);
props.put("warehouse", sfWarehouse);
props.put("db", sfDb);
props.put("schema", sfSchema);
return DriverManager.getConnection(sfUrl, props);
}
}
Phase 4: Automation with Jenkins
The CI/CD pipeline automates the build, test, and deployment of these services. The most valuable part of this pipeline is the integration test stage, which uses Testcontainers to spin up ephemeral TiDB and Redis instances, ensuring the entire data flow works before any code is merged.
// Jenkinsfile
pipeline {
agent any
environment {
// Use a fixed version for reproducibility
TIDB_VERSION = 'v7.1.0'
REDIS_VERSION = '7.0-alpine'
}
stages {
stage('Build Services') {
steps {
script {
// Parallel build of both Quarkus applications
parallel(
writeService: {
dir('write-service') {
sh './mvnw clean package'
}
},
consumerService: {
dir('consumer-service') {
sh './mvnw clean package'
}
}
)
}
}
}
stage('Integration Test') {
steps {
dir('write-service') {
// The magic happens here. The Quarkus test framework with Testcontainers
// will manage the lifecycle of these containers automatically.
// The test properties point to the dynamic ports of the containers.
sh './mvnw verify -Pintegration-tests'
}
}
}
stage('Build Docker Images') {
steps {
script {
// Assuming Dockerfiles exist in each service directory
def registry = "my-docker-registry.io/pipelines"
parallel(
writeService: {
dir('write-service') {
def image = docker.build("${registry}/write-service:${env.BUILD_ID}", ".")
image.push()
}
},
consumerService: {
dir('consumer-service') {
def image = docker.build("${registry}/consumer-service:${env.BUILD_ID}", ".")
image.push()
}
}
)
}
}
}
stage('Deploy to Staging') {
// This stage would contain kubectl apply, Helm, or other deployment commands
steps {
echo "Deploying version ${env.BUILD_ID} to staging environment..."
// Example: sh 'kubectl set image deployment/write-service write-service=my-docker-registry.io/pipelines/write-service:${env.BUILD_ID}'
}
}
}
}
A quick look at the integration test setup in the write service:
// File: src/test/java/org/acme/pipeline/PipelineResourceTest.java
package org.acme.pipeline;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Test;
import static io.restassured.RestAssured.given;
// This annotation links our test to the Testcontainers lifecycle manager
@QuarkusTest
@QuarkusTestResource(PipelineTestResourceLifecycleManager.class)
public class PipelineResourceTest {
@Test
public void testCreateOrderEndpoint() {
// This simple test confirms the write path is working.
// A more complex test would verify the event in Redis.
given()
.when().post("/orders?productId=ABC-123&quantity=5")
.then()
.statusCode(200);
}
}
The PipelineTestResourceLifecycleManager
would implement QuarkusTestResourceLifecycleManager
to start and stop GenericContainer
instances for TiDB and Redis, passing their connection details back to the Quarkus test application via system properties. This provides high-fidelity testing of the entire stack.
Lingering Issues and Future Outlook
This architecture successfully decoupled our transactional and analytical systems, achieving the sub-minute data freshness goal. However, it’s not without its own set of trade-offs and potential improvements.
The outbox polling mechanism, while robust, introduces a small but measurable latency (bound by the polling interval). For workloads requiring lower latency, a log-based Change Data Capture (CDC) solution like TiCDC could be integrated. TiCDC can push committed transaction changes directly to a message sink like Redis Streams, eliminating the polling overhead entirely.
The current implementation guarantees at-least-once delivery. In the event of a crash in the consumer after a Snowflake commit but before the Redis XACK
, the message will be redelivered, potentially creating a duplicate record in Snowflake. For our use case, downstream deduplication during BI processing was an acceptable solution. Achieving exactly-once semantics would require making the Snowflake write idempotent (e.g., using MERGE
with a unique event ID) or implementing a more complex distributed transaction protocol, both of which add significant complexity for diminishing returns in our context.
Finally, the resilience of the consumer depends on Redis’s pending entries list mechanism. Messages that fail processing repeatedly (so-called “poison pills”) will be redelivered to other consumers until they are manually dealt with. A more mature implementation would include a dead-letter queue (DLQ) strategy, where after a certain number of failed delivery attempts, the message is moved to a separate stream for manual inspection, preventing it from blocking the entire pipeline.