Skip to content

Instantly share code, notes, and snippets.

@UBarney
Created June 22, 2025 09:12
Show Gist options
  • Save UBarney/23fdb597f43bfcffe4f781fb6b99e579 to your computer and use it in GitHub Desktop.
Save UBarney/23fdb597f43bfcffe4f781fb6b99e579 to your computer and use it in GitHub Desktop.
bench.rs
use std::cmp::min;
use std::sync::Arc;
use std::time::Duration;
use arrow::array::{
UInt32Array, UInt64Array,
};
use arrow::datatypes::Schema;
use arrow_schema::{DataType, Field};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_common::{
JoinSide, ScalarValue,
};
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::joins::nested_loop_join::build_join_indices;
use datafusion_physical_plan::joins::utils::{
build_batch_from_indices, build_join_schema, ColumnIndex, JoinFilter,
};
use datafusion_physical_plan::test::build_table_i32;
use futures::StreamExt;
fn prepare_join_filter() -> JoinFilter {
let column_indices = vec![
ColumnIndex {
index: 1,
side: JoinSide::Left,
},
ColumnIndex {
index: 1,
side: JoinSide::Right,
},
];
let intermediate_schema = Schema::new(vec![
Field::new("x", DataType::Int32, true),
Field::new("x", DataType::Int32, true),
]);
// left.b1!=8
let left_filter = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 0)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
)) as Arc<dyn PhysicalExpr>;
// right.b2!=10
let right_filter = Arc::new(BinaryExpr::new(
Arc::new(Column::new("x", 1)),
Operator::NotEq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
)) as Arc<dyn PhysicalExpr>;
let filter_expression =
Arc::new(BinaryExpr::new(left_filter, Operator::And, right_filter))
as Arc<dyn PhysicalExpr>;
JoinFilter::new(
filter_expression,
column_indices,
Arc::new(intermediate_schema),
)
}
fn bench_new_groups(c: &mut Criterion) {
let mut group = c.benchmark_group("nlj");
let data: Vec<i32> = {
let i: i32 = 0;
let j: i32 = 8192;
(i..j).collect()
};
let left = build_table_i32(("a", &data), ("b", &data), ("c", &data));
let right = build_table_i32(("a", &data), ("b", &data), ("c", &data));
let filter = prepare_join_filter();
group
.measurement_time(Duration::from_secs(120))
.bench_function("filter/batch_size==8192", |b| {
let mut indices_cache = (UInt64Array::new_null(0), UInt32Array::new_null(0));
b.iter(|| {
black_box(
build_join_indices(
&left,
&right,
Some(&filter),
&mut indices_cache,
8192,
)
.unwrap(),
);
});
});
group
.measurement_time(Duration::from_secs(120))
.bench_function("filter/batch_size==8192*8192", |b| {
let mut indices_cache = (UInt64Array::new_null(0), UInt32Array::new_null(0));
b.iter(|| {
black_box(
build_join_indices(
&left,
&right,
Some(&filter),
&mut indices_cache,
8192 * 8192,
)
.unwrap(),
);
});
});
let mut indices_cache = (UInt64Array::new_null(0), UInt32Array::new_null(0));
let (left_indices, right_indices) =
build_join_indices(&left, &right, Some(&filter), &mut indices_cache, 8192)
.unwrap();
let (join_schema, col_indices) =
build_join_schema(&left.schema(), &right.schema(), &JoinType::Inner);
group.finish();
}
criterion_group!(benches, bench_new_groups);
criterion_main!(benches);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment