Skip to content

Commit 1553267

Browse files
authored
Add RecordBatch::schema_ref (#5474)
* Add RecordBatch::schema_ref * Fix Clippy errors --------- Co-authored-by: Clide Stefani <[email protected]>
1 parent ace6d90 commit 1553267

File tree

8 files changed

+29
-24
lines changed

8 files changed

+29
-24
lines changed

arrow-array/src/record_batch.rs

+5
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,11 @@ impl RecordBatch {
236236
self.schema.clone()
237237
}
238238

239+
/// Returns a reference to the [`Schema`] of the record batch.
240+
pub fn schema_ref(&self) -> &SchemaRef {
241+
&self.schema
242+
}
243+
239244
/// Projects the schema onto the specified columns
240245
pub fn project(&self, indices: &[usize]) -> Result<RecordBatch, ArrowError> {
241246
let projected_schema = self.schema.project(indices)?;

arrow-flight/examples/flight_sql_server.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,9 @@ impl FlightSqlService for FlightSqlServiceImpl {
193193
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
194194
self.check_token(&request)?;
195195
let batch = Self::fake_result().map_err(|e| status!("Could not fake a result", e))?;
196-
let schema = batch.schema();
197-
let batches = vec![batch];
198-
let flight_data = batches_to_flight_data(schema.as_ref(), batches)
196+
let schema = batch.schema_ref();
197+
let batches = vec![batch.clone()];
198+
let flight_data = batches_to_flight_data(schema, batches)
199199
.map_err(|e| status!("Could not convert batches", e))?
200200
.into_iter()
201201
.map(Ok);
@@ -641,10 +641,10 @@ impl FlightSqlService for FlightSqlServiceImpl {
641641
request: Request<Action>,
642642
) -> Result<ActionCreatePreparedStatementResult, Status> {
643643
self.check_token(&request)?;
644-
let schema = Self::fake_result()
645-
.map_err(|e| status!("Error getting result schema", e))?
646-
.schema();
647-
let message = SchemaAsIpc::new(&schema, &IpcWriteOptions::default())
644+
let record_batch =
645+
Self::fake_result().map_err(|e| status!("Error getting result schema", e))?;
646+
let schema = record_batch.schema_ref();
647+
let message = SchemaAsIpc::new(schema, &IpcWriteOptions::default())
648648
.try_into()
649649
.map_err(|e| status!("Unable to serialize schema", e))?;
650650
let IpcMessage(schema_bytes) = message;

arrow-flight/src/encode.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ impl FlightDataEncoder {
320320
let schema = match &self.schema {
321321
Some(schema) => schema.clone(),
322322
// encode the schema if this is the first time we have seen it
323-
None => self.encode_schema(&batch.schema()),
323+
None => self.encode_schema(batch.schema_ref()),
324324
};
325325

326326
// encode the batch
@@ -565,12 +565,12 @@ mod tests {
565565

566566
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c1) as ArrayRef)])
567567
.expect("cannot create record batch");
568-
let schema = batch.schema();
568+
let schema = batch.schema_ref();
569569

570570
let (_, baseline_flight_batch) = make_flight_data(&batch, &options);
571571

572572
let big_batch = batch.slice(0, batch.num_rows() - 1);
573-
let optimized_big_batch = prepare_batch_for_flight(&big_batch, Arc::clone(&schema), false)
573+
let optimized_big_batch = prepare_batch_for_flight(&big_batch, Arc::clone(schema), false)
574574
.expect("failed to optimize");
575575
let (_, optimized_big_flight_batch) = make_flight_data(&optimized_big_batch, &options);
576576

@@ -581,7 +581,7 @@ mod tests {
581581

582582
let small_batch = batch.slice(0, 1);
583583
let optimized_small_batch =
584-
prepare_batch_for_flight(&small_batch, Arc::clone(&schema), false)
584+
prepare_batch_for_flight(&small_batch, Arc::clone(schema), false)
585585
.expect("failed to optimize");
586586
let (_, optimized_small_flight_batch) = make_flight_data(&optimized_small_batch, &options);
587587

arrow-flight/tests/encode_decode.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ async fn roundtrip(input: Vec<RecordBatch>) {
465465
/// When <https://github.com/apache/arrow-rs/issues/3389> is resolved,
466466
/// it should be possible to use `roundtrip`
467467
async fn roundtrip_dictionary(input: Vec<RecordBatch>) {
468-
let schema = Arc::new(prepare_schema_for_flight(&input[0].schema()));
468+
let schema = Arc::new(prepare_schema_for_flight(input[0].schema_ref()));
469469
let expected_output: Vec<_> = input
470470
.iter()
471471
.map(|batch| prepare_batch_for_flight(batch, schema.clone()).unwrap())

arrow-flight/tests/flight_sql_client_cli.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ impl FlightSqlServiceImpl {
189189
let batch = Self::fake_result()?;
190190

191191
Ok(FlightInfo::new()
192-
.try_with_schema(&batch.schema())
192+
.try_with_schema(batch.schema_ref())
193193
.expect("encoding schema")
194194
.with_endpoint(
195195
FlightEndpoint::new().with_ticket(Ticket::new(
@@ -245,9 +245,9 @@ impl FlightSqlService for FlightSqlServiceImpl {
245245
"part_2" => batch.slice(2, 1),
246246
ticket => panic!("Invalid ticket: {ticket:?}"),
247247
};
248-
let schema = batch.schema();
249-
let batches = vec![batch];
250-
let flight_data = batches_to_flight_data(schema.as_ref(), batches)
248+
let schema = batch.schema_ref();
249+
let batches = vec![batch.clone()];
250+
let flight_data = batches_to_flight_data(schema, batches)
251251
.unwrap()
252252
.into_iter()
253253
.map(Ok);

arrow-ipc/src/reader.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -1429,7 +1429,7 @@ mod tests {
14291429

14301430
fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
14311431
let mut buf = Vec::new();
1432-
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &rb.schema()).unwrap();
1432+
let mut writer = crate::writer::FileWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
14331433
writer.write(rb).unwrap();
14341434
writer.finish().unwrap();
14351435
drop(writer);
@@ -1440,7 +1440,7 @@ mod tests {
14401440

14411441
fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
14421442
let mut buf = Vec::new();
1443-
let mut writer = crate::writer::StreamWriter::try_new(&mut buf, &rb.schema()).unwrap();
1443+
let mut writer = crate::writer::StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
14441444
writer.write(rb).unwrap();
14451445
writer.finish().unwrap();
14461446
drop(writer);
@@ -1815,7 +1815,7 @@ mod tests {
18151815
let batch = RecordBatch::new_empty(schema);
18161816

18171817
let mut buf = Vec::new();
1818-
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
1818+
let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
18191819
writer.write(&batch).unwrap();
18201820
writer.finish().unwrap();
18211821
drop(writer);
@@ -1842,7 +1842,7 @@ mod tests {
18421842
let batch = RecordBatch::new_empty(schema);
18431843

18441844
let mut buf = Vec::new();
1845-
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
1845+
let mut writer = crate::writer::FileWriter::try_new(&mut buf, batch.schema_ref()).unwrap();
18461846
writer.write(&batch).unwrap();
18471847
writer.finish().unwrap();
18481848
drop(writer);

arrow-ipc/src/writer.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1436,7 +1436,7 @@ mod tests {
14361436
use super::*;
14371437

14381438
fn serialize_file(rb: &RecordBatch) -> Vec<u8> {
1439-
let mut writer = FileWriter::try_new(vec![], &rb.schema()).unwrap();
1439+
let mut writer = FileWriter::try_new(vec![], rb.schema_ref()).unwrap();
14401440
writer.write(rb).unwrap();
14411441
writer.finish().unwrap();
14421442
writer.into_inner().unwrap()
@@ -1448,7 +1448,7 @@ mod tests {
14481448
}
14491449

14501450
fn serialize_stream(record: &RecordBatch) -> Vec<u8> {
1451-
let mut stream_writer = StreamWriter::try_new(vec![], &record.schema()).unwrap();
1451+
let mut stream_writer = StreamWriter::try_new(vec![], record.schema_ref()).unwrap();
14521452
stream_writer.write(record).unwrap();
14531453
stream_writer.finish().unwrap();
14541454
stream_writer.into_inner().unwrap()
@@ -1982,7 +1982,7 @@ mod tests {
19821982
)
19831983
.expect("new batch");
19841984

1985-
let mut writer = StreamWriter::try_new(vec![], &batch.schema()).expect("new writer");
1985+
let mut writer = StreamWriter::try_new(vec![], batch.schema_ref()).expect("new writer");
19861986
writer.write(&batch).expect("write");
19871987
let outbuf = writer.into_inner().expect("inner");
19881988

parquet/src/arrow/arrow_reader/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -3082,7 +3082,7 @@ mod tests {
30823082
.unwrap();
30833083

30843084
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3085-
let actual = concat_batches(&batch.schema(), &batches).unwrap();
3085+
let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
30863086
assert_eq!(actual.num_rows(), selection.row_count());
30873087

30883088
let mut batch_offset = 0;

0 commit comments

Comments
 (0)