Implementing a Tiered Storage Strategy for Distributed Feed Ingestion Using Java SQLite and Cassandra


The monolith feed ingestion system buckled at around 10,000 active feeds. The central PostgreSQL database, once a reliable workhorse, became a fiery bottleneck of lock contention and I/O saturation. Every new feed added was a direct tax on the entire system’s performance. Horizontal scaling was a fantasy; the architecture was fundamentally flawed for the task of ingesting hundreds of thousands of disparate, unpredictable RSS/Atom feeds. A complete teardown and rethink was necessary, moving from a centralized, monolithic design to a decentralized, agent-based architecture.

The core concept was to decouple ingestion from central storage. Instead of one large system pulling everything, hundreds or thousands of lightweight Java-based agents would be responsible for subsets of feeds. This immediately solved the scaling problem for fetching and parsing, but created a new, more complex one: how to manage the data flow from thousands of ephemeral agents to a durable, queryable central store without creating a new bottleneck. A single-tier storage solution was out of the question. The answer lay in a tiered storage strategy.

graph TD
    subgraph Distributed Agents
        A1[Agent 1] -->|JDBC| DB1[(SQLite)]
        A2[Agent 2] -->|JDBC| DB2[(SQLite)]
        An[Agent N] -->|JDBC| DBn[(SQLite)]
    end

    subgraph Central Data Plane
        CDB{Cassandra Cluster}
    end

    subgraph Observability
        LOKI[Loki]
    end

    A1 -- Rome Parse --> A1
    A2 -- Rome Parse --> A2
    An -- Rome Parse --> An

    DB1 -- Async Batch Sync --> CDB
    DB2 -- Async Batch Sync --> CDB
    DBn -- Async Batch Sync --> CDB

    A1 -- Log via logback-loki --> LOKI
    A2 -- Log via logback-loki --> LOKI
    An -- Log via logback-loki --> LOKI

This architecture dictates two distinct storage tiers. The first is a “hot” tier, local to each agent, designed for high-speed writes and temporary buffering. It must be lightweight, transactional, and require zero operational overhead. SQLite is the perfect candidate. It allows each agent to operate autonomously, absorbing bursts of new articles and persisting state locally, even if the central system is unavailable.

The second is the “cold” or archival tier. This is the central repository for all data ingested by all agents. It needs to handle an immense write throughput from thousands of concurrent sources, scale horizontally, and be highly available. Apache Cassandra, with its masterless architecture and linear scalability, is the logical choice for this role.

The challenge, then, becomes the implementation of the agent itself: a robust Java application capable of managing this two-tier flow, complete with reliable data synchronization, comprehensive logging, and resilience against network failures.

The Agent’s Foundation: Project Setup and Core Components

The agent is a self-contained Java application built with Maven. The pom.xml defines the core dependencies for this architecture.

<project ...>
    <properties>
        <java.version>17</java.version>
        <slf4j.version>2.0.7</slf4j.version>
        <logback.version>1.4.8</logback.version>
        <cassandra.driver.version>4.17.0</cassandra.driver.version>
        <sqlite.jdbc.version>3.42.0.0</sqlite.jdbc.version>
        <rome.version>1.18.0</rome.version>
        <loki-logback-appender.version>1.4.1</loki-logback-appender.version>
    </properties>

    <dependencies>
        <!-- Core Language & Logging -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>

        <!-- Feed Parsing -->
        <dependency>
            <groupId>com.rometools</groupId>
            <artifactId>rome</artifactId>
            <version>${rome.version}</version>
        </dependency>

        <!-- Hot Tier: SQLite -->
        <dependency>
            <groupId>org.xerial</groupId>
            <artifactId>sqlite-jdbc</artifactId>
            <version>${sqlite.jdbc.version}</version>
        </dependency>

        <!-- Cold Tier: Cassandra -->
        <dependency>
            <groupId>com.datastax.oss</groupId>
            <artifactId>java-driver-core</artifactId>
            <version>${cassandra.driver.version}</version>
        </dependency>
        <dependency>
            <groupId>com.datastax.oss</groupId>
            <artifactId>java-driver-query-builder</artifactId>
            <version>${cassandra.driver.version}</version>
        </dependency>
        
        <!-- Observability: Loki -->
        <dependency>
            <groupId>com.github.loki4j</groupId>
            <artifactId>loki-logback-appender</artifactId>
            <version>${loki-logback-appender.version}</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

The agent’s logic is orchestrated by a main AgentService class, which initializes and manages three key components: a FeedFetcher, a LocalStore, and a CentralSyncer. These are run on separate scheduled thread pools to ensure that parsing, local storage, and central synchronization do not block each other.

Tier 1: The Edge - SQLite for Local, Resilient Storage

The LocalStore service is the agent’s brainstem. It manages the SQLite database file, handling schema creation and all local data operations. In a real-world project, a more robust migration tool like Flyway or Liquibase would be used, but for this context, initialization on startup is sufficient.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class LocalStore implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(LocalStore.class);
    private final String dbUrl;
    private Connection connection;

    // A simple representation of an article DTO
    public record Article(String guid, String feedUrl, String title, String content, long publishedAt, Status status) {}
    public enum Status { NEW, SYNCED, FAILED }

    public LocalStore(String agentId) {
        this.dbUrl = "jdbc:sqlite:agent_" + agentId + ".db";
    }

    public void connect() throws SQLException {
        connection = DriverManager.getConnection(dbUrl);
        log.info("Connected to local SQLite database: {}", dbUrl);
        initializeSchema();
    }
    
    // A production system would use a migration tool. For this example, we ensure the table exists.
    private void initializeSchema() {
        String sql = """
            CREATE TABLE IF NOT EXISTS articles (
                guid TEXT PRIMARY KEY,
                feed_url TEXT NOT NULL,
                title TEXT,
                content TEXT,
                published_at INTEGER NOT NULL,
                status TEXT NOT NULL CHECK(status IN ('NEW', 'SYNCED', 'FAILED')),
                created_at INTEGER DEFAULT (strftime('%s','now'))
            );
        """;
        try (Statement stmt = connection.createStatement()) {
            stmt.execute(sql);
            stmt.execute("CREATE INDEX IF NOT EXISTS idx_articles_status ON articles (status);");
        } catch (SQLException e) {
            log.error("Failed to initialize SQLite schema", e);
            throw new RuntimeException(e);
        }
    }

    public boolean articleExists(String guid) throws SQLException {
        String sql = "SELECT 1 FROM articles WHERE guid = ?";
        try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
            pstmt.setString(1, guid);
            try (ResultSet rs = pstmt.executeQuery()) {
                return rs.next();
            }
        }
    }
    
    // This operation must be transactional.
    public int saveNewArticles(List<Article> articles) {
        String sql = "INSERT OR IGNORE INTO articles(guid, feed_url, title, content, published_at, status) VALUES(?, ?, ?, ?, ?, ?)";
        int insertedCount = 0;
        try {
            connection.setAutoCommit(false);
            try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
                for (Article article : articles) {
                    pstmt.setString(1, article.guid());
                    pstmt.setString(2, article.feedUrl());
                    pstmt.setString(3, article.title());
                    pstmt.setString(4, article.content());
                    pstmt.setLong(5, article.publishedAt());
                    pstmt.setString(6, Status.NEW.name());
                    pstmt.addBatch();
                }
                int[] results = pstmt.executeBatch();
                for (int result : results) {
                    if (result > 0) insertedCount++;
                }
            }
            connection.commit();
        } catch (SQLException e) {
            log.error("Transaction failed during article save. Rolling back.", e);
            try {
                connection.rollback();
            } catch (SQLException ex) {
                log.error("Failed to rollback transaction.", ex);
            }
            return 0;
        } finally {
            try {
                connection.setAutoCommit(true);
            } catch (SQLException e) {
                log.error("Failed to reset auto-commit.", e);
            }
        }
        return insertedCount;
    }
    
    public List<Article> getUnsyncedArticles(int limit) {
        String sql = "SELECT guid, feed_url, title, content, published_at FROM articles WHERE status = 'NEW' LIMIT ?";
        List<Article> articles = new ArrayList<>();
        try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
            pstmt.setInt(1, limit);
            ResultSet rs = pstmt.executeQuery();
            while (rs.next()) {
                articles.add(new Article(
                    rs.getString("guid"),
                    rs.getString("feed_url"),
                    rs.getString("title"),
                    rs.getString("content"),
                    rs.getLong("published_at"),
                    Status.NEW
                ));
            }
        } catch (SQLException e) {
            log.error("Failed to fetch unsynced articles", e);
        }
        return articles;
    }

    public void updateArticleStatus(List<String> guids, Status newStatus) {
        if (guids.isEmpty()) return;
        String sql = "UPDATE articles SET status = ? WHERE guid IN (" +
                     String.join(",", java.util.Collections.nCopies(guids.size(), "?")) + ")";
        
        try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
            pstmt.setString(1, newStatus.name());
            for (int i = 0; i < guids.size(); i++) {
                pstmt.setString(i + 2, guids.get(i));
            }
            int updatedRows = pstmt.executeUpdate();
            log.debug("Updated status to {} for {} articles.", newStatus, updatedRows);
        } catch (SQLException e) {
            log.error("Failed to update article status for {} articles.", guids.size(), e);
        }
    }

    @Override
    public void close() throws SQLException {
        if (connection != null && !connection.isClosed()) {
            connection.close();
            log.info("SQLite connection closed.");
        }
    }
}

The critical parts here are the transactional batch insert (saveNewArticles) and the status tracking (status column). The agent fetches feeds using the Rome library, checks for existing GUIDs in SQLite to avoid re-processing, and saves new entries with a NEW status. This local buffer is the key to the agent’s resilience.

Tier 2: The Core - Cassandra for Scalable, Central Archival

The CentralSyncer is responsible for moving data from the local SQLite store to the central Cassandra cluster. This involves careful data modeling in Cassandra to ensure writes are distributed evenly and queries will be efficient.

A common pitfall in Cassandra data modeling is creating hot partitions. If we were to use feed_url as the sole partition key, a very active feed could overload a single node. A better approach is to compound the partition key with a time bucket, such as year and month. This ensures data is spread across the cluster by both feed and time.

Here is the Cassandra schema (CQL):

CREATE KEYSPACE IF NOT EXISTS feed_archive WITH replication = {
  'class': 'NetworkTopologyStrategy',
  'datacenter1': '3' 
};

USE feed_archive;

CREATE TABLE IF NOT EXISTS articles (
    feed_url TEXT,
    year_month TEXT, -- e.g., '2023-10'
    published_at TIMESTAMP,
    guid TEXT,
    title TEXT,
    content TEXT,
    ingested_at TIMESTAMP,
    agent_id TEXT,
    PRIMARY KEY ((feed_url, year_month), published_at, guid)
) WITH CLUSTERING ORDER BY (published_at DESC, guid ASC);

The partition key is (feed_url, year_month), distributing the load. The clustering keys (published_at, guid) ensure that articles within a partition are sorted by publication date, which is a common query pattern.

The CentralSyncer implementation uses the DataStax Java Driver. It’s crucial to manage the CqlSession as a singleton for the application’s lifetime.

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

public class CentralSyncer implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(CentralSyncer.class);
    private final CqlSession session;
    private final PreparedStatement insertStatement;
    private final String agentId;

    private static final DateTimeFormatter YEAR_MONTH_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM").withZone(ZoneOffset.UTC);

    public CentralSyncer(String contactPoint, int port, String localDc, String agentId) {
        this.session = CqlSession.builder()
            .addContactPoint(new InetSocketAddress(contactPoint, port))
            .withLocalDatacenter(localDc)
            .build();
        this.agentId = agentId;
        
        // Pre-compiling statements is a major performance best practice for Cassandra drivers
        this.insertStatement = session.prepare(
            "INSERT INTO feed_archive.articles (feed_url, year_month, published_at, guid, title, content, ingested_at, agent_id) " +
            "VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
        );
        log.info("Cassandra session initialized for agent {}", agentId);
    }

    public CompletionStage<Void> syncArticles(List<LocalStore.Article> articles) {
        if (articles.isEmpty()) {
            return java.util.concurrent.CompletableFuture.completedFuture(null);
        }

        // Using an UNLOGGED batch for performance, as we're typically writing to a single partition key per agent run.
        // If writes spanned many partitions, individual async queries would be better.
        BatchStatementBuilder batchBuilder = BatchStatement.builder(DefaultBatchType.UNLOGGED);
        for (LocalStore.Article article : articles) {
            Instant publishedInstant = Instant.ofEpochMilli(article.publishedAt());
            String yearMonth = YEAR_MONTH_FORMATTER.format(publishedInstant);

            batchBuilder.addStatement(insertStatement.bind(
                article.feedUrl(),
                yearMonth,
                publishedInstant,
                article.guid(),
                article.title(),
                article.content(),
                Instant.now(),
                this.agentId
            ));
        }

        CompletionStage<AsyncResultSet> future = session.executeAsync(batchBuilder.build());
        
        return future.thenAccept(rs -> {
            log.info("Successfully synced a batch of {} articles to Cassandra.", articles.size());
        }).exceptionally(throwable -> {
            log.error("Failed to sync batch of {} articles to Cassandra.", articles.size(), throwable);
            // The caller is responsible for not updating the local status in this case.
            return null;
        });
    }

    @Override
    public void close() {
        if (session != null && !session.isClosed()) {
            session.close();
            log.info("Cassandra session closed.");
        }
    }
}

The syncArticles method takes a batch of articles from SQLite and constructs an UNLOGGED batch statement for Cassandra. This is a critical performance choice. LOGGED batches guarantee atomicity but come with a significant coordination overhead. Since our sync process is idempotent (we can re-run it if it fails without ill effect), an UNLOGGED batch is far more performant for bulk loading. The operation is asynchronous, returning a CompletionStage, allowing the agent to continue its work without blocking on network I/O.

Observability: Centralized Logging with Loki

With potentially thousands of agents running, isolated log files are useless. Centralized logging is not a luxury; it’s a requirement. Loki is an excellent fit due to its cost-effective design. It indexes metadata (labels) rather than the full text of log lines. For our agent, useful labels would be agent_id, hostname, and region.

We configure this using a logback.xml file.

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="LOKI" class="com.github.loki4j.logback.Loki4jAppender">
        <url>http://loki-server:3100/loki/api/v1/push</url>
        <format>
            <label>
                <pattern>app=feed-agent,agent_id=${agent.id:-unknown},hostname=${HOSTNAME}</pattern>
                <!-- This allows MDC key 'feedUrl' to become a Loki label -->
                <readMdcValues>feedUrl</readMdcValues> 
            </label>
            <message>
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n%ex</pattern>
            </message>
        </format>
    </appender>

    <root level="INFO">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="LOKI" />
    </root>
</configuration>

To use this configuration, we must set the agent.id system property on startup (e.g., -Dagent.id=agent-007). The real power comes from using SLF4J’s Mapped Diagnostic Context (MDC) to add contextual labels to logs.

import com.rometools.rome.feed.synd.SyndEntry;
import com.rometools.rome.feed.synd.SyndFeed;
import com.rometools.rome.io.SyndFeedInput;
import com.rometools.rome.io.XmlReader;
import org.slf4j.MDC;

import java.net.URL;
import java.util.List;
import java.util.stream.Collectors;

public class FeedFetcher {
    // ...
    public List<LocalStore.Article> fetch(String feedUrl) {
        // The try-with-resources statement ensures MDC is cleaned up automatically.
        try (MDC.MDCCloseable closeable = MDC.putCloseable("feedUrl", feedUrl)) {
            log.info("Starting fetch for feed.");
            // ... Rome parsing logic ...
            SyndFeed feed = new SyndFeedInput().build(new XmlReader(new URL(feedUrl)));
            
            // Map SyndEntry to our Article DTO
            List<LocalStore.Article> articles = feed.getEntries().stream()
                .map(entry -> toArticle(entry, feedUrl))
                .collect(Collectors.toList());

            log.info("Fetched {} entries.", articles.size());
            return articles;
        } catch (Exception e) {
            log.error("Failed to fetch or parse feed.", e);
            return List.of();
        }
    }
    // ...
}

With this setup, a log message from this method will automatically be tagged in Loki with app=feed-agent, agent_id=agent-007, and feedUrl=http://example.com/rss. This enables powerful operational queries in Grafana like sum(rate({app="feed-agent"} |~ "Failed to fetch" [5m])) by (feedUrl), immediately identifying problematic feeds across the entire fleet of agents.

Orchestrating the Agent’s Lifecycle

The final piece is the AgentOrchestrator, which ties everything together. It uses a ScheduledExecutorService to run the fetch-and-store cycle and the store-and-sync cycle at different intervals. This separation is crucial. Fetching might happen every 5 minutes, while syncing to Cassandra might happen every 30 seconds to create smaller, more manageable batches.

import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AgentOrchestrator {
    private static final Logger log = LoggerFactory.getLogger(AgentOrchestrator.class);
    private final String agentId;
    private final LocalStore localStore;
    private final CentralSyncer centralSyncer;
    private final FeedFetcher feedFetcher;
    private final List<String> feedsToProcess;
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

    public AgentOrchestrator(String agentId, List<String> feeds, CentralSyncer syncer, LocalStore store) {
        this.agentId = agentId;
        this.feedsToProcess = feeds;
        this.centralSyncer = syncer;
        this.localStore = store;
        this.feedFetcher = new FeedFetcher(); // Assuming a default constructor
    }

    public void start() {
        log.info("Starting agent orchestrator with ID: {}", agentId);
        // Task 1: Fetch feeds and save to local store
        executorService.scheduleAtFixedRate(this::fetchAndStore, 0, 5, TimeUnit.MINUTES);
        // Task 2: Sync from local store to central Cassandra
        executorService.scheduleAtFixedRate(this::syncToCentral, 30, 30, TimeUnit.SECONDS);
    }
    
    private void fetchAndStore() {
        log.info("Running fetch-and-store cycle.");
        for (String feedUrl : feedsToProcess) {
            try {
                List<LocalStore.Article> newArticles = feedFetcher.fetch(feedUrl).stream()
                    .filter(article -> {
                        try {
                            return !localStore.articleExists(article.guid());
                        } catch (Exception e) {
                            log.error("Error checking existence for guid {}", article.guid(), e);
                            return false;
                        }
                    })
                    .toList();
                
                if (!newArticles.isEmpty()) {
                    int savedCount = localStore.saveNewArticles(newArticles);
                    log.info("Saved {} new articles from {}", savedCount, feedUrl);
                }
            } catch (Exception e) {
                log.error("Unhandled exception in fetch-and-store loop for {}", feedUrl, e);
            }
        }
    }

    private void syncToCentral() {
        log.debug("Running sync-to-central cycle.");
        List<LocalStore.Article> unsynced = localStore.getUnsyncedArticles(100); // Process in batches of 100
        if (unsynced.isEmpty()) {
            return;
        }

        log.info("Found {} articles to sync to Cassandra.", unsynced.size());
        centralSyncer.syncArticles(unsynced)
            .thenAccept(v -> {
                // This block executes only on successful sync
                List<String> syncedGuids = unsynced.stream().map(LocalStore.Article::guid).toList();
                localStore.updateArticleStatus(syncedGuids, LocalStore.Status.SYNCED);
                log.info("Successfully marked {} articles as SYNCED in local store.", syncedGuids.size());
            })
            .exceptionally(ex -> {
                log.error("Sync process failed. Local article status will not be updated. Will retry on next cycle.", ex);
                // Optionally, update status to FAILED after a certain number of retries
                return null;
            });
    }

    public void stop() throws InterruptedException {
        executorService.shutdown();
        executorService.awaitTermination(30, TimeUnit.SECONDS);
        log.info("Agent orchestrator {} stopped.", agentId);
    }

    public static void main(String[] args) throws Exception {
        String agentId = System.getProperty("agent.id", UUID.randomUUID().toString());
        
        try (
            LocalStore localStore = new LocalStore(agentId);
            CentralSyncer centralSyncer = new CentralSyncer("127.0.0.1", 9042, "datacenter1", agentId)
        ) {
            localStore.connect();
            List<String> feeds = List.of("http://rss.cnn.com/rss/cnn_topstories.rss", "http://feeds.bbci.co.uk/news/rss.xml");
            
            AgentOrchestrator orchestrator = new AgentOrchestrator(agentId, feeds, centralSyncer, localStore);
            orchestrator.start();

            // Keep agent running
            Thread.currentThread().join();
        }
    }
}

The design’s resilience is evident in the syncToCentral method. If the syncArticles future completes exceptionally (i.e., Cassandra is down or there’s a network error), the thenAccept block is skipped. The articles remain in SQLite with the NEW status. The next time the sync cycle runs, it will pick them up again and retry. This simple, robust retry mechanism is a direct benefit of the tiered storage architecture.

Limitations and Future Iterations

This architecture, while robust, is not without its own set of challenges that would need to be addressed in a full-scale production deployment. The local SQLite database on each agent will grow indefinitely; a data retention policy must be implemented to periodically purge old, successfully synced records.

Furthermore, the agent’s configuration, such as the list of feeds to process, is currently hardcoded. A production system would require a dynamic configuration mechanism, perhaps pulling assignments from a central service or a distributed coordinator like ZooKeeper or Consul. This would also enable rebalancing of feeds across agents if one fails.

Finally, while the asynchronous batching to Cassandra is efficient, at extreme scale, it could still overwhelm the central cluster if thousands of agents try to sync simultaneously after a network partition heals. A more advanced implementation might introduce jitter to the sync schedule or even place a proper message queue like Kafka between the agents and the final datastore to smooth out the write load, though this adds significant operational complexity. The current design prioritizes agent simplicity and autonomy over perfect load distribution.


  TOC