Skip to content

Commit a36f4fe

Browse files
g-talbotclaude
andauthored
feat: add configurable ParquetMergePolicyConfig to index settings (#6362)
* feat: add configurable ParquetMergePolicyConfig to index settings Adds `parquet_merge_policy` section to `IndexingSettings`, making the Parquet merge policy configurable per-index via YAML. Parameters: - merge_factor (default 10): min splits to trigger a merge - max_merge_factor (default 12): max splits per merge - max_merge_ops (default 4): bounds write amplification - target_split_size_bytes (default 256 MiB): target output size - maturation_period (default 48h): split maturity timeout - max_finalize_merge_operations (default 3): cold-window shutdown limit Mirrors the existing merge_policy config pattern for logs/traces. Updates index-config.md documentation with the new section. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: add ParquetIndexingConfig with sort_fields and window_duration_secs Adds `parquet_indexing` section to `IndexingSettings` for per-index Parquet pipeline configuration: - `sort_fields`: sort schema override (Husky-style pipe-delimited syntax with /V2 suffix). Controls row ordering, query pruning, compression locality, and compaction scope. When omitted, uses the product-type default. - `window_duration_secs`: time window for split partitioning (default 900s / 15 min). Must divide 3600. Updates docs/configuration/index-config.md with: - "Parquet indexing settings" section explaining both parameters - Full sort schema syntax reference (column types, direction overrides, & LSM cutoff marker) - Examples showing minimal, custom, and advanced configurations Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: update indexing service fingerprint constants and nightly fmt Adding ParquetMergePolicyConfig and ParquetIndexingConfig to IndexingSettings changes the Hash output, which changes the pipeline params fingerprints. Updated the hardcoded test constants. Added a comment explaining how to recompute them when IndexingSettings fields change. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4d6fd21 commit a36f4fe

16 files changed

Lines changed: 727 additions & 470 deletions

File tree

docs/configuration/index-config.md

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,9 @@ This section describes indexing settings for a given index.
594594
| ------------- | ------------- | ------------- |
595595
| `commit_timeout_secs` | Maximum number of seconds before committing a split since its creation. | `60` |
596596
| `split_num_docs_target` | Target number of docs per split. | `10000000` |
597-
| `merge_policy` | Describes the strategy used to trigger split merge operations (see [Merge policies](#merge-policies) section below). |
597+
| `merge_policy` | Describes the strategy used to trigger split merge operations for logs/traces (see [Merge policies](#merge-policies) section below). |
598+
| `parquet_merge_policy` | Describes the merge policy for Parquet (metrics/sketches) splits (see [Parquet merge policy](#parquet-merge-policy) section below). |
599+
| `parquet_indexing` | Parquet-specific indexing settings: sort schema, window duration (see [Parquet indexing settings](#parquet-indexing-settings) section below). |
598600
| `resources.heap_size` | Indexer heap size per source per index. | `2000000000` |
599601
| `docstore_compression_level` | Level of compression used by zstd for the docstore. Lower values may increase ingest speed, at the cost of index size | `8` |
600602
| `docstore_blocksize` | Size of blocks in the docstore, in bytes. Lower values may improve doc retrieval speed, at the cost of index size | `1000000` |
@@ -687,6 +689,86 @@ indexing_settings:
687689
type: "no_merge"
688690
```
689691

692+
### Parquet indexing settings
693+
694+
*For indexes using the Parquet indexing pipeline (metrics, sketches).*
695+
696+
These settings control how the Parquet pipeline sorts, windows, and writes incoming data. They affect both ingest-time performance and downstream query/compaction efficiency.
697+
698+
```yaml
699+
version: 0.7
700+
index_id: "my-metrics-index"
701+
# ...
702+
indexing_settings:
703+
parquet_indexing:
704+
sort_fields: "metric_name|service|env|host|timeseries_id|timestamp_secs/V2"
705+
window_duration_secs: 900
706+
```
707+
708+
| Variable | Description | Default value |
709+
| ------------- | ------------- | ------------- |
710+
| `sort_fields` | Sort schema for row ordering in Parquet files (see syntax below). When omitted, the product-type default is used. | `metric_name\|service\|env\|datacenter\|region\|host\|timeseries_id\|timestamp_secs/V2` |
711+
| `window_duration_secs` | Time window duration in seconds for split partitioning. Must evenly divide 3600. Larger values = fewer splits but coarser time pruning. | `900` (15 minutes) |
712+
713+
#### Sort schema syntax
714+
715+
The sort schema uses pipe-delimited column names with a `/V2` version suffix:
716+
717+
```text
718+
column1|column2|...|timestamp_secs/V2
719+
```
720+
721+
**Column types** are inferred from name suffixes:
722+
- `__s` → string (e.g., `custom_tag__s`)
723+
- `__i` → int64 (e.g., `priority__i`)
724+
- Well-known names like `metric_name`, `service`, `env`, `host`, `timestamp_secs`, and `timeseries_id` have built-in type mappings and don't need suffixes.
725+
726+
**Sort direction** defaults to ascending for most columns and descending for timestamp columns. Override with `+` (ascending) or `-` (descending) as a prefix or suffix on the column name:
727+
728+
```text
729+
# Explicit descending timestamp
730+
metric_name|host|-timestamp_secs/V2
731+
732+
# Ascending host (default), descending timestamp (default)
733+
metric_name|host|timestamp_secs/V2
734+
```
735+
736+
**How the sort schema affects behavior:**
737+
- **Query pruning**: queries filtering on leading columns (e.g., `metric_name`) can skip entire splits whose row key ranges don't match.
738+
- **Compression**: grouping similar values together (e.g., all rows for the same metric name) improves columnar compression ratios.
739+
- **Compaction scope**: splits with different sort schemas are never merged together. Changing the sort schema on an existing index creates a new compaction scope — old splits are not re-sorted.
740+
741+
**The `&` marker** (advanced) sets the LSM comparison cutoff: columns after `&` are used for sort order but not for compaction locality decisions. For example, `metric_name|&host|timestamp_secs/V2` sorts by metric_name then host, but only metric_name determines which splits can be merged.
742+
743+
#### Parquet merge policy
744+
745+
*For indexes using the Parquet indexing pipeline (metrics, sketches).*
746+
747+
The Parquet merge policy controls how Parquet splits within a compaction scope (same time window, partition, and sort schema) are merged. It uses a constant write amplification strategy: splits at the same merge level are greedily accumulated until reaching `max_merge_factor` or `target_split_size_bytes`.
748+
749+
```yaml
750+
version: 0.7
751+
index_id: "my-metrics-index"
752+
# ...
753+
indexing_settings:
754+
parquet_merge_policy:
755+
merge_factor: 10
756+
max_merge_factor: 12
757+
max_merge_ops: 4
758+
target_split_size_bytes: 268435456
759+
maturation_period: 48h
760+
max_finalize_merge_operations: 3
761+
```
762+
763+
764+
| Variable | Description | Default value |
765+
| ------------- | ------------- | ------------- |
766+
| `merge_factor` | Minimum number of splits to trigger a merge. | `10` |
767+
| `max_merge_factor` | Maximum number of splits in a single merge operation. | `12` |
768+
| `max_merge_ops` | Maximum number of merges a split can undergo before becoming mature. Bounds total write amplification. | `4` |
769+
| `target_split_size_bytes` | Target size for merged output splits in bytes. Merges trigger when accumulated bytes reach this threshold, even if `merge_factor` is not reached. | `268435456` (256 MiB) |
770+
| `maturation_period` | Duration after creation when a split becomes mature (never merged again). | `48h` |
771+
| `max_finalize_merge_operations` | *(advanced)* Maximum number of merge operations emitted during cold-window finalization at pipeline shutdown. Set to `0` to disable. | `3` |
690772

691773

692774
### Indexer memory usage

quickwit/quickwit-config/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,6 @@ quickwit-common = { workspace = true, features = ["testsuite"] }
4545
quickwit-proto = { workspace = true, features = ["testsuite"] }
4646

4747
[features]
48+
metrics = []
4849
testsuite = []
4950
vrl = ["dep:vrl"]

quickwit/quickwit-config/src/index_config/mod.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use tracing::warn;
3737

3838
use crate::index_config::serialize::VersionedIndexConfig;
3939
use crate::merge_policy_config::MergePolicyConfig;
40+
#[cfg(feature = "metrics")]
41+
use crate::merge_policy_config::ParquetMergePolicyConfig;
4042

4143
#[derive(Clone, Debug, Serialize, Deserialize, utoipa::ToSchema)]
4244
#[serde(deny_unknown_fields)]
@@ -118,15 +120,108 @@ pub struct IndexingSettings {
118120
pub split_num_docs_target: usize,
119121
#[serde(default)]
120122
pub merge_policy: MergePolicyConfig,
123+
/// Merge policy for Parquet (metrics/sketches) splits.
124+
#[cfg(feature = "metrics")]
125+
#[serde(default, skip_serializing_if = "Option::is_none")]
126+
pub parquet_merge_policy: Option<ParquetMergePolicyConfig>,
127+
/// Parquet-specific indexing settings (sort schema, window duration).
128+
#[cfg(feature = "metrics")]
129+
#[serde(default, skip_serializing_if = "Option::is_none")]
130+
pub parquet_indexing: Option<ParquetIndexingConfig>,
121131
#[serde(default)]
122132
pub resources: IndexingResources,
123133
}
124134

135+
/// Configuration for the Parquet indexing pipeline (metrics, sketches).
136+
///
137+
/// Controls how incoming data is sorted, windowed, and compressed before
138+
/// writing to Parquet split files. These settings affect both ingest-time
139+
/// performance and downstream query/compaction efficiency.
140+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Hash, utoipa::ToSchema)]
141+
#[serde(deny_unknown_fields)]
142+
pub struct ParquetIndexingConfig {
143+
/// Sort schema defining the physical sort order of rows in Parquet files.
144+
///
145+
/// Uses Husky-style pipe-delimited syntax with a `/V2` version suffix.
146+
/// Each column is sorted ascending by default; use `+` or `-` prefix/suffix
147+
/// to override. Column types are inferred from well-known suffixes
148+
/// (`__s` = string, `__i` = int64, `_secs` = uint64 timestamp).
149+
///
150+
/// The sort order determines:
151+
/// - **Query pruning**: queries that filter on leading sort columns can skip entire splits
152+
/// whose row key ranges don't match.
153+
/// - **Compression**: columns with good locality (e.g., metric_name first) compress better in
154+
/// Parquet's columnar format.
155+
/// - **Compaction scope**: splits with different sort schemas are never merged together.
156+
///
157+
/// When `None`, the product-type default is used (see below).
158+
///
159+
/// # Default (metrics/sketches)
160+
/// ```text
161+
/// metric_name|service|env|datacenter|region|host|timeseries_id|timestamp_secs/V2
162+
/// ```
163+
///
164+
/// # Examples
165+
/// ```text
166+
/// # Minimal: just metric name and timestamp
167+
/// metric_name|timestamp_secs/V2
168+
///
169+
/// # Custom tags in sort order
170+
/// metric_name|service|cluster|host|timestamp_secs/V2
171+
///
172+
/// # Explicit descending timestamp
173+
/// metric_name|host|-timestamp_secs/V2
174+
/// ```
175+
#[serde(default, skip_serializing_if = "Option::is_none")]
176+
pub sort_fields: Option<String>,
177+
178+
/// Time window duration in seconds for split partitioning.
179+
///
180+
/// Incoming data is partitioned into time windows of this duration.
181+
/// Splits within the same window may be compacted together; splits in
182+
/// different windows are never merged. Must evenly divide 3600 (one hour).
183+
///
184+
/// Larger values produce fewer, larger splits (better for bulk queries)
185+
/// but coarser time-based pruning. Smaller values give finer pruning
186+
/// but more splits to manage.
187+
#[serde(default = "ParquetIndexingConfig::default_window_duration_secs")]
188+
pub window_duration_secs: u32,
189+
}
190+
191+
impl ParquetIndexingConfig {
192+
fn default_window_duration_secs() -> u32 {
193+
900
194+
}
195+
}
196+
197+
impl Default for ParquetIndexingConfig {
198+
fn default() -> Self {
199+
Self {
200+
sort_fields: None,
201+
window_duration_secs: Self::default_window_duration_secs(),
202+
}
203+
}
204+
}
205+
125206
impl IndexingSettings {
126207
pub fn commit_timeout(&self) -> Duration {
127208
Duration::from_secs(self.commit_timeout_secs as u64)
128209
}
129210

211+
/// Returns the Parquet merge policy config, using defaults if not
212+
/// explicitly configured.
213+
#[cfg(feature = "metrics")]
214+
pub fn parquet_merge_policy(&self) -> ParquetMergePolicyConfig {
215+
self.parquet_merge_policy.clone().unwrap_or_default()
216+
}
217+
218+
/// Returns the Parquet indexing config, using defaults if not
219+
/// explicitly configured.
220+
#[cfg(feature = "metrics")]
221+
pub fn parquet_indexing(&self) -> ParquetIndexingConfig {
222+
self.parquet_indexing.clone().unwrap_or_default()
223+
}
224+
130225
fn default_commit_timeout_secs() -> usize {
131226
60
132227
}
@@ -160,6 +255,10 @@ impl Default for IndexingSettings {
160255
docstore_compression_level: Self::default_docstore_compression_level(),
161256
split_num_docs_target: Self::default_split_num_docs_target(),
162257
merge_policy: MergePolicyConfig::default(),
258+
#[cfg(feature = "metrics")]
259+
parquet_merge_policy: None,
260+
#[cfg(feature = "metrics")]
261+
parquet_indexing: None,
163262
resources: IndexingResources::default(),
164263
}
165264
}

quickwit/quickwit-config/src/lib.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ pub use cluster_config::ClusterConfig;
4545
// See #2048
4646
use index_config::serialize::{IndexConfigV0_8, VersionedIndexConfig};
4747
pub use index_config::{
48-
IndexConfig, IndexingResources, IndexingSettings, IngestSettings, RetentionPolicy,
49-
SearchSettings, build_doc_mapper, load_index_config_from_user_config, load_index_config_update,
50-
prepare_doc_mapping_update,
48+
IndexConfig, IndexingResources, IndexingSettings, IngestSettings, ParquetIndexingConfig,
49+
RetentionPolicy, SearchSettings, build_doc_mapper, load_index_config_from_user_config,
50+
load_index_config_update, prepare_doc_mapping_update,
5151
};
5252
pub use quickwit_doc_mapper::DocMapping;
5353
use serde::Serialize;
@@ -67,7 +67,8 @@ use tracing::warn;
6767
use crate::index_template::IndexTemplateV0_8;
6868
pub use crate::index_template::{IndexTemplate, IndexTemplateId, VersionedIndexTemplate};
6969
use crate::merge_policy_config::{
70-
ConstWriteAmplificationMergePolicyConfig, MergePolicyConfig, StableLogMergePolicyConfig,
70+
ConstWriteAmplificationMergePolicyConfig, MergePolicyConfig, ParquetMergePolicyConfig,
71+
StableLogMergePolicyConfig,
7172
};
7273
pub use crate::metastore_config::{
7374
MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig,
@@ -113,6 +114,8 @@ pub fn disable_ingest_v1() -> bool {
113114
KafkaSourceParams,
114115
KinesisSourceParams,
115116
MergePolicyConfig,
117+
ParquetIndexingConfig,
118+
ParquetMergePolicyConfig,
116119
PubSubSourceParams,
117120
PulsarSourceAuth,
118121
PulsarSourceParams,

quickwit/quickwit-config/src/merge_policy_config.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,74 @@ impl Default for StableLogMergePolicyConfig {
119119
}
120120
}
121121

122+
// --- Parquet merge policy config ---
123+
//
124+
// The types are always available (for OpenAPI schema generation in
125+
// quickwit-serve). The IndexingSettings fields that use them are
126+
// gated behind cfg(feature = "metrics").
127+
128+
fn default_target_split_size_bytes() -> u64 {
129+
256 * 1024 * 1024 // 256 MiB
130+
}
131+
132+
fn default_max_finalize_merge_operations() -> usize {
133+
3
134+
}
135+
136+
/// Configuration for the Parquet (metrics/sketches) merge policy.
137+
///
138+
/// Controls how Parquet splits within a compaction scope are merged.
139+
/// Splits at the same `num_merge_ops` level are greedily accumulated
140+
/// until reaching `max_merge_factor` or `target_split_size_bytes`.
141+
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash, utoipa::ToSchema)]
142+
#[serde(deny_unknown_fields)]
143+
pub struct ParquetMergePolicyConfig {
144+
/// Minimum number of splits to trigger a merge.
145+
#[serde(default = "default_merge_factor")]
146+
pub merge_factor: usize,
147+
/// Maximum number of splits in a single merge operation.
148+
#[serde(default = "default_max_merge_factor")]
149+
pub max_merge_factor: usize,
150+
/// Maximum number of merges a split can undergo before becoming mature.
151+
/// Bounds total write amplification.
152+
#[serde(default = "default_parquet_max_merge_ops")]
153+
pub max_merge_ops: u32,
154+
/// Target size for merged output splits in bytes. Merges are triggered
155+
/// when accumulated bytes reach this threshold, even if `merge_factor`
156+
/// is not reached.
157+
#[serde(default = "default_target_split_size_bytes")]
158+
pub target_split_size_bytes: u64,
159+
/// Duration after creation when a split becomes mature regardless of
160+
/// size or merge count. Mature splits are never merged.
161+
#[schema(value_type = String)]
162+
#[serde(default = "default_maturation_period")]
163+
#[serde(deserialize_with = "parse_human_duration")]
164+
#[serde(serialize_with = "serialize_duration")]
165+
pub maturation_period: Duration,
166+
/// Maximum number of merge operations emitted during cold-window
167+
/// finalization at shutdown. Set to 0 to disable.
168+
#[serde(default = "default_max_finalize_merge_operations")]
169+
#[serde(skip_serializing_if = "is_zero")]
170+
pub max_finalize_merge_operations: usize,
171+
}
172+
173+
fn default_parquet_max_merge_ops() -> u32 {
174+
4
175+
}
176+
177+
impl Default for ParquetMergePolicyConfig {
178+
fn default() -> Self {
179+
Self {
180+
merge_factor: default_merge_factor(),
181+
max_merge_factor: default_max_merge_factor(),
182+
max_merge_ops: default_parquet_max_merge_ops(),
183+
target_split_size_bytes: default_target_split_size_bytes(),
184+
maturation_period: default_maturation_period(),
185+
max_finalize_merge_operations: default_max_finalize_merge_operations(),
186+
}
187+
}
188+
}
189+
122190
fn parse_human_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
123191
where D: Deserializer<'de> {
124192
let value: String = Deserialize::deserialize(deserializer)?;

quickwit/quickwit-indexing/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ testsuite = [
105105
"quickwit-proto/testsuite",
106106
"quickwit-storage/testsuite"
107107
]
108-
metrics = ["dep:arrow", "dep:quickwit-parquet-engine", "quickwit-doc-mapper/metrics"]
108+
metrics = ["dep:arrow", "dep:quickwit-parquet-engine", "quickwit-doc-mapper/metrics", "quickwit-config/metrics"]
109109
vrl = ["dep:vrl", "quickwit-config/vrl"]
110110
postgres = ["quickwit-metastore/postgres"]
111111
ci-test = []

quickwit/quickwit-indexing/src/actors/indexing_service.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,9 +1250,13 @@ mod tests {
12501250

12511251
#[tokio::test]
12521252
async fn test_indexing_service_apply_plan() {
1253-
const PARAMS_FINGERPRINT_INGEST_API: u64 = 1637744865450232394;
1254-
const PARAMS_FINGERPRINT_SOURCE_1: u64 = 1705211905504908791;
1255-
const PARAMS_FINGERPRINT_SOURCE_2: u64 = 8706667372658059428;
1253+
// These fingerprints are hashes of IndexConfig + SourceConfig. They
1254+
// change whenever IndexingSettings fields are added/removed. Recompute
1255+
// by temporarily adding a test that prints
1256+
// `indexing_pipeline_params_fingerprint(&index_config, &source_config)`.
1257+
const PARAMS_FINGERPRINT_INGEST_API: u64 = 7973087274884969148;
1258+
const PARAMS_FINGERPRINT_SOURCE_1: u64 = 9420938500552890840;
1259+
const PARAMS_FINGERPRINT_SOURCE_2: u64 = 16199199787360162635;
12561260

12571261
quickwit_common::setup_logging_for_tests();
12581262
let transport = ChannelTransport::default();

0 commit comments

Comments
 (0)