Skip to content

Commit 516b8d6

Browse files
Expose Substrait execution metadata (#6364)
* Expose unified DataFusion execution metadata * Expose DataFusion execution statistics
1 parent 41cfe4a commit 516b8d6

5 files changed

Lines changed: 912 additions & 179 deletions

File tree

quickwit/quickwit-datafusion/tests/metrics.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,15 @@ async fn test_multi_statement_sql_with_semicolons_in_literals_and_comments() {
278278
SELECT COUNT(*) AS cnt FROM "test-semi"
279279
"#;
280280

281-
let stream = quickwit_datafusion::DataFusionService::new(Arc::clone(&builder))
282-
.execute_sql(sql, &std::collections::HashMap::new())
281+
let properties = std::collections::HashMap::new();
282+
let execution = quickwit_datafusion::DataFusionService::new(Arc::clone(&builder))
283+
.execute(quickwit_datafusion::DataFusionRequest::records(
284+
quickwit_datafusion::DataFusionInput::Sql(sql),
285+
&properties,
286+
))
283287
.await
284288
.unwrap();
285-
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
289+
let batches: Vec<RecordBatch> = execution.stream.try_collect().await.unwrap();
286290
assert_eq!(total_rows(&batches), 1);
287291
let cnt = batches[0]
288292
.column_by_name("cnt")

quickwit/quickwit-df-core/src/grpc.rs

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::proto::{
4848
ExecuteSqlRequest, ExecuteSqlResponse, ExecuteSubstraitRequest, ExecuteSubstraitResponse,
4949
data_fusion_service_server,
5050
};
51-
use crate::service::DataFusionService;
51+
use crate::service::{DataFusionInput, DataFusionOutput, DataFusionRequest, DataFusionService};
5252

5353
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
5454
enum GrpcErrorKind {
@@ -167,42 +167,37 @@ impl data_fusion_service_server::DataFusionService for DataFusionServiceGrpcImpl
167167
let req = request.into_inner();
168168
let service = Arc::clone(&self.service);
169169

170-
// Route to the appropriate DataFusionService method:
171-
// - substrait_plan_bytes: production path (pre-encoded protobuf)
172-
// - substrait_plan_json: dev/tooling path (grpcurl, rollup JSON files)
173-
// When `explain` is set, the server returns the EXPLAIN output
174-
// (no storage I/O) instead of executing the plan.
175-
let stream = match (
170+
let input = match (
176171
!req.substrait_plan_bytes.is_empty(),
177172
!req.substrait_plan_json.is_empty(),
178-
req.explain,
179173
) {
180-
(true, _, false) => service
181-
.execute_substrait(&req.substrait_plan_bytes, &req.properties)
182-
.await
183-
.map_err(|err| df_error_to_status(&err))?,
184-
(true, _, true) => service
185-
.explain_substrait(&req.substrait_plan_bytes, &req.properties)
186-
.await
187-
.map_err(|err| df_error_to_status(&err))?,
188-
(false, true, false) => service
189-
.execute_substrait_json(&req.substrait_plan_json, &req.properties)
190-
.await
191-
.map_err(|err| df_error_to_status(&err))?,
192-
(false, true, true) => service
193-
.explain_substrait_json(&req.substrait_plan_json, &req.properties)
194-
.await
195-
.map_err(|err| df_error_to_status(&err))?,
196-
_ => {
174+
(true, _) => DataFusionInput::SubstraitBytes(&req.substrait_plan_bytes),
175+
(false, true) => DataFusionInput::SubstraitJson(&req.substrait_plan_json),
176+
(false, false) => {
197177
return Err(tonic::Status::invalid_argument(
198178
"either substrait_plan_bytes or substrait_plan_json must be set",
199179
));
200180
}
201181
};
182+
let output = if req.explain {
183+
DataFusionOutput::Explain
184+
} else {
185+
DataFusionOutput::Records
186+
};
202187

203-
let response_stream = map_batch_stream(stream, |ipc_bytes| ExecuteSubstraitResponse {
204-
arrow_ipc_bytes: ipc_bytes,
205-
});
188+
let execution = service
189+
.execute(DataFusionRequest {
190+
input,
191+
output,
192+
properties: &req.properties,
193+
})
194+
.await
195+
.map_err(|err| df_error_to_status(&err))?;
196+
197+
let response_stream =
198+
map_batch_stream(execution.stream, |ipc_bytes| ExecuteSubstraitResponse {
199+
arrow_ipc_bytes: ipc_bytes,
200+
});
206201
Ok(tonic::Response::new(response_stream))
207202
}
208203

@@ -213,15 +208,18 @@ impl data_fusion_service_server::DataFusionService for DataFusionServiceGrpcImpl
213208
let req = request.into_inner();
214209
let service = Arc::clone(&self.service);
215210

216-
let stream = service
217-
.execute_sql(&req.sql, &req.properties)
211+
let execution = service
212+
.execute(DataFusionRequest::records(
213+
DataFusionInput::Sql(&req.sql),
214+
&req.properties,
215+
))
218216
.await
219217
.map_err(|err| {
220218
warn!(error = %err, "DataFusion SQL execution error");
221219
df_error_to_status(&err)
222220
})?;
223221

224-
let response_stream = map_batch_stream(stream, |ipc_bytes| ExecuteSqlResponse {
222+
let response_stream = map_batch_stream(execution.stream, |ipc_bytes| ExecuteSqlResponse {
225223
arrow_ipc_bytes: ipc_bytes,
226224
});
227225
Ok(tonic::Response::new(response_stream))

quickwit/quickwit-df-core/src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,13 @@ pub use data_source::{
4343
};
4444
pub use datafusion::execution::SendableRecordBatchStream;
4545
pub use datafusion_distributed::{Worker, WorkerResolver};
46-
pub use service::DataFusionService;
46+
pub use service::{
47+
DataFusionExecution, DataFusionExecutionMetadata, DataFusionExecutionStatistics,
48+
DataFusionInput, DataFusionInputMetadata, DataFusionMetricStatistics, DataFusionOutput,
49+
DataFusionPhysicalPlanMetadata, DataFusionPlanMetricStatistics, DataFusionPruningStatistics,
50+
DataFusionRatioStatistics, DataFusionRequest, DataFusionService, SubstraitExecution,
51+
SubstraitExecutionMetadata,
52+
};
4753
pub use session::DataFusionSessionBuilder;
4854
pub use task_estimator::DataSourceExecPartitionEstimator;
4955
pub use worker::build_worker;

0 commit comments

Comments
 (0)