Skip to content

Commit 7412edf

Browse files
trying to isolate code metrics code (#6285)
* trying to isolate code from metrics * added license * minor refact * Removing AI slop * Making SourceActor's member private, and added a constructor * Renaming ProcessorMailbox SourceSink * Simplification of the dual sequencer/no sequencer logic * AI slop * CI * fixing clippy --------- Co-authored-by: Paul Masurel <paul@quickwit.io>
1 parent 89cc988 commit 7412edf

47 files changed

Lines changed: 2230 additions & 1730 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,17 @@ jobs:
102102
uses: taiki-e/install-action@aba36d755ec7ca22d38b12111787c26115943952
103103
with:
104104
tool: cargo-nextest
105-
- name: cargo build
105+
- name: cargo check without metrics
106106
if: always() && steps.modified.outputs.rust_src == 'true'
107-
run: cargo build --features=postgres --tests --bin quickwit
107+
run: cargo check --features=postgres --tests --bin quickwit
108+
working-directory: ./quickwit
109+
- name: cargo check with metrics
110+
if: always() && steps.modified.outputs.rust_src == 'true'
111+
run: cargo check --features=postgres,metrics --tests --bin quickwit
108112
working-directory: ./quickwit
109113
- name: cargo nextest
110114
if: always() && steps.modified.outputs.rust_src == 'true'
111-
run: cargo nextest run --features=postgres --retries 1
115+
run: cargo nextest run --features=postgres,metrics --retries 1
112116
working-directory: ./quickwit
113117
- name: Install python packages
114118
if: always() && steps.modified.outputs.rust_src == 'true'

quickwit/Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-cli/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ jemalloc-profiled = [
9595
ci-test = []
9696
pprof = ["quickwit-serve/pprof"]
9797
openssl-support = ["openssl-probe"]
98+
# metrics here refers to adding support for metrics ingestion within Quickwit.
99+
# (this is not about quickwit's metrics themselves)
100+
metrics = [ "quickwit-indexing/metrics" ]
98101
# Requires to enable tokio unstable via RUSTFLAGS="--cfg tokio_unstable"
99102
tokio-console = ["console-subscriber", "quickwit-common/named_tasks"]
100103
release-feature-set = [
@@ -116,6 +119,7 @@ release-feature-vendored-set = [
116119
"openssl-support",
117120
"pprof",
118121
"quickwit-indexing/kinesis",
122+
"quickwit-indexing/metrics",
119123
"quickwit-indexing/pulsar",
120124
"quickwit-indexing/sqs",
121125
"quickwit-indexing/vrl",
@@ -129,6 +133,7 @@ release-macos-feature-vendored-set = [
129133
"jemalloc",
130134
"openssl-support",
131135
"quickwit-indexing/kinesis",
136+
"quickwit-indexing/metrics",
132137
"quickwit-indexing/pulsar",
133138
"quickwit-indexing/sqs",
134139
"quickwit-indexing/vrl",

quickwit/quickwit-cli/src/tool.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use quickwit_config::{
3737
TransformConfig, VecSourceParams,
3838
};
3939
use quickwit_index_management::{IndexService, clear_cache_directory};
40-
use quickwit_indexing::IndexingPipeline;
40+
use quickwit_indexing::BoxedPipelineHandle;
4141
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService};
4242
use quickwit_indexing::models::{
4343
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
@@ -751,7 +751,7 @@ async fn extract_split_cli(args: ExtractSplitArgs) -> anyhow::Result<()> {
751751
/// Starts a tokio task that displays the indexing statistics
752752
/// every once in awhile.
753753
pub async fn start_statistics_reporting_loop(
754-
pipeline_handle: ActorHandle<IndexingPipeline>,
754+
pipeline_handle: BoxedPipelineHandle,
755755
is_stdin: bool,
756756
) -> anyhow::Result<IndexingStatistics> {
757757
let mut stdout_handle = stdout();

quickwit/quickwit-common/src/lib.rs

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ pub mod tower;
4949
pub mod type_map;
5050
pub mod uri;
5151

52+
mod metrics_specific;
53+
pub use metrics_specific::*;
54+
5255
mod socket_addr_legacy_hash;
5356

5457
use std::env;
@@ -215,14 +218,6 @@ macro_rules! assert_eventually {
215218
};
216219
}
217220

218-
/// Returns whether the given index ID corresponds to a metrics index.
219-
///
220-
/// Metrics indexes use the Parquet/DataFusion pipeline instead of the Tantivy pipeline.
221-
/// An index is considered a metrics index if it starts with "otel-metrics" or "metrics-".
222-
pub fn is_metrics_index(index_id: &str) -> bool {
223-
index_id.starts_with("otel-metrics") || index_id.starts_with("metrics-")
224-
}
225-
226221
#[macro_export]
227222
macro_rules! ignore_error_kind {
228223
($kind:path, $expr:expr) => {
@@ -423,27 +418,6 @@ mod tests {
423418
assert_eq!(div_ceil_u32(0, 3), 0);
424419
}
425420

426-
#[test]
427-
fn test_is_metrics_index() {
428-
// OpenTelemetry metrics indexes
429-
assert!(is_metrics_index("otel-metrics-v0_7"));
430-
assert!(is_metrics_index("otel-metrics"));
431-
assert!(is_metrics_index("otel-metrics-custom"));
432-
433-
// Generic metrics indexes
434-
assert!(is_metrics_index("metrics-default"));
435-
assert!(is_metrics_index("metrics-"));
436-
assert!(is_metrics_index("metrics-my-app"));
437-
438-
// Non-metrics indexes
439-
assert!(!is_metrics_index("otel-logs-v0_7"));
440-
assert!(!is_metrics_index("otel-traces-v0_7"));
441-
assert!(!is_metrics_index("my-index"));
442-
assert!(!is_metrics_index("logs-default"));
443-
assert!(!is_metrics_index("metrics")); // No hyphen after "metrics"
444-
assert!(!is_metrics_index("my-metrics-index")); // Not prefixed
445-
}
446-
447421
#[test]
448422
fn test_parse_bool_lenient() {
449423
assert_eq!(parse_bool_lenient("true"), Some(true));
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/// Returns whether the given index ID corresponds to a metrics index.
16+
///
17+
/// Metrics indexes use the Parquet/DataFusion pipeline instead of the Tantivy pipeline.
18+
/// An index is considered a metrics index if it starts with "otel-metrics" or "metrics-".
19+
pub fn is_metrics_index(index_id: &str) -> bool {
20+
index_id.starts_with("otel-metrics") || index_id.starts_with("metrics-")
21+
}
22+
23+
#[cfg(test)]
24+
mod tests {
25+
use super::is_metrics_index;
26+
27+
#[test]
28+
fn test_is_metrics_index() {
29+
// OpenTelemetry metrics indexes
30+
assert!(is_metrics_index("otel-metrics-v0_7"));
31+
assert!(is_metrics_index("otel-metrics"));
32+
assert!(is_metrics_index("otel-metrics-custom"));
33+
34+
// Generic metrics indexes
35+
assert!(is_metrics_index("metrics-default"));
36+
assert!(is_metrics_index("metrics-"));
37+
assert!(is_metrics_index("metrics-my-app"));
38+
39+
// Non-metrics indexes
40+
assert!(!is_metrics_index("otel-logs-v0_7"));
41+
assert!(!is_metrics_index("otel-traces-v0_7"));
42+
assert!(!is_metrics_index("my-index"));
43+
assert!(!is_metrics_index("logs-default"));
44+
assert!(!is_metrics_index("metrics")); // No hyphen after "metrics"
45+
assert!(!is_metrics_index("my-metrics-index")); // Not prefixed
46+
}
47+
}

quickwit/quickwit-indexing/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ utoipa = { workspace = true }
4949
vrl = { workspace = true, optional = true }
5050
warp = { workspace = true, optional = true }
5151

52-
arrow = { workspace = true }
52+
arrow = { workspace = true, optional = true }
5353

5454
quickwit-actors = { workspace = true }
5555
quickwit-aws = { workspace = true }
@@ -61,7 +61,7 @@ quickwit-doc-mapper = { workspace = true }
6161
quickwit-ingest = { workspace = true }
6262
quickwit-metastore = { workspace = true }
6363
quickwit-opentelemetry = { workspace = true }
64-
quickwit-parquet-engine = { workspace = true }
64+
quickwit-parquet-engine = { workspace = true, optional = true }
6565
quickwit-proto = { workspace = true }
6666
quickwit-storage = { workspace = true }
6767

@@ -105,6 +105,7 @@ testsuite = [
105105
"quickwit-proto/testsuite",
106106
"quickwit-storage/testsuite"
107107
]
108+
metrics = ["dep:arrow", "dep:quickwit-parquet-engine"]
108109
vrl = ["dep:vrl", "quickwit-config/vrl"]
109110
postgres = ["quickwit-metastore/postgres"]
110111
ci-test = []
@@ -113,7 +114,6 @@ ci-test = []
113114
bytes = { workspace = true }
114115
criterion = { workspace = true, features = ["async_tokio"] }
115116
mockall = { workspace = true }
116-
parquet = { workspace = true }
117117
proptest = { workspace = true }
118118
prost = { workspace = true }
119119
rand = { workspace = true }
@@ -122,8 +122,8 @@ sqlx = { workspace = true, features = ["runtime-tokio", "postgres"] }
122122
tempfile = { workspace = true }
123123

124124
quickwit-actors = { workspace = true, features = ["testsuite"] }
125-
quickwit-parquet-engine = { workspace = true, features = ["testsuite"] }
126125
quickwit-cluster = { workspace = true, features = ["testsuite"] }
126+
quickwit-parquet-engine = { workspace = true, features = ["testsuite"] }
127127
quickwit-common = { workspace = true, features = ["testsuite"] }
128128
quickwit-config = { workspace = true, features = ["testsuite"] }
129129
quickwit-doc-mapper = { workspace = true, features = ["testsuite"] }

quickwit/quickwit-indexing/src/actors/indexer.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ use tokio::sync::Semaphore;
4949
use tracing::{Span, info, info_span, warn};
5050
use ulid::Ulid;
5151

52-
use crate::actors::IndexSerializer;
53-
use crate::actors::cooperative_indexing::{CooperativeIndexingCycle, CooperativeIndexingPeriod};
52+
use super::IndexSerializer;
53+
use super::cooperative_indexing::{CooperativeIndexingCycle, CooperativeIndexingPeriod};
5454
use crate::models::{
5555
CommitTrigger, EmptySplit, IndexedSplitBatchBuilder, IndexedSplitBuilder, NewPublishLock,
5656
NewPublishToken, ProcessedDoc, ProcessedDocBatch, PublishLock,
@@ -715,8 +715,7 @@ mod tests {
715715
use quickwit_proto::types::{IndexUid, NodeId, PipelineUid};
716716
use tantivy::{DateTime, doc};
717717

718-
use super::*;
719-
use crate::actors::indexer::{IndexerCounters, record_timestamp};
718+
use super::{IndexerCounters, record_timestamp, *};
720719

721720
#[test]
722721
fn test_record_timestamp() {

0 commit comments

Comments
 (0)