The product-catalog-v3
index, holding over 2.5 terabytes of data and serving our core Spring Boot search service, was an operational time bomb. Its mappings, designed years ago, were inefficient, leading to slow queries and an inability to add new search features without breaking changes. Every quarter, the business would request a schema modification, triggering a weekend-long, high-risk manual procedure involving SSH, screen
sessions, and a prayer that the multi-hour reindex script wouldn’t fail. The last attempt caused a 45-minute outage during the alias switch. This was the final catalyst for a fundamental change in how we manage our data infrastructure’s lifecycle.
Our previous method was a collection of brittle shell and Python scripts. It was imperative, state-blind, and required constant human supervision. A failure halfway through the process meant a painful manual rollback. We briefly considered a Jenkins pipeline, but that felt like replacing one form of imperative scripting with another, still lacking the idempotency and declarative state reconciliation we needed for a truly reliable system. The core problem wasn’t just execution; it was state management. We needed a system that understood the desired state of our OpenSearch cluster—indices, mappings, and aliases—and worked tirelessly to make it a reality, all driven from our single source of truth: Git.
This led us to a GitOps approach with Argo CD. While we were already using it to deploy our Spring Boot microservices, we had never extended its reach to manage the state of a stateful dependency like OpenSearch. The vision was simple but ambitious: a developer should be able to update an index mapping by modifying a YAML file in a Git repository and committing the change. Argo CD, observing this change, should then orchestrate the entire zero-downtime reindexing workflow automatically. This meant creating a new index, migrating the data, atomically switching the live alias, and safely cleaning up the old index, all without manual intervention.
The Architectural Blueprint: GitOps for Data Schema
The foundation of this solution rests on treating OpenSearch index definitions and migration tasks as Kubernetes-native resources that Argo CD can understand and manage. A direct helm install opensearch
is for bootstrapping a cluster; managing its internal state requires a more granular approach. Our architecture coalesced around a few key components:
- Kubernetes
Job
s as Workflow Steps: Each distinct action in the reindexing process (create index, run reindex, switch alias, delete old index) is encapsulated within a separate KubernetesJob
. This provides containerization, resource management, and automatic retries for transient failures. - Argo CD Sync Waves for Orchestration: Kubernetes
Job
s, by themselves, are unordered. To enforce the strictcreate -> reindex -> switch -> delete
sequence, we leverage Argo CD’s Sync Waves. By annotating each resource with async-wave
number, we command Argo CD to apply them in ordered phases, only proceeding to the next wave when all resources in the current one report a healthy status (i.e.,Job
completion). -
ConfigMap
s for Mappings: The new index mapping, the very trigger for the workflow, is stored declaratively as aConfigMap
in Git. This makes schema changes transparent and reviewable through pull requests. - A Resilient Spring Boot Client: The application code must be blissfully unaware of the underlying index versions. It must only ever target a stable alias. Furthermore, it needs robust connection management and retry mechanisms to gracefully handle the sub-second transition during the atomic alias swap.
Here is a visualization of the orchestrated workflow managed entirely by Argo CD:
sequenceDiagram participant Developer participant GitRepo participant ArgoCD participant KubernetesAPI participant OpenSearch Developer->>GitRepo: git push (new index mapping & workflow files) ArgoCD->>GitRepo: Detects change ArgoCD->>KubernetesAPI: Sync Wave 1: Apply create-index-v4-job.yaml KubernetesAPI->>OpenSearch: Job pod runs `curl` to create `product-catalog-v4` ArgoCD->>KubernetesAPI: Sync Wave 2: Apply reindex-v3-to-v4-job.yaml KubernetesAPI->>OpenSearch: Job pod runs `_reindex` from v3 to v4 Note right of OpenSearch: This is the long-running task. ArgoCD->>KubernetesAPI: Sync Wave 3: Apply switch-alias-job.yaml KubernetesAPI->>OpenSearch: Job pod runs atomic alias switch API call Note right of OpenSearch: `product-catalog` alias now points to v4. ArgoCD->>KubernetesAPI: Sync Wave 4: Apply delete-index-v3-job.yaml KubernetesAPI->>OpenSearch: Job pod deletes `product-catalog-v3`
This declarative model transforms a high-stakes manual operation into a routine, audited, and automated CI/CD process.
Implementing the GitOps Workflow
Our Git repository is structured to separate application code from this platform-level automation. We use an Argo CD ApplicationSet
to automatically discover and manage these workflows.
gitops-repo/
├── apps/
│ └── product-search-service/ # Spring Boot application manifests
└── platform/
└── opensearch-reindexing/
└── product-catalog-v3-to-v4/
├── 01-mapping-configmap.yaml
├── 02-create-index-job.yaml
├── 03-reindex-job.yaml
├── 04-switch-alias-job.yaml
└── 05-cleanup-job.yaml
When a new reindexing task is needed, an engineer creates a new directory (e.g., product-catalog-v4-to-v5
) and commits the necessary manifest files. Argo CD picks it up and starts the process.
Step 1: Storing the New Mapping (Sync Wave 0)
The first resource is a ConfigMap
holding the new index mapping. We place this in an early sync wave to ensure it’s available for the creation job.
01-mapping-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: product-catalog-v4-mapping
namespace: data-ops
annotations:
argocd.argoproj.io/sync-wave: "0"
data:
mapping.json: |-
{
"properties": {
"productId": { "type": "keyword" },
"productName": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"description": { "type": "text" },
"price": { "type": "double" },
"stock_level": { "type": "integer" },
"tags": { "type": "keyword" },
"last_updated": { "type": "date" }
}
}
Step 2: Creating the New Index (Sync Wave 1)
This job uses a simple curlimages/curl
container to make a PUT
request to OpenSearch, creating the new index with the mapping from our ConfigMap
. We manage OpenSearch credentials through a pre-existing Kubernetes Secret
.
02-create-index-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: create-product-catalog-v4
namespace: data-ops
annotations:
argocd.argoproj.io/sync-wave: "1"
spec:
template:
spec:
containers:
- name: create-index
image: curlimages/curl:7.84.0
command:
- "/bin/sh"
- "-c"
- |
# Read credentials from secret
OS_USER=$(cat /etc/opensearch-creds/username)
OS_PASS=$(cat /etc/opensearch-creds/password)
# Read mapping from ConfigMap
MAPPING_DATA=$(cat /etc/opensearch-mapping/mapping.json)
# Use -f to fail on non-2xx status codes
curl -f -X PUT "https://opensearch-cluster.opensearch.svc.cluster.local:9200/product-catalog-v4" \
-u "$OS_USER:$OS_PASS" \
-H 'Content-Type: application/json' \
-d "$MAPPING_DATA" \
--insecure # Use this for self-signed certs; prefer config for trusted CAs
restartPolicy: OnFailure
volumes:
- name: opensearch-creds
secret:
secretName: opensearch-credentials
- name: opensearch-mapping
configMap:
name: product-catalog-v4-mapping
volumeMounts:
- name: opensearch-creds
mountPath: "/etc/opensearch-creds"
readOnly: true
- name: opensearch-mapping
mountPath: "/etc/opensearch-mapping"
readOnly: true
backoffLimit: 3
A common mistake here is not using -f
with curl
. Without it, a 4xx or 5xx response from OpenSearch still results in a 0 exit code, and Argo CD would mistakenly believe the job succeeded.
Step 3: The Data Migration Workhorse (Sync Wave 2)
This is the most critical and long-running step. A simple curl
is insufficient. We need a robust script that calls OpenSearch’s _reindex
API, waits for the task to complete, and provides meaningful logging. We use a custom Python container for this.
Dockerfile
for the reindex-runner:
FROM python:3.9-slim
RUN pip install opensearch-py requests
WORKDIR /app
COPY reindex.py .
ENTRYPOINT ["python", "reindex.py"]
reindex.py
script:
import os
import sys
import time
import logging
from opensearchpy import OpenSearch
# --- Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Get configuration from environment variables
OS_HOST = os.environ.get("OS_HOST")
OS_USER = os.environ.get("OS_USER")
OS_PASS = os.environ.get("OS_PASS")
SOURCE_INDEX = os.environ.get("SOURCE_INDEX")
DEST_INDEX = os.environ.get("DEST_INDEX")
# Allow throttling to avoid overloading the cluster. Default is no limit.
REQUESTS_PER_SECOND = int(os.environ.get("REQUESTS_PER_SECOND", -1))
if not all([OS_HOST, OS_USER, OS_PASS, SOURCE_INDEX, DEST_INDEX]):
logging.error("Missing required environment variables.")
sys.exit(1)
# --- Main Logic ---
try:
# Initialize OpenSearch client
client = OpenSearch(
hosts=[{'host': OS_HOST, 'port': 9200}],
http_auth=(OS_USER, OS_PASS),
use_ssl=True,
verify_certs=False, # In production, use True and provide ca_certs path
ssl_assert_hostname=False,
ssl_show_warn=False,
)
if not client.ping():
raise ConnectionError("Could not connect to OpenSearch")
logging.info(f"Starting reindex from '{SOURCE_INDEX}' to '{DEST_INDEX}'")
reindex_body = {
"source": {"index": SOURCE_INDEX},
"dest": {"index": DEST_INDEX}
}
# The _reindex API is asynchronous. wait_for_completion=false returns a task_id.
task_info = client.reindex(
body=reindex_body,
wait_for_completion=False,
requests_per_second=REQUESTS_PER_SECOND
)
task_id = task_info['task']
logging.info(f"Reindex task started with ID: {task_id}")
# Poll the task API for completion status
while True:
task_status = client.tasks.get(task_id=task_id)
if task_status.get('completed', False):
logging.info("Reindex task completed successfully.")
completion_details = task_status.get('response', {})
logging.info(f"Details: {completion_details}")
# Check for failures within the completed task
if completion_details.get('failures') and len(completion_details['failures']) > 0:
logging.error(f"Reindex completed with failures: {completion_details['failures']}")
sys.exit(1)
break
# Log progress
status = task_status.get('task', {}).get('status', {})
total = status.get('total', 0)
created = status.get('created', 0)
if total > 0:
progress = (created / total) * 100
logging.info(f"Progress: {progress:.2f}% ({created}/{total} documents)")
time.sleep(30) # Poll every 30 seconds
except Exception as e:
logging.error(f"An error occurred during reindex: {e}", exc_info=True)
sys.exit(1)
sys.exit(0)
03-reindex-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: reindex-product-catalog-v3-to-v4
namespace: data-ops
annotations:
argocd.argoproj.io/sync-wave: "2"
spec:
template:
spec:
containers:
- name: reindex-runner
image: your-registry/reindex-runner:1.0.0
env:
- name: OS_HOST
value: "opensearch-cluster.opensearch.svc.cluster.local"
- name: SOURCE_INDEX
value: "product-catalog-v3"
- name: DEST_INDEX
value: "product-catalog-v4"
- name: REQUESTS_PER_SECOND
value: "5000" # Important: throttle to protect the cluster
- name: OS_USER
valueFrom:
secretKeyRef:
name: opensearch-credentials
key: username
- name: OS_PASS
valueFrom:
secretKeyRef:
name: opensearch-credentials
key: password
restartPolicy: OnFailure
backoffLimit: 1 # We want to investigate failures, not endlessly retry a multi-hour job
This job is the most fragile. A pitfall here is network instability or OpenSearch cluster load. The Python script’s error handling and explicit exit codes are vital for signaling success or failure to Argo CD.
Step 4 & 5: The Atomic Switch and Cleanup (Sync Waves 3 & 4)
These final steps perform the alias switch and delete the old index. The alias update is an atomic operation within OpenSearch, which is the key to the zero-downtime guarantee.
04-switch-alias-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: switch-alias-to-product-catalog-v4
namespace: data-ops
annotations:
argocd.argoproj.io/sync-wave: "3"
spec:
template:
spec:
containers:
- name: switch-alias
image: curlimages/curl:7.84.0
command:
- "/bin/sh"
- "-c"
- |
OS_USER=$(cat /etc/opensearch-creds/username)
OS_PASS=$(cat /etc/opensearch-creds/password)
curl -f -X POST "https://opensearch-cluster.opensearch.svc.cluster.local:9200/_aliases" \
-u "$OS_USER:$OS_PASS" \
-H 'Content-Type: application/json' \
-d '{
"actions" : [
{ "remove" : { "index" : "product-catalog-v3", "alias" : "product-catalog" } },
{ "add" : { "index" : "product-catalog-v4", "alias" : "product-catalog" } }
]
}' --insecure
restartPolicy: OnFailure
# Volume mounts for credentials as before...
backoffLimit: 3
05-cleanup-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: delete-product-catalog-v3
namespace: data-ops
annotations:
argocd.argoproj.io/sync-wave: "4"
spec:
template:
spec:
containers:
- name: delete-index
image: curlimages/curl:7.84.0
command:
- "/bin/sh"
- "-c"
- |
OS_USER=$(cat /etc/opensearch-creds/username)
OS_PASS=$(cat /etc/opensearch-creds/password)
curl -f -X DELETE "https://opensearch-cluster.opensearch.svc.cluster.local:9200/product-catalog-v3" \
-u "$OS_USER:$OS_PASS" --insecure
restartPolicy: OnFailure
# Volume mounts for credentials as before...
backoffLimit: 3
With these manifests committed, Argo CD executes the plan flawlessly. What was once a weekend of toil is now a git push
.
The Resilient Spring Boot Consumer
The automation is useless if the application falters during the switch. The Spring Boot service must be configured for resilience.
First, the configuration must always point to the alias, never a versioned index name.
application.yml
opensearch:
hostname: opensearch-cluster.opensearch.svc.cluster.local
port: 9200
scheme: https
index:
alias: product-catalog # The key is to use the alias
The client configuration in Java establishes the connection.
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class OpenSearchConfig {
@Value("${opensearch.hostname}")
private String hostname;
@Value("${opensearch.port}")
private int port;
@Value("${opensearch.scheme}")
private String scheme;
// Assume credentials are injected securely, e.g. from Vault or k8s secrets
@Value("${opensearch.username}")
private String username;
@Value("${opensearch.password}")
private String password;
@Bean(destroyMethod = "close")
public RestHighLevelClient restHighLevelClient() {
final var credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
return new RestHighLevelClient(
RestClient.builder(new HttpHost(hostname, port, scheme))
.setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
)
);
}
}
Most importantly, we must account for the possibility of a transient error during the exact millisecond of the alias switch. A query might fail if it hits the cluster at this precise moment. Using Spring Retry is a pragmatic way to handle this.
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Repository;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import java.io.IOException;
@Repository
public class ProductSearchRepository {
private final RestHighLevelClient client;
private final String indexAlias;
// Constructor injection
public ProductSearchRepository(RestHighLevelClient client,
@Value("${opensearch.index.alias}") String indexAlias) {
this.client = client;
this.indexAlias = indexAlias;
}
// This annotation adds resilience. It will retry up to 3 times on specific exceptions
// with an exponential backoff delay (100ms, 200ms, 400ms).
@Retryable(
value = { IOException.class }, // Retry on network or OpenSearch I/O errors
maxAttempts = 3,
backoff = @Backoff(delay = 100, multiplier = 2)
)
public SearchResponse search(SearchSourceBuilder sourceBuilder) throws IOException {
SearchRequest searchRequest = new SearchRequest(indexAlias);
searchRequest.source(sourceBuilder);
// This is the call that might fail during the alias switch
return client.search(searchRequest, RequestOptions.DEFAULT);
}
}
This small addition makes the application robust enough to withstand the brief turbulence of the alias transition, making the entire process seamless from the user’s perspective. Unit testing this behavior with Testcontainers and a mock OpenSearch server that throws an exception on the first call is critical to ensure the retry logic is correctly implemented.
While this system has revolutionized our OpenSearch schema management, it is not without its limitations. The current implementation relies on Argo CD Sync Waves, which is a coarse-grained orchestration method. A failure in the long-running reindex Job
requires manual intervention to diagnose and a fresh git commit
to restart the process from scratch. For terabyte-scale indices where a reindex can take over 12 hours, this is a significant risk. A more sophisticated approach would involve replacing the sequence of Job
s with a single Argo Workflow, which provides a DAG-based execution engine with capabilities for checkpointing, retrying specific steps, and more complex conditional logic.
Furthermore, the reindex job’s impact on the production OpenSearch cluster is managed by a static requests_per_second
setting. A truly advanced implementation would integrate with our observability stack. The reindexing container could query Prometheus for cluster health metrics (like CPU utilization, heap pressure, and query latency) and dynamically adjust its own throttling rate, ensuring the migration proceeds as fast as possible without degrading live service performance. This moves from simple automation to an intelligent, self-adapting data management system.