Skip to content

feat(datafusion): DataFusion metrics query layer#6276

Merged
mattmkim merged 25 commits intoquickwit-oss:mainfrom
alexanderbianchi:bianchi/df-4-serve
Apr 24, 2026
Merged

feat(datafusion): DataFusion metrics query layer#6276
mattmkim merged 25 commits intoquickwit-oss:mainfrom
alexanderbianchi:bianchi/df-4-serve

Conversation

@alexanderbianchi
Copy link
Copy Markdown
Contributor

@alexanderbianchi alexanderbianchi commented Apr 7, 2026

Why this PR exists

Quickwit is growing a second query/storage shape alongside Tantivy:

  • logs and traces: document-oriented workloads on top of Tantivy
  • metrics: columnar, time-series workloads on top of Parquet

#6231 made Parquet metrics a real storage path. #6285 isolated that pipeline so it can evolve without bleeding across the rest of Quickwit. This PR adds the query-side half of that story: a query layer built on Apache DataFusion.

The immediate payoff: Quickwit can execute SQL and Substrait queries over OSS Parquet metrics.

The larger payoff is architectural: Quickwit gets a reusable query substrate with a real optimizer, Arrow-native execution, Substrait interop, and distributed planning hooks.

That bigger direction is deliberate. The same extension points introduced here are also the shape we want for future SQL-over-logs work, as explored in:

In other words: this PR is not just “metrics can be queried with DataFusion.” It is the first in-tree implementation of a more general idea:

Quickwit-specific storage engines stay specialized, while query planning and execution becomes shared.

Why DataFusion

Apache DataFusion is an extensible Rust query engine built on Arrow. It already provides the parts we do not want to reinvent:

  • SQL planning
  • logical and physical optimization
  • vectorized, streaming execution
  • Arrow-native interoperability
  • extensibility around catalogs, table providers, UDFs, optimizer rules, codecs, and Substrait

That makes it a strong fit for Quickwit.

Quickwit still owns the parts that are actually Quickwit-shaped:

  • index discovery and split enumeration
  • storage resolution
  • storage-engine-specific pushdown
  • worker selection and distributed execution across searchers

The boundary is intentional: DataFusion handles relational query execution; Quickwit handles how that execution maps onto Quickwit storage and cluster topology.

Core APIs introduced

QuickwitDataSource

This is the main extension point for plugging a Quickwit-native storage engine into DataFusion.

A source can contribute:

  • additive query/runtime state via contributions()
  • default table resolution via create_default_table_provider()
  • schema enumeration via list_index_names()
  • worker-side setup via register_for_worker()
  • optional Substrait ReadRel handling via try_consume_read_rel()

The important design point is that storage-specific logic stays behind this trait. The generic session, catalog, and worker code does not need to know whether it is serving Parquet metrics, Tantivy logs, or something else later.

DataSourceContributions

This is the additive container for source-provided DataFusion state.

Right now that includes:

  • optimizer rules
  • UDFs
  • codec appliers

The builder merges contributions from all registered sources and checks invariants up front so two sources cannot silently stomp on each other.

QuickwitSchemaProvider

This is the schema bridge between DataFusion and Quickwit.

Resolution order is:

  1. session-local DDL tables
  2. source-backed Quickwit indexes

That gives us normal DataFusion ergonomics for explicit DDL while still allowing Quickwit-native indexes to appear as tables without pre-registering each one manually.

DataFusionSessionBuilder

This is the coordinator-side assembly point.

It owns the shared runtime state and builds DataFusion sessions with:

  • merged source contributions
  • Quickwit schema/catalog wiring
  • distributed planning hooks when a worker resolver is present
  • shared object store/runtime state reused across sessions

QuickwitWorkerSessionBuilder and build_quickwit_worker

These are the worker-side mirrors of the coordinator builder.

They rebuild the same source/runtime shape on worker nodes so distributed physical plans can execute without inventing a separate query runtime just for workers.

DataFusionService

This is the execution entry point.

It exposes:

  • execute_sql
  • execute_substrait
  • execute_substrait_json

and streams results as Arrow RecordBatches. In OSS, this is wired over gRPC as Arrow IPC batches, which is a good low-level transport for both internal execution and future integrations.

QuickwitSubstraitConsumer

This is the glue for Substrait plans that need Quickwit-aware table resolution.

A source can claim a ReadRel, materialize a provider, and still fall back onto normal DataFusion planning behavior for filter and projection handling.

What is wired up in this PR today

This PR wires the first in-tree source: MetricsDataSource.

That includes:

  • MetricsDataSource implementing QuickwitDataSource
  • MetricsTableProvider backed by ParquetSource
  • metric index resolution through the metastore and storage resolver
  • DDL support via CREATE EXTERNAL TABLE ... STORED AS metrics
  • SQL execution
  • Substrait execution
  • distributed execution across workers and searchers
  • gRPC streaming of Arrow IPC record batches

On the Parquet side, the query path is not a toy wrapper. It is already doing real storage-aware work, including:

  • filter pushdown
  • bloom filter and page-index aware pruning
  • split-aware planning
  • metrics-specific predicate extraction for time-range pruning

Example shape

This is the sort of flow this PR enables for metrics today:

CREATE OR REPLACE EXTERNAL TABLE "cpu-prod" (
  metric_name VARCHAR NOT NULL,
  metric_type TINYINT,
  timestamp_secs BIGINT NOT NULL,
  value DOUBLE NOT NULL,
  service VARCHAR
) STORED AS metrics LOCATION 'cpu-prod';

SELECT service, AVG(value) AS avg_cpu
FROM "cpu-prod"
WHERE metric_name = 'cpu.usage'
  AND timestamp_secs BETWEEN 1710000000 AND 1710003600
GROUP BY service
ORDER BY service;

Scope of this PR

This PR is intentionally the first slice, not the final shape.

Current scope:

  • query layer only
  • metrics is the first in-tree source
  • disabled by default unless a datafusion_session_builder is configured
  • gRPC serving path for Arrow IPC batch streaming
  • no claim that all Quickwit queries should immediately move to DataFusion

The point is to land the reusable query substrate and prove it on the storage path that is naturally columnar first: Parquet metrics.

Test coverage

This PR includes coverage for the important end-to-end query behaviors already implemented, including:

  • SQL pruning
  • aggregation
  • time-range filtering
  • GROUP BY
  • distributed task planning without shuffles
  • NULL fill for missing Parquet columns
  • Substrait named-table queries
  • rollup from JSON Substrait plan

Some Quirks

  • metrics source ownership is name-based for now
  • DDL registration is session-local / per-query, not persistent
    catalog state
  • the endpoint is experimental and opt-in only

@alexanderbianchi alexanderbianchi changed the title feat(datafusion): wire DataFusion gRPC service + integration tests feat(datafusion): DataFusion metrics query layer Apr 7, 2026
@alexanderbianchi alexanderbianchi force-pushed the bianchi/df-4-serve branch 4 times, most recently from f57d0b1 to 07f8548 Compare April 8, 2026 20:51
@alexanderbianchi
Copy link
Copy Markdown
Contributor Author

@mattmkim left some comments on this draft PR that I closed #6270 , addressed in 07f8548

Copy link
Copy Markdown
Contributor

@mattmkim mattmkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disclaimer only really reviewed the metrics specific logic, not a DF expert. made comments here,

what would give us more confidence in merging this? should we feature flag the datafusion code/path, so you have to "opt-in" to compiling an image with it? we could also add an env var to actually enable the endpoint?

Comment thread quickwit/quickwit-serve/src/lib.rs Outdated
// gRPC handler. No downstream-specific code needed here.
let datafusion_session_builder = if node_config
.is_service_enabled(QuickwitService::Searcher)
&& quickwit_common::get_bool_from_env("QW_ENABLE_DATAFUSION_ENDPOINT", false)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattmkim the session is behind a env var already FYI. Open to other rec's for isolating.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, missed that 👍

@@ -0,0 +1,252 @@
// Copyright 2021-Present Datadog, Inc.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guilload would be good to have a quickwit expert review this part

@evance-br
Copy link
Copy Markdown

evance-br commented Apr 15, 2026

Hi @alexanderbianchi, Please can you give more context on this effort. Are you trying to query parquet files in DataFusion via Quickwit? How is this supposed to work for a user? How Quickwit end up with the parquet files being queried.

alexanderbianchi and others added 12 commits April 23, 2026 13:21
Introduces a generic DataFusion execution layer with a pluggable
QuickwitDataSource trait. No data-source-specific code.

- QuickwitDataSource trait + DataSourceContributions (contribution-return pattern)
- DataFusionSessionBuilder with shared RuntimeEnv, check_invariants
- QuickwitSchemaProvider backed by DataFusion MemorySchemaProvider for DDL tables
- QuickwitWorkerSessionBuilder + build_quickwit_worker for distributed execution
- QuickwitWorkerResolver, QuickwitTaskEstimator
- QuickwitObjectStore: quickwit_storage::Storage → object_store::ObjectStore bridge
- DataFusionService::execute_sql (streaming Arrow IPC responses)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Implements QuickwitDataSource for the parquet metrics pipeline from PR quickwit-oss#6237.

- MetricsDataSource: production (metastore-backed) and test (SimpleIndexResolver)
- MetricsTableProvider: filter pushdown with CAST-unwrapping fix for timestamp
- MetastoreSplitProvider: converts MetricsSplitQuery → ListMetricsSplitsQuery
- MetastoreIndexResolver: resolves index URI → QuickwitObjectStore per query
- MetricsSplitQuery + extract_split_filters: predicate extraction for split pruning
- MetricsTableProviderFactory: CREATE EXTERNAL TABLE … STORED AS metrics support
- test_utils: make_batch, TestSplitProvider, MetricsTestbed for integration tests

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Routes Substrait ReadRel nodes to registered QuickwitDataSource implementations.
Standard NamedTable reads resolve via MetricsDataSource::try_consume_read_rel.
ExtensionTable reads (custom protos) can be handled by downstream callers.

- QuickwitSubstraitConsumer implementing datafusion-substrait SubstraitConsumer
- execute_substrait_plan / execute_substrait_plan_streaming entry points
- DataFusionService::execute_substrait (bytes) and execute_substrait_json (dev path)
- session.rs: DataFusionSessionBuilder::execute_substrait convenience method

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…tegration tests

- Add datafusion.proto with DataFusionService (ExecuteSubstrait, ExecuteSql RPCs)
- Generate codegen and mod.rs for the new proto service
- Wire DataFusionService and WorkerService into quickwit-serve gRPC layer
- Add DataFusionServiceGrpcImpl handler
- Auto-create otel-metrics-v0_9 index on startup alongside logs/traces
- Add metrics_datafusion_tests: in-process SQL + Substrait over parquet splits
- Add metrics_distributed_tests: multi-node distributed execution
- Add rollup_substrait.json fixture for Substrait plan testing

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Remove extra schema argument from ParquetWriter::new; the API only accepts
a ParquetWriterConfig. Remove unused ParquetSchema import.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…test

- Remove erroneous ParquetSchema argument from ParquetWriter::new calls
  in integration tests (API takes only ParquetWriterConfig)
- Mark test_rest_ingest_then_in_process_query as #[ignore] until the
  /ingest-metrics REST endpoint is wired in quickwit-serve

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- fix: CAST unwrapping in classify_filter — reuse predicate::column_name
  so time-range predicates are correctly classified as Inexact and
  passed to scan(); previously CAST-wrapped filters were silently dropped
- fix: declare parquet sort order (metric_name, timestamp_secs ASC) on
  FileScanConfig so DataFusion avoids redundant sort operators
- fix: get_opts now respects GetOptions.range — dispatches to get_slice
  for Bounded/Suffix ranges instead of always downloading the full file
- fix: to_object_store_error propagates file path on NotFound
- fix: register_for_worker made a no-op; lazy scan-path registration is
  sufficient and avoids O(indexes) metastore RPCs per worker task;
  removes stale comment claiming a non-existent object-store cache
- fix: extract is_index_not_found helper, removing duplicated downcast
  block from try_consume_read_rel and create_default_table_provider
- fix: sort before dedup in QuickwitSchemaProvider::table_names
- fix: empty searcher pool returns Ok(vec![]) for local execution fallback
- fix: remove dead builder methods with_udf_batch, with_codec_applier,
  with_physical_optimizer_rule from DataSourceContributions
- feat: add tracing spans to execute_substrait and execute_sql
- feat: wire 4 GiB memory limit on DataFusionSessionBuilder in serve
- refactor: extract stream_to_receiver helper in gRPC handler

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Comment thread quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs Outdated
Comment thread quickwit/quickwit-datafusion/tests/common/mod.rs Outdated
@mattmkim mattmkim enabled auto-merge (squash) April 24, 2026 20:22
@mattmkim mattmkim merged commit b0155f8 into quickwit-oss:main Apr 24, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants