Skip to content

Commit f57d0b1

Browse files
fix(datafusion): address review findings
- fix: CAST unwrapping in classify_filter — reuse predicate::column_name so time-range predicates are correctly classified as Inexact and passed to scan(); previously CAST-wrapped filters were silently dropped - fix: declare parquet sort order (metric_name, timestamp_secs ASC) on FileScanConfig so DataFusion avoids redundant sort operators - fix: get_opts now respects GetOptions.range — dispatches to get_slice for Bounded/Suffix ranges instead of always downloading the full file - fix: to_object_store_error propagates file path on NotFound - fix: register_for_worker made a no-op; lazy scan-path registration is sufficient and avoids O(indexes) metastore RPCs per worker task; removes stale comment claiming a non-existent object-store cache - fix: extract is_index_not_found helper, removing duplicated downcast block from try_consume_read_rel and create_default_table_provider - fix: sort before dedup in QuickwitSchemaProvider::table_names - fix: empty searcher pool returns Ok(vec![]) for local execution fallback - fix: remove dead builder methods with_udf_batch, with_codec_applier, with_physical_optimizer_rule from DataSourceContributions - feat: add tracing spans to execute_substrait and execute_sql - feat: wire 4 GiB memory limit on DataFusionSessionBuilder in serve - refactor: extract stream_to_receiver helper in gRPC handler Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
1 parent 0b3831b commit f57d0b1

14 files changed

Lines changed: 202 additions & 279 deletions

File tree

quickwit/quickwit-datafusion/src/catalog.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ impl SchemaProvider for QuickwitSchemaProvider {
8888
names.append(&mut source_names);
8989
}
9090
}
91-
// Deduplicate in case multiple sources claim the same name.
91+
// Sort then deduplicate in case multiple sources claim the same name.
92+
// `dedup()` only removes consecutive duplicates, so sorting first
93+
// is required to remove all duplicates.
94+
names.sort();
9295
names.dedup();
9396
names
9497
})

quickwit/quickwit-datafusion/src/data_source.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -121,38 +121,12 @@ impl Default for DataSourceContributions {
121121
}
122122

123123
impl DataSourceContributions {
124-
/// Add a physical optimizer rule.
125-
pub fn with_physical_optimizer_rule(
126-
mut self,
127-
rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
128-
) -> Self {
129-
self.physical_optimizer_rules.push(rule);
130-
self
131-
}
132-
133124
/// Add a scalar UDF.
134125
pub fn with_udf(mut self, udf: Arc<ScalarUDF>) -> Self {
135126
self.udfs.push(udf);
136127
self
137128
}
138129

139-
/// Add multiple scalar UDFs at once.
140-
pub fn with_udf_batch(mut self, udfs: impl IntoIterator<Item = Arc<ScalarUDF>>) -> Self {
141-
self.udfs.extend(udfs);
142-
self
143-
}
144-
145-
/// Add a codec / builder-extension callback.
146-
///
147-
/// Logs uses this to call `.with_distributed_user_codec(TantivyCodec)`.
148-
pub fn with_codec_applier(
149-
mut self,
150-
f: impl FnOnce(SessionStateBuilder) -> SessionStateBuilder + Send + Sync + 'static,
151-
) -> Self {
152-
self.codec_appliers.push(Box::new(f));
153-
self
154-
}
155-
156130
pub(crate) fn udf_names(&self) -> Vec<String> {
157131
self.udfs.iter().map(|udf| udf.name().to_string()).collect()
158132
}

quickwit/quickwit-datafusion/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub(crate) mod substrait;
2323
pub(crate) mod task_estimator;
2424
pub(crate) mod worker;
2525

26+
pub use datafusion::execution::SendableRecordBatchStream;
2627
pub use resolver::QuickwitWorkerResolver;
2728
pub use service::DataFusionService;
2829
pub use session::DataFusionSessionBuilder;

quickwit/quickwit-datafusion/src/resolver.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@ impl WorkerResolver for QuickwitWorkerResolver {
5151
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
5252
let addrs: Vec<SocketAddr> = self.searcher_pool.keys();
5353
if addrs.is_empty() {
54-
return Err(DataFusionError::Execution(
55-
"no searcher nodes available in the cluster".to_string(),
56-
));
54+
// Empty pool means no searcher workers are registered (e.g. single-node
55+
// local execution). Return an empty list so the distributed optimizer
56+
// sees zero workers and falls back to local execution rather than
57+
// treating it as a hard error.
58+
return Ok(vec![]);
5759
}
5860
let scheme = if self.use_tls { "https" } else { "http" };
5961
addrs

quickwit/quickwit-datafusion/src/service.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,12 @@ impl DataFusionService {
7777
/// decodes the plan, and returns a streaming `RecordBatch` iterator.
7878
/// The caller decides whether to collect, send via gRPC, or pipe to Arrow
7979
/// Flight — no materialization happens inside this method.
80+
#[tracing::instrument(skip(self, plan_bytes), fields(plan_bytes_len = plan_bytes.len()))]
8081
pub async fn execute_substrait(
8182
&self,
8283
plan_bytes: &[u8],
8384
) -> DFResult<SendableRecordBatchStream> {
85+
tracing::info!(plan_bytes_len = plan_bytes.len(), "executing substrait plan");
8486
use datafusion_substrait::substrait::proto::Plan;
8587
use prost::Message;
8688

@@ -128,7 +130,9 @@ impl DataFusionService {
128130
///
129131
/// Returns an error if `sql` is empty after splitting, or if any statement
130132
/// fails to parse or execute.
133+
#[tracing::instrument(skip(self, sql), fields(sql_len = sql.len()))]
131134
pub async fn execute_sql(&self, sql: &str) -> DFResult<SendableRecordBatchStream> {
135+
tracing::info!(sql_len = sql.len(), "executing SQL query");
132136
let ctx = self.builder.build_session()?;
133137

134138
// Split on `;` and discard empty fragments (trailing `;` etc.).

quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -127,17 +127,6 @@ fn to_metastore_query(index_uid: &IndexUid, query: &MetricsSplitQuery) -> ListMe
127127
metastore_query.time_range_end = Some(end as i64);
128128
}
129129

130-
// Push down a tag filter to the metastore only when there is exactly one
131-
// candidate value. Multi-value IN lists cannot be expressed as a single
132-
// `Option<String>` on `ListMetricsSplitsQuery`; passing only the first
133-
// value would silently skip splits that match the other values, producing
134-
// incorrect (incomplete) results. For multi-value lists we pass `None`
135-
// (no metastore pruning) and rely on the parquet-level filter instead.
136-
metastore_query.tag_service = single_value(query.tag_service.as_deref());
137-
metastore_query.tag_env = single_value(query.tag_env.as_deref());
138-
metastore_query.tag_datacenter = single_value(query.tag_datacenter.as_deref());
139-
metastore_query.tag_region = single_value(query.tag_region.as_deref());
140-
metastore_query.tag_host = single_value(query.tag_host.as_deref());
141130

142131
metastore_query
143132
}

quickwit/quickwit-datafusion/src/sources/metrics/mod.rs

Lines changed: 23 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,32 @@ use datafusion::error::Result as DFResult;
4040
use datafusion::execution::SessionState;
4141
use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient};
4242
use quickwit_storage::StorageResolver;
43-
use tracing::debug;
4443

4544
use crate::data_source::{DataSourceContributions, QuickwitDataSource};
4645
use self::factory::{MetricsTableProviderFactory, METRICS_FILE_TYPE};
4746
use self::index_resolver::{MetastoreIndexResolver, MetricsIndexResolver};
4847
use self::table_provider::MetricsTableProvider;
4948

49+
/// Returns `true` when `err` wraps a [`MetastoreError::NotFound`].
50+
///
51+
/// Used to distinguish "this data source does not own that index" (caller
52+
/// should try the next source) from a genuine metastore failure that should
53+
/// be surfaced to the user.
54+
fn is_index_not_found(err: &datafusion::error::DataFusionError) -> bool {
55+
match err {
56+
datafusion::error::DataFusionError::External(boxed) => boxed
57+
.downcast_ref::<MetastoreError>()
58+
.map(|me| matches!(me, MetastoreError::NotFound(_)))
59+
.unwrap_or(false),
60+
_ => false,
61+
}
62+
}
63+
5064
/// `QuickwitDataSource` implementation for OSS parquet metrics.
5165
///
5266
/// Backed by the Quickwit metastore for split discovery and `StorageResolver`
53-
/// for object-store access. Registers object stores on Flight workers via
54-
/// `register_for_worker()`.
67+
/// for object-store access. Object stores are registered lazily inside
68+
/// `MetricsTableProvider::scan()` on each use.
5569
#[derive(Debug)]
5670
pub struct MetricsDataSource {
5771
index_resolver: Arc<dyn MetricsIndexResolver>,
@@ -141,14 +155,7 @@ impl QuickwitDataSource for MetricsDataSource {
141155
}
142156
Err(err) => {
143157
// Not-found means this source doesn't own the index; let others try.
144-
let is_not_found = match &err {
145-
datafusion::error::DataFusionError::External(boxed) => boxed
146-
.downcast_ref::<MetastoreError>()
147-
.map(|me| matches!(me, MetastoreError::NotFound(_)))
148-
.unwrap_or(false),
149-
_ => false,
150-
};
151-
if is_not_found { Ok(None) } else { Err(err) }
158+
if is_index_not_found(&err) { Ok(None) } else { Err(err) }
152159
}
153160
}
154161
}
@@ -177,55 +184,15 @@ impl QuickwitDataSource for MetricsDataSource {
177184
Err(err) => {
178185
// Only swallow "index not found" — propagate everything else so the
179186
// caller gets an actionable error (e.g. metastore unavailable).
180-
let is_not_found = match &err {
181-
datafusion::error::DataFusionError::External(boxed) => boxed
182-
.downcast_ref::<MetastoreError>()
183-
.map(|me| matches!(me, MetastoreError::NotFound(_)))
184-
.unwrap_or(false),
185-
_ => false,
186-
};
187-
if is_not_found {
188-
Ok(None)
189-
} else {
190-
Err(err)
191-
}
187+
if is_index_not_found(&err) { Ok(None) } else { Err(err) }
192188
}
193189
}
194190
}
195191

196-
async fn register_for_worker(&self, state: &SessionState) -> DFResult<()> {
197-
let index_names = self.index_resolver.list_index_names().await?;
198-
199-
// Resolve all indexes concurrently — issuing N sequential `index_metadata`
200-
// RPCs would cost O(N × rtt) wall-clock time; concurrent resolution keeps
201-
// startup latency near O(rtt) regardless of index count.
202-
// The object-store cache in MetastoreIndexResolver ensures storage-resolver
203-
// RPCs are skipped on subsequent registrations.
204-
let resolver = &self.index_resolver;
205-
let results = futures::future::join_all(
206-
index_names
207-
.iter()
208-
.map(|name| resolver.resolve(name.as_str())),
209-
)
210-
.await;
211-
212-
for (index_name, result) in index_names.iter().zip(results) {
213-
match result {
214-
Ok((_, object_store, object_store_url)) => {
215-
state
216-
.runtime_env()
217-
.register_object_store(object_store_url.as_ref(), object_store);
218-
debug!(index_name, "registered object store for metrics worker");
219-
}
220-
Err(err) => {
221-
debug!(
222-
index_name,
223-
error = %err,
224-
"skipping metrics index in worker registration (non-fatal)"
225-
);
226-
}
227-
}
228-
}
192+
async fn register_for_worker(&self, _state: &SessionState) -> DFResult<()> {
193+
// No-op: object stores are registered lazily and idempotently inside
194+
// `MetricsTableProvider::scan()`, so eager pre-registration on every
195+
// worker task startup is unnecessary.
229196
Ok(())
230197
}
231198

0 commit comments

Comments
 (0)