Skip to content

Latest commit

 

History

History
600 lines (444 loc) · 15.2 KB

File metadata and controls

600 lines (444 loc) · 15.2 KB

DuckDB Engine

Complete guide to DuckDB usage in Python AMR, covering query optimization, data loading strategies, performance characteristics, and best practices.


Table of Contents


Why DuckDB

Design Decision

Python AMR uses DuckDB for analytical storage and queries on run output data.

Key Benefits

  1. Columnar Storage - 10-100x faster than row-based for analytics
  2. In-Process - No separate server process required
  3. SQL Interface - Familiar query language
  4. Parquet Integration - Native read/write support
  5. Vectorized Execution - SIMD-accelerated operations
  6. Parallel Query - Automatic multi-threading
  7. Larger-than-RAM - Efficient external merge algorithms

Use Cases in AMR

  • Run output storage - Store large result sets (100k+ rows)
  • Analytical queries - Filter, aggregate, join run data
  • Batch exports - Convert runs to Parquet for archival
  • Ad-hoc analysis - Interactive queries on historical runs

Architecture Integration

Dual Database Design

┌──────────────────────────────────────────────────────┐
│              Application Layer                       │
└──────────────────────────────────────────────────────┘
              │                       │
              ▼                       ▼
┌─────────────────────────┐  ┌──────────────────────┐
│   SQLite/Postgres       │  │      DuckDB          │
│   (Metadata)            │  │   (Output Rows)      │
├─────────────────────────┤  ├──────────────────────┤
│ • runs table            │  │ • run_outputs table  │
│ • run_metadata table    │  │ • Columnar storage   │
│ • dead_letters table    │  │ • Analytical queries │
│ • ACID transactions     │  │ • Aggregations       │
│ • Small row counts      │  │ • Large row counts   │
└─────────────────────────┘  └──────────────────────┘

Separation Rationale

Aspect SQLite/Postgres DuckDB
Data Type Metadata, run info Output rows, results
Row Counts Low (1M runs) High (100M+ output rows)
Access Pattern Transactional, random Analytical, sequential
Query Type Point lookups, updates Aggregations, scans
Concurrency Multi-writer Single-writer
ACID Full support Limited (auto-commit)

File Locations

# Default locations
data/runs.db              # SQLite metadata
data/runs_duckdb.db       # DuckDB outputs

# Production (environment variables)
export AMR_METADATA_DB_URL="postgresql+asyncpg://user:pass@host/db"
export AMR_DUCKDB_PATH="/var/lib/amr/runs_duckdb.db"

Query Optimization

Vectorized Execution

DuckDB uses SIMD vectorization for fast operations:

import duckdb

conn = duckdb.connect("runs_duckdb.db")

# This query is vectorized automatically
result = conn.execute("""
    SELECT
        run_id,
        COUNT(*) as row_count,
        AVG(CASE WHEN sir_result = 'R' THEN 1.0 ELSE 0.0 END) as resistance_pct
    FROM run_outputs
    WHERE run_id BETWEEN 1000 AND 2000
    GROUP BY run_id
    HAVING COUNT(*) >= 30
""").fetchall()

Optimization: DuckDB processes data in batches of ~2048 rows with SIMD instructions.

Predicate Pushdown

DuckDB pushes filters down to storage layer:

# Efficient: Filter before scan
conn.execute("""
    SELECT * FROM run_outputs
    WHERE run_id = 123
    AND created_at > '2026-01-01'
""")

# Less efficient: Filter after full scan
conn.execute("""
    SELECT * FROM (SELECT * FROM run_outputs) t
    WHERE run_id = 123
""")

Projection Pushdown

Only read columns you need:

# Efficient: Only read 2 columns
conn.execute("SELECT run_id, sir_result FROM run_outputs")

# Inefficient: Read all columns
conn.execute("SELECT * FROM run_outputs")

Impact: 10x faster for wide tables when selecting few columns.

Join Optimization

# Efficient: Join on indexed columns
conn.execute("""
    SELECT o.*, m.metadata_value
    FROM run_outputs o
    JOIN run_metadata m ON o.run_id = m.run_id
    WHERE o.run_id IN (100, 200, 300)
""")

Best Practice: Add indexes on join columns for faster lookups.


Data Loading Strategies

Bulk Insert from Polars

import polars as pl
import duckdb

# Create Polars DataFrame
df = pl.DataFrame({
    "run_id": [1, 1, 2, 2],
    "row_index": [0, 1, 0, 1],
    "input_value": ["S", "R", "I", "S"],
    "sir_result": ["S", "R", "I", "S"]
})

# Insert into DuckDB (zero-copy via Arrow)
conn = duckdb.connect("runs_duckdb.db")
conn.execute("CREATE TABLE IF NOT EXISTS run_outputs AS SELECT * FROM df WHERE 1=0")
conn.execute("INSERT INTO run_outputs SELECT * FROM df")

Performance: 1M rows/second for simple schemas.

Batch Loading

def save_run_outputs_batch(conn: duckdb.DuckDBPyConnection, outputs: list[dict]):
    """Save run outputs in batches for efficiency."""
    df = pl.DataFrame(outputs)

    # Use COPY for faster loading
    with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp:
        df.write_parquet(tmp.name)
        conn.execute(f"""
            INSERT INTO run_outputs
            SELECT * FROM read_parquet('{tmp.name}')
        """)
        os.unlink(tmp.name)

Best Practice: Batch inserts in groups of 10k-100k rows.

Parquet Import

# Direct Parquet load (fastest)
conn.execute("""
    CREATE TABLE run_outputs AS
    SELECT * FROM read_parquet('data/run_outputs_*.parquet')
""")

Performance: 10M rows in ~2 seconds from Parquet.


Performance Characteristics

Benchmarks

Operation Rows Time Throughput
Bulk insert (Polars) 1M 1.2s 833k/s
Bulk insert (Parquet) 1M 0.5s 2M/s
Scan (full table) 10M 0.8s 12.5M/s
Scan (filtered) 10M 0.3s 33M/s
Aggregation (GROUP BY) 10M 1.5s 6.7M/s
Join (two tables) 1M each 0.4s 2.5M/s

Hardware: 8-core CPU, 16GB RAM, NVMe SSD

Comparison with PostgreSQL

Query Type PostgreSQL DuckDB Speedup
Full scan 12s 0.8s 15x
Filtered scan 5s 0.3s 17x
Aggregation 8s 1.5s 5x
Join 3s 0.4s 7.5x

Test: 10M row table, same hardware

Memory Usage

# Query memory usage
conn.execute("PRAGMA memory_limit='4GB'")
conn.execute("PRAGMA threads=4")

result = conn.execute("PRAGMA database_size").fetchone()
print(f"Database size: {result[0]} bytes")

Typical usage:

  • 1M output rows ≈ 50MB on disk (compressed)
  • 10M output rows ≈ 500MB on disk
  • Query memory: 2-4x dataset size (worst case)

Storage Format

Table Schema

CREATE TABLE run_outputs (
    run_id INTEGER NOT NULL,
    row_index INTEGER NOT NULL,
    input_value VARCHAR,
    sir_result VARCHAR,
    metadata JSON,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (run_id, row_index)
);

CREATE INDEX idx_run_outputs_run_id ON run_outputs(run_id);
CREATE INDEX idx_run_outputs_created_at ON run_outputs(created_at);

Columnar Layout

DuckDB stores each column separately:

run_outputs.db
├── run_id.column       (integers, compressed)
├── row_index.column    (integers, compressed)
├── input_value.column  (strings, dictionary-encoded)
├── sir_result.column   (strings, dictionary-encoded)
├── metadata.column     (JSON, compressed)
└── created_at.column   (timestamps, delta-encoded)

Compression

DuckDB automatically applies compression:

  • Integers: Frame-of-reference + bitpacking
  • Strings: Dictionary encoding
  • Floats: Custom compression
  • JSON: ZSTD compression

Result: 5-10x compression ratio for typical AMR data.


Query Patterns

Run Output Retrieval

def get_run_outputs(
    conn: duckdb.DuckDBPyConnection,
    run_id: int,
    limit: int = 1000,
    offset: int = 0
) -> pl.DataFrame:
    """Fetch run outputs with pagination."""
    result = conn.execute("""
        SELECT *
        FROM run_outputs
        WHERE run_id = ?
        ORDER BY row_index
        LIMIT ? OFFSET ?
    """, [run_id, limit, offset])

    # Convert to Polars (zero-copy via Arrow)
    return pl.from_arrow(result.fetch_arrow_table())

Aggregation Query

def get_resistance_summary(
    conn: duckdb.DuckDBPyConnection,
    start_date: str,
    end_date: str
) -> pl.DataFrame:
    """Calculate resistance percentages by run."""
    result = conn.execute("""
        SELECT
            run_id,
            COUNT(*) as total_isolates,
            SUM(CASE WHEN sir_result = 'R' THEN 1 ELSE 0 END) as resistant,
            ROUND(100.0 * SUM(CASE WHEN sir_result = 'R' THEN 1 ELSE 0 END) / COUNT(*), 2) as pct_resistant
        FROM run_outputs
        WHERE created_at BETWEEN ? AND ?
        GROUP BY run_id
        ORDER BY created_at DESC
    """, [start_date, end_date])

    return pl.from_arrow(result.fetch_arrow_table())

Time-series Query

def get_daily_resistance_trend(
    conn: duckdb.DuckDBPyConnection,
    days: int = 30
) -> pl.DataFrame:
    """Get daily resistance trends."""
    result = conn.execute("""
        SELECT
            DATE_TRUNC('day', created_at) as date,
            COUNT(*) as total,
            SUM(CASE WHEN sir_result = 'R' THEN 1 ELSE 0 END) as resistant
        FROM run_outputs
        WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL ? DAY
        GROUP BY DATE_TRUNC('day', created_at)
        ORDER BY date
    """, [days])

    return pl.from_arrow(result.fetch_arrow_table())

Memory Management

Configuration

conn = duckdb.connect("runs_duckdb.db")

# Set memory limit (default: 80% of system RAM)
conn.execute("SET memory_limit='4GB'")

# Set thread count (default: CPU cores)
conn.execute("SET threads=4")

# Enable temporary directory for spills
conn.execute("SET temp_directory='/tmp/duckdb'")

Out-of-Core Processing

DuckDB handles larger-than-RAM datasets:

# Query 100GB dataset with 16GB RAM
result = conn.execute("""
    SELECT organism, AVG(resistance_pct)
    FROM read_parquet('data/historical/*.parquet')
    GROUP BY organism
""").fetchall()

How it works:

  1. Processes data in chunks
  2. Spills intermediate results to disk
  3. Uses external merge algorithms
  4. Automatically parallelizes

Concurrency Model

Single-Writer Limitation

DuckDB constraint: Only one write transaction at a time.

Workaround: Use async queue to serialize writes:

import asyncio

write_queue = asyncio.Queue()

async def writer_worker():
    """Single writer worker."""
    conn = duckdb.connect("runs_duckdb.db")
    while True:
        write_op = await write_queue.get()
        try:
            conn.execute(write_op["sql"], write_op["params"])
        finally:
            write_queue.task_done()

# Start worker
asyncio.create_task(writer_worker())

# Enqueue writes
await write_queue.put({
    "sql": "INSERT INTO run_outputs VALUES (?, ?, ?, ?)",
    "params": [1, 0, "S", "S"]
})

Multi-Reader Support

DuckDB supports concurrent reads:

# Multiple connections can read simultaneously
conn1 = duckdb.connect("runs_duckdb.db", read_only=True)
conn2 = duckdb.connect("runs_duckdb.db", read_only=True)

# Both queries run in parallel
result1 = conn1.execute("SELECT COUNT(*) FROM run_outputs")
result2 = conn2.execute("SELECT AVG(resistance_pct) FROM run_outputs")

Monitoring and Tuning

Query Profiling

# Enable profiling
conn.execute("PRAGMA enable_profiling='query_tree'")

# Run query
conn.execute("SELECT * FROM run_outputs WHERE run_id = 123")

# Get profile
profile = conn.execute("PRAGMA last_profiling_output").fetchone()
print(profile)

Output:

┌────────────────┬─────────┬─────────┐
│ Operator       │ Rows    │ Time    │
├────────────────┼─────────┼─────────┤
│ SEQ_SCAN       │ 1000    │ 0.002s  │
│ FILTER         │ 1000    │ 0.001s  │
│ PROJECTION     │ 1000    │ 0.001s  │
└────────────────┴─────────┴─────────┘

EXPLAIN Analysis

explain = conn.execute("""
    EXPLAIN SELECT run_id, COUNT(*)
    FROM run_outputs
    GROUP BY run_id
""").fetchall()

print(explain)

Performance Tuning

# Increase memory for better performance
conn.execute("SET memory_limit='16GB'")

# Use more threads for parallelism
conn.execute("SET threads=16")

# Disable progress bar for batch jobs
conn.execute("SET enable_progress_bar=false")

# Optimize for specific workload
conn.execute("PRAGMA optimize")

Limitations and Workarounds

1. Single-Writer Constraint

Limitation: Only one write transaction at a time

Workaround: Serialize writes via async queue (see Concurrency Model)

2. No Native Async API

Limitation: DuckDB Python API is synchronous

Workaround: Use asyncio.to_thread():

import asyncio

async def async_query(conn, sql):
    """Run DuckDB query in thread pool."""
    return await asyncio.to_thread(conn.execute, sql)

3. Limited Transaction Support

Limitation: Auto-commit mode by default

Workaround: Use explicit transactions:

conn.execute("BEGIN TRANSACTION")
try:
    conn.execute("INSERT INTO run_outputs VALUES (...)")
    conn.execute("INSERT INTO run_outputs VALUES (...)")
    conn.execute("COMMIT")
except Exception:
    conn.execute("ROLLBACK")
    raise

4. File Size Growth

Limitation: File grows with data, doesn't shrink on DELETE

Workaround: Periodic VACUUM:

# Compact database file
conn.execute("VACUUM")

5. No Replication

Limitation: No built-in replication

Workaround: Use file-level replication or export to Parquet for archival.


Related Documentation


Last Updated: 2026-02-15