Skip to content

Commit

Permalink
Partitions work
Browse files Browse the repository at this point in the history
  • Loading branch information
m09526 committed Apr 24, 2024
1 parent 4cf2ffc commit e55e8f7
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 26 deletions.
61 changes: 54 additions & 7 deletions rust/compaction/src/datafusion.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

/// `DataFusion` contains the implementation for performing Sleeper compactions
/// using Apache `DataFusion`.
///
Expand All @@ -21,7 +19,7 @@ use std::sync::Arc;
*/
use crate::{
aws_s3::{CountingObjectStore, ObjectStoreFactory},
CompactionInput, CompactionResult,
ColRange, CompactionInput, CompactionResult, PartitionBound,
};
use arrow::{array::RecordBatch, util::pretty::pretty_format_batches};
use datafusion::{
Expand All @@ -33,6 +31,7 @@ use datafusion::{
};
use log::{error, info};
use num_format::{Locale, ToFormattedString};
use std::{collections::HashMap, sync::Arc};
use url::Url;

/// Starts a Sleeper compaction.
Expand All @@ -47,7 +46,7 @@ pub async fn compact(
output_path: &Url,
) -> Result<CompactionResult, DataFusionError> {
info!("DataFusion compaction of {input_paths:?}");

info!("Compaction region {:?}", input_data.region);
let sf = create_session_cfg(input_data);
let ctx = SessionContext::new_with_config(sf);

Expand All @@ -59,7 +58,7 @@ pub async fn compact(
info!("Row key and sort column order {sort_order:?}");

let po = ParquetReadOptions::default().file_sort_order(vec![sort_order.clone()]);
let frame = ctx.read_parquet(input_paths.to_owned(), po).await?;
let mut frame = ctx.read_parquet(input_paths.to_owned(), po).await?;

// Extract all column names
let col_names = frame.schema().clone().strip_qualifiers().field_names();
Expand All @@ -74,7 +73,12 @@ pub async fn compact(
.iter()
.map(col)
.collect::<Vec<_>>();
let frame = frame.sort(sort_order)?.select(col_names_expr)?;

// Create plan
if let Some(expr) = region_filter(&input_data.region) {
frame = frame.filter(expr)?;
}
frame = frame.sort(sort_order)?.select(col_names_expr)?;

// Show explanation of plan
let explained = frame.clone().explain(false, false)?.collect().await?;
Expand Down Expand Up @@ -119,7 +123,50 @@ pub async fn compact(
rows_read: rows_written,
rows_written,
})
// TODO: Sketches
}

fn region_filter(region: &HashMap<String, ColRange>) -> Option<Expr> {
let mut col_exprs = vec![];
for (name, range) in region {
let min_bound = match range.lower {
PartitionBound::Int32(val) => lit(val),
PartitionBound::Int64(val) => lit(val),
PartitionBound::String(val) => lit(val.to_owned()),
PartitionBound::ByteArray(val) => lit(val.to_owned()),
};
let lower_expr = if range.lower_inclusive {
col(name).gt_eq(min_bound)
} else {
col(name).gt(min_bound)
};

let max_bound = match range.upper {
PartitionBound::Int32(val) => lit(val),
PartitionBound::Int64(val) => lit(val),
PartitionBound::String(val) => lit(val.to_owned()),
PartitionBound::ByteArray(val) => lit(val.to_owned()),
};
let upper_expr = if range.upper_inclusive {
col(name).lt_eq(max_bound)
} else {
col(name).lt(max_bound)
};

let expr = lower_expr.and(upper_expr);
col_exprs.push(expr);
}

// join them together
// TODO: write this more Rust like
if !col_exprs.is_empty() {
let mut expr = col_exprs[0].clone();
for idx in 1..col_exprs.len() {
expr = expr.and(col_exprs[idx].clone());
}
Some(expr)
} else {
None
}
}

/// Convert a Sleeper compression codec string to one `DataFusion` understands.
Expand Down
8 changes: 4 additions & 4 deletions rust/compaction/src/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use url::Url;
/// Type safe variant for Sleeper partition boundary
#[derive(Debug, Copy, Clone)]
pub enum PartitionBound {
Int32 { val: i32 },
Int64 { val: i64 },
String { val: &'static str },
ByteArray { val: &'static [i8] },
Int32(i32),
Int64(i64),
String(&'static str),
ByteArray(&'static [u8]),
}

/// All the information for a a Sleeper compaction.
Expand Down
20 changes: 7 additions & 13 deletions rust/compaction/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,8 @@ fn unpack_variant_array(
})
.zip(schema_types.iter())
.map(|(&bptr, type_id)| match type_id {
1 => Ok(PartitionBound::Int32 {
val: unsafe { *bptr.cast::<i32>() },
}),
2 => Ok(PartitionBound::Int64 {
val: unsafe { *bptr.cast::<i64>() },
}),
1 => Ok(PartitionBound::Int32(unsafe { *bptr.cast::<i32>() })),
2 => Ok(PartitionBound::Int64(unsafe { *bptr.cast::<i64>() })),
3 => {
//unpack length (signed because it's from Java)
let str_len = unsafe { *bptr.cast::<i32>() };
Expand All @@ -392,7 +388,7 @@ fn unpack_variant_array(
#[allow(clippy::cast_sign_loss)]
slice::from_raw_parts(bptr.byte_add(4).cast::<u8>(), str_len as usize)
})
.map(|v| PartitionBound::String { val: v })
.map(|v| PartitionBound::String(v))
}
4 => {
//unpack length (signed because it's from Java)
Expand All @@ -401,12 +397,10 @@ fn unpack_variant_array(
error!("Illegal byte array length in FFI array: {byte_len}");
panic!("Illegal byte array length in FFI array: {byte_len}");
}
Ok(PartitionBound::ByteArray {
val: unsafe {
#[allow(clippy::cast_sign_loss)]
slice::from_raw_parts(bptr.byte_add(4).cast::<i8>(), byte_len as usize)
},
})
Ok(PartitionBound::ByteArray(unsafe {
#[allow(clippy::cast_sign_loss)]
slice::from_raw_parts(bptr.byte_add(4).cast::<u8>(), byte_len as usize)
}))
}
x => {
error!("Unexpected type id {x}");
Expand Down
14 changes: 12 additions & 2 deletions rust/compactor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use chrono::Local;
use clap::Parser;
use compaction::{merge_sorted_files, CompactionInput};
use compaction::{merge_sorted_files, ColRange, CompactionInput};
use human_panic::setup_panic;
use log::info;
use num_format::{Locale, ToFormattedString};
Expand Down Expand Up @@ -91,6 +91,16 @@ async fn main() -> color_eyre::Result<()> {
let output_url = Url::parse(&args.output)
.or_else(|_e| Url::parse(&("file://".to_owned() + &args.output)))?;

let mut map = HashMap::new();
map.insert(
"key".into(),
ColRange {
lower: compaction::PartitionBound::String("h"),
lower_inclusive: true,
upper: compaction::PartitionBound::String("m"),
upper_inclusive: false,
},
);
let details = CompactionInput {
input_files: input_urls,
output_file: output_url,
Expand All @@ -103,7 +113,7 @@ async fn main() -> color_eyre::Result<()> {
dict_enc_row_keys: true,
dict_enc_sort_keys: true,
dict_enc_values: true,
region: HashMap::default(),
region: map,
row_key_cols: args.row_keys,
sort_key_cols: args.sort_column,
};
Expand Down

0 comments on commit e55e8f7

Please sign in to comment.