Skip to content

Commit 469c606

Browse files
g-talbotmattmkimclaude
authored
[phase-31 4/4] PostgreSQL metastore — migration + compaction columns (#6245)
* feat: replace fixed MetricDataPoint fields with dynamic tag HashMap * feat: replace ParquetField enum with constants and dynamic validation * feat: derive sort order and bloom filters from batch schema * feat: union schema accumulation and schema-agnostic ingest validation * feat: dynamic column lookup in split writer * feat: remove ParquetSchema dependency from indexing actors * refactor: deduplicate test batch helpers * lint * feat(31): sort schema foundation — proto, parser, display, validation, window, TableConfig Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: rustdoc link errors — use backticks for private items * feat(31): compaction metadata types — extend split metadata, postgres model, field lookup Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(31): wire TableConfig into sort path, add compaction KV metadata Wire TableConfig-driven sort order into ParquetWriter and add self-describing Parquet file metadata for compaction: - ParquetWriter::new() takes &TableConfig, resolves sort fields at construction via parse_sort_fields() + ParquetField::from_name() - sort_batch() uses resolved fields with per-column direction (ASC/DESC) - SS-1 debug_assert verification: re-sort and check identity permutation - build_compaction_key_value_metadata(): embeds sort_fields, window_start, window_duration, num_merge_ops, row_keys (base64) in Parquet kv_metadata - SS-5 verify_ss5_kv_consistency(): kv_metadata matches source struct - write_to_file_with_metadata() replaces write_to_file() - prepare_write() shared method for bytes and file paths - ParquetWriterConfig gains to_writer_properties_with_metadata() - ParquetSplitWriter passes TableConfig through - All callers in quickwit-indexing updated with TableConfig::default() - 23 storage tests pass including META-07 self-describing roundtrip Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(31): PostgreSQL migration 27 + compaction columns in stage/list/publish Add compaction metadata to the PostgreSQL metastore: Migration 27: - 6 new columns: window_start, window_duration_secs, sort_fields, num_merge_ops, row_keys, zonemap_regexes - Partial index idx_metrics_splits_compaction_scope on (index_uid, sort_fields, window_start) WHERE split_state = 'Published' stage_metrics_splits: - INSERT extended from 15 to 21 bind parameters for compaction columns - ON CONFLICT SET updates all compaction columns list_metrics_splits: - PgMetricsSplit construction includes compaction fields (defaults from JSON) Also fixes pre-existing compilation errors on upstream-10b-parquet-actors: - Missing StageMetricsSplitsRequestExt import - index_id vs index_uid type mismatches in publish/mark/delete - IndexUid binding (to_string() for sqlx) - ListMetricsSplitsResponseExt trait disambiguation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(31): close port gaps — split_writer metadata, compaction scope, publish validation Close critical gaps identified during port review: split_writer.rs: - Store table_config on ParquetSplitWriter (not just pass-through) - Compute window_start from batch time range using table_config.window_duration_secs - Populate sort_fields, window_duration_secs, parquet_files on metadata before write - Call write_to_file_with_metadata(Some(&metadata)) to embed KV metadata in Parquet - Update size_bytes after write completes metastore/mod.rs: - Add window_start and sort_fields fields to ListMetricsSplitsQuery - Add with_compaction_scope() builder method metastore/postgres/metastore.rs: - Add compaction scope filters (AND window_start = $N, AND sort_fields = $N) to list query - Add replaced_split_ids count verification in publish_metrics_splits - Bind compaction scope query parameters ingest/config.rs: - Add table_config: TableConfig field to ParquetIngestConfig Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(31): final gap fixes — file-backed scope filter, META-07 test, dead code removal - file_backed_index/mod.rs: Add window_start and sort_fields filtering to metrics_split_matches_query() for compaction scope queries - writer.rs: Add test_meta07_self_describing_parquet_roundtrip test (writes compaction metadata to Parquet, reads back from cold file, verifies all fields roundtrip correctly) - fields.rs: Remove dead sort_order() method (replaced by TableConfig) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(31): correct postgres types for window_duration_secs and zonemap_regexes Gap 1: Change window_duration_secs from i32 to Option<i32> in both PgMetricsSplit and InsertableMetricsSplit. Pre-Phase-31 splits now correctly map 0 → NULL in PostgreSQL, enabling Phase 32 compaction queries to use `WHERE window_duration_secs IS NOT NULL` instead of the fragile `WHERE window_duration_secs > 0`. Gap 2: Change zonemap_regexes from String to serde_json::Value in both structs. This maps directly to JSONB in sqlx, avoiding ambiguity when PostgreSQL JSONB operators are used in Phase 34/35 zonemap pruning. Gap 3: Add two missing tests: - test_insertable_from_metadata_with_compaction_fields: verifies all 6 compaction fields round-trip through InsertableMetricsSplit - test_insertable_from_metadata_pre_phase31_defaults: verifies pre-Phase-31 metadata produces window_duration_secs: None, zonemap_regexes: json!({}) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: rustfmt Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test(31): add metrics split test suite to shared metastore_test_suite! macro 11 tests covering the full metrics split lifecycle: - stage (happy path + non-existent index error) - stage upsert (ON CONFLICT update) - list by state, time range, metric name, compaction scope - publish (happy path + non-existent split error) - mark for deletion - delete (happy path + idempotent non-existent) Tests are generic and run against both file-backed and PostgreSQL backends. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(31): read compaction columns in list_metrics_splits, fix cleanup_index FK * fix(31): correct error types for non-existent metrics splits - publish_metrics_splits: return NotFound (not FailedPrecondition) when staged splits don't exist - delete_metrics_splits: succeed silently (idempotent) for non-existent splits instead of returning FailedPrecondition - Tests now assert the correct error types on both backends Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: rustfmt metastore tests and postgres * fix(31): address PR review — align metrics_splits with splits table - Migration 27: add maturity_timestamp, delete_opstamp, node_id columns and publish_timestamp trigger to match the splits table (Paul's review) - ListMetricsSplitsQuery: adopt FilterRange<i64> for time_range (matching log-side pattern), single time_range field for both read and compaction paths, add node_id/delete_opstamp/update_timestamp/create_timestamp/ mature filters to close gaps with ListSplitsQuery - Use SplitState enum instead of stringly-typed Vec<String> for split_states - StoredMetricsSplit: add create_timestamp, node_id, delete_opstamp, maturity_timestamp so file-backed metastore can filter on them locally - File-backed filter: use FilterRange::overlaps_with() for time range and window intersection, apply all new filters matching log-side predicate - Postgres: intersection semantics for window queries, FilterRange-based SQL generation for all range filters - Fix InsertableMetricsSplit.window_duration_secs from Option<i32> to i32 - Rename two-letter variables (ws, sf, dt) throughout Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: fix rustfmt nightly formatting Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Update quickwit/quickwit-parquet-engine/src/table_config.rs Co-authored-by: Matthew Kim <matthew.kim@datadoghq.com> * Update quickwit/quickwit-parquet-engine/src/table_config.rs Co-authored-by: Matthew Kim <matthew.kim@datadoghq.com> * style: rustfmt long match arm in default_sort_fields Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: make parquet_file field backward-compatible in MetricsSplitMetadata Pre-existing splits were serialized before the parquet_file field was added, so their JSON doesn't contain it. Adding #[serde(default)] makes deserialization fall back to empty string for old splits. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: handle empty-column batches in accumulator flush When the commit timeout fires and the accumulator contains only zero-column batches, union_fields is empty and concat_batches fails with "must either specify a row count or at least one column". Now flush_internal treats empty union_fields the same as empty pending_batches — resets state and returns None. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Matthew Kim <matthew.kim@datadoghq.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7645703 commit 469c606

13 files changed

Lines changed: 1567 additions & 125 deletions

File tree

quickwit/quickwit-indexing/src/actors/parquet_e2e_test.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,8 @@ async fn test_file_backed_metastore_metrics_operations() {
261261
use quickwit_config::IndexConfig;
262262
use quickwit_metastore::{
263263
CreateIndexRequestExt, FileBackedMetastore, ListMetricsSplitsQuery,
264-
ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt, StageMetricsSplitsRequestExt,
264+
ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt, SplitState,
265+
StageMetricsSplitsRequestExt,
265266
};
266267
use quickwit_parquet_engine::split::{MetricsSplitMetadata, MetricsSplitRecord, TimeRange};
267268
use quickwit_proto::metastore::{
@@ -306,7 +307,7 @@ async fn test_file_backed_metastore_metrics_operations() {
306307

307308
// Verify staged
308309
let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
309-
.with_split_states(vec!["Staged".to_string()]);
310+
.with_split_states([SplitState::Staged]);
310311
let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap();
311312
let list_response = metastore.list_metrics_splits(list_request).await.unwrap();
312313
let staged: Vec<MetricsSplitRecord> = list_response.deserialize_splits().unwrap();
@@ -327,7 +328,7 @@ async fn test_file_backed_metastore_metrics_operations() {
327328

328329
// Verify published
329330
let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
330-
.with_split_states(vec!["Published".to_string()]);
331+
.with_split_states([SplitState::Published]);
331332
let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap();
332333
let list_response = metastore.list_metrics_splits(list_request).await.unwrap();
333334
let published: Vec<MetricsSplitRecord> = list_response.deserialize_splits().unwrap();
@@ -336,24 +337,26 @@ async fn test_file_backed_metastore_metrics_operations() {
336337

337338
// Time range filtering
338339
let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
339-
.with_split_states(vec!["Published".to_string()])
340-
.with_time_range(1000, 1100);
340+
.with_split_states([SplitState::Published])
341+
.with_time_range_start_gte(1000)
342+
.with_time_range_end_lte(1100);
341343
let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap();
342344
let list_response = metastore.list_metrics_splits(list_request).await.unwrap();
343345
let in_range: Vec<MetricsSplitRecord> = list_response.deserialize_splits().unwrap();
344346
assert_eq!(in_range.len(), 1);
345347

346348
let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
347-
.with_split_states(vec!["Published".to_string()])
348-
.with_time_range(5000, 5100);
349+
.with_split_states([SplitState::Published])
350+
.with_time_range_start_gte(5000)
351+
.with_time_range_end_lte(5100);
349352
let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap();
350353
let list_response = metastore.list_metrics_splits(list_request).await.unwrap();
351354
let out_of_range: Vec<MetricsSplitRecord> = list_response.deserialize_splits().unwrap();
352355
assert_eq!(out_of_range.len(), 0);
353356

354357
// Metric name filtering
355358
let query = ListMetricsSplitsQuery::for_index(index_uid.clone())
356-
.with_split_states(vec!["Published".to_string()])
359+
.with_split_states([SplitState::Published])
357360
.with_metric_names(vec!["cpu.usage".to_string()]);
358361
let list_request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query).unwrap();
359362
let list_response = metastore.list_metrics_splits(list_request).await.unwrap();
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
-- Reverse Phase 31: Remove compaction metadata columns and triggers.
2+
DROP TRIGGER IF EXISTS set_publish_timestamp_on_metrics_split_publish ON metrics_splits CASCADE;
3+
DROP FUNCTION IF EXISTS set_publish_timestamp_for_metrics_split();
4+
DROP INDEX IF EXISTS idx_metrics_splits_compaction_scope;
5+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS node_id;
6+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS delete_opstamp;
7+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS maturity_timestamp;
8+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS zonemap_regexes;
9+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS row_keys;
10+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS num_merge_ops;
11+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS sort_fields;
12+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_duration_secs;
13+
ALTER TABLE metrics_splits DROP COLUMN IF EXISTS window_start;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
-- Phase 31: Add compaction metadata columns to metrics_splits.
2+
-- These columns support time-windowed compaction planning and execution.
3+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_start BIGINT;
4+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS window_duration_secs INTEGER;
5+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS sort_fields TEXT NOT NULL DEFAULT '';
6+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS num_merge_ops INTEGER NOT NULL DEFAULT 0;
7+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS row_keys BYTEA;
8+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS zonemap_regexes JSONB NOT NULL DEFAULT '{}';
9+
10+
-- Columns present on the `splits` table that were missing from `metrics_splits`.
11+
-- maturity_timestamp: compaction planner needs this to restrict candidates to
12+
-- Published-and-immature splits, matching the logic the log-side merge planner uses.
13+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS maturity_timestamp TIMESTAMP DEFAULT TO_TIMESTAMP(0);
14+
-- delete_opstamp: tracks which delete tasks have been applied to a split.
15+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS delete_opstamp BIGINT CHECK (delete_opstamp >= 0) DEFAULT 0;
16+
-- node_id: identifies which node produced the split.
17+
ALTER TABLE metrics_splits ADD COLUMN IF NOT EXISTS node_id VARCHAR(253);
18+
19+
-- Auto-set publish_timestamp when a split transitions Staged → Published,
20+
-- matching the trigger on the `splits` table (migration 3).
21+
CREATE OR REPLACE FUNCTION set_publish_timestamp_for_metrics_split() RETURNS trigger AS $$
22+
BEGIN
23+
IF (TG_OP = 'UPDATE') AND (NEW.split_state = 'Published') AND (OLD.split_state = 'Staged') THEN
24+
NEW.publish_timestamp := (CURRENT_TIMESTAMP AT TIME ZONE 'UTC');
25+
END IF;
26+
RETURN NEW;
27+
END;
28+
$$ LANGUAGE plpgsql;
29+
30+
DROP TRIGGER IF EXISTS set_publish_timestamp_on_metrics_split_publish ON metrics_splits CASCADE;
31+
CREATE TRIGGER set_publish_timestamp_on_metrics_split_publish
32+
BEFORE UPDATE ON metrics_splits
33+
FOR EACH ROW
34+
EXECUTE PROCEDURE set_publish_timestamp_for_metrics_split();
35+
36+
-- Compaction scope index: supports the compaction planner's primary query pattern
37+
-- "give me all Published splits for a given (index_uid, sort_fields, window_start) triple."
38+
CREATE INDEX IF NOT EXISTS idx_metrics_splits_compaction_scope
39+
ON metrics_splits (index_uid, sort_fields, window_start)
40+
WHERE split_state = 'Published';

quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs

Lines changed: 83 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,20 @@ pub(crate) struct StoredMetricsSplit {
5656
pub state: MetricsSplitState,
5757
/// Update timestamp (Unix epoch seconds).
5858
pub update_timestamp: i64,
59+
/// Create timestamp (Unix epoch seconds).
60+
#[serde(default)]
61+
pub create_timestamp: i64,
62+
/// Node that produced this split.
63+
#[serde(default)]
64+
pub node_id: String,
65+
/// Delete opstamp.
66+
#[serde(default)]
67+
pub delete_opstamp: u64,
68+
/// Maturity timestamp (Unix epoch seconds). Splits with
69+
/// maturity_timestamp <= now are considered mature.
70+
/// Defaults to 0 (epoch), meaning mature immediately.
71+
#[serde(default)]
72+
pub maturity_timestamp: i64,
5973
}
6074

6175
/// A `FileBackedIndex` object carries an index metadata and its split metadata.
@@ -759,6 +773,10 @@ impl FileBackedIndex {
759773
metadata,
760774
state: MetricsSplitState::Staged,
761775
update_timestamp: now,
776+
create_timestamp: now,
777+
node_id: String::new(),
778+
delete_opstamp: 0,
779+
maturity_timestamp: 0,
762780
};
763781
self.metrics_splits.insert(split_id, stored);
764782
}
@@ -907,21 +925,37 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp
907925
// Filter by state
908926
if !query.split_states.is_empty() {
909927
let state_str = split.state.as_str();
910-
if !query.split_states.iter().any(|s| s == state_str) {
928+
if !query.split_states.iter().any(|s| s.as_str() == state_str) {
911929
return false;
912930
}
913931
}
914932

915-
// Filter by time range
916-
if let Some(start) = query.time_range_start
917-
&& (split.metadata.time_range.end_secs as i64) < start
918-
{
919-
return false;
920-
}
921-
if let Some(end) = query.time_range_end
922-
&& (split.metadata.time_range.start_secs as i64) > end
923-
{
924-
return false;
933+
// Filter by time range.
934+
// When sort_fields is set this is a compaction query and time_range
935+
// refers to the compaction window; otherwise it refers to the data
936+
// time range. Both use intersection semantics via FilterRange.
937+
if !query.time_range.is_unbounded() {
938+
if query.sort_fields.is_some() {
939+
// Compaction path: intersect against the split's window.
940+
let split_start = split.metadata.window_start();
941+
let split_duration = split.metadata.window_duration_secs() as i64;
942+
match split_start {
943+
Some(split_start) if split_duration > 0 => {
944+
let split_end = split_start + split_duration - 1;
945+
if !query.time_range.overlaps_with(split_start..=split_end) {
946+
return false;
947+
}
948+
}
949+
_ => return false,
950+
}
951+
} else {
952+
// Read path: intersect against the split's data time range.
953+
let data_range = split.metadata.time_range.start_secs as i64
954+
..=split.metadata.time_range.end_secs as i64;
955+
if !query.time_range.overlaps_with(data_range) {
956+
return false;
957+
}
958+
}
925959
}
926960

927961
// Filter by metric names
@@ -979,6 +1013,44 @@ fn metrics_split_matches_query(split: &StoredMetricsSplit, query: &ListMetricsSp
9791013
}
9801014
}
9811015

1016+
if let Some(ref sort_fields) = query.sort_fields
1017+
&& split.metadata.sort_fields != *sort_fields
1018+
{
1019+
return false;
1020+
}
1021+
1022+
if let Some(node_id) = &query.node_id
1023+
&& split.node_id != *node_id
1024+
{
1025+
return false;
1026+
}
1027+
1028+
if !query.delete_opstamp.contains(&split.delete_opstamp) {
1029+
return false;
1030+
}
1031+
1032+
if !query.update_timestamp.contains(&split.update_timestamp) {
1033+
return false;
1034+
}
1035+
1036+
if !query.create_timestamp.contains(&split.create_timestamp) {
1037+
return false;
1038+
}
1039+
1040+
match &query.mature {
1041+
Bound::Included(evaluation_datetime) => {
1042+
if split.maturity_timestamp > evaluation_datetime.unix_timestamp() {
1043+
return false;
1044+
}
1045+
}
1046+
Bound::Excluded(evaluation_datetime) => {
1047+
if split.maturity_timestamp <= evaluation_datetime.unix_timestamp() {
1048+
return false;
1049+
}
1050+
}
1051+
Bound::Unbounded => {}
1052+
}
1053+
9821054
true
9831055
}
9841056

quickwit/quickwit-metastore/src/metastore/mod.rs

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,15 @@ use crate::checkpoint::IndexCheckpointDelta;
5151
use crate::{Split, SplitMetadata, SplitState};
5252

5353
/// Query parameters for listing metrics splits.
54-
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
54+
#[derive(Debug, Clone, Serialize, Deserialize)]
5555
pub struct ListMetricsSplitsQuery {
5656
/// Index UID to filter by (required).
5757
pub index_uid: IndexUid,
5858
/// Split states to include.
5959
#[serde(default)]
60-
pub split_states: Vec<String>,
61-
/// Time range start (inclusive).
62-
pub time_range_start: Option<i64>,
63-
/// Time range end (inclusive).
64-
pub time_range_end: Option<i64>,
60+
pub split_states: Vec<SplitState>,
61+
/// The time range to filter by.
62+
pub time_range: FilterRange<i64>,
6563
/// Metric names to filter by (any match).
6664
#[serde(default)]
6765
pub metric_names: Vec<String>,
@@ -75,30 +73,70 @@ pub struct ListMetricsSplitsQuery {
7573
pub tag_region: Option<String>,
7674
/// Host tag filter.
7775
pub tag_host: Option<String>,
76+
/// Sort fields filter for compaction scope queries.
77+
pub sort_fields: Option<String>,
78+
/// A specific node ID to filter by.
79+
pub node_id: Option<NodeId>,
80+
/// The delete opstamp range to filter by.
81+
pub delete_opstamp: FilterRange<u64>,
82+
/// The update timestamp range to filter by.
83+
pub update_timestamp: FilterRange<i64>,
84+
/// The create timestamp range to filter by.
85+
pub create_timestamp: FilterRange<i64>,
86+
/// The datetime at which you include or exclude mature splits.
87+
pub mature: Bound<OffsetDateTime>,
7888
/// Limit number of results.
7989
pub limit: Option<usize>,
8090
}
8191

92+
impl Default for ListMetricsSplitsQuery {
93+
fn default() -> Self {
94+
Self {
95+
index_uid: IndexUid::default(),
96+
split_states: Vec::new(),
97+
time_range: Default::default(),
98+
metric_names: Vec::new(),
99+
tag_service: None,
100+
tag_env: None,
101+
tag_datacenter: None,
102+
tag_region: None,
103+
tag_host: None,
104+
sort_fields: None,
105+
node_id: None,
106+
delete_opstamp: Default::default(),
107+
update_timestamp: Default::default(),
108+
create_timestamp: Default::default(),
109+
mature: Bound::Unbounded,
110+
limit: None,
111+
}
112+
}
113+
}
114+
82115
impl ListMetricsSplitsQuery {
83116
/// Creates a query for all splits in an index.
84117
pub fn for_index(index_uid: impl Into<IndexUid>) -> Self {
85118
Self {
86119
index_uid: index_uid.into(),
87-
split_states: vec!["Published".to_string()],
120+
split_states: vec![SplitState::Published],
88121
..Default::default()
89122
}
90123
}
91124

92125
/// Filter by split states.
93-
pub fn with_split_states(mut self, states: Vec<String>) -> Self {
94-
self.split_states = states;
126+
pub fn with_split_states(mut self, states: impl AsRef<[SplitState]>) -> Self {
127+
self.split_states = states.as_ref().to_vec();
95128
self
96129
}
97130

98-
/// Filter by time range.
99-
pub fn with_time_range(mut self, start: i64, end: i64) -> Self {
100-
self.time_range_start = Some(start);
101-
self.time_range_end = Some(end);
131+
/// Filter by time range (inclusive on both ends).
132+
pub fn with_time_range_start_gte(mut self, v: i64) -> Self {
133+
self.time_range.start = Bound::Included(v);
134+
self
135+
}
136+
137+
/// Filter by time range (inclusive on both ends).
138+
pub fn with_time_range_end_lte(mut self, v: i64) -> Self {
139+
self.time_range.end = Bound::Included(v);
102140
self
103141
}
104142

@@ -107,6 +145,21 @@ impl ListMetricsSplitsQuery {
107145
self.metric_names = names;
108146
self
109147
}
148+
149+
/// Filter by compaction scope: splits whose window intersects
150+
/// `[window_start, window_start + window_duration_secs)` and whose
151+
/// sort_fields match exactly.
152+
pub fn with_compaction_scope(
153+
mut self,
154+
window_start: i64,
155+
window_duration_secs: u32,
156+
sort_fields: impl Into<String>,
157+
) -> Self {
158+
self.time_range.start = Bound::Included(window_start);
159+
self.time_range.end = Bound::Excluded(window_start + window_duration_secs as i64);
160+
self.sort_fields = Some(sort_fields.into());
161+
self
162+
}
110163
}
111164

112165
/// Splits batch size returned by the stream splits API

0 commit comments

Comments
 (0)