Skip to content

Commit

Permalink
fix merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
ugoa committed Nov 16, 2023
1 parent bc349c5 commit 61a388c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
8 changes: 2 additions & 6 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,13 @@ ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]



<<<<<<< HEAD
csv = ["lexical", "polars-core/rows", "itoa", "ryu", "fast-float", "simdutf8"]
=======

>>>>>>> main
decompress = ["flate2/rust_backend", "zstd"]avro = ["arrow/io_avro", "arrow/io_avro_compression"]
decompress = ["flate2/rust_backend", "zstd"]
avro = ["arrow/io_avro", "arrow/io_avro_compression"]
# support for arrows streaming ipc file parsing
ipc_streaming = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrow avro parsing
odbc = ["arrow/io_odbc"]
csv = ["atoi_simd", "polars-core/rows", "itoa", "ryu", "fast-float", "simdutf8"]
decompress-fast = ["flate2/zlib-ng", "zstd"]
dtype-categorical = ["polars-core/dtype-categorical"]
dtype-date = ["polars-core/dtype-date", "polars-time/dtype-date"]
Expand Down
38 changes: 35 additions & 3 deletions crates/polars-io/src/odbc/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use arrow::error::Result;
use arrow::io::odbc::api::Cursor;
use arrow::io::odbc::{api, read, write};

use super::*;
use super::{DataFrame, MmapBytesReader, PolarsResult, SerReader};

pub struct OdbcReader<R> {
reader: R,
connection_string: AsRef<str>,
connector: AsRef<str>,
max_batch_size: i32,
query: AsRef<str>,
}

impl<'a, R> SerReader<R> for OdbcReader<'a, R>
Expand All @@ -19,12 +21,42 @@ where
OdbcReader { reader }
}

fn set_connection_string(mut self, connection_string: AsRef<str>) -> Self {
fn set_connector(mut self, connector: AsRef<str>) -> Self {
self.connection_string = connection_string.as_ref();
self
}

fn set_query(mut self, query: AsRef<str>) -> Self {
self.query = query.as_ref();
self
}

fn set_max_batch_size(mut self, max_batch_size: i32) -> Self {
self.max_batch_size = max_batch_size
self
}

fn finish(mut self) -> PolarsResult<DataFrame> {
let env = api::Environment::new()?;
let connection = env.connect_with_connection_string(connector)?;

let mut a = connection.prepare(query)?;
let fields = read::infer_schema(&a)?;

let buffer = read::buffer_from_metadata(&a, self.max_batch_size)?;

let mut chunks = vec![];
while let Some(batch) = cursor.fetch()? {
let arrays = (0..batch.num_cols())
.zip(fields.iter())
.map(|(index, field)| {
let column_view = batch.column(index);
read::deserialize(column_view, field.data_type.clone())
})
.collect::<Vec<_>>();
chunks.push(Chunk::new(arrays));
}

todo!()
}
}

0 comments on commit 61a388c

Please sign in to comment.