Skip to content

Commit ba9c817

Browse files
g-talbotmattmkimclaude
authored
[phase-31 1/4] Sort schema foundation (#6242)
* feat: replace fixed MetricDataPoint fields with dynamic tag HashMap * feat: replace ParquetField enum with constants and dynamic validation * feat: derive sort order and bloom filters from batch schema * feat: union schema accumulation and schema-agnostic ingest validation * feat: dynamic column lookup in split writer * feat: remove ParquetSchema dependency from indexing actors * refactor: deduplicate test batch helpers * lint * feat(31): sort schema foundation — proto, parser, display, validation, window, TableConfig Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: rustdoc link errors — use backticks for private items * Update quickwit/quickwit-parquet-engine/src/table_config.rs Co-authored-by: Matthew Kim <matthew.kim@datadoghq.com> * Update quickwit/quickwit-parquet-engine/src/table_config.rs Co-authored-by: Matthew Kim <matthew.kim@datadoghq.com> * style: rustfmt long match arm in default_sort_fields Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Matthew Kim <matthew.kim@datadoghq.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 65d97fc commit ba9c817

26 files changed

Lines changed: 2920 additions & 18 deletions

File tree

quickwit/Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub struct IndexingSchedulerState {
6868
///
6969
/// Scheduling executes the following steps:
7070
/// 1. Builds a [`PhysicalIndexingPlan`] from the list of logical indexing tasks. See
71-
/// [`build_physical_indexing_plan`] for the implementation details.
71+
/// `build_physical_indexing_plan` for the implementation details.
7272
/// 2. Apply the [`PhysicalIndexingPlan`]: for each indexer, the scheduler send the indexing tasks
7373
/// by gRPC. An indexer immediately returns an Ok and apply asynchronously the received plan. Any
7474
/// errors (network) happening in this step are ignored. The scheduler runs a control loop that
@@ -98,7 +98,7 @@ pub struct IndexingSchedulerState {
9898
/// Concretely, it will send the faulty nodes of the plan they are supposed to follow.
9999
//
100100
/// Finally, in order to give the time for each indexer to run their indexing tasks, the control
101-
/// plane will wait at least [`MIN_DURATION_BETWEEN_SCHEDULING`] before comparing the desired
101+
/// plane will wait at least `MIN_DURATION_BETWEEN_SCHEDULING` before comparing the desired
102102
/// plan with the running plan.
103103
pub struct IndexingScheduler {
104104
cluster_id: String,

quickwit/quickwit-indexing/src/actors/uploader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub enum UploaderType {
6666
/// [`SplitsUpdateMailbox`] wraps either a [`Mailbox<Sequencer<P>>`] or [`Mailbox<P>`].
6767
///
6868
/// It makes it possible to send a splits update either to the [`Sequencer`] or directly
69-
/// to the publisher actor `P`. It is used in combination with [`SplitsUpdateSender`] that
69+
/// to the publisher actor `P`. It is used in combination with `SplitsUpdateSender` that
7070
/// will do the send.
7171
///
7272
/// This is useful as we have different requirements between the indexing pipeline and

quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
//! [`FileBackedIndex`] module. It is public so that the crate `quickwit-backward-compat` can
16-
//! import [`FileBackedIndex`] and run backward-compatibility tests. You should not have to import
15+
//! `FileBackedIndex` module. It is public so that the crate `quickwit-backward-compat` can
16+
//! import `FileBackedIndex` and run backward-compatibility tests. You should not have to import
1717
//! anything from here directly.
1818
1919
mod serialize;

quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use tracing::error;
2424
use super::file_backed_index::FileBackedIndex;
2525
use super::store_operations::{METASTORE_FILE_NAME, load_index};
2626

27-
/// Lazy [`FileBackedIndex`]. It loads a `FileBackedIndex` on demand. When the index is first
27+
/// Lazy `FileBackedIndex`. It loads a `FileBackedIndex` on demand. When the index is first
2828
/// loaded, it optionally spawns a task to periodically poll the storage and update the index.
2929
pub(crate) struct LazyFileBackedIndex {
3030
index_id: IndexId,

quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
//! Module for [`FileBackedMetastore`]. It is public so that the crate `quickwit-backward-compat`
16-
//! can import [`FileBackedIndex`] and run backward-compatibility tests. You should not have to
16+
//! can import `FileBackedIndex` and run backward-compatibility tests. You should not have to
1717
//! import anything from here directly.
1818
1919
pub mod file_backed_index;
@@ -116,9 +116,9 @@ impl From<bool> for MutationOccurred<()> {
116116
/// into as many files and stores a map of indexes
117117
/// (index_id, index_status) in a dedicated file `manifest.json`.
118118
///
119-
/// A [`LazyIndexStatus`] describes the lifecycle of an index: [`LazyIndexStatus::Creating`] and
120-
/// [`LazyIndexStatus::Deleting`] are transitioning states that indicates that the index is not
121-
/// yet available. On the contrary, the [`LazyIndexStatus::Active`] status indicates the index is
119+
/// A `LazyIndexStatus` describes the lifecycle of an index: `LazyIndexStatus::Creating` and
120+
/// `LazyIndexStatus::Deleting` are transitioning states that indicates that the index is not
121+
/// yet available. On the contrary, the `LazyIndexStatus::Active` status indicates the index is
122122
/// ready to be fetched and updated.
123123
///
124124
/// Transitioning states are useful to track inconsistencies between the in-memory and on-disk data

quickwit/quickwit-metastore/src/metastore_resolver.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl fmt::Debug for MetastoreResolver {
4545
}
4646

4747
impl MetastoreResolver {
48-
/// Creates an empty [`MetastoreResolverBuilder`].
48+
/// Creates an empty `MetastoreResolverBuilder`.
4949
pub fn builder() -> MetastoreResolverBuilder {
5050
MetastoreResolverBuilder::default()
5151
}

quickwit/quickwit-parquet-engine/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ license.workspace = true
1313
[dependencies]
1414
anyhow = { workspace = true }
1515
arrow = { workspace = true }
16+
chrono = { workspace = true }
1617
parquet = { workspace = true }
1718
quickwit-common = { workspace = true }
19+
quickwit-proto = { workspace = true }
1820
sea-query = { workspace = true, optional = true }
1921
serde = { workspace = true }
2022
serde_json = { workspace = true }

quickwit/quickwit-parquet-engine/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ pub mod index;
2424
pub mod ingest;
2525
pub mod metrics;
2626
pub mod schema;
27+
pub mod sort_fields;
2728
pub mod split;
2829
pub mod storage;
30+
pub mod table_config;
2931

3032
#[cfg(any(test, feature = "testsuite"))]
3133
pub mod test_helpers;
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Column type identification from name suffixes and string names.
16+
//!
17+
//! Type can be specified via Husky-convention suffixes (`__s`, `__i`, `__nf`)
18+
//! or inferred from well-known bare names. The discriminant values match
19+
//! the Go iota exactly for cross-system interoperability.
20+
21+
use std::str::FromStr;
22+
23+
use super::SortFieldsError;
24+
25+
/// Well-known column name for timestamps.
26+
pub const TIMESTAMP: &str = "timestamp";
27+
28+
/// Well-known column name for tiebreaker.
29+
pub const TIEBREAKER: &str = "tiebreaker";
30+
31+
/// Well-known column name for timeseries ID hash.
32+
pub const TIMESERIES_ID: &str = "timeseries_id";
33+
34+
/// Well-known column name for metric value.
35+
pub const METRIC_VALUE: &str = "metric_value";
36+
37+
/// Column type IDs matching Go `types.TypeID` iota values.
38+
///
39+
/// Only the types that appear in sort schemas are included here.
40+
/// The discriminant values MUST match Go exactly for cross-system interop.
41+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42+
#[repr(u64)]
43+
pub enum ColumnTypeId {
44+
Int64 = 2,
45+
Float64 = 10,
46+
String = 14,
47+
Sketch = 17,
48+
CpcSketch = 20,
49+
ItemSketch = 22,
50+
}
51+
52+
impl ColumnTypeId {
53+
/// The Husky-convention suffix for this column type.
54+
///
55+
/// Used when serializing back to the string format with explicit types.
56+
pub fn suffix(self) -> &'static str {
57+
match self {
58+
Self::Int64 => "__i",
59+
Self::Float64 => "__nf",
60+
Self::String => "__s",
61+
Self::Sketch => "__sk",
62+
Self::CpcSketch => "__cpcsk",
63+
Self::ItemSketch => "__isk",
64+
}
65+
}
66+
67+
/// Human-readable type name matching Go `TypeID.String()`.
68+
pub fn as_str(self) -> &'static str {
69+
match self {
70+
Self::Int64 => "dense-int64",
71+
Self::Float64 => "dense-float64",
72+
Self::String => "dense-string",
73+
Self::Sketch => "dense-sketch",
74+
Self::CpcSketch => "dense-cpc-sketch",
75+
Self::ItemSketch => "dense-item-sketch",
76+
}
77+
}
78+
79+
/// Resolve column type from a column name, stripping any type suffix.
80+
///
81+
/// Returns `(bare_name, type)`. Type resolution order:
82+
/// 1. Explicit suffix (`__s`, `__i`, `__nf`, etc.) — stripped, type from suffix
83+
/// 2. Well-known bare name defaults:
84+
/// - `timestamp`, `tiebreaker`, `timeseries_id` → Int64
85+
/// - `metric_value` → Float64
86+
/// - everything else → String
87+
pub fn from_column_name(name: &str) -> Result<(&str, Self), SortFieldsError> {
88+
// Try explicit suffixes first (longest match first to avoid ambiguity).
89+
if let Some(bare) = name.strip_suffix("__isk") {
90+
return Ok((bare, Self::ItemSketch));
91+
}
92+
if let Some(bare) = name.strip_suffix("__cpcsk") {
93+
return Ok((bare, Self::CpcSketch));
94+
}
95+
if let Some(bare) = name.strip_suffix("__sk") {
96+
return Ok((bare, Self::Sketch));
97+
}
98+
if let Some(bare) = name.strip_suffix("__nf") {
99+
return Ok((bare, Self::Float64));
100+
}
101+
if let Some(bare) = name.strip_suffix("__i") {
102+
return Ok((bare, Self::Int64));
103+
}
104+
if let Some(bare) = name.strip_suffix("__s") {
105+
return Ok((bare, Self::String));
106+
}
107+
108+
// No suffix — use well-known name defaults.
109+
Ok((name, default_type_for_name(name)))
110+
}
111+
}
112+
113+
/// Default column type and sort direction for a bare column name.
114+
///
115+
/// This is the single source of truth for well-known column defaults.
116+
/// Used by the parser (type inference, default direction), display
117+
/// (suffix omission, direction omission), and validation.
118+
pub struct ColumnDefaults {
119+
pub column_type: ColumnTypeId,
120+
/// True if the default sort direction is descending.
121+
pub descending: bool,
122+
}
123+
124+
/// Well-known name → default type and sort direction lookup table.
125+
///
126+
/// Columns not in this table default to String, ascending.
127+
static WELL_KNOWN_COLUMNS: &[(&str, ColumnDefaults)] = &[
128+
(
129+
TIMESTAMP,
130+
ColumnDefaults {
131+
column_type: ColumnTypeId::Int64,
132+
descending: true,
133+
},
134+
),
135+
(
136+
"timestamp_secs",
137+
ColumnDefaults {
138+
column_type: ColumnTypeId::Int64,
139+
descending: true,
140+
},
141+
),
142+
(
143+
TIEBREAKER,
144+
ColumnDefaults {
145+
column_type: ColumnTypeId::Int64,
146+
descending: false,
147+
},
148+
),
149+
(
150+
TIMESERIES_ID,
151+
ColumnDefaults {
152+
column_type: ColumnTypeId::Int64,
153+
descending: false,
154+
},
155+
),
156+
(
157+
METRIC_VALUE,
158+
ColumnDefaults {
159+
column_type: ColumnTypeId::Float64,
160+
descending: false,
161+
},
162+
),
163+
(
164+
"value",
165+
ColumnDefaults {
166+
column_type: ColumnTypeId::Float64,
167+
descending: false,
168+
},
169+
),
170+
];
171+
172+
const DEFAULT_COLUMN: ColumnDefaults = ColumnDefaults {
173+
column_type: ColumnTypeId::String,
174+
descending: false,
175+
};
176+
177+
/// Look up default type and direction for a bare column name.
178+
pub fn column_defaults(name: &str) -> &'static ColumnDefaults {
179+
WELL_KNOWN_COLUMNS
180+
.iter()
181+
.find(|(n, _)| *n == name)
182+
.map(|(_, d)| d)
183+
.unwrap_or(&DEFAULT_COLUMN)
184+
}
185+
186+
/// Default column type for a bare name (convenience wrapper).
187+
pub fn default_type_for_name(name: &str) -> ColumnTypeId {
188+
column_defaults(name).column_type
189+
}
190+
191+
/// Whether this bare name defaults to descending sort.
192+
pub fn default_is_descending(name: &str) -> bool {
193+
column_defaults(name).descending
194+
}
195+
196+
impl std::fmt::Display for ColumnTypeId {
197+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198+
f.write_str(self.as_str())
199+
}
200+
}
201+
202+
/// Parse a type name string (e.g., "dense-int64") into a `ColumnTypeId`.
203+
impl FromStr for ColumnTypeId {
204+
type Err = SortFieldsError;
205+
206+
fn from_str(s: &str) -> Result<Self, Self::Err> {
207+
match s {
208+
"dense-int64" => Ok(Self::Int64),
209+
"dense-float64" => Ok(Self::Float64),
210+
"dense-string" => Ok(Self::String),
211+
"dense-sketch" => Ok(Self::Sketch),
212+
"dense-cpc-sketch" => Ok(Self::CpcSketch),
213+
"dense-item-sketch" => Ok(Self::ItemSketch),
214+
_ => Err(SortFieldsError::UnknownColumnType(format!(
215+
"unknown column type '{}'",
216+
s
217+
))),
218+
}
219+
}
220+
}
221+
222+
/// Convert a proto `column_type` u64 back to a `ColumnTypeId`.
223+
impl TryFrom<u64> for ColumnTypeId {
224+
type Error = SortFieldsError;
225+
226+
fn try_from(value: u64) -> Result<Self, Self::Error> {
227+
match value {
228+
2 => Ok(Self::Int64),
229+
10 => Ok(Self::Float64),
230+
14 => Ok(Self::String),
231+
17 => Ok(Self::Sketch),
232+
20 => Ok(Self::CpcSketch),
233+
22 => Ok(Self::ItemSketch),
234+
_ => Err(SortFieldsError::UnknownColumnType(format!(
235+
"unknown column type id: {}",
236+
value
237+
))),
238+
}
239+
}
240+
}

0 commit comments

Comments
 (0)