Skip to content

Commit

Permalink
refactor: Convert IPCWriter metrics from u64 to usize (apache#10278)
Browse files Browse the repository at this point in the history
  • Loading branch information
erenavsarogullari authored Apr 29, 2024
1 parent 7d3e6c7 commit acd9865
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
10 changes: 5 additions & 5 deletions datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,11 @@ pub struct IPCWriter {
/// inner writer
pub writer: FileWriter<File>,
/// batches written
pub num_batches: u64,
pub num_batches: usize,
/// rows written
pub num_rows: u64,
pub num_rows: usize,
/// bytes written
pub num_bytes: u64,
pub num_bytes: usize,
}

impl IPCWriter {
Expand Down Expand Up @@ -306,9 +306,9 @@ impl IPCWriter {
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.num_batches += 1;
self.num_rows += batch.num_rows() as u64;
self.num_rows += batch.num_rows();
let num_bytes: usize = batch.get_array_memory_size();
self.num_bytes += num_bytes as u64;
self.num_bytes += num_bytes;
Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ impl ExternalSorter {
let used = self.reservation.free();
self.metrics.spill_count.add(1);
self.metrics.spilled_bytes.add(used);
self.metrics.spilled_rows.add(spilled_rows as usize);
self.metrics.spilled_rows.add(spilled_rows);
self.spills.push(spill_file);
Ok(used)
}
Expand Down Expand Up @@ -674,7 +674,7 @@ async fn spill_sorted_batches(
batches: Vec<RecordBatch>,
path: &Path,
schema: SchemaRef,
) -> Result<u64> {
) -> Result<usize> {
let path: PathBuf = path.into();
let task = SpawnedTask::spawn_blocking(move || write_sorted(batches, path, schema));
match task.join().await {
Expand Down Expand Up @@ -705,7 +705,7 @@ fn write_sorted(
batches: Vec<RecordBatch>,
path: PathBuf,
schema: SchemaRef,
) -> Result<u64> {
) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
for batch in batches {
writer.write(&batch)?;
Expand All @@ -715,7 +715,7 @@ fn write_sorted(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches,
writer.num_rows,
human_readable_size(writer.num_bytes as usize),
human_readable_size(writer.num_bytes),
);
Ok(writer.num_rows)
}
Expand Down

0 comments on commit acd9865

Please sign in to comment.