Skip to content

Commit b7d6876

Browse files
upgrade to df 53
1 parent 032ef47 commit b7d6876

13 files changed

Lines changed: 312 additions & 534 deletions

File tree

quickwit/Cargo.lock

Lines changed: 230 additions & 429 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ members = [
4343

4444
# The following list excludes `quickwit-metastore-utils`
4545
# from the default member to ease build/deps.
46+
#
47+
# DataFusion crates are opt-in and covered by their own CI lane. Keep them out
48+
# of the default members so a plain `cargo build` does not pull DataFusion in.
4649
default-members = [
4750
"quickwit-actors",
4851
"quickwit-aws",
@@ -53,9 +56,7 @@ default-members = [
5356
"quickwit-common",
5457
"quickwit-config",
5558
"quickwit-control-plane",
56-
"quickwit-datafusion",
5759
"quickwit-datetime",
58-
"quickwit-df-core",
5960
"quickwit-directories",
6061
"quickwit-doc-mapper",
6162
"quickwit-index-management",

quickwit/license-tool.toml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,3 @@
88

99
# openssl-macros doesn't declare a repository in its crate metadata
1010
"openssl-macros" = { origin = "https://github.com/sfackler/rust-openssl" }
11-
12-
# datafusion-distributed 0.1.0 is pinned as a git dep; the Cargo.toml at that
13-
# rev does not declare a license field (added upstream after our pinned commit).
14-
# The crate is Apache-2.0 — see the crate's current Cargo.toml and
15-
# https://crates.io/crates/datafusion-distributed. Bumping to the crates.io
16-
# release requires a datafusion 52 → 53 workspace bump, tracked separately.
17-
"datafusion-distributed-0.1.0" = { license = "Apache-2.0", origin = "https://github.com/datafusion-contrib/datafusion-distributed" }

quickwit/quickwit-datafusion/Cargo.toml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,19 @@ quickwit-search = { workspace = true }
3535
quickwit-storage = { workspace = true }
3636

3737
arrow = { workspace = true }
38-
datafusion = "52"
39-
datafusion-substrait = "52"
40-
datafusion-datasource = "52"
41-
datafusion-sql = "52"
42-
datafusion-physical-plan = "52"
43-
datafusion-datasource-parquet = "52"
44-
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed", rev = "0f2c8be3e148b0bd5c7f17b23f2df8bb1201d5fb" }
45-
object_store = "0.12"
38+
datafusion = "53"
39+
datafusion-substrait = "53"
40+
datafusion-datasource = "53"
41+
datafusion-sql = "53"
42+
datafusion-physical-plan = "53"
43+
datafusion-datasource-parquet = "53"
44+
datafusion-distributed = "1.0"
45+
object_store = "0.13"
4646

4747
[dev-dependencies]
4848
bytesize = { workspace = true }
49-
datafusion = "52"
50-
datafusion-substrait = "52"
49+
datafusion = "53"
50+
datafusion-substrait = "53"
5151
prost = { workspace = true }
5252
serde_json = { workspace = true }
5353
tempfile = { workspace = true }

quickwit/quickwit-datafusion/src/object_store_registry.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,11 @@
2626
//! This registry resolves the tension by doing the sync/async split on
2727
//! the correct axis:
2828
//!
29-
//! - `get_store` (sync) builds a [`QuickwitObjectStore`] wrapper on demand.
30-
//! Construction is cheap — just stashes the URI and a clone of the
31-
//! resolver.
29+
//! - `get_store` (sync) builds a [`QuickwitObjectStore`] wrapper on demand. Construction is cheap —
30+
//! just stashes the URI and a clone of the resolver.
3231
//! - `QuickwitObjectStore`'s own methods are already async, so the actual
33-
//! `StorageResolver::resolve(&uri).await` runs the first time
34-
//! DataFusion asks for data — inside the `ObjectStore` method itself.
32+
//! `StorageResolver::resolve(&uri).await` runs the first time DataFusion asks for data — inside
33+
//! the `ObjectStore` method itself.
3534
//!
3635
//! End result: one `StorageResolver` per node, used per-request, no
3736
//! global cache warm-up, no per-query listing. Matches the search-leaf
@@ -48,8 +47,7 @@ use std::collections::HashMap;
4847
use std::str::FromStr;
4948
use std::sync::{Arc, RwLock};
5049

51-
use datafusion::common::DataFusionError;
52-
use datafusion::common::Result as DFResult;
50+
use datafusion::common::{DataFusionError, Result as DFResult};
5351
use datafusion::execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry};
5452
use object_store::ObjectStore;
5553
use quickwit_common::uri::Uri;
@@ -140,10 +138,8 @@ impl ObjectStoreRegistry for QuickwitObjectStoreRegistry {
140138
"failed to build Quickwit URI from `{key}`: {err}"
141139
))))
142140
})?;
143-
let store: Arc<dyn ObjectStore> = Arc::new(QuickwitObjectStore::new(
144-
uri,
145-
self.storage_resolver.clone(),
146-
));
141+
let store: Arc<dyn ObjectStore> =
142+
Arc::new(QuickwitObjectStore::new(uri, self.storage_resolver.clone()));
147143
let mut write = self
148144
.lazy_stores
149145
.write()

quickwit/quickwit-datafusion/src/sources/metrics/factory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
2929
use std::sync::Arc;
3030

31-
use datafusion::arrow as arrow;
3231
use arrow::datatypes::SchemaRef;
3332
use async_trait::async_trait;
33+
use datafusion::arrow;
3434
use datafusion::catalog::{Session, TableProviderFactory};
3535
use datafusion::error::{DataFusionError, Result as DFResult};
3636
use datafusion::logical_expr::CreateExternalTable;

quickwit/quickwit-datafusion/src/sources/metrics/mod.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,8 @@ impl SchemaProvider for MetricsSchemaProvider {
183183
return Ok(Some(provider));
184184
}
185185

186-
resolve_metrics_table_provider(
187-
self.index_resolver.as_ref(),
188-
name,
189-
minimal_base_schema(),
190-
)
191-
.await
186+
resolve_metrics_table_provider(self.index_resolver.as_ref(), name, minimal_base_schema())
187+
.await
192188
}
193189

194190
fn table_exist(&self, name: &str) -> bool {

quickwit/quickwit-datafusion/src/sources/metrics/table_provider.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ use std::any::Any;
2121
use std::fmt;
2222
use std::sync::Arc;
2323

24-
use datafusion::arrow as arrow;
2524
use arrow::compute::SortOptions;
2625
use arrow::datatypes::SchemaRef;
2726
use async_trait::async_trait;
27+
use datafusion::arrow;
2828
use datafusion::catalog::Session;
2929
use datafusion::datasource::TableProvider;
3030
use datafusion::datasource::source::DataSourceExec;
@@ -71,9 +71,8 @@ pub trait MetricsSplitProvider: Send + Sync + fmt::Debug {
7171
/// `ObjectStoreUrl` only accepts a scheme + authority, so the full index
7272
/// URI is split:
7373
/// - `scheme://authority` goes into the `ObjectStoreUrl`.
74-
/// - The path component (e.g. the `my-index/` part of
75-
/// `s3://bucket/my-index/`) is prepended to each split's filename in the
76-
/// emitted `PartitionedFile`s.
74+
/// - The path component (e.g. the `my-index/` part of `s3://bucket/my-index/`) is prepended to each
75+
/// split's filename in the emitted `PartitionedFile`s.
7776
///
7877
/// Neither this type nor `scan()` constructs an `Arc<dyn ObjectStore>` —
7978
/// the session's custom

quickwit/quickwit-datafusion/src/sources/metrics/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use datafusion::physical_plan::ExecutionPlan;
3333
use datafusion::prelude::SessionContext;
3434
use object_store::memory::InMemory;
3535
use object_store::path::Path as ObjectPath;
36-
use object_store::{ObjectStore, PutPayload};
36+
use object_store::{ObjectStoreExt, PutPayload};
3737
use quickwit_parquet_engine::split::{ParquetSplitId, ParquetSplitMetadata, TimeRange};
3838
use quickwit_parquet_engine::storage::{ParquetWriter, ParquetWriterConfig};
3939
use quickwit_parquet_engine::table_config::TableConfig;

quickwit/quickwit-datafusion/src/storage_bridge.rs

Lines changed: 44 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,20 @@
2626
//! This mirrors how `quickwit-search` uses the resolver:
2727
//! `storage_resolver.resolve(&uri).await` per request, no global registry.
2828
//!
29-
//! Only read operations (`get_opts`, `get_range`, `head`) are implemented.
30-
//! All write and list operations return `NotSupported` — DataFusion only
31-
//! reads parquet files through this store.
29+
//! Only `get_opts` is implemented. All write, delete, copy, and list
30+
//! operations return `NotSupported` — DataFusion only reads parquet files
31+
//! through this store.
3232
3333
use std::sync::Arc;
3434

3535
use async_trait::async_trait;
3636
use bytes::Bytes;
37+
use futures::StreamExt;
3738
use futures::stream::BoxStream;
3839
use object_store::path::Path as ObjectPath;
3940
use object_store::{
40-
GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
41-
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult,
41+
CopyOptions, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
42+
ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult,
4243
Result as ObjectStoreResult,
4344
};
4445
use quickwit_common::uri::Uri;
@@ -127,10 +128,13 @@ impl ObjectStore for QuickwitObjectStore {
127128
let location_str = location.as_ref();
128129
let map_err = |err| to_object_store_error(err, location_str);
129130

130-
let (bytes, byte_range) = match options.range {
131+
let (bytes, byte_range) = match &options.range {
131132
Some(GetRange::Bounded(r)) => {
132133
let usize_range = r.start as usize..r.end as usize;
133-
let data = storage.get_slice(&path, usize_range).await.map_err(map_err)?;
134+
let data = storage
135+
.get_slice(&path, usize_range)
136+
.await
137+
.map_err(map_err)?;
134138
let b = Bytes::copy_from_slice(data.as_ref());
135139
let len = b.len() as u64;
136140
// The storage may return fewer bytes than requested if the range
@@ -140,17 +144,24 @@ impl ObjectStore for QuickwitObjectStore {
140144
}
141145
Some(GetRange::Suffix(n)) => {
142146
let file_size = storage.file_num_bytes(&path).await.map_err(map_err)?;
143-
let start = file_size.saturating_sub(n);
147+
let start = file_size.saturating_sub(*n);
144148
let usize_range = start as usize..file_size as usize;
145-
let data = storage.get_slice(&path, usize_range).await.map_err(map_err)?;
149+
let data = storage
150+
.get_slice(&path, usize_range)
151+
.await
152+
.map_err(map_err)?;
146153
let b = Bytes::copy_from_slice(data.as_ref());
147154
let len = b.len() as u64;
148155
(b, start..start + len)
149156
}
150157
Some(GetRange::Offset(start)) => {
151158
let file_size = storage.file_num_bytes(&path).await.map_err(map_err)?;
159+
let start = *start;
152160
let usize_range = start as usize..file_size as usize;
153-
let data = storage.get_slice(&path, usize_range).await.map_err(map_err)?;
161+
let data = storage
162+
.get_slice(&path, usize_range)
163+
.await
164+
.map_err(map_err)?;
154165
let b = Bytes::copy_from_slice(data.as_ref());
155166
let len = b.len() as u64;
156167
(b, start..start + len)
@@ -171,45 +182,22 @@ impl ObjectStore for QuickwitObjectStore {
171182
e_tag: None,
172183
version: None,
173184
};
185+
options.check_preconditions(&meta)?;
186+
187+
let payload = if options.head {
188+
GetResultPayload::Stream(Box::pin(futures::stream::empty()))
189+
} else {
190+
GetResultPayload::Stream(Box::pin(futures::stream::once(async { Ok(bytes) })))
191+
};
192+
174193
Ok(GetResult {
175-
payload: GetResultPayload::Stream(Box::pin(futures::stream::once(async { Ok(bytes) }))),
194+
payload,
176195
meta,
177196
range: byte_range,
178197
attributes: Default::default(),
179198
})
180199
}
181200

182-
async fn get_range(
183-
&self,
184-
location: &ObjectPath,
185-
range: std::ops::Range<u64>,
186-
) -> ObjectStoreResult<Bytes> {
187-
let storage = self.storage().await?;
188-
let path = object_path_to_std(location);
189-
let usize_range = range.start as usize..range.end as usize;
190-
let data = storage
191-
.get_slice(&path, usize_range)
192-
.await
193-
.map_err(|err| to_object_store_error(err, location.as_ref()))?;
194-
Ok(Bytes::copy_from_slice(data.as_ref()))
195-
}
196-
197-
async fn head(&self, location: &ObjectPath) -> ObjectStoreResult<ObjectMeta> {
198-
let storage = self.storage().await?;
199-
let path = object_path_to_std(location);
200-
let size = storage
201-
.file_num_bytes(&path)
202-
.await
203-
.map_err(|err| to_object_store_error(err, location.as_ref()))?;
204-
Ok(ObjectMeta {
205-
location: location.clone(),
206-
last_modified: chrono::Utc::now(),
207-
size,
208-
e_tag: None,
209-
version: None,
210-
})
211-
}
212-
213201
async fn put_opts(
214202
&self,
215203
_location: &ObjectPath,
@@ -231,10 +219,18 @@ impl ObjectStore for QuickwitObjectStore {
231219
})
232220
}
233221

234-
async fn delete(&self, _location: &ObjectPath) -> ObjectStoreResult<()> {
235-
Err(object_store::Error::NotSupported {
236-
source: "QuickwitObjectStore is read-only".into(),
237-
})
222+
fn delete_stream(
223+
&self,
224+
locations: BoxStream<'static, ObjectStoreResult<ObjectPath>>,
225+
) -> BoxStream<'static, ObjectStoreResult<ObjectPath>> {
226+
locations
227+
.map(|location| match location {
228+
Ok(_) => Err(object_store::Error::NotSupported {
229+
source: "QuickwitObjectStore is read-only".into(),
230+
}),
231+
Err(err) => Err(err),
232+
})
233+
.boxed()
238234
}
239235

240236
fn list(
@@ -257,16 +253,11 @@ impl ObjectStore for QuickwitObjectStore {
257253
})
258254
}
259255

260-
async fn copy(&self, _from: &ObjectPath, _to: &ObjectPath) -> ObjectStoreResult<()> {
261-
Err(object_store::Error::NotSupported {
262-
source: "QuickwitObjectStore is read-only".into(),
263-
})
264-
}
265-
266-
async fn copy_if_not_exists(
256+
async fn copy_opts(
267257
&self,
268258
_from: &ObjectPath,
269259
_to: &ObjectPath,
260+
_options: CopyOptions,
270261
) -> ObjectStoreResult<()> {
271262
Err(object_store::Error::NotSupported {
272263
source: "QuickwitObjectStore is read-only".into(),

0 commit comments

Comments
 (0)