Skip to content

Commit 65d97fc

Browse files
congx4claude
andauthored
refactor: extract ES-compat handler logic into reusable pub(crate) functions (#6271)
Extract JSON construction from warp handlers into standalone functions that can be called directly without HTTP, enabling non-HTTP consumers (e.g., gRPC passthrough) to reuse the ES-compatible logic. Extracted functions: - es_compat_cluster_info, es_compat_nodes_info, es_compat_search_shards, es_compat_aliases, es_compat_cluster_health_check, es_compat_delete_scroll Made pub(crate): es_compat_index_mapping, es_compat_index_search, es_compat_index_count, es_compat_index_stats, es_compat_cat_indices, es_compat_index_cat_indices, es_compat_resolve_index, es_compat_index_field_capabilities, es_compat_index_multi_search, es_scroll Also made model and rest_handler modules pub(crate) in mod.rs. Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 050a3b4 commit 65d97fc

2 files changed

Lines changed: 91 additions & 71 deletions

File tree

quickwit/quickwit-serve/src/elasticsearch_api/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
mod bulk;
1616
mod bulk_v2;
1717
mod filter;
18-
mod model;
19-
mod rest_handler;
18+
pub(crate) mod model;
19+
pub(crate) mod rest_handler;
2020

2121
use std::sync::Arc;
2222

quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs

Lines changed: 89 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,28 @@ use crate::rest::recover_fn;
7070
use crate::rest_api_response::{RestApiError, RestApiResponse};
7171
use crate::{BuildInfo, with_arg};
7272

73+
pub(crate) fn es_compat_cluster_info(
74+
config: Arc<NodeConfig>,
75+
build_info: &'static BuildInfo,
76+
) -> Value {
77+
json!({
78+
"name" : config.node_id,
79+
"cluster_name" : config.cluster_id,
80+
"cluster_uuid" : config.cluster_id,
81+
"tagline" : "You Know, for Search",
82+
"version" : {
83+
"distribution" : "quickwit",
84+
"number" : "7.17.0",
85+
"build_hash" : build_info.commit_hash,
86+
"build_date" : build_info.build_date,
87+
"build_snapshot" : false,
88+
"lucene_version" : "8.11.1",
89+
"minimum_wire_compatibility_version" : "6.8.0",
90+
"minimum_index_compatibility_version" : "6.0.0-beta1",
91+
}
92+
})
93+
}
94+
7395
/// Elastic compatible cluster info handler.
7496
pub fn es_compat_cluster_info_handler(
7597
node_config: Arc<NodeConfig>,
@@ -80,76 +102,73 @@ pub fn es_compat_cluster_info_handler(
80102
.and(with_arg(build_info))
81103
.then(
82104
|config: Arc<NodeConfig>, build_info: &'static BuildInfo| async move {
83-
warp::reply::json(&json!({
84-
"name" : config.node_id,
85-
"cluster_name" : config.cluster_id,
86-
"cluster_uuid" : config.cluster_id,
87-
"tagline" : "You Know, for Search",
88-
"version" : {
89-
"distribution" : "quickwit",
90-
"number" : "7.17.0",
91-
"build_hash" : build_info.commit_hash,
92-
"build_date" : build_info.build_date,
93-
"build_snapshot" : false,
94-
"lucene_version" : "8.11.1",
95-
"minimum_wire_compatibility_version" : "6.8.0",
96-
"minimum_index_compatibility_version" : "6.0.0-beta1",
97-
}
98-
}))
105+
warp::reply::json(&es_compat_cluster_info(config, build_info))
99106
},
100107
)
101108
.boxed()
102109
}
103110

111+
pub(crate) fn es_compat_nodes_info(config: Arc<NodeConfig>) -> Value {
112+
let advertise_addr = std::net::SocketAddr::new(
113+
config.grpc_advertise_addr.ip(),
114+
config.rest_config.listen_addr.port(),
115+
);
116+
json!({
117+
"nodes": {
118+
config.node_id.as_str(): {
119+
"roles": ["data", "ingest"],
120+
"http": {
121+
"publish_address": advertise_addr.to_string()
122+
}
123+
}
124+
}
125+
})
126+
}
127+
104128
/// GET _elastic/_nodes/http
105129
pub fn es_compat_nodes_handler(
106130
node_config: Arc<NodeConfig>,
107131
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
108132
elastic_nodes_filter()
109133
.and(with_arg(node_config))
110134
.then(|config: Arc<NodeConfig>| async move {
111-
let advertise_addr = std::net::SocketAddr::new(
112-
config.grpc_advertise_addr.ip(),
113-
config.rest_config.listen_addr.port(),
114-
);
115-
warp::reply::json(&json!({
116-
"nodes": {
117-
config.node_id.as_str(): {
118-
"roles": ["data", "ingest"],
119-
"http": {
120-
"publish_address": advertise_addr.to_string()
121-
}
122-
}
123-
}
124-
}))
135+
warp::reply::json(&es_compat_nodes_info(config))
125136
})
126137
.boxed()
127138
}
128139

140+
pub(crate) fn es_compat_search_shards(index_id: String, config: Arc<NodeConfig>) -> Value {
141+
json!({
142+
"shards": [[{
143+
"index": index_id,
144+
"shard": 0,
145+
"primary": true,
146+
"node": config.node_id.as_str()
147+
}]]
148+
})
149+
}
150+
129151
/// GET _elastic/{index}/_search_shards
130152
pub fn es_compat_search_shards_handler(
131153
node_config: Arc<NodeConfig>,
132154
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
133155
elastic_search_shards_filter()
134156
.and(with_arg(node_config))
135157
.then(|index_id: String, config: Arc<NodeConfig>| async move {
136-
warp::reply::json(&json!({
137-
"shards": [[{
138-
"index": index_id,
139-
"shard": 0,
140-
"primary": true,
141-
"node": config.node_id.as_str()
142-
}]]
143-
}))
158+
warp::reply::json(&es_compat_search_shards(index_id, config))
144159
})
145160
.boxed()
146161
}
147162

163+
pub(crate) fn es_compat_aliases() -> Value {
164+
Value::Object(Map::new())
165+
}
166+
148167
/// GET _elastic/_aliases
149168
pub fn es_compat_aliases_handler()
150169
-> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
151170
elastic_aliases_filter()
152-
.then(|| async { Ok(Value::Object(Map::new())) })
171+
.then(|| async { Ok(es_compat_aliases()) })
153172
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
154173
.recover(recover_fn)
155174
.boxed()
@@ -180,7 +199,7 @@ async fn get_index_metadata(
180199
Ok(index_metadata)
181200
}
182201

183-
async fn es_compat_index_mapping(
202+
pub(crate) async fn es_compat_index_mapping(
184203
index_id: String,
185204
mut metastore: MetastoreServiceClient,
186205
search_service: Arc<dyn SearchService>,
@@ -296,6 +315,15 @@ pub fn es_compat_cluster_health_handler(
296315
(status = 503, description = "The cluster is unhealthy.", body = bool),
297316
),
298317
)]
318+
pub(crate) async fn es_compat_cluster_health_check(cluster: &Cluster) -> (Value, StatusCode) {
319+
let is_ready = cluster.is_self_node_ready().await;
320+
if is_ready {
321+
(json!({"status": "green"}), StatusCode::OK)
322+
} else {
323+
(json!({"status": "red"}), StatusCode::SERVICE_UNAVAILABLE)
324+
}
325+
}
326+
299327
/// Get Node Liveliness
300328
async fn es_compat_cluster_health(
301329
query_params: HashMap<String, String>,
@@ -308,18 +336,8 @@ async fn es_compat_cluster_health(
308336
}));
309337
return with_status(error_body, StatusCode::BAD_REQUEST);
310338
}
311-
let is_ready = cluster.is_self_node_ready().await;
312-
if is_ready {
313-
with_status(
314-
warp::reply::json(&json!({"status": "green"})),
315-
StatusCode::OK,
316-
)
317-
} else {
318-
with_status(
319-
warp::reply::json(&json!({"status": "red"})),
320-
StatusCode::SERVICE_UNAVAILABLE,
321-
)
322-
}
339+
let (body, status) = es_compat_cluster_health_check(&cluster).await;
340+
with_status(warp::reply::json(&body), status)
323341
}
324342

325343
/// GET _elastic/{index}/_stats
@@ -423,19 +441,21 @@ pub fn es_compat_scroll_handler(
423441
.boxed()
424442
}
425443

444+
pub(crate) fn es_compat_delete_scroll() -> Value {
445+
json!({
446+
"succeeded": true,
447+
"num_freed": 0
448+
})
449+
}
450+
426451
/// DELETE _elastic/_search/scroll
427452
///
428453
/// Clears a scroll context. Quickwit manages scroll lifetime via TTL,
429454
/// so this is a no-op that returns success.
430455
pub fn es_compat_delete_scroll_handler()
431456
-> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
432457
elastic_delete_scroll_filter()
433-
.then(|| async {
434-
Ok::<_, ElasticsearchError>(json!({
435-
"succeeded": true,
436-
"num_freed": 0
437-
}))
438-
})
458+
.then(|| async { Ok::<_, ElasticsearchError>(es_compat_delete_scroll()) })
439459
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
440460
.recover(recover_fn)
441461
.boxed()
@@ -621,11 +641,11 @@ fn partial_hit_from_search_after_param(
621641
}
622642

623643
#[derive(Debug, Serialize, Deserialize)]
624-
struct ElasticsearchCountResponse {
625-
count: u64,
644+
pub(crate) struct ElasticsearchCountResponse {
645+
pub(crate) count: u64,
626646
}
627647

628-
async fn es_compat_index_count(
648+
pub(crate) async fn es_compat_index_count(
629649
index_id_patterns: Vec<String>,
630650
search_params: SearchQueryParamsCount,
631651
search_body: SearchBody,
@@ -642,7 +662,7 @@ async fn es_compat_index_count(
642662
Ok(search_response_rest)
643663
}
644664

645-
async fn es_compat_index_search(
665+
pub(crate) async fn es_compat_index_search(
646666
index_id_patterns: Vec<String>,
647667
search_params: SearchQueryParams,
648668
search_body: SearchBody,
@@ -704,7 +724,7 @@ async fn es_compat_stats(
704724
es_compat_index_stats(vec!["*".to_string()], metastore).await
705725
}
706726

707-
async fn es_compat_index_stats(
727+
pub(crate) async fn es_compat_index_stats(
708728
index_id_patterns: Vec<String>,
709729
mut metastore: MetastoreServiceClient,
710730
) -> Result<ElasticsearchStatsResponse, ElasticsearchError> {
@@ -729,14 +749,14 @@ async fn es_compat_index_stats(
729749
Ok(search_response_rest)
730750
}
731751

732-
async fn es_compat_cat_indices(
752+
pub(crate) async fn es_compat_cat_indices(
733753
query_params: CatIndexQueryParams,
734754
metastore: MetastoreServiceClient,
735755
) -> Result<Vec<serde_json::Value>, ElasticsearchError> {
736756
es_compat_index_cat_indices(vec!["*".to_string()], query_params, metastore).await
737757
}
738758

739-
async fn es_compat_index_cat_indices(
759+
pub(crate) async fn es_compat_index_cat_indices(
740760
index_id_patterns: Vec<String>,
741761
query_params: CatIndexQueryParams,
742762
mut metastore: MetastoreServiceClient,
@@ -783,7 +803,7 @@ async fn es_compat_index_cat_indices(
783803
Ok(search_response_rest)
784804
}
785805

786-
async fn es_compat_resolve_index(
806+
pub(crate) async fn es_compat_resolve_index(
787807
index_id_patterns: Vec<String>,
788808
mut metastore: MetastoreServiceClient,
789809
) -> Result<ElasticsearchResolveIndexResponse, ElasticsearchError> {
@@ -801,7 +821,7 @@ async fn es_compat_resolve_index(
801821
})
802822
}
803823

804-
async fn es_compat_index_field_capabilities(
824+
pub(crate) async fn es_compat_index_field_capabilities(
805825
index_id_patterns: Vec<String>,
806826
search_params: FieldCapabilityQueryParams,
807827
search_body: FieldCapabilityRequestBody,
@@ -946,7 +966,7 @@ fn convert_hit(
946966
}
947967
}
948968

949-
async fn es_compat_index_multi_search(
969+
pub(crate) async fn es_compat_index_multi_search(
950970
payload: Bytes,
951971
multi_search_params: MultiSearchQueryParams,
952972
search_service: Arc<dyn SearchService>,
@@ -1050,7 +1070,7 @@ async fn es_compat_index_multi_search(
10501070
Ok(multi_search_response)
10511071
}
10521072

1053-
async fn es_scroll(
1073+
pub(crate) async fn es_scroll(
10541074
scroll_query_params: ScrollQueryParams,
10551075
search_service: Arc<dyn SearchService>,
10561076
) -> Result<ElasticsearchResponse, ElasticsearchError> {

0 commit comments

Comments
 (0)