From 15089713ac24a3aa54fe144d3fae0f1b78c6e6c2 Mon Sep 17 00:00:00 2001 From: Andrew Korzhuev Date: Mon, 28 Aug 2023 16:03:10 +0200 Subject: [PATCH] snowflake-api: GET query support --- snowflake-api/Cargo.toml | 7 +- snowflake-api/examples/filetransfer.rs | 36 ++++++++++ snowflake-api/src/lib.rs | 95 ++++++++++++++++++++++++-- snowflake-api/src/responses.rs | 9 +-- 4 files changed, 136 insertions(+), 11 deletions(-) diff --git a/snowflake-api/Cargo.toml b/snowflake-api/Cargo.toml index 5d1f76f..f140980 100644 --- a/snowflake-api/Cargo.toml +++ b/snowflake-api/Cargo.toml @@ -22,15 +22,16 @@ serde_json = "1" serde = { version = "1", features = ["derive"] } url = "2" uuid = { version = "1.4", features = ["v4"] } -arrow = "42" +arrow = "45" base64 = "0.21" regex = "1" -object_store = { version = "0.6", features = ["aws"] } +object_store = { version = "0.7", features = ["aws"] } async-trait = "0.1" [dev-dependencies] anyhow = "1" pretty_env_logger = "0.5.0" clap = { version = "4", features = ["derive"] } -arrow = { version = "42", features = ["prettyprint"] } tokio = { version = "1", features=["macros", "rt-multi-thread"] } +parquet = { version = "45", features = ["arrow", "snap"] } +arrow = { version = "45", features = ["prettyprint"] } diff --git a/snowflake-api/examples/filetransfer.rs b/snowflake-api/examples/filetransfer.rs index 811c49e..1099cd8 100644 --- a/snowflake-api/examples/filetransfer.rs +++ b/snowflake-api/examples/filetransfer.rs @@ -3,6 +3,8 @@ use arrow::util::pretty::pretty_format_batches; use clap::Parser; use snowflake_api::{QueryResult, SnowflakeApi}; use std::fs; +use std::fs::File; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; extern crate snowflake_api; @@ -43,6 +45,9 @@ struct Args { #[arg(long)] csv_path: String, + + #[arg(long)] + output_path: String, } #[tokio::main] @@ -111,7 +116,38 @@ async fn main() -> Result<()> { } } + log::info!("Copy table contents into a stage"); + api.exec( + "COPY INTO @%OSCAR_AGE_MALE/output/ FROM OSCAR_AGE_MALE FILE_FORMAT = (TYPE = parquet COMPRESSION = NONE) HEADER = TRUE OVERWRITE = TRUE SINGLE = TRUE;" + ).await?; + + log::info!("Downloading Parquet files"); + api.exec(&format!( + "GET @%OSCAR_AGE_MALE/output/ file://{}", + &args.output_path + )) + .await?; + + log::info!("Closing Snowflake session"); api.close_session().await?; + log::info!("Reading downloaded files"); + let parquet_dir = format!("{}output", &args.output_path); + let paths = fs::read_dir(&parquet_dir).unwrap(); + + for path in paths { + let path = path?.path(); + log::info!("Reading {:?}", path); + let file = File::open(path)?; + + let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; + let reader = builder.build()?; + let mut batches = Vec::default(); + for batch in reader { + batches.push(batch?); + } + println!("{}", pretty_format_batches(batches.as_slice()).unwrap()); + } + Ok(()) } diff --git a/snowflake-api/src/lib.rs b/snowflake-api/src/lib.rs index 25e81a6..e22ac71 100644 --- a/snowflake-api/src/lib.rs +++ b/snowflake-api/src/lib.rs @@ -2,7 +2,7 @@ issue_tracker_base_url = "https://github.com/mycelial/snowflake-rs/issues", test(no_crate_inject) )] -#![doc = include_str ! ("../README.md")] +#![doc = include_str!("../README.md")] use std::io; use std::path::Path; @@ -169,24 +169,109 @@ impl SnowflakeApi { /// Execute a single query against API. /// If statement is PUT, then file will be uploaded to the Snowflake-managed storage pub async fn exec(&mut self, sql: &str) -> Result { + // fixme: can go without regex? but needs different accept-mime for those still let put_re = Regex::new(r"(?i)^(?:/\*.*\*/\s*)*put\s+").unwrap(); + let get_re = Regex::new(r"(?i)^(?:/\*.*\*/\s*)*get\s+").unwrap(); - // put commands go through a different flow and result is side-effect + // put/get commands go through a different flow and result is side-effect if put_re.is_match(sql) { log::info!("Detected PUT query"); self.exec_put(sql).await.map(|_| QueryResult::Empty) + } else if get_re.is_match(sql) { + log::info!("Detected GET query"); + + self.exec_get(sql).await.map(|_| QueryResult::Empty) } else { self.exec_arrow(sql).await } } + async fn exec_get(&mut self, sql: &str) -> Result<(), SnowflakeApiError> { + let resp = self + .run_sql::(sql, QueryType::JsonQuery) + .await?; + log::debug!("Got GET response: {:?}", resp); + + match resp { + ExecResponse::Query(_) => Err(SnowflakeApiError::UnexpectedResponse), + ExecResponse::PutGet(pg) => self.get(pg).await, + ExecResponse::Error(e) => Err(SnowflakeApiError::ApiError( + e.data.error_code, + e.message.unwrap_or_default(), + )), + } + } + + async fn get(&self, resp: PutGetExecResponse) -> Result<(), SnowflakeApiError> { + match resp.data.stage_info { + PutGetStageInfo::Aws(info) => { + self.get_from_s3( + resp.data + .local_location + .ok_or(SnowflakeApiError::BrokenResponse)?, + &resp.data.src_locations, + info, + ) + .await + } + PutGetStageInfo::Azure(_) => Err(SnowflakeApiError::Unimplemented( + "GET local file requests for Azure".to_string(), + )), + PutGetStageInfo::Gcs(_) => Err(SnowflakeApiError::Unimplemented( + "GET local file requests for GCS".to_string(), + )), + } + } + + // fixme: refactor s3 put/get into a single function? + async fn get_from_s3( + &self, + local_location: String, + src_locations: &[String], + info: AwsPutGetStageInfo, + ) -> Result<(), SnowflakeApiError> { + // todo: use path parser? + let (bucket_name, bucket_path) = info + .location + .split_once('/') + .ok_or(SnowflakeApiError::InvalidBucketPath(info.location.clone()))?; + + let s3 = AmazonS3Builder::new() + .with_region(info.region) + .with_bucket_name(bucket_name) + .with_access_key_id(info.creds.aws_key_id) + .with_secret_access_key(info.creds.aws_secret_key) + .with_token(info.creds.aws_token) + .build()?; + + // todo: implement parallelism for small files + // todo: security vulnerability, external system tells you which local files to upload + for src_path in src_locations { + let dest_path = format!("{}{}", local_location, src_path); + let dest_path = object_store::path::Path::parse(dest_path)?; + + let src_path = format!("{}{}", bucket_path, src_path); + let src_path = object_store::path::Path::parse(src_path)?; + + // fixme: can we stream the thing or multipart? + let bytes = s3.get(&src_path).await?; + LocalFileSystem::new() + .put(&dest_path, bytes.bytes().await?) + .await?; + } + + Ok(()) + } + async fn exec_put(&mut self, sql: &str) -> Result<(), SnowflakeApiError> { let resp = self .run_sql::(sql, QueryType::JsonQuery) .await?; + // fixme: don't log secrets maybe? log::debug!("Got PUT response: {:?}", resp); + // fixme: support PUT for external stage match resp { ExecResponse::Query(_) => Err(SnowflakeApiError::UnexpectedResponse), ExecResponse::PutGet(pg) => self.put(pg).await, @@ -227,6 +312,7 @@ impl SnowflakeApi { .with_token(info.creds.aws_token) .build()?; + // todo: implement parallelism for small files // todo: security vulnerability, external system tells you which local files to upload for src_path in src_locations { let path = Path::new(src_path); @@ -234,14 +320,15 @@ impl SnowflakeApi { .file_name() .ok_or(SnowflakeApiError::InvalidLocalPath(src_path.clone()))?; + // fixme: nicer way to join paths? // fixme: unwrap let dest_path = format!("{}{}", bucket_path, filename.to_str().unwrap()); let dest_path = object_store::path::Path::parse(dest_path)?; let src_path = object_store::path::Path::parse(src_path)?; + // fixme: can we stream the thing or multipart? let fs = LocalFileSystem::new().get(&src_path).await?; - s3.put(&dest_path, fs.bytes().await?).await?; } @@ -276,7 +363,7 @@ impl SnowflakeApi { return Err(SnowflakeApiError::ApiError( e.data.error_code, e.message.unwrap_or_default(), - )) + )); } }; diff --git a/snowflake-api/src/responses.rs b/snowflake-api/src/responses.rs index ea81720..d45e8ce 100644 --- a/snowflake-api/src/responses.rs +++ b/snowflake-api/src/responses.rs @@ -198,18 +198,19 @@ pub struct PutGetResponseData { // file upload parallelism pub parallel: i32, // file size threshold, small ones are should be uploaded with given parallelism - pub threshold: i64, + pub threshold: Option, // doesn't need compression if source is already compressed - pub auto_compress: bool, + pub auto_compress: Option, pub overwrite: bool, // maps to one of the predefined compression algos // todo: support different compression formats? - pub source_compression: String, + pub source_compression: Option, pub stage_info: PutGetStageInfo, pub encryption_material: EncryptionMaterialVariant, // GCS specific. If you request multiple files? + // might return a [ null ] for AWS responses #[serde(default)] - pub presigned_urls: Vec, + pub presigned_urls: Vec>, #[serde(default)] pub parameters: Vec, pub statement_type_id: Option,