Skip to content

Commit c2a0235

Browse files
authored
[metrics] Support DDSketch in the parquet pipeline (#6257)
* generalize metric splits to parquet splits * add sketch schema + processor * sketch split support in metastore * wire sketch pipeline into indexing actors and control plane * drive by fix: pass sort fields to bloom filter config * address comments, lint * address comments, lint * linter * linter * fix unit test
1 parent 5fe18a4 commit c2a0235

46 files changed

Lines changed: 4555 additions & 1152 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

quickwit/Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-common/src/metrics_specific.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,22 @@ pub fn is_metrics_index(index_id: &str) -> bool {
2020
index_id.starts_with("otel-metrics") || index_id.starts_with("metrics-")
2121
}
2222

23+
/// Returns whether the given index ID corresponds to a sketches index.
24+
///
25+
/// Sketches indexes use the Parquet/DataFusion pipeline with sketch-specific
26+
/// processors and writers.
27+
pub fn is_sketches_index(index_id: &str) -> bool {
28+
index_id.starts_with("sketches-")
29+
}
30+
31+
/// Returns whether the given index ID uses the Parquet/DataFusion pipeline.
32+
pub fn is_parquet_pipeline_index(index_id: &str) -> bool {
33+
is_metrics_index(index_id) || is_sketches_index(index_id)
34+
}
35+
2336
#[cfg(test)]
2437
mod tests {
25-
use super::is_metrics_index;
38+
use super::*;
2639

2740
#[test]
2841
fn test_is_metrics_index() {
@@ -44,4 +57,19 @@ mod tests {
4457
assert!(!is_metrics_index("metrics")); // No hyphen after "metrics"
4558
assert!(!is_metrics_index("my-metrics-index")); // Not prefixed
4659
}
60+
61+
#[test]
62+
fn test_is_sketches_index() {
63+
assert!(is_sketches_index("sketches-default"));
64+
assert!(!is_sketches_index("otel-metrics"));
65+
assert!(!is_sketches_index("my-index"));
66+
}
67+
68+
#[test]
69+
fn test_is_parquet_pipeline_index() {
70+
assert!(is_parquet_pipeline_index("otel-metrics"));
71+
assert!(is_parquet_pipeline_index("sketches-default"));
72+
assert!(!is_parquet_pipeline_index("otel-logs-v0_7"));
73+
assert!(!is_parquet_pipeline_index("my-index"));
74+
}
4775
}

quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::time::{Duration, Instant};
2323

2424
use fnv::{FnvHashMap, FnvHashSet};
2525
use itertools::Itertools;
26-
use quickwit_common::is_metrics_index;
26+
use quickwit_common::is_parquet_pipeline_index;
2727
use quickwit_common::pretty::PrettySample;
2828
use quickwit_config::{FileSourceParams, SourceParams, indexing_pipeline_params_fingerprint};
2929
use quickwit_proto::indexing::{
@@ -217,7 +217,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
217217
SourceParams::IngestApi => {
218218
// Metrics indexes should use IngestV2 only, not IngestV1.
219219
// The ParquetSourceLoader doesn't support IngestV1.
220-
if is_metrics_index(&source_uid.index_uid.index_id) {
220+
if is_parquet_pipeline_index(&source_uid.index_uid.index_id) {
221221
continue;
222222
}
223223
// TODO ingest v1 is scheduled differently

quickwit/quickwit-indexing/src/actors/metrics_pipeline/indexing_service_impl.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
// In that case, metrics index will be started as a metrics pipeline.
1919

2020
use quickwit_actors::ActorContext;
21-
use quickwit_common::{is_metrics_index, temp_dir};
21+
use quickwit_common::{is_parquet_pipeline_index, is_sketches_index, temp_dir};
2222
use quickwit_config::{IndexConfig, SourceConfig};
2323
use quickwit_metastore::SplitMetadata;
2424
use quickwit_proto::indexing::{IndexingError, IndexingPipelineId};
@@ -35,6 +35,7 @@ impl IndexingService {
3535
index_config: IndexConfig,
3636
source_config: SourceConfig,
3737
params_fingerprint: u64,
38+
use_sketch_processors: bool,
3839
) -> Result<BoxedPipelineHandle, IndexingError> {
3940
let pipeline_uid_str = indexing_pipeline_id.pipeline_uid.to_string();
4041
let indexing_directory = temp_dir::Builder::default()
@@ -69,6 +70,7 @@ impl IndexingService {
6970
source_storage_resolver: self.storage_resolver.clone(),
7071
params_fingerprint,
7172
event_broker: self.event_broker.clone(),
73+
use_sketch_processors,
7274
};
7375
let pipeline = MetricsPipeline::new(pipeline_params);
7476
let (mailbox, handle) = ctx.spawn_actor().spawn(pipeline);
@@ -88,13 +90,16 @@ impl IndexingService {
8890
immature_splits_opt: Option<Vec<SplitMetadata>>,
8991
params_fingerprint: u64,
9092
) -> Result<BoxedPipelineHandle, IndexingError> {
91-
if is_metrics_index(&indexing_pipeline_id.index_uid.index_id) {
93+
let index_id = &indexing_pipeline_id.index_uid.index_id;
94+
if is_parquet_pipeline_index(index_id) {
95+
let use_sketch_processors = is_sketches_index(index_id);
9296
self.spawn_metrics_pipeline(
9397
ctx,
9498
indexing_pipeline_id.clone(),
9599
index_config,
96100
source_config,
97101
params_fingerprint,
102+
use_sketch_processors,
98103
)
99104
.await
100105
} else {

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_doc_processor.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, Qu
2323
use quickwit_common::rate_limited_tracing::rate_limited_warn;
2424
use quickwit_common::runtimes::RuntimeType;
2525
use quickwit_metastore::checkpoint::SourceCheckpointDelta;
26-
use quickwit_parquet_engine::ingest::{IngestError, ParquetIngestProcessor};
26+
use quickwit_parquet_engine::ingest::{
27+
IngestError, ParquetIngestProcessor, SketchParquetIngestProcessor,
28+
};
2729
use quickwit_proto::types::{IndexId, SourceId};
2830
use serde::Serialize;
2931
use tokio::runtime::Handle;
@@ -114,17 +116,32 @@ pub enum ParquetDocProcessorError {
114116
Ingest(#[from] IngestError),
115117
}
116118

117-
/// ParquetDocProcessor actor that routes Arrow IPC batches to the metrics engine.
119+
/// Enum wrapping the ingest processor variant for metrics vs sketches.
120+
pub enum IngestProcessor {
121+
Metrics(ParquetIngestProcessor),
122+
Sketches(SketchParquetIngestProcessor),
123+
}
124+
125+
impl IngestProcessor {
126+
fn process_ipc(&self, ipc_bytes: &[u8]) -> Result<RecordBatch, IngestError> {
127+
match self {
128+
Self::Metrics(p) => p.process_ipc(ipc_bytes),
129+
Self::Sketches(p) => p.process_ipc(ipc_bytes),
130+
}
131+
}
132+
}
133+
134+
/// ParquetDocProcessor actor that routes Arrow IPC batches to the parquet engine.
118135
///
119136
/// This actor receives RawDocBatch messages containing Arrow IPC data and converts
120-
/// them to RecordBatch using ParquetIngestProcessor. The resulting batches are
121-
/// forwarded to ParquetIndexer for accumulation and split production.
137+
/// them to RecordBatch using the configured ingest processor. The resulting batches
138+
/// are forwarded to ParquetIndexer for accumulation and split production.
122139
///
123140
/// Unlike DocProcessor which converts to Tantivy documents, this actor works
124-
/// exclusively with Arrow RecordBatch for high-throughput metrics ingestion.
141+
/// exclusively with Arrow RecordBatch for high-throughput metrics/sketch ingestion.
125142
pub struct ParquetDocProcessor {
126143
/// Processor for converting Arrow IPC to RecordBatch.
127-
processor: ParquetIngestProcessor,
144+
processor: IngestProcessor,
128145
/// Processing counters.
129146
counters: ParquetDocProcessorCounters,
130147
/// Publish lock for coordinating with sources.
@@ -136,11 +153,11 @@ pub struct ParquetDocProcessor {
136153
impl ParquetDocProcessor {
137154
/// Creates a new ParquetDocProcessor.
138155
pub fn new(
156+
processor: IngestProcessor,
139157
index_id: IndexId,
140158
source_id: SourceId,
141159
indexer_mailbox: Mailbox<ParquetIndexer>,
142160
) -> Self {
143-
let processor = ParquetIngestProcessor;
144161
let counters = ParquetDocProcessorCounters::new(index_id.clone(), source_id.clone());
145162

146163
info!(
@@ -401,6 +418,7 @@ mod tests {
401418

402419
let (indexer_mailbox, _indexer_inbox) = universe.create_test_mailbox::<ParquetIndexer>();
403420
let metrics_doc_processor = ParquetDocProcessor::new(
421+
IngestProcessor::Metrics(ParquetIngestProcessor),
404422
"test-metrics-index".to_string(),
405423
"test-source".to_string(),
406424
indexer_mailbox,
@@ -442,6 +460,7 @@ mod tests {
442460

443461
let (indexer_mailbox, _indexer_inbox) = universe.create_test_mailbox::<ParquetIndexer>();
444462
let metrics_doc_processor = ParquetDocProcessor::new(
463+
IngestProcessor::Metrics(ParquetIngestProcessor),
445464
"test-metrics-index".to_string(),
446465
"test-source".to_string(),
447466
indexer_mailbox,
@@ -477,6 +496,7 @@ mod tests {
477496

478497
let (indexer_mailbox, _indexer_inbox) = universe.create_test_mailbox::<ParquetIndexer>();
479498
let metrics_doc_processor = ParquetDocProcessor::new(
499+
IngestProcessor::Metrics(ParquetIngestProcessor),
480500
"test-metrics-index".to_string(),
481501
"test-source".to_string(),
482502
indexer_mailbox,
@@ -547,7 +567,12 @@ mod tests {
547567
// Create ParquetPackager
548568
let writer_config = ParquetWriterConfig::default();
549569
let table_config = quickwit_parquet_engine::table_config::TableConfig::default();
550-
let split_writer = ParquetSplitWriter::new(writer_config, temp_dir.path(), &table_config);
570+
let split_writer = ParquetSplitWriter::new(
571+
quickwit_parquet_engine::split::ParquetSplitKind::Metrics,
572+
writer_config,
573+
temp_dir.path(),
574+
&table_config,
575+
);
551576
let packager = ParquetPackager::new(split_writer, uploader_mailbox);
552577
let (packager_mailbox, packager_handle) = universe.spawn_builder().spawn(packager);
553578

@@ -562,6 +587,7 @@ mod tests {
562587
let (indexer_mailbox, indexer_handle) = universe.spawn_builder().spawn(indexer);
563588

564589
let metrics_doc_processor = ParquetDocProcessor::new(
590+
IngestProcessor::Metrics(ParquetIngestProcessor),
565591
"test-index".to_string(),
566592
"test-source".to_string(),
567593
indexer_mailbox,

0 commit comments

Comments
 (0)