feat(datafusion): DataFusion metrics query layer#6276
feat(datafusion): DataFusion metrics query layer#6276mattmkim merged 25 commits intoquickwit-oss:mainfrom
Conversation
f57d0b1 to
07f8548
Compare
mattmkim
left a comment
There was a problem hiding this comment.
disclaimer only really reviewed the metrics specific logic, not a DF expert. made comments here,
what would give us more confidence in merging this? should we feature flag the datafusion code/path, so you have to "opt-in" to compiling an image with it? we could also add an env var to actually enable the endpoint?
| // gRPC handler. No downstream-specific code needed here. | ||
| let datafusion_session_builder = if node_config | ||
| .is_service_enabled(QuickwitService::Searcher) | ||
| && quickwit_common::get_bool_from_env("QW_ENABLE_DATAFUSION_ENDPOINT", false) |
There was a problem hiding this comment.
@mattmkim the session is behind a env var already FYI. Open to other rec's for isolating.
| @@ -0,0 +1,252 @@ | |||
| // Copyright 2021-Present Datadog, Inc. | |||
There was a problem hiding this comment.
@guilload would be good to have a quickwit expert review this part
7245f1e to
1a5a11b
Compare
|
Hi @alexanderbianchi, Please can you give more context on this effort. Are you trying to query parquet files in DataFusion via Quickwit? How is this supposed to work for a user? How Quickwit end up with the parquet files being queried. |
e97acde to
e61b936
Compare
Introduces a generic DataFusion execution layer with a pluggable QuickwitDataSource trait. No data-source-specific code. - QuickwitDataSource trait + DataSourceContributions (contribution-return pattern) - DataFusionSessionBuilder with shared RuntimeEnv, check_invariants - QuickwitSchemaProvider backed by DataFusion MemorySchemaProvider for DDL tables - QuickwitWorkerSessionBuilder + build_quickwit_worker for distributed execution - QuickwitWorkerResolver, QuickwitTaskEstimator - QuickwitObjectStore: quickwit_storage::Storage → object_store::ObjectStore bridge - DataFusionService::execute_sql (streaming Arrow IPC responses) Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Implements QuickwitDataSource for the parquet metrics pipeline from PR quickwit-oss#6237. - MetricsDataSource: production (metastore-backed) and test (SimpleIndexResolver) - MetricsTableProvider: filter pushdown with CAST-unwrapping fix for timestamp - MetastoreSplitProvider: converts MetricsSplitQuery → ListMetricsSplitsQuery - MetastoreIndexResolver: resolves index URI → QuickwitObjectStore per query - MetricsSplitQuery + extract_split_filters: predicate extraction for split pruning - MetricsTableProviderFactory: CREATE EXTERNAL TABLE … STORED AS metrics support - test_utils: make_batch, TestSplitProvider, MetricsTestbed for integration tests Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Routes Substrait ReadRel nodes to registered QuickwitDataSource implementations. Standard NamedTable reads resolve via MetricsDataSource::try_consume_read_rel. ExtensionTable reads (custom protos) can be handled by downstream callers. - QuickwitSubstraitConsumer implementing datafusion-substrait SubstraitConsumer - execute_substrait_plan / execute_substrait_plan_streaming entry points - DataFusionService::execute_substrait (bytes) and execute_substrait_json (dev path) - session.rs: DataFusionSessionBuilder::execute_substrait convenience method Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…tegration tests - Add datafusion.proto with DataFusionService (ExecuteSubstrait, ExecuteSql RPCs) - Generate codegen and mod.rs for the new proto service - Wire DataFusionService and WorkerService into quickwit-serve gRPC layer - Add DataFusionServiceGrpcImpl handler - Auto-create otel-metrics-v0_9 index on startup alongside logs/traces - Add metrics_datafusion_tests: in-process SQL + Substrait over parquet splits - Add metrics_distributed_tests: multi-node distributed execution - Add rollup_substrait.json fixture for Substrait plan testing Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Remove extra schema argument from ParquetWriter::new; the API only accepts a ParquetWriterConfig. Remove unused ParquetSchema import. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…test - Remove erroneous ParquetSchema argument from ParquetWriter::new calls in integration tests (API takes only ParquetWriterConfig) - Mark test_rest_ingest_then_in_process_query as #[ignore] until the /ingest-metrics REST endpoint is wired in quickwit-serve Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- fix: CAST unwrapping in classify_filter — reuse predicate::column_name so time-range predicates are correctly classified as Inexact and passed to scan(); previously CAST-wrapped filters were silently dropped - fix: declare parquet sort order (metric_name, timestamp_secs ASC) on FileScanConfig so DataFusion avoids redundant sort operators - fix: get_opts now respects GetOptions.range — dispatches to get_slice for Bounded/Suffix ranges instead of always downloading the full file - fix: to_object_store_error propagates file path on NotFound - fix: register_for_worker made a no-op; lazy scan-path registration is sufficient and avoids O(indexes) metastore RPCs per worker task; removes stale comment claiming a non-existent object-store cache - fix: extract is_index_not_found helper, removing duplicated downcast block from try_consume_read_rel and create_default_table_provider - fix: sort before dedup in QuickwitSchemaProvider::table_names - fix: empty searcher pool returns Ok(vec![]) for local execution fallback - fix: remove dead builder methods with_udf_batch, with_codec_applier, with_physical_optimizer_rule from DataSourceContributions - feat: add tracing spans to execute_substrait and execute_sql - feat: wire 4 GiB memory limit on DataFusionSessionBuilder in serve - refactor: extract stream_to_receiver helper in gRPC handler Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
03616be to
daa504a
Compare
59f2d0a to
b7d6876
Compare
Why this PR exists
Quickwit is growing a second query/storage shape alongside Tantivy:
#6231 made Parquet metrics a real storage path. #6285 isolated that pipeline so it can evolve without bleeding across the rest of Quickwit. This PR adds the query-side half of that story: a query layer built on Apache DataFusion.
The immediate payoff: Quickwit can execute SQL and Substrait queries over OSS Parquet metrics.
The larger payoff is architectural: Quickwit gets a reusable query substrate with a real optimizer, Arrow-native execution, Substrait interop, and distributed planning hooks.
That bigger direction is deliberate. The same extension points introduced here are also the shape we want for future SQL-over-logs work, as explored in:
In other words: this PR is not just “metrics can be queried with DataFusion.” It is the first in-tree implementation of a more general idea:
Quickwit-specific storage engines stay specialized, while query planning and execution becomes shared.
Why DataFusion
Apache DataFusion is an extensible Rust query engine built on Arrow. It already provides the parts we do not want to reinvent:
That makes it a strong fit for Quickwit.
Quickwit still owns the parts that are actually Quickwit-shaped:
The boundary is intentional: DataFusion handles relational query execution; Quickwit handles how that execution maps onto Quickwit storage and cluster topology.
Core APIs introduced
QuickwitDataSourceThis is the main extension point for plugging a Quickwit-native storage engine into DataFusion.
A source can contribute:
contributions()create_default_table_provider()list_index_names()register_for_worker()ReadRelhandling viatry_consume_read_rel()The important design point is that storage-specific logic stays behind this trait. The generic session, catalog, and worker code does not need to know whether it is serving Parquet metrics, Tantivy logs, or something else later.
DataSourceContributionsThis is the additive container for source-provided DataFusion state.
Right now that includes:
The builder merges contributions from all registered sources and checks invariants up front so two sources cannot silently stomp on each other.
QuickwitSchemaProviderThis is the schema bridge between DataFusion and Quickwit.
Resolution order is:
That gives us normal DataFusion ergonomics for explicit DDL while still allowing Quickwit-native indexes to appear as tables without pre-registering each one manually.
DataFusionSessionBuilderThis is the coordinator-side assembly point.
It owns the shared runtime state and builds DataFusion sessions with:
QuickwitWorkerSessionBuilderandbuild_quickwit_workerThese are the worker-side mirrors of the coordinator builder.
They rebuild the same source/runtime shape on worker nodes so distributed physical plans can execute without inventing a separate query runtime just for workers.
DataFusionServiceThis is the execution entry point.
It exposes:
execute_sqlexecute_substraitexecute_substrait_jsonand streams results as Arrow
RecordBatches. In OSS, this is wired over gRPC as Arrow IPC batches, which is a good low-level transport for both internal execution and future integrations.QuickwitSubstraitConsumerThis is the glue for Substrait plans that need Quickwit-aware table resolution.
A source can claim a
ReadRel, materialize a provider, and still fall back onto normal DataFusion planning behavior for filter and projection handling.What is wired up in this PR today
This PR wires the first in-tree source:
MetricsDataSource.That includes:
MetricsDataSourceimplementingQuickwitDataSourceMetricsTableProviderbacked byParquetSourceCREATE EXTERNAL TABLE ... STORED AS metricsOn the Parquet side, the query path is not a toy wrapper. It is already doing real storage-aware work, including:
Example shape
This is the sort of flow this PR enables for metrics today:
Scope of this PR
This PR is intentionally the first slice, not the final shape.
Current scope:
datafusion_session_builderis configuredThe point is to land the reusable query substrate and prove it on the storage path that is naturally columnar first: Parquet metrics.
Test coverage
This PR includes coverage for the important end-to-end query behaviors already implemented, including:
GROUP BYSome Quirks
catalog state