Skip to content

Commit 0131aef

Browse files
authored
separate index/merge upload semaphores, pass around merge task instead of just the permit (#6376)
1 parent f6768d6 commit 0131aef

6 files changed

Lines changed: 59 additions & 33 deletions

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use tracing::{debug, info, info_span, warn};
4040
use ulid::Ulid;
4141

4242
use super::ProcessedParquetBatch;
43+
use super::parquet_merge_messages::ParquetMergeTask;
4344
use super::parquet_packager::{ParquetBatchForPackager, ParquetPackager, PartitionedRecordBatch};
4445
use crate::actors::indexer::OTHER_PARTITION_ID;
4546
use crate::models::{NewPublishLock, NewPublishToken, PublishLock};
@@ -126,10 +127,10 @@ pub struct ParquetSplitBatch {
126127
/// `None` for the ingest path (packager manages its own temp dir).
127128
/// `Some` for the merge path (executor's scratch directory).
128129
pub _scratch_directory_opt: Option<quickwit_common::temp_dir::TempDirectory>,
129-
/// Merge concurrency permit — carried through to the publisher so the
130-
/// semaphore slot isn't released until the upload completes.
130+
/// Merge task — carried through to the publisher so the planner inventory
131+
/// guard and semaphore permit stay alive until publish completes.
131132
/// `None` for the ingest path. `Some` for the merge path.
132-
pub _merge_permit_opt: Option<crate::actors::MergePermit>,
133+
pub _merge_task_opt: Option<ParquetMergeTask>,
133134
}
134135

135136
impl std::fmt::Debug for ParquetSplitBatch {
@@ -139,7 +140,7 @@ impl std::fmt::Debug for ParquetSplitBatch {
139140
.field("num_splits", &self.splits.len())
140141
.field("output_dir", &self.output_dir)
141142
.field("replaced_split_ids", &self.replaced_split_ids)
142-
.field("has_merge_permit", &self._merge_permit_opt.is_some())
143+
.field("has_merge_task", &self._merge_task_opt.is_some())
143144
.finish()
144145
}
145146
}

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tracing::{info, instrument, warn};
3030

3131
use super::ParquetUploader;
3232
use super::parquet_indexer::ParquetSplitBatch;
33-
use super::parquet_merge_messages::ParquetMergeScratch;
33+
use super::parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask};
3434
use crate::models::PublishLock;
3535

3636
/// Executes Parquet merge operations using the Phase 1 k-way merge engine.
@@ -173,7 +173,10 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
173173
publish_token_opt: None,
174174
replaced_split_ids,
175175
_scratch_directory_opt: Some(scratch.scratch_directory),
176-
_merge_permit_opt: Some(scratch.merge_permit),
176+
_merge_task_opt: Some(ParquetMergeTask {
177+
merge_operation: scratch.merge_operation,
178+
merge_permit: scratch.merge_permit,
179+
}),
177180
};
178181
ctx.send_message(&self.uploader_mailbox, batch).await?;
179182
return Ok(());
@@ -230,12 +233,15 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
230233
publish_token_opt: None,
231234
replaced_split_ids,
232235
_scratch_directory_opt: Some(scratch.scratch_directory),
233-
_merge_permit_opt: Some(scratch.merge_permit),
236+
_merge_task_opt: Some(ParquetMergeTask {
237+
merge_operation: scratch.merge_operation,
238+
merge_permit: scratch.merge_permit,
239+
}),
234240
};
235241

236242
ctx.send_message(&self.uploader_mailbox, batch).await?;
237243

238-
// The merge permit is now carried by the batch — it will be held
244+
// The merge task is now carried by the batch — it will be held
239245
// through the uploader and released when the publisher drops the
240246
// ParquetSplitsUpdate message.
241247
info!(

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ impl Handler<ParquetBatchForPackager> for ParquetPackager {
238238
publish_token_opt,
239239
replaced_split_ids: Vec::new(),
240240
_scratch_directory_opt: None,
241-
_merge_permit_opt: None,
241+
_merge_task_opt: None,
242242
};
243243

244244
ctx.send_message(&self.uploader_mailbox, split_batch)

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_splits_update.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use quickwit_parquet_engine::split::ParquetSplitMetadata;
2222
use quickwit_proto::types::{IndexUid, PublishToken};
2323
use tracing::Span;
2424

25+
use super::parquet_merge_messages::ParquetMergeTask;
2526
use crate::models::PublishLock;
2627

2728
/// Message sent by ParquetUploader to downstream actors after staging and uploading.
@@ -43,10 +44,10 @@ pub struct ParquetSplitsUpdate {
4344
pub publish_token_opt: Option<PublishToken>,
4445
/// Parent span for tracing.
4546
pub parent_span: Span,
46-
/// Merge concurrency permit — held until the publisher drops this message,
47-
/// ensuring the semaphore slot stays occupied while the merge output is
48-
/// in flight. `None` for the ingest path.
49-
pub _merge_permit_opt: Option<crate::actors::MergePermit>,
47+
/// Merge task — held until the publisher drops this message, ensuring the
48+
/// planner inventory guard and semaphore permit stay alive while the merge
49+
/// output is in flight. `None` for the ingest path.
50+
pub _merge_task_opt: Option<ParquetMergeTask>,
5051
}
5152

5253
impl fmt::Debug for ParquetSplitsUpdate {

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ use crate::actors::sequencer::{Sequencer, SequencerCommand};
3838
use crate::actors::{Publisher, UploaderCounters, UploaderType};
3939
use crate::metrics::INDEXER_METRICS;
4040

41-
/// Concurrent upload permits for metrics uploader.
42-
/// Uses same permit pool as indexer uploads.
43-
static CONCURRENT_UPLOAD_PERMITS_METRICS: OnceLock<Semaphore> = OnceLock::new();
41+
/// Concurrent upload permits for metrics ingest uploads.
42+
static CONCURRENT_UPLOAD_PERMITS_METRICS_INDEX: OnceLock<Semaphore> = OnceLock::new();
43+
/// Concurrent upload permits for metrics merge uploads.
44+
static CONCURRENT_UPLOAD_PERMITS_METRICS_MERGE: OnceLock<Semaphore> = OnceLock::new();
4445

4546
/// Stage splits in the metastore, dispatching to the correct RPC based on split kind.
4647
async fn stage_splits(
@@ -120,12 +121,24 @@ impl ParquetUploader {
120121
ctx: &ActorContext<Self>,
121122
) -> anyhow::Result<SemaphorePermit<'static>> {
122123
let _guard = ctx.protect_zone();
123-
let concurrent_upload_permits = CONCURRENT_UPLOAD_PERMITS_METRICS
124+
let (concurrent_upload_permits_once_cell, concurrent_upload_permits_gauge) =
125+
match self.uploader_type {
126+
UploaderType::IndexUploader => (
127+
&CONCURRENT_UPLOAD_PERMITS_METRICS_INDEX,
128+
INDEXER_METRICS
129+
.available_concurrent_upload_permits
130+
.with_label_values(["metrics_indexer"]),
131+
),
132+
UploaderType::MergeUploader | UploaderType::DeleteUploader => (
133+
&CONCURRENT_UPLOAD_PERMITS_METRICS_MERGE,
134+
INDEXER_METRICS
135+
.available_concurrent_upload_permits
136+
.with_label_values(["metrics_merger"]),
137+
),
138+
};
139+
let concurrent_upload_permits = concurrent_upload_permits_once_cell
124140
.get_or_init(|| Semaphore::const_new(self.max_concurrent_uploads));
125-
let gauge = INDEXER_METRICS
126-
.available_concurrent_upload_permits
127-
.with_label_values(["metrics"]);
128-
gauge.set(concurrent_upload_permits.available_permits() as i64);
141+
concurrent_upload_permits_gauge.set(concurrent_upload_permits.available_permits() as i64);
129142
concurrent_upload_permits
130143
.acquire()
131144
.await
@@ -185,7 +198,7 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
185198
publish_lock: batch.publish_lock,
186199
publish_token_opt: batch.publish_token_opt,
187200
parent_span: tracing::Span::current(),
188-
_merge_permit_opt: batch._merge_permit_opt,
201+
_merge_task_opt: batch._merge_task_opt,
189202
};
190203
if tx.send(SequencerCommand::Proceed(update)).is_err() {
191204
warn!("sequencer receiver dropped for empty batch");
@@ -223,7 +236,7 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
223236
let publish_token_opt = batch.publish_token_opt;
224237
let splits = batch.splits;
225238
let replaced_split_ids = batch.replaced_split_ids;
226-
let merge_permit_opt = batch._merge_permit_opt;
239+
let merge_task_opt = batch._merge_task_opt;
227240
// Hold the scratch directory alive until the upload task completes.
228241
// For the merge path, this prevents the TempDirectory from being
229242
// cleaned up before the upload task reads the merged files.
@@ -325,8 +338,8 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
325338
}
326339

327340
// Create ParquetSplitsUpdate and send downstream.
328-
// The merge permit (if present) transfers to the update so it
329-
// stays alive until the publisher drops the message.
341+
// The merge task (if present) transfers to the update so the
342+
// planner guard and semaphore permit stay alive until publish.
330343
let update = ParquetSplitsUpdate {
331344
index_uid,
332345
new_splits: splits,
@@ -335,7 +348,7 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
335348
publish_lock,
336349
publish_token_opt,
337350
parent_span: Span::current(),
338-
_merge_permit_opt: merge_permit_opt,
351+
_merge_task_opt: merge_task_opt,
339352
};
340353

341354
if tx.send(SequencerCommand::Proceed(update)).is_err() {
@@ -441,7 +454,7 @@ mod tests {
441454
publish_token_opt: None,
442455
replaced_split_ids: Vec::new(),
443456
_scratch_directory_opt: None,
444-
_merge_permit_opt: None,
457+
_merge_task_opt: None,
445458
};
446459

447460
uploader_mailbox.send_message(batch).await.unwrap();
@@ -537,7 +550,7 @@ mod tests {
537550
publish_token_opt: None,
538551
replaced_split_ids: Vec::new(),
539552
_scratch_directory_opt: None,
540-
_merge_permit_opt: None,
553+
_merge_task_opt: None,
541554
};
542555

543556
uploader_mailbox.send_message(batch).await.unwrap();
@@ -614,7 +627,7 @@ mod tests {
614627
publish_token_opt: None,
615628
replaced_split_ids: Vec::new(),
616629
_scratch_directory_opt: None,
617-
_merge_permit_opt: None,
630+
_merge_task_opt: None,
618631
};
619632

620633
uploader_mailbox.send_message(batch).await.unwrap();
@@ -687,7 +700,7 @@ mod tests {
687700
publish_token_opt: None,
688701
replaced_split_ids: Vec::new(),
689702
_scratch_directory_opt: None,
690-
_merge_permit_opt: None,
703+
_merge_task_opt: None,
691704
};
692705
uploader_mailbox.send_message(batch).await.unwrap();
693706
}

quickwit/quickwit-indexing/src/actors/metrics_pipeline/publisher_impl.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ impl Handler<ParquetSplitsUpdate> for Publisher {
4545
checkpoint_delta_opt,
4646
publish_lock,
4747
publish_token_opt,
48+
_merge_task_opt,
4849
..
4950
} = split_update;
5051

@@ -108,6 +109,10 @@ impl Handler<ParquetSplitsUpdate> for Publisher {
108109
} else {
109110
self.counters.num_replace_operations += 1;
110111
}
112+
// Keep the merge task alive until after the metastore publish and
113+
// planner feedback have completed. Dropping it releases both the merge
114+
// semaphore permit and the planner's tracked-operation inventory guard.
115+
drop(_merge_task_opt);
111116
Ok(())
112117
}
113118
}
@@ -175,7 +180,7 @@ mod tests {
175180
publish_lock: PublishLock::default(),
176181
publish_token_opt: None,
177182
parent_span: Span::none(),
178-
_merge_permit_opt: None,
183+
_merge_task_opt: None,
179184
};
180185

181186
publisher_mailbox.send_message(update).await.unwrap();
@@ -224,7 +229,7 @@ mod tests {
224229
publish_lock: PublishLock::default(),
225230
publish_token_opt: None,
226231
parent_span: Span::none(),
227-
_merge_permit_opt: None,
232+
_merge_task_opt: None,
228233
};
229234

230235
publisher_mailbox.send_message(update).await.unwrap();
@@ -270,7 +275,7 @@ mod tests {
270275
publish_lock,
271276
publish_token_opt: None,
272277
parent_span: Span::none(),
273-
_merge_permit_opt: None,
278+
_merge_task_opt: None,
274279
};
275280

276281
publisher_mailbox.send_message(update).await.unwrap();

0 commit comments

Comments
 (0)