Skip to content

Decoupling LLM Extraction: Building an Async Ingestion Queue in SQLite

Updated: at 07:00 AM

In the previous post, we built a hybrid retrieval system that fuses BFS graph traversal with SQLite FTS5 BM25 scoring to hit a 92.3% recall rate. Retrieval is fast, low single-digit milliseconds. But getting knowledge into the graph in the first place is a different story.

Every time an AI agent finishes a session and tries to ingest a Markdown handoff document, the pipeline runs: chunk the document, call an LLM to extract triples, embed each triple, insert into SQLite. For a 5 KB handoff file, that can take anywhere from 3 to 30 seconds depending on the model and hardware.

If the agent blocks on this at session end, the user is left staring at a spinner. If you multiply that across a multi-agent system, your orchestrator grinds to a halt.

The fix is to decouple write acceptance from write materialization.

Why Synchronous LLM Extraction is an Anti-Pattern

The naive implementation of an ingestion pipeline looks like this:

user calls membox ingest <file>
  → chunk document (fast, <10ms)
  → call LLM extractor for each chunk (slow, 500ms–5s per chunk)
  → embed each entity and relation (slow, 200ms–1s per entity)
  → INSERT into SQLite (fast, <5ms)
→ return

For a document with 10 chunks, that’s potentially a 50-second blocking call. For an AI agent responding to a user mid-conversation, that is completely unacceptable. The user shouldn’t wait for knowledge graph materialization. They want to keep working.

The pattern we needed: accept the raw document in under 100 ms, return a queue ID, and let the expensive LLM work happen in the background.

The Queue Schema: SQLite as a Message Broker

We didn’t add Redis. We didn’t add RabbitMQ. We added one table to the same SQLite file we were already using.

Migration 0004 in membox creates the ingest_queue table:

CREATE TABLE ingest_queue (
    id          INTEGER PRIMARY KEY,
    content     TEXT    NOT NULL,       -- raw document text
    project     TEXT,                   -- captured at enqueue time
    source_path TEXT,
    doc_date    TEXT,
    status      TEXT    NOT NULL DEFAULT 'pending',
                        -- pending | processing | done | failed
    retries     INTEGER NOT NULL DEFAULT 0,
    error       TEXT,
    enqueued_at TEXT    NOT NULL DEFAULT (datetime('now')),
    started_at  TEXT,
    finished_at TEXT
);

The critical insight: all chunking, LLM extraction, and embedding happens in the worker, not at enqueue time. The enqueue path does exactly one thing: insert the raw text.

Here is the entire fast path in QueueOps.enqueue_ingest:

def enqueue_ingest(self, content: str, *, project=None, source_path=None, doc_date=None) -> int:
    with self._cm.transaction() as c:
        cur = c.execute(
            "INSERT INTO ingest_queue (content, project, source_path, doc_date) VALUES (?, ?, ?, ?)",
            (content, project, source_path, doc_date),
        )
        return int(cur.lastrowid)

One INSERT, no LLM calls, and it returns in under 5 ms.

The Single-Worker Lease: No Daemon, No Coordinator

Without coordination, a flood of membox ingest calls would spawn a flood of workers, each racing to process the same queue. The fix is a lease, a record in the existing meta table that encodes which process currently owns the worker role.

{
  "pid": 12345,
  "hostname": "roys-macbook",
  "heartbeat": "2026-06-14T10:32:01Z"
}

The lease logic in acquire_worker_lease covers three cases:

  1. No lease exists. Take it, write your own PID and timestamp.
  2. A fresh lease exists (heartbeat < 60s ago) owned by another process. Return False, do not spawn.
  3. The lease exists but has expired (heartbeat ≥ 60s ago). Take it over. This is the crash recovery case.
def acquire_worker_lease(self, ttl: float = 60.0) -> bool:
    with self._cm.transaction() as c:
        row = c.execute("SELECT value FROM meta WHERE key = 'worker_lease';").fetchone()
        if row is not None:
            lease = parse_lease(str(row[0]))
            if lease is not None and lease_is_live(lease, ttl) and not lease_is_mine(lease):
                return False  # another live worker holds it
            # Expired or our own: take over and reset any stale rows
            c.execute(
                "UPDATE ingest_queue SET status = 'pending', started_at = NULL "
                "WHERE status = 'processing';"
            )
        c.execute(
            "INSERT OR REPLACE INTO meta(key, value) VALUES ('worker_lease', ?);",
            (render_lease(),),
        )
        return True

The crash recovery line is key: if a worker dies mid-item, its row is stuck in 'processing'. The next worker to take over the expired lease resets those rows to 'pending' before starting its drain loop. No manual intervention required.

The Worker: Transient, Not Permanent

The worker is not a daemon. It does not sit idle waiting for work. It spawns, drains whatever is in the queue, and exits.

def drain_queue(agent: MemoryAgent, *, retry_failed=False) -> dict[str, int]:
    store = agent.store
    if not store.acquire_worker_lease():
        return {"done": 0, "failed": 0, "retried": 0}  # another worker is running

    done = failed = 0
    try:
        while True:
            item = store.claim_next_pending()  # atomic UPDATE ... RETURNING
            if item is None:
                break  # queue empty, exit
            try:
                agent.ingest_content(
                    item["content"],
                    source=item["source_path"] or "",
                    project=item["project"],
                    source_path=item["source_path"],
                    doc_date=item["doc_date"],
                )
                store.mark_done(item["id"])
                done += 1
            except Exception as exc:
                store.mark_failed(item["id"], str(exc))
                failed += 1
            store.refresh_worker_lease()  # keep the heartbeat alive
    finally:
        store.release_worker_lease()  # clean exit

    return {"done": done, "failed": failed, "retried": 0}

claim_next_pending uses an atomic UPDATE ... WHERE status='pending' ... RETURNING to claim items without TOCTOU races. The lease heartbeat is refreshed after every item so long-running documents don’t cause lease expiry mid-drain.

The spawning side is equally simple:

def spawn_worker(db_path: str) -> bool:
    store = KnowledgeStore(db_path)
    if store.worker_is_alive():
        return False  # already running
    log_path = f"{db_path}.worker.log"
    with open(log_path, "a") as log:
        subprocess.Popen(
            [sys.executable, "-m", "membox.cli", "process", "--db", db_path],
            start_new_session=True,  # detach from parent process
            stdout=log,
            stderr=log,
        )
    return True

start_new_session=True detaches the subprocess from the enqueueing process’s session. The enqueue call returns in milliseconds. The worker runs independently in the background.

Observability: Knowing What’s In the Queue

The worst failure mode of any async system is silent staleness. You think the knowledge graph is up to date. It isn’t.

In membox, every membox query output appends a coverage footer:

(returned 45/120 triples, ~1,940/2,000 tokens; 3 ingests pending — results may be incomplete)

The “pending” note is never omitted when pending + processing > 0. Silent truncation is explicitly forbidden by the spec.

You can also inspect the queue directly:

$ membox queue
pending:    0
processing: 0
done:       12
failed:     1

Recent failures:
  #8  handoff.md  "LLM timeout after 30s"  retries=1  2026-06-14 10:31:00

Failed rows with retries < 3 can be reset with membox process --retry-failed. At retries == 3, a row is permanently failed until you manually intervene. The friction is intentional, to surface real problems rather than silently retry forever.

Tradeoffs and What This Design Gets Right

This pattern isn’t new: a SQLite table as a queue, a transient worker, a lease in a meta table. It’s a well-known technique for systems that don’t want to take on a separate message broker dependency. What makes it work here:

The queue survives crashes; pending items aren’t lost in memory. There’s no RabbitMQ container to keep running and no Redis to configure, because the queue lives in the same .db file as the knowledge graph. Crash recovery is automatic: an expired lease triggers processing → pending resets before the next drain starts. And eventual consistency is surfaced rather than hidden, because the query footer tells you when results may be stale. You’re never silently operating on an incomplete graph.

The tradeoff: this design assumes a single machine. The meta table lease is per-database, not distributed. If you want multiple machines hitting the same NFS-mounted SQLite file, you need a real distributed lock. For a local-first AI memory layer running on one developer’s machine, this is not a real constraint.

Up Next

With async ingestion in place, the full pipeline is: accept document in <100ms → worker materializes the graph in the background → hybrid BM25+graph retrieval answers queries against whatever is materialized so far, with honest staleness reporting.

But raw retrieval is only part of the story. Over time, the graph fills with redundant facts, stale decisions, and conflicting beliefs. In the next post, we look at how membox handles the full memory lifecycle: triaging raw traces into structured memory units, promoting the most-corroborated facts into immutable “crystals,” and automatically superseding stale knowledge when new sessions arrive.