From 4381206ad5b8704de89679565154c45c9f42440f Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Fri, 25 Oct 2024 13:15:01 +0800 Subject: [PATCH 01/10] feat(source): column pruning for parquet file source (#18967) --- .../opendal_source/opendal_reader.rs | 76 ++++++++++++++++++- 1 file changed, 73 insertions(+), 3 deletions(-) diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 1cfc9c1355167..69308a092e2dd 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -19,8 +19,12 @@ use async_compression::tokio::bufread::GzipDecoder; use async_trait::async_trait; use futures::TryStreamExt; use futures_async_stream::try_stream; +use itertools::Itertools; use opendal::Operator; -use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::FileMetaData; +use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::StreamChunk; use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; use tokio::io::{AsyncRead, BufReader}; @@ -46,6 +50,7 @@ pub struct OpendalReader { splits: Vec>, parser_config: ParserConfig, source_ctx: SourceContextRef, + columns: Option>, } #[async_trait] impl SplitReader for OpendalReader { @@ -57,7 +62,7 @@ impl SplitReader for OpendalReader { splits: Vec>, parser_config: ParserConfig, source_ctx: SourceContextRef, - _columns: Option>, + columns: Option>, ) -> ConnectorResult { let connector = Src::new_enumerator(properties)?; let opendal_reader = OpendalReader { @@ -65,6 +70,7 @@ impl SplitReader for OpendalReader { splits, parser_config, source_ctx, + columns, }; Ok(opendal_reader) } @@ -86,7 +92,7 @@ impl OpendalReader { if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config { // // If the format is "parquet", use `ParquetParser` to convert `record_batch` into stream chunk. - let reader: tokio_util::compat::Compat = self + let mut reader: tokio_util::compat::Compat = self .connector .op .reader_with(&object_name) @@ -95,11 +101,19 @@ impl OpendalReader { .into_futures_async_read(..) .await? .compat(); + let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; + + let file_metadata = parquet_metadata.file_metadata(); + let column_indices = + extract_valid_column_indices(self.columns.clone(), file_metadata)?; + let projection_mask = + ProjectionMask::leaves(file_metadata.schema_descr(), column_indices); // For the Parquet format, we directly convert from a record batch to a stream chunk. // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file. let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader) .await? .with_batch_size(self.source_ctx.source_ctrl_opts.chunk_size) + .with_projection(projection_mask) .with_offset(split.offset) .build()?; @@ -215,3 +229,59 @@ impl OpendalReader { } } } + +/// Extracts valid column indices from a Parquet file schema based on the user's requested schema. +/// +/// This function is used for column pruning of Parquet files. It calculates the intersection +/// between the columns in the currently read Parquet file and the schema provided by the user. +/// This is useful for reading a `RecordBatch` with the appropriate `ProjectionMask`, ensuring that +/// only the necessary columns are read. +/// +/// # Parameters +/// - `columns`: A vector of `Column` representing the user's requested schema. +/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file. +/// +/// # Returns +/// - A `ConnectorResult>`, which contains the indices of the valid columns in the +/// Parquet file schema that match the requested schema. If an error occurs during processing, +/// it returns an appropriate error. +pub fn extract_valid_column_indices( + columns: Option>, + metadata: &FileMetaData, +) -> ConnectorResult> { + match columns { + Some(rw_columns) => { + let parquet_column_names = metadata + .schema_descr() + .columns() + .iter() + .map(|c| c.name()) + .collect_vec(); + + let converted_arrow_schema = + parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) + .map_err(anyhow::Error::from)?; + + let valid_column_indices: Vec = rw_columns + .iter() + .filter_map(|column| { + parquet_column_names + .iter() + .position(|&name| name == column.name) + .and_then(|pos| { + let arrow_field = IcebergArrowConvert + .to_arrow_field(&column.name, &column.data_type) + .ok()?; + if &arrow_field == converted_arrow_schema.field(pos) { + Some(pos) + } else { + None + } + }) + }) + .collect(); + Ok(valid_column_indices) + } + None => Ok(vec![]), + } +} From c7df0cf7180b41a9c7ec25e333c4ca8b25c9547d Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Mon, 4 Nov 2024 15:16:30 +0800 Subject: [PATCH 02/10] cherry pick 19230 and 18967 --- ci/workflows/main-cron.yml | 94 +---------- ...arquet_source_and_sink.py => file_sink.py} | 133 ++++++++++++++- .../s3/{fs_source_batch.py => file_source.py} | 129 +++++++++++--- e2e_test/s3/fs_source_v2.py | 158 ------------------ e2e_test/s3/fs_source_v2_new_file.py | 90 ---------- e2e_test/s3/gcs_source.py | 130 -------------- e2e_test/s3/posix_fs_source.py | 136 --------------- 7 files changed, 246 insertions(+), 624 deletions(-) rename e2e_test/s3/{fs_parquet_source_and_sink.py => file_sink.py} (63%) rename e2e_test/s3/{fs_source_batch.py => file_source.py} (60%) delete mode 100644 e2e_test/s3/fs_source_v2.py delete mode 100644 e2e_test/s3/fs_source_v2_new_file.py delete mode 100644 e2e_test/s3/gcs_source.py delete mode 100644 e2e_test/s3/posix_fs_source.py diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index 107b3e06afb12..bf5d60e3cdf32 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -486,7 +486,7 @@ steps: - label: "S3 source check on AWS (json parser)" key: "s3-v2-source-check-aws-json-parser" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_v2.py -t json" + command: "ci/scripts/s3-source-test.sh -p ci-release -s file_source.py -t json" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" @@ -506,53 +506,9 @@ steps: timeout_in_minutes: 25 retry: *auto-retry - - label: "S3 source new file check on AWS (json)" - key: "s3-v2-source-new-file-check-aws" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_v2_new_file.py" - if: | - !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - || build.pull_request.labels includes "ci/run-s3-source-tests" - || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ - depends_on: build - plugins: - - seek-oss/aws-sm#v2.3.1: - env: - S3_SOURCE_TEST_CONF: ci_s3_source_test_aws - - docker-compose#v5.1.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - environment: - - S3_SOURCE_TEST_CONF - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 25 - retry: *auto-retry - - - label: "S3 source and sink on parquet file" - key: "s3-v2-source-check-parquet-file" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_parquet_source_and_sink.py" - if: | - !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - || build.pull_request.labels includes "ci/run-s3-source-tests" - || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ - depends_on: build - plugins: - - seek-oss/aws-sm#v2.3.1: - env: - S3_SOURCE_TEST_CONF: ci_s3_source_test_aws - - docker-compose#v5.1.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - environment: - - S3_SOURCE_TEST_CONF - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 25 - retry: *auto-retry - - - label: "S3 source batch read on AWS (json parser)" - key: "s3-v2-source-batch-read-check-aws-json-parser" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_batch.py -t json" + - label: "S3 sink on parquet and json file" + key: "s3-sink-parquet-and-json-encode" + command: "ci/scripts/s3-source-test.sh -p ci-release -s file_sink.py" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" @@ -574,7 +530,7 @@ steps: - label: "S3 source check on AWS (csv parser)" key: "s3-v2-source-check-aws-csv-parser" - command: "ci/scripts/s3-source-test.sh -p ci-release -s fs_source_v2.py -t csv_without_header" + command: "ci/scripts/s3-source-test.sh -p ci-release -s file_source.py -t csv_without_header" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-s3-source-tests" @@ -594,46 +550,6 @@ steps: timeout_in_minutes: 25 retry: *auto-retry - - label: "PosixFs source on OpenDAL fs engine (csv parser)" - key: "s3-source-test-for-opendal-fs-engine-csv-parser" - command: "ci/scripts/s3-source-test.sh -p ci-release -s posix_fs_source.py -t csv_without_header" - if: | - !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - || build.pull_request.labels includes "ci/run-s3-source-tests" - || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ - depends_on: build - plugins: - - docker-compose#v5.1.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 25 - retry: *auto-retry - - # TODO(Kexiang): Enable this test after we have a GCS_SOURCE_TEST_CONF. - # - label: "GCS source on OpenDAL fs engine" - # key: "s3-source-test-for-opendal-fs-engine" - # command: "ci/scripts/s3-source-test-for-opendal-fs-engine.sh -p ci-release -s gcs_source.py" - # if: | - # !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null - # || build.pull_request.labels includes "ci/run-s3-source-tests" - # || build.env("CI_STEPS") =~ /(^|,)s3-source-tests?(,|$$)/ - # depends_on: build - # plugins: - # - seek-oss/aws-sm#v2.3.1: - # env: - # S3_SOURCE_TEST_CONF: ci_s3_source_test_aws - # - docker-compose#v5.1.0: - # run: rw-build-env - # config: ci/docker-compose.yml - # mount-buildkite-agent: true - # environment: - # - S3_SOURCE_TEST_CONF - # - ./ci/plugins/upload-failure-logs - # timeout_in_minutes: 20 - # retry: *auto-retry - - label: "pulsar source check" key: "pulsar-source-tests" command: "ci/scripts/pulsar-source-test.sh -p ci-release" diff --git a/e2e_test/s3/fs_parquet_source_and_sink.py b/e2e_test/s3/file_sink.py similarity index 63% rename from e2e_test/s3/fs_parquet_source_and_sink.py rename to e2e_test/s3/file_sink.py index 033cb73ffbe70..320fc90c41474 100644 --- a/e2e_test/s3/fs_parquet_source_and_sink.py +++ b/e2e_test/s3/file_sink.py @@ -10,6 +10,8 @@ from time import sleep from minio import Minio from random import uniform +from time import sleep +import time def gen_data(file_num, item_num_per_file): assert item_num_per_file % 2 == 0, \ @@ -194,11 +196,138 @@ def _assert_eq(field, got, expect): _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) print('File sink test pass!') +<<<<<<<< HEAD:e2e_test/s3/fs_parquet_source_and_sink.py cur.execute(f'drop sink test_file_sink') cur.execute(f'drop table test_sink_table') +======== + cur.execute(f'drop sink test_file_sink_parquet') + cur.execute(f'drop table test_parquet_sink_table') + cur.execute(f'drop sink test_file_sink_json') + cur.execute(f'drop table test_json_sink_table') + cur.execute(f'drop table s3_test_parquet') +>>>>>>>> f3e9a3be19 (refactor(test): reorganize file connector CI tests (#19230)):e2e_test/s3/file_sink.py cur.close() conn.close() +def test_file_sink_batching(): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE t (v1 int, v2 int);''') + + print('test file sink batching...\n') + cur.execute(f'''CREATE sink test_file_sink_batching as select + v1, v2 from t WITH ( + connector = 's3', + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + s3.path = 'test_file_sink_batching/', + s3.file_type = 'parquet', + type = 'append-only', + rollover_seconds = 5, + max_row_count = 5, + force_append_only='true' + ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''') + + cur.execute(f'''CREATE TABLE test_file_sink_batching_table( + v1 int, + v2 int, + ) WITH ( + connector = 's3', + match_pattern = 'test_file_sink_batching/*.parquet', + refresh.interval.sec = 1, + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + ) FORMAT PLAIN ENCODE PARQUET;''') + + cur.execute(f'''ALTER SINK test_file_sink_batching SET PARALLELISM = 2;''') + + cur.execute(f'''INSERT INTO t VALUES (10, 10);''') + + + cur.execute(f'select count(*) from test_file_sink_batching_table') + # no item will be selectedpsq + result = cur.fetchone() + + def _assert_eq(field, got, expect): + assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' + def _assert_greater(field, got, expect): + assert got > expect, f'{field} assertion failed: got {got}, expect {expect}.' + + _assert_eq('count(*)', result[0], 0) + print('the rollover_seconds has not reached, count(*) = 0') + + + time.sleep(11) + + cur.execute(f'select count(*) from test_file_sink_batching_table') + result = cur.fetchone() + _assert_eq('count(*)', result[0], 1) + print('the rollover_seconds has reached, count(*) = ', result[0]) + + cur.execute(f''' + INSERT INTO t VALUES (20, 20); + INSERT INTO t VALUES (30, 30); + INSERT INTO t VALUES (40, 40); + INSERT INTO t VALUES (50, 10); + ''') + + cur.execute(f'select count(*) from test_file_sink_batching_table') + # count(*) = 1 + result = cur.fetchone() + _assert_eq('count(*)', result[0], 1) + print('the max row count has not reached, count(*) = ', result[0]) + + cur.execute(f''' + INSERT INTO t VALUES (60, 20); + INSERT INTO t VALUES (70, 30); + INSERT INTO t VALUES (80, 10); + INSERT INTO t VALUES (90, 20); + INSERT INTO t VALUES (100, 30); + INSERT INTO t VALUES (100, 10); + ''') + + time.sleep(10) + + cur.execute(f'select count(*) from test_file_sink_batching_table') + result = cur.fetchone() + _assert_greater('count(*)', result[0], 1) + print('the rollover_seconds has reached, count(*) = ', result[0]) + + cur.execute(f'drop sink test_file_sink_batching;') + cur.execute(f'drop table t;') + cur.execute(f'drop table test_file_sink_batching_table;') + cur.close() + conn.close() + # delete objects + + client = Minio( + "127.0.0.1:9301", + "hummockadmin", + "hummockadmin", + secure=False, + ) + objects = client.list_objects("hummock001", prefix="test_file_sink_batching/", recursive=True) + + for obj in objects: + client.remove_object("hummock001", obj.object_name) + print(f"Deleted: {obj.object_name}") + if __name__ == "__main__": @@ -237,7 +366,9 @@ def _assert_eq(field, got, expect): do_sink(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) - # clean up s3 files + # clean up s3 files for idx, _ in enumerate(data): client.remove_object(config["S3_BUCKET"], _s3(idx)) + # test file sink batching + test_file_sink_batching() \ No newline at end of file diff --git a/e2e_test/s3/fs_source_batch.py b/e2e_test/s3/file_source.py similarity index 60% rename from e2e_test/s3/fs_source_batch.py rename to e2e_test/s3/file_source.py index fc09b0ef4b516..4fdd2b475aeb9 100644 --- a/e2e_test/s3/fs_source_batch.py +++ b/e2e_test/s3/file_source.py @@ -5,7 +5,9 @@ import random import psycopg2 +# from handle_incremental_file import upload_to_s3_bucket, check_for_new_files from time import sleep +import time from io import StringIO from minio import Minio from functools import partial @@ -29,6 +31,7 @@ def format_json(data): for file in data ] + def format_csv(data, with_header): csv_files = [] @@ -42,7 +45,7 @@ def format_csv(data, with_header): csv_files.append(ostream.getvalue()) return csv_files -def do_test(config, file_num, item_num_per_file, prefix, fmt): +def do_test(config, file_num, item_num_per_file, prefix, fmt, need_drop_table=True): conn = psycopg2.connect( host="localhost", port="4566", @@ -53,7 +56,7 @@ def do_test(config, file_num, item_num_per_file, prefix, fmt): # Open a cursor to execute SQL statements cur = conn.cursor() - def _source(): + def _table(): return f's3_test_{fmt}' def _encode(): @@ -63,7 +66,7 @@ def _encode(): return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" # Execute a SELECT statement - cur.execute(f'''CREATE SOURCE {_source()}( + cur.execute(f'''CREATE TABLE {_table()}( id int, name TEXT, sex int, @@ -75,20 +78,21 @@ def _encode(): s3.bucket_name = '{config['S3_BUCKET']}', s3.credentials.access = '{config['S3_ACCESS_KEY']}', s3.credentials.secret = '{config['S3_SECRET_KEY']}', - s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}', + refresh.interval.sec = 1 ) FORMAT PLAIN ENCODE {_encode()};''') total_rows = file_num * item_num_per_file MAX_RETRIES = 40 for retry_no in range(MAX_RETRIES): - cur.execute(f'select count(*) from {_source()}') + cur.execute(f'select count(*) from {_table()}') result = cur.fetchone() if result[0] == total_rows: break - print(f"[retry {retry_no}] Now got {result[0]} rows in source, {total_rows} expected, wait 30s") + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") sleep(30) - stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_source()}' + stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' print(f'Execute {stmt}') cur.execute(stmt) result = cur.fetchone() @@ -105,11 +109,19 @@ def _assert_eq(field, got, expect): print('Test pass') - cur.execute(f'drop source {_source()}') + if need_drop_table: + cur.execute(f'drop table {_table()}') cur.close() conn.close() -def test_empty_source(config, prefix, fmt): +FORMATTER = { + 'json': format_json, + 'csv_with_header': partial(format_csv, with_header=True), + 'csv_without_header': partial(format_csv, with_header=False), + } + + +def test_batch_read(config, file_num, item_num_per_file, prefix, fmt): conn = psycopg2.connect( host="localhost", port="4566", @@ -121,7 +133,7 @@ def test_empty_source(config, prefix, fmt): cur = conn.cursor() def _source(): - return f's3_test_empty_{fmt}' + return f's3_test_{fmt}' def _encode(): if fmt == 'json': @@ -145,6 +157,16 @@ def _encode(): s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' ) FORMAT PLAIN ENCODE {_encode()};''') + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from {_source()}') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in source, {total_rows} expected, wait 30s") + sleep(30) + stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_source()}' print(f'Execute {stmt}') cur.execute(stmt) @@ -155,25 +177,61 @@ def _encode(): def _assert_eq(field, got, expect): assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' - _assert_eq('count(*)', result[0], 0) + _assert_eq('count(*)', result[0], total_rows) + _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) + _assert_eq('sum(sex)', result[2], total_rows / 2) + _assert_eq('sum(mark)', result[3], 0) - print('Empty source test pass') + print('Test batch read pass') cur.execute(f'drop source {_source()}') cur.close() conn.close() + +def upload_to_s3_bucket(config, minio_client, run_id, files, start_bias): + _local = lambda idx, start_bias: f"data_{idx + start_bias}.{fmt}" + _s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}" + for idx, file_str in enumerate(files): + with open(_local(idx, start_bias), "w") as f: + f.write(file_str) + os.fsync(f.fileno()) + + minio_client.fput_object( + config["S3_BUCKET"], _s3(idx, start_bias), _local(idx, start_bias) + ) + + +def check_for_new_files(file_num, item_num_per_file, fmt): + conn = psycopg2.connect(host="localhost", port="4566", user="root", database="dev") + + # Open a cursor to execute SQL statements + cur = conn.cursor() + + def _table(): + return f"s3_test_{fmt}" + + total_rows = file_num * item_num_per_file + + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f"select count(*) from {_table()}") + result = cur.fetchone() + if result[0] == total_rows: + return True + print( + f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s" + ) + time.sleep(10) + return False + + if __name__ == "__main__": FILE_NUM = 4001 ITEM_NUM_PER_FILE = 2 data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) fmt = sys.argv[1] - FORMATTER = { - 'json': format_json, - 'csv_with_header': partial(format_csv, with_header=True), - 'csv_without_header': partial(format_csv, with_header=False), - } assert fmt in FORMATTER, f"Unsupported format: {fmt}" formatted_files = FORMATTER[fmt](data) @@ -201,10 +259,41 @@ def _assert_eq(field, got, expect): ) # do test + print("Test streaming file source...\n") do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) + print("Test batch read file source...\n") + test_batch_read(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) + + # test file source handle incremental files + data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) + fmt = "json" + + split_idx = 51 + data_batch1 = data[:split_idx] + data_batch2 = data[split_idx:] + run_id = str(random.randint(1000, 9999)) + print(f"S3 Source New File Test: run ID: {run_id} to buckek") + + formatted_batch1 = FORMATTER[fmt](data_batch1) + upload_to_s3_bucket(config, client, run_id, formatted_batch1, 0) + + # config in do_test that fs source's list interval is 1s + do_test( + config, len(data_batch1), ITEM_NUM_PER_FILE, run_id, fmt, need_drop_table=False + ) + + formatted_batch2 = FORMATTER[fmt](data_batch2) + upload_to_s3_bucket(config, client, run_id, formatted_batch2, split_idx) + + success_flag = check_for_new_files(FILE_NUM, ITEM_NUM_PER_FILE, fmt) + if success_flag: + print("Test(add new file) pass") + else: + print("Test(add new file) fail") + + _s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}" + # clean up s3 files for idx, _ in enumerate(formatted_files): - client.remove_object(config["S3_BUCKET"], _s3(idx)) - - test_empty_source(config, run_id, fmt) \ No newline at end of file + client.remove_object(config["S3_BUCKET"], _s3(idx, 0)) diff --git a/e2e_test/s3/fs_source_v2.py b/e2e_test/s3/fs_source_v2.py deleted file mode 100644 index eaef004dd600a..0000000000000 --- a/e2e_test/s3/fs_source_v2.py +++ /dev/null @@ -1,158 +0,0 @@ -import os -import sys -import csv -import json -import random -import psycopg2 - -from time import sleep -from io import StringIO -from minio import Minio -from functools import partial - -def gen_data(file_num, item_num_per_file): - assert item_num_per_file % 2 == 0, \ - f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' - return [ - [{ - 'id': file_id * item_num_per_file + item_id, - 'name': f'{file_id}_{item_id}', - 'sex': item_id % 2, - 'mark': (-1) ** (item_id % 2), - } for item_id in range(item_num_per_file)] - for file_id in range(file_num) - ] - -def format_json(data): - return [ - '\n'.join([json.dumps(item) for item in file]) - for file in data - ] - - -def format_csv(data, with_header): - csv_files = [] - - for file_data in data: - ostream = StringIO() - writer = csv.DictWriter(ostream, fieldnames=file_data[0].keys()) - if with_header: - writer.writeheader() - for item_data in file_data: - writer.writerow(item_data) - csv_files.append(ostream.getvalue()) - return csv_files - -def do_test(config, file_num, item_num_per_file, prefix, fmt, need_drop_table=True): - conn = psycopg2.connect( - host="localhost", - port="4566", - user="root", - database="dev" - ) - - # Open a cursor to execute SQL statements - cur = conn.cursor() - - def _table(): - return f's3_test_{fmt}' - - def _encode(): - if fmt == 'json': - return 'JSON' - else: - return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" - - # Execute a SELECT statement - cur.execute(f'''CREATE TABLE {_table()}( - id int, - name TEXT, - sex int, - mark int, - ) WITH ( - connector = 's3', - match_pattern = '{prefix}*.{fmt}', - s3.region_name = '{config['S3_REGION']}', - s3.bucket_name = '{config['S3_BUCKET']}', - s3.credentials.access = '{config['S3_ACCESS_KEY']}', - s3.credentials.secret = '{config['S3_SECRET_KEY']}', - s3.endpoint_url = 'https://{config['S3_ENDPOINT']}', - refresh.interval.sec = 1 - ) FORMAT PLAIN ENCODE {_encode()};''') - - total_rows = file_num * item_num_per_file - MAX_RETRIES = 40 - for retry_no in range(MAX_RETRIES): - cur.execute(f'select count(*) from {_table()}') - result = cur.fetchone() - if result[0] == total_rows: - break - print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") - sleep(30) - - stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' - print(f'Execute {stmt}') - cur.execute(stmt) - result = cur.fetchone() - - print('Got:', result) - - def _assert_eq(field, got, expect): - assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' - - _assert_eq('count(*)', result[0], total_rows) - _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) - _assert_eq('sum(sex)', result[2], total_rows / 2) - _assert_eq('sum(mark)', result[3], 0) - - print('Test pass') - - if need_drop_table: - cur.execute(f'drop table {_table()}') - cur.close() - conn.close() - -FORMATTER = { - 'json': format_json, - 'csv_with_header': partial(format_csv, with_header=True), - 'csv_without_header': partial(format_csv, with_header=False), - } - -if __name__ == "__main__": - FILE_NUM = 4001 - ITEM_NUM_PER_FILE = 2 - data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) - - fmt = sys.argv[1] - assert fmt in FORMATTER, f"Unsupported format: {fmt}" - formatted_files = FORMATTER[fmt](data) - - config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) - client = Minio( - config["S3_ENDPOINT"], - access_key=config["S3_ACCESS_KEY"], - secret_key=config["S3_SECRET_KEY"], - secure=True, - ) - run_id = str(random.randint(1000, 9999)) - _local = lambda idx: f'data_{idx}.{fmt}' - _s3 = lambda idx: f"{run_id}_data_{idx}.{fmt}" - - # put s3 files - for idx, file_str in enumerate(formatted_files): - with open(_local(idx), "w") as f: - f.write(file_str) - os.fsync(f.fileno()) - - client.fput_object( - config["S3_BUCKET"], - _s3(idx), - _local(idx) - ) - - # do test - do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) - - # clean up s3 files - for idx, _ in enumerate(formatted_files): - client.remove_object(config["S3_BUCKET"], _s3(idx)) diff --git a/e2e_test/s3/fs_source_v2_new_file.py b/e2e_test/s3/fs_source_v2_new_file.py deleted file mode 100644 index c90103e15c127..0000000000000 --- a/e2e_test/s3/fs_source_v2_new_file.py +++ /dev/null @@ -1,90 +0,0 @@ -from fs_source_v2 import gen_data, FORMATTER, do_test -import json -import os -import random -import psycopg2 -import time -from minio import Minio - - -def upload_to_s3_bucket(config, minio_client, run_id, files, start_bias): - _local = lambda idx, start_bias: f"data_{idx + start_bias}.{fmt}" - _s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}" - for idx, file_str in enumerate(files): - with open(_local(idx, start_bias), "w") as f: - f.write(file_str) - os.fsync(f.fileno()) - - minio_client.fput_object( - config["S3_BUCKET"], _s3(idx, start_bias), _local(idx, start_bias) - ) - - -def check_for_new_files(file_num, item_num_per_file, fmt): - conn = psycopg2.connect(host="localhost", port="4566", user="root", database="dev") - - # Open a cursor to execute SQL statements - cur = conn.cursor() - - def _table(): - return f"s3_test_{fmt}" - - total_rows = file_num * item_num_per_file - - MAX_RETRIES = 40 - for retry_no in range(MAX_RETRIES): - cur.execute(f"select count(*) from {_table()}") - result = cur.fetchone() - if result[0] == total_rows: - return True - print( - f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s" - ) - time.sleep(10) - return False - - -if __name__ == "__main__": - FILE_NUM = 101 - ITEM_NUM_PER_FILE = 2 - data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) - fmt = "json" - - split_idx = 51 - data_batch1 = data[:split_idx] - data_batch2 = data[split_idx:] - - config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) - client = Minio( - config["S3_ENDPOINT"], - access_key=config["S3_ACCESS_KEY"], - secret_key=config["S3_SECRET_KEY"], - secure=True, - ) - run_id = str(random.randint(1000, 9999)) - print(f"S3 Source New File Test: run ID: {run_id} to bucket {config['S3_BUCKET']}") - - formatted_batch1 = FORMATTER[fmt](data_batch1) - upload_to_s3_bucket(config, client, run_id, formatted_batch1, 0) - - # config in do_test that fs source's list interval is 1s - do_test( - config, len(data_batch1), ITEM_NUM_PER_FILE, run_id, fmt, need_drop_table=False - ) - - formatted_batch2 = FORMATTER[fmt](data_batch2) - upload_to_s3_bucket(config, client, run_id, formatted_batch2, split_idx) - - success_flag = check_for_new_files(FILE_NUM, ITEM_NUM_PER_FILE, fmt) - if success_flag: - print("Test(add new file) pass") - else: - print("Test(add new file) fail") - - _s3 = lambda idx, start_bias: f"{run_id}_data_{idx + start_bias}.{fmt}" - # clean up s3 files - for idx, _ in enumerate(data): - client.remove_object(config["S3_BUCKET"], _s3(idx, 0)) - - if success_flag == False: - exit(1) diff --git a/e2e_test/s3/gcs_source.py b/e2e_test/s3/gcs_source.py deleted file mode 100644 index 5e1144266fb23..0000000000000 --- a/e2e_test/s3/gcs_source.py +++ /dev/null @@ -1,130 +0,0 @@ -import os -import sys -import csv -import json -import random -import psycopg2 -import opendal - -from time import sleep -from io import StringIO -from functools import partial - -def gen_data(file_num, item_num_per_file): - assert item_num_per_file % 2 == 0, \ - f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' - return [ - [{ - 'id': file_id * item_num_per_file + item_id, - 'name': f'{file_id}_{item_id}', - 'sex': item_id % 2, - 'mark': (-1) ** (item_id % 2), - } for item_id in range(item_num_per_file)] - for file_id in range(file_num) - ] - -def format_json(data): - return [ - '\n'.join([json.dumps(item) for item in file]) - for file in data - ] - - -def do_test(config, file_num, item_num_per_file, prefix, fmt, credential): - conn = psycopg2.connect( - host="localhost", - port="4566", - user="root", - database="dev" - ) - - # Open a cursor to execute SQL statements - cur = conn.cursor() - - def _table(): - return f'gcs_test_{fmt}' - - def _encode(): - return 'JSON' - - # Execute a SELECT statement - cur.execute(f'''CREATE TABLE {_table()}( - id int, - name TEXT, - sex int, - mark int, - ) WITH ( - connector = 'gcs', - match_pattern = '{prefix}*.{fmt}', - gcs.bucket_name = '{config['GCS_BUCKET']}', - gcs.credential = '{credential}', - ) FORMAT PLAIN ENCODE {_encode()};''') - - total_rows = file_num * item_num_per_file - MAX_RETRIES = 40 - for retry_no in range(MAX_RETRIES): - cur.execute(f'select count(*) from {_table()}') - result = cur.fetchone() - if result[0] == total_rows: - break - print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") - sleep(30) - - stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' - print(f'Execute {stmt}') - cur.execute(stmt) - result = cur.fetchone() - - print('Got:', result) - - def _assert_eq(field, got, expect): - assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' - - _assert_eq('count(*)', result[0], total_rows) - _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) - _assert_eq('sum(sex)', result[2], total_rows / 2) - _assert_eq('sum(mark)', result[3], 0) - - print('Test pass') - - cur.execute(f'drop table {_table()}') - cur.close() - conn.close() - - -if __name__ == "__main__": - FILE_NUM = 4001 - ITEM_NUM_PER_FILE = 2 - data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) - - fmt = sys.argv[1] - FORMATTER = { - 'json': format_json, - } - assert fmt in FORMATTER, f"Unsupported format: {fmt}" - formatted_files = FORMATTER[fmt](data) - - config = json.loads(os.environ["GCS_SOURCE_TEST_CONF"]) - run_id = str(random.randint(1000, 9999)) - _local = lambda idx: f'data_{idx}.{fmt}' - _gcs = lambda idx: f"{run_id}_data_{idx}.{fmt}" - credential_str = json.dumps(config["GOOGLE_APPLICATION_CREDENTIALS"]) - # put gcs files - op = opendal.Operator("gcs", root="/", bucket=config["GCS_BUCKET"], credential=credential_str) - - print("upload file to gcs") - for idx, file_str in enumerate(formatted_files): - with open(_local(idx), "w") as f: - f.write(file_str) - os.fsync(f.fileno()) - file_bytes = file_str.encode('utf-8') - op.write(_gcs(idx), file_bytes) - - # do test - print("do test") - do_test(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt, credential_str) - - # clean up gcs files - print("clean up gcs files") - for idx, _ in enumerate(formatted_files): - op.delete(_gcs(idx)) diff --git a/e2e_test/s3/posix_fs_source.py b/e2e_test/s3/posix_fs_source.py deleted file mode 100644 index a7cea46fa496a..0000000000000 --- a/e2e_test/s3/posix_fs_source.py +++ /dev/null @@ -1,136 +0,0 @@ -import os -import sys -import csv -import random -import psycopg2 -import opendal - -from time import sleep -from io import StringIO -from functools import partial - -def gen_data(file_num, item_num_per_file): - assert item_num_per_file % 2 == 0, \ - f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}' - return [ - [{ - 'id': file_id * item_num_per_file + item_id, - 'name': f'{file_id}_{item_id}', - 'sex': item_id % 2, - 'mark': (-1) ** (item_id % 2), - } for item_id in range(item_num_per_file)] - for file_id in range(file_num) - ] - -def format_csv(data, with_header): - csv_files = [] - - for file_data in data: - ostream = StringIO() - writer = csv.DictWriter(ostream, fieldnames=file_data[0].keys()) - if with_header: - writer.writeheader() - for item_data in file_data: - writer.writerow(item_data) - csv_files.append(ostream.getvalue()) - return csv_files - - -def do_test(file_num, item_num_per_file, prefix, fmt): - conn = psycopg2.connect( - host="localhost", - port="4566", - user="root", - database="dev" - ) - - # Open a cursor to execute SQL statements - cur = conn.cursor() - - def _table(): - return f'posix_fs_test_{fmt}' - - def _encode(): - return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" - - # Execute a SELECT statement - cur.execute(f'''CREATE TABLE {_table()}( - id int, - name TEXT, - sex int, - mark int, - ) WITH ( - connector = 'posix_fs', - match_pattern = '{prefix}*.{fmt}', - posix_fs.root = '/tmp', - ) FORMAT PLAIN ENCODE {_encode()};''') - - total_rows = file_num * item_num_per_file - MAX_RETRIES = 40 - for retry_no in range(MAX_RETRIES): - cur.execute(f'select count(*) from {_table()}') - result = cur.fetchone() - if result[0] == total_rows: - break - print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 30s") - sleep(30) - - stmt = f'select count(*), sum(id), sum(sex), sum(mark) from {_table()}' - print(f'Execute {stmt}') - cur.execute(stmt) - result = cur.fetchone() - - print('Got:', result) - - def _assert_eq(field, got, expect): - assert got == expect, f'{field} assertion failed: got {got}, expect {expect}.' - - _assert_eq('count(*)', result[0], total_rows) - _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) - _assert_eq('sum(sex)', result[2], total_rows / 2) - _assert_eq('sum(mark)', result[3], 0) - - print('Test pass') - - cur.execute(f'drop table {_table()}') - cur.close() - conn.close() - - -if __name__ == "__main__": - FILE_NUM = 4001 - ITEM_NUM_PER_FILE = 2 - data = gen_data(FILE_NUM, ITEM_NUM_PER_FILE) - - fmt = sys.argv[1] - FORMATTER = { - 'csv_with_header': partial(format_csv, with_header=True), - 'csv_without_header': partial(format_csv, with_header=False), - } - assert fmt in FORMATTER, f"Unsupported format: {fmt}" - formatted_files = FORMATTER[fmt](data) - - run_id = str(random.randint(1000, 9999)) - _local = lambda idx: f'data_{idx}.{fmt}' - _posix = lambda idx: f"{run_id}_data_{idx}.{fmt}" - # put local files - op = opendal.Operator("fs", root="/tmp") - - print("write file to /tmp") - for idx, file_str in enumerate(formatted_files): - with open(_local(idx), "w") as f: - f.write(file_str) - os.fsync(f.fileno()) - file_name = _posix(idx) - file_bytes = file_str.encode('utf-8') - op.write(file_name, file_bytes) - - # do test - print("do test") - do_test(FILE_NUM, ITEM_NUM_PER_FILE, run_id, fmt) - - # clean up local files - print("clean up local files in /tmp") - for idx, _ in enumerate(formatted_files): - file_name = _posix(idx) - op.delete(file_name) From 5c3698f291227dc97e59cce120c3bf1615ee6b86 Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Tue, 12 Nov 2024 11:45:16 +0800 Subject: [PATCH 03/10] cherry pick 19221 --- e2e_test/s3/file_sink.py | 127 ++++++++++-- src/common/src/array/arrow/arrow_impl.rs | 185 ++++++++++++++++-- src/common/src/types/timestamptz.rs | 10 + src/connector/src/parser/parquet_parser.rs | 28 +-- .../opendal_source/opendal_reader.rs | 8 +- 5 files changed, 303 insertions(+), 55 deletions(-) diff --git a/e2e_test/s3/file_sink.py b/e2e_test/s3/file_sink.py index 320fc90c41474..e959ebc02e7ba 100644 --- a/e2e_test/s3/file_sink.py +++ b/e2e_test/s3/file_sink.py @@ -29,8 +29,14 @@ def gen_data(file_num, item_num_per_file): 'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()), 'test_date': pa.scalar(datetime.now().date(), type=pa.date32()), 'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')), - 'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')), - 'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')), + 'test_timestamp_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s')), + 'test_timestamp_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms')), + 'test_timestamp_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')), + 'test_timestamp_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns')), + 'test_timestamptz_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s', tz='+00:00')), + 'test_timestamptz_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms', tz='+00:00')), + 'test_timestamptz_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us', tz='+00:00')), + 'test_timestamptz_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns', tz='+00:00')), } for item_id in range(item_num_per_file)] for file_id in range(file_num) ] @@ -62,8 +68,15 @@ def _table(): test_bytea bytea, test_date date, test_time time, - test_timestamp timestamp, - test_timestamptz timestamptz, + test_timestamp_s timestamp, + test_timestamp_ms timestamp, + test_timestamp_us timestamp, + test_timestamp_ns timestamp, + test_timestamptz_s timestamptz, + test_timestamptz_ms timestamptz, + test_timestamptz_us timestamptz, + test_timestamptz_ns timestamptz + ) WITH ( connector = 's3', match_pattern = '*.parquet', @@ -130,8 +143,14 @@ def _table(): test_bytea, test_date, test_time, - test_timestamp, - test_timestamptz + test_timestamp_s, + test_timestamp_ms, + test_timestamp_us, + test_timestamp_ns, + test_timestamptz_s, + test_timestamptz_ms, + test_timestamptz_us, + test_timestamptz_ns from {_table()} WITH ( connector = 's3', match_pattern = '*.parquet', @@ -148,8 +167,8 @@ def _table(): print('Sink into s3...') # Execute a SELECT statement - cur.execute(f'''CREATE TABLE test_sink_table( - id bigint primary key, + cur.execute(f'''CREATE TABLE test_parquet_sink_table( + id bigint primary key,\ name TEXT, sex bigint, mark bigint, @@ -160,8 +179,14 @@ def _table(): test_bytea bytea, test_date date, test_time time, - test_timestamp timestamp, - test_timestamptz timestamptz, + test_timestamp_s timestamp, + test_timestamp_ms timestamp, + test_timestamp_us timestamp, + test_timestamp_ns timestamp, + test_timestamptz_s timestamptz, + test_timestamptz_ms timestamptz, + test_timestamptz_us timestamptz, + test_timestamptz_ns timestamptz ) WITH ( connector = 's3', match_pattern = '*.parquet', @@ -182,7 +207,87 @@ def _table(): print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s") sleep(10) - stmt = f'select count(*), sum(id) from test_sink_table' + stmt = f'select count(*), sum(id) from test_parquet_sink_table' + print(f'Execute reading sink files: {stmt}') + + print(f'Create snowflake s3 sink ') + # Execute a SELECT statement + cur.execute(f'''CREATE sink test_file_sink_json as select + id, + name, + sex, + mark, + test_int, + test_real, + test_double_precision, + test_varchar, + test_bytea, + test_date, + test_time, + test_timestamp_s, + test_timestamp_ms, + test_timestamp_us, + test_timestamp_ns, + test_timestamptz_s, + test_timestamptz_ms, + test_timestamptz_us, + test_timestamptz_ns + from {_table()} WITH ( + connector = 'snowflake', + match_pattern = '*.parquet', + snowflake.aws_region = 'custom', + snowflake.s3_bucket = 'hummock001', + snowflake.aws_access_key_id = 'hummockadmin', + snowflake.aws_secret_access_key = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + s3.path = 'test_json_sink/', + type = 'append-only', + force_append_only='true' + ) FORMAT PLAIN ENCODE JSON(force_append_only='true');''') + + print('Sink into s3 in json encode...') + # Execute a SELECT statement + cur.execute(f'''CREATE TABLE test_json_sink_table( + id bigint primary key, + name TEXT, + sex bigint, + mark bigint, + test_int int, + test_real real, + test_double_precision double precision, + test_varchar varchar, + test_bytea bytea, + test_date date, + test_time time, + test_timestamp_s timestamp, + test_timestamp_ms timestamp, + test_timestamp_us timestamp, + test_timestamp_ns timestamp, + test_timestamptz_s timestamptz, + test_timestamptz_ms timestamptz, + test_timestamptz_us timestamptz, + test_timestamptz_ns timestamptz + ) WITH ( + connector = 's3', + match_pattern = 'test_json_sink/*.json', + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + ) FORMAT PLAIN ENCODE JSON;''') + + total_rows = file_num * item_num_per_file + MAX_RETRIES = 40 + for retry_no in range(MAX_RETRIES): + cur.execute(f'select count(*) from test_json_sink_table') + result = cur.fetchone() + if result[0] == total_rows: + break + print(f"[retry {retry_no}] Now got {result[0]} rows in table, {total_rows} expected, wait 10s") + sleep(10) + + stmt = f'select count(*), sum(id) from test_json_sink_table' print(f'Execute reading sink files: {stmt}') cur.execute(stmt) result = cur.fetchone() diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 8fa3e2abb6b5f..45e092597c356 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -42,6 +42,8 @@ use std::fmt::Write; +use arrow_53_schema::TimeUnit; +use arrow_array::array; use arrow_array::cast::AsArray; use arrow_array_iceberg::array; use arrow_buffer::OffsetBuffer; @@ -512,6 +514,12 @@ pub trait FromArrow { Time64(Microsecond) => DataType::Time, Timestamp(Microsecond, None) => DataType::Timestamp, Timestamp(Microsecond, Some(_)) => DataType::Timestamptz, + Timestamp(Second, None) => DataType::Timestamp, + Timestamp(Second, Some(_)) => DataType::Timestamptz, + Timestamp(Millisecond, None) => DataType::Timestamp, + Timestamp(Millisecond, Some(_)) => DataType::Timestamptz, + Timestamp(Nanosecond, None) => DataType::Timestamp, + Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz, Interval(MonthDayNano) => DataType::Interval, Utf8 => DataType::Varchar, Binary => DataType::Bytea, @@ -572,7 +580,6 @@ pub trait FromArrow { if let Some(type_name) = field.metadata().get("ARROW:extension:name") { return self.from_extension_array(type_name, array); } - match array.data_type() { Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()), Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()), @@ -584,12 +591,30 @@ pub trait FromArrow { Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()), Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()), Time64(Microsecond) => self.from_time64us_array(array.as_any().downcast_ref().unwrap()), + Timestamp(Second, None) => { + self.from_timestampsecond_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Second, Some(_)) => { + self.from_timestampsecond_some_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Millisecond, None) => { + self.from_timestampms_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Millisecond, Some(_)) => { + self.from_timestampms_some_array(array.as_any().downcast_ref().unwrap()) + } Timestamp(Microsecond, None) => { self.from_timestampus_array(array.as_any().downcast_ref().unwrap()) } Timestamp(Microsecond, Some(_)) => { self.from_timestampus_some_array(array.as_any().downcast_ref().unwrap()) } + Timestamp(Nanosecond, None) => { + self.from_timestampns_array(array.as_any().downcast_ref().unwrap()) + } + Timestamp(Nanosecond, Some(_)) => { + self.from_timestampns_some_array(array.as_any().downcast_ref().unwrap()) + } Interval(MonthDayNano) => { self.from_interval_array(array.as_any().downcast_ref().unwrap()) } @@ -692,6 +717,33 @@ pub trait FromArrow { Ok(ArrayImpl::Time(array.into())) } + fn from_timestampsecond_array( + &self, + array: &arrow_array::TimestampSecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamp(array.into())) + } + fn from_timestampsecond_some_array( + &self, + array: &arrow_array::TimestampSecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamptz(array.into())) + } + + fn from_timestampms_array( + &self, + array: &arrow_array::TimestampMillisecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamp(array.into())) + } + + fn from_timestampms_some_array( + &self, + array: &arrow_array::TimestampMillisecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamptz(array.into())) + } + fn from_timestampus_array( &self, array: &arrow_array::TimestampMicrosecondArray, @@ -706,6 +758,20 @@ pub trait FromArrow { Ok(ArrayImpl::Timestamptz(array.into())) } + fn from_timestampns_array( + &self, + array: &arrow_array::TimestampNanosecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamp(array.into())) + } + + fn from_timestampns_some_array( + &self, + array: &arrow_array::TimestampNanosecondArray, + ) -> Result { + Ok(ArrayImpl::Timestamptz(array.into())) + } + fn from_interval_array( &self, array: &arrow_array::IntervalMonthDayNanoArray, @@ -842,6 +908,44 @@ macro_rules! converts { } }; } + +macro_rules! converts_with_timeunit { + ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => { + + impl From<&$ArrayType> for $ArrowType { + fn from(array: &$ArrayType) -> Self { + array.iter().map(|o| o.map(|v| v.into_arrow_with_unit($time_unit))).collect() + } + } + + impl From<&$ArrowType> for $ArrayType { + fn from(array: &$ArrowType) -> Self { + array.iter().map(|o| { + o.map(|v| { + let timestamp = <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit); + timestamp + }) + }).collect() + } + } + + impl From<&[$ArrowType]> for $ArrayType { + fn from(arrays: &[$ArrowType]) -> Self { + arrays + .iter() + .flat_map(|a| a.iter()) + .map(|o| { + o.map(|v| { + <<$ArrayType as Array>::RefItem<'_> as FromIntoArrowWithUnit>::from_arrow_with_unit(v, $time_unit) + }) + }) + .collect() + } + } + + }; +} + converts!(BoolArray, arrow_array::BooleanArray); converts!(I16Array, arrow_array::Int16Array); converts!(I32Array, arrow_array::Int32Array); @@ -854,11 +958,19 @@ converts!(Utf8Array, arrow_array::StringArray); converts!(Utf8Array, arrow_array::LargeStringArray); converts!(DateArray, arrow_array::Date32Array, @map); converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map); -converts!(TimestampArray, arrow_array::TimestampMicrosecondArray, @map); -converts!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, @map); converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map); converts!(SerialArray, arrow_array::Int64Array, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); +converts_with_timeunit!(TimestampArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); + +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMillisecondArray,TimeUnit::Millisecond, @map); +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); +converts_with_timeunit!(TimestamptzArray, arrow_array::TimestampNanosecondArray, TimeUnit::Nanosecond, @map); + /// Converts RisingWave value from and into Arrow value. trait FromIntoArrow { /// The corresponding element type in the Arrow array. @@ -867,6 +979,16 @@ trait FromIntoArrow { fn into_arrow(self) -> Self::ArrowType; } +/// Converts RisingWave value from and into Arrow value. +/// Specifically used for converting timestamp types according to timeunit. +trait FromIntoArrowWithUnit { + type ArrowType; + /// The timestamp type used to distinguish different time units, only utilized when the Arrow type is a timestamp. + type TimestampType; + fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self; + fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType; +} + impl FromIntoArrow for Serial { type ArrowType = i64; @@ -936,34 +1058,55 @@ impl FromIntoArrow for Time { } } -impl FromIntoArrow for Timestamp { +impl FromIntoArrowWithUnit for Timestamp { type ArrowType = i64; + type TimestampType = TimeUnit; - fn from_arrow(value: Self::ArrowType) -> Self { - Timestamp( - DateTime::from_timestamp((value / 1_000_000) as _, (value % 1_000_000 * 1000) as _) - .unwrap() - .naive_utc(), - ) + fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { + match time_unit { + TimeUnit::Second => { + Timestamp(DateTime::from_timestamp(value as _, 0).unwrap().naive_utc()) + } + TimeUnit::Millisecond => { + Timestamp(DateTime::from_timestamp_millis(value).unwrap().naive_utc()) + } + TimeUnit::Microsecond => { + Timestamp(DateTime::from_timestamp_micros(value).unwrap().naive_utc()) + } + TimeUnit::Nanosecond => Timestamp(DateTime::from_timestamp_nanos(value).naive_utc()), + } } - fn into_arrow(self) -> Self::ArrowType { - self.0 - .signed_duration_since(NaiveDateTime::default()) - .num_microseconds() - .unwrap() + fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType { + match time_unit { + TimeUnit::Second => self.0.and_utc().timestamp(), + TimeUnit::Millisecond => self.0.and_utc().timestamp_millis(), + TimeUnit::Microsecond => self.0.and_utc().timestamp_micros(), + TimeUnit::Nanosecond => self.0.and_utc().timestamp_nanos_opt().unwrap(), + } } } -impl FromIntoArrow for Timestamptz { +impl FromIntoArrowWithUnit for Timestamptz { type ArrowType = i64; - - fn from_arrow(value: Self::ArrowType) -> Self { - Timestamptz::from_micros(value) + type TimestampType = TimeUnit; + + fn from_arrow_with_unit(value: Self::ArrowType, time_unit: Self::TimestampType) -> Self { + match time_unit { + TimeUnit::Second => Timestamptz::from_secs(value).unwrap_or_default(), + TimeUnit::Millisecond => Timestamptz::from_millis(value).unwrap_or_default(), + TimeUnit::Microsecond => Timestamptz::from_micros(value), + TimeUnit::Nanosecond => Timestamptz::from_nanos(value).unwrap_or_default(), + } } - fn into_arrow(self) -> Self::ArrowType { - self.timestamp_micros() + fn into_arrow_with_unit(self, time_unit: Self::TimestampType) -> Self::ArrowType { + match time_unit { + TimeUnit::Second => self.timestamp(), + TimeUnit::Millisecond => self.timestamp_millis(), + TimeUnit::Microsecond => self.timestamp_micros(), + TimeUnit::Nanosecond => self.timestamp_nanos().unwrap(), + } } } diff --git a/src/common/src/types/timestamptz.rs b/src/common/src/types/timestamptz.rs index 34a6bd2465271..11ea22154e97d 100644 --- a/src/common/src/types/timestamptz.rs +++ b/src/common/src/types/timestamptz.rs @@ -102,6 +102,11 @@ impl Timestamptz { Self(timestamp_micros) } + /// Creates a `Timestamptz` from microseconds. + pub fn from_nanos(timestamp_nanos: i64) -> Option { + timestamp_nanos.checked_div(1_000).map(Self) + } + /// Returns the number of non-leap-microseconds since January 1, 1970 UTC. pub fn timestamp_micros(&self) -> i64 { self.0 @@ -112,6 +117,11 @@ impl Timestamptz { self.0.div_euclid(1_000) } + /// Returns the number of non-leap-nanosseconds since January 1, 1970 UTC. + pub fn timestamp_nanos(&self) -> Option { + self.0.checked_mul(1_000) + } + /// Returns the number of non-leap seconds since January 1, 1970 0:00:00 UTC (aka "UNIX /// timestamp"). pub fn timestamp(&self) -> i64 { diff --git a/src/connector/src/parser/parquet_parser.rs b/src/connector/src/parser/parquet_parser.rs index 4f1e720bc47fb..78ee4aeb6b568 100644 --- a/src/connector/src/parser/parquet_parser.rs +++ b/src/connector/src/parser/parquet_parser.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use arrow_array_iceberg::RecordBatch; use deltalake::parquet::arrow::async_reader::AsyncFileReader; use futures_async_stream::try_stream; +use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk}; use risingwave_common::bail; @@ -104,32 +105,19 @@ impl ParquetParser { crate::source::SourceColumnType::Normal => { match source_column.is_hidden_addition_col { false => { - let rw_data_type = &source_column.data_type; + let rw_data_type: &risingwave_common::types::DataType = + &source_column.data_type; let rw_column_name = &source_column.name; + if let Some(parquet_column) = record_batch.column_by_name(rw_column_name) { let arrow_field = IcebergArrowConvert .to_arrow_field(rw_column_name, rw_data_type)?; - let converted_arrow_data_type: &arrow_schema_iceberg::DataType = - arrow_field.data_type(); - if converted_arrow_data_type == parquet_column.data_type() { - let array_impl = IcebergArrowConvert - .array_from_arrow_array(&arrow_field, parquet_column)?; - let column = Arc::new(array_impl); - chunk_columns.push(column); - } else { - // data type mismatch, this column is set to null. - let mut array_builder = ArrayBuilderImpl::with_type( - column_size, - rw_data_type.clone(), - ); - - array_builder.append_n_null(record_batch.num_rows()); - let res = array_builder.finish(); - let column = Arc::new(res); - chunk_columns.push(column); - } + let array_impl = IcebergArrowConvert + .array_from_arrow_array(&arrow_field, parquet_column)?; + let column = Arc::new(array_impl); + chunk_columns.push(column); } else { // For columns defined in the source schema but not present in the Parquet file, null values are filled in. let mut array_builder = diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 69308a092e2dd..14258d8924658 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -269,10 +269,12 @@ pub fn extract_valid_column_indices( .iter() .position(|&name| name == column.name) .and_then(|pos| { - let arrow_field = IcebergArrowConvert - .to_arrow_field(&column.name, &column.data_type) + // We should convert Arrow field to the rw data type instead of converting the rw data type to the Arrow data type for comparison. + // The reason is that for the timestamp type, the different time units in Arrow need to match with the timestamp and timestamptz in rw. + let arrow_filed_to_rw_data_type = IcebergArrowConvert + .type_from_field(converted_arrow_schema.field(pos)) .ok()?; - if &arrow_field == converted_arrow_schema.field(pos) { + if arrow_filed_to_rw_data_type == column.data_type { Some(pos) } else { None From b9f337d661024def6d73f5e09e5705edb628effb Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:56:59 +0800 Subject: [PATCH 04/10] cherry pick 19398 --- src/batch/src/executor/s3_file_scan.rs | 37 +- .../opendal_source/opendal_reader.rs | 104 +----- src/connector/src/source/iceberg/mod.rs | 4 +- .../source/iceberg/parquet_file_handler.rs | 320 ++++++++++++++++++ .../src/source/iceberg/parquet_file_reader.rs | 132 -------- src/frontend/src/expr/table_function.rs | 15 +- 6 files changed, 352 insertions(+), 260 deletions(-) create mode 100644 src/connector/src/source/iceberg/parquet_file_handler.rs delete mode 100644 src/connector/src/source/iceberg/parquet_file_reader.rs diff --git a/src/batch/src/executor/s3_file_scan.rs b/src/batch/src/executor/s3_file_scan.rs index 5096d1d625fa1..38907c63f8416 100644 --- a/src/batch/src/executor/s3_file_scan.rs +++ b/src/batch/src/executor/s3_file_scan.rs @@ -12,13 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; -use parquet::arrow::ProjectionMask; -use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::catalog::{Field, Schema}; -use risingwave_connector::source::iceberg::parquet_file_reader::create_parquet_stream_builder; +use risingwave_connector::source::iceberg::{new_s3_operator, read_parquet_file}; use risingwave_pb::batch_plan::file_scan_node; use risingwave_pb::batch_plan::file_scan_node::StorageType; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -85,34 +82,18 @@ impl S3FileScanExecutor { async fn do_execute(self: Box) { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { - let mut batch_stream_builder = create_parquet_stream_builder( + let op = new_s3_operator( self.s3_region.clone(), self.s3_access_key.clone(), self.s3_secret_key.clone(), - file, - ) - .await?; - - let arrow_schema = batch_stream_builder.schema(); - assert_eq!(arrow_schema.fields.len(), self.schema.fields.len()); - for (field, arrow_field) in self.schema.fields.iter().zip(arrow_schema.fields.iter()) { - assert_eq!(*field.name, *arrow_field.name()); - } - - batch_stream_builder = batch_stream_builder.with_projection(ProjectionMask::all()); - - batch_stream_builder = batch_stream_builder.with_batch_size(self.batch_size); - - let record_batch_stream = batch_stream_builder - .build() - .map_err(|e| anyhow!(e).context("fail to build arrow stream builder"))?; - + file.clone(), + )?; + let chunk_stream = read_parquet_file(op, file, None, None, self.batch_size, 0).await?; #[for_await] - for record_batch in record_batch_stream { - let record_batch = record_batch.map_err(BatchError::Parquet)?; - let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - debug_assert_eq!(chunk.data_types(), self.schema.data_types()); - yield chunk; + for stream_chunk in chunk_stream { + let stream_chunk = stream_chunk?; + let (data_chunk, _) = stream_chunk.into_parts(); + yield data_chunk; } } } diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 14258d8924658..ca8ee1ae486b2 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -19,24 +19,19 @@ use async_compression::tokio::bufread::GzipDecoder; use async_trait::async_trait; use futures::TryStreamExt; use futures_async_stream::try_stream; -use itertools::Itertools; use opendal::Operator; -use parquet::arrow::async_reader::AsyncFileReader; -use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; -use parquet::file::metadata::FileMetaData; -use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::StreamChunk; -use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; use tokio::io::{AsyncRead, BufReader}; use tokio_util::io::{ReaderStream, StreamReader}; use super::opendal_enumerator::OpendalEnumerator; use super::OpendalSource; use crate::error::ConnectorResult; -use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParquetParser, ParserConfig}; +use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig}; use crate::source::filesystem::file_common::CompressionFormat; use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::{nd_streaming, OpendalFsSplit}; +use crate::source::iceberg::read_parquet_file; use crate::source::{ BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, SplitReader, @@ -91,38 +86,15 @@ impl OpendalReader { let msg_stream; if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config { - // // If the format is "parquet", use `ParquetParser` to convert `record_batch` into stream chunk. - let mut reader: tokio_util::compat::Compat = self - .connector - .op - .reader_with(&object_name) - .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. - .await? - .into_futures_async_read(..) - .await? - .compat(); - let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; - - let file_metadata = parquet_metadata.file_metadata(); - let column_indices = - extract_valid_column_indices(self.columns.clone(), file_metadata)?; - let projection_mask = - ProjectionMask::leaves(file_metadata.schema_descr(), column_indices); - // For the Parquet format, we directly convert from a record batch to a stream chunk. - // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file. - let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader) - .await? - .with_batch_size(self.source_ctx.source_ctrl_opts.chunk_size) - .with_projection(projection_mask) - .with_offset(split.offset) - .build()?; - - let parquet_parser = ParquetParser::new( - self.parser_config.common.rw_columns.clone(), + msg_stream = read_parquet_file( + self.connector.op.clone(), object_name, + self.columns.clone(), + Some(self.parser_config.common.rw_columns.clone()), + self.source_ctx.source_ctrl_opts.chunk_size, split.offset, - )?; - msg_stream = parquet_parser.into_stream(record_batch_stream); + ) + .await?; } else { let data_stream = Self::stream_read_object( self.connector.op.clone(), @@ -229,61 +201,3 @@ impl OpendalReader { } } } - -/// Extracts valid column indices from a Parquet file schema based on the user's requested schema. -/// -/// This function is used for column pruning of Parquet files. It calculates the intersection -/// between the columns in the currently read Parquet file and the schema provided by the user. -/// This is useful for reading a `RecordBatch` with the appropriate `ProjectionMask`, ensuring that -/// only the necessary columns are read. -/// -/// # Parameters -/// - `columns`: A vector of `Column` representing the user's requested schema. -/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file. -/// -/// # Returns -/// - A `ConnectorResult>`, which contains the indices of the valid columns in the -/// Parquet file schema that match the requested schema. If an error occurs during processing, -/// it returns an appropriate error. -pub fn extract_valid_column_indices( - columns: Option>, - metadata: &FileMetaData, -) -> ConnectorResult> { - match columns { - Some(rw_columns) => { - let parquet_column_names = metadata - .schema_descr() - .columns() - .iter() - .map(|c| c.name()) - .collect_vec(); - - let converted_arrow_schema = - parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) - .map_err(anyhow::Error::from)?; - - let valid_column_indices: Vec = rw_columns - .iter() - .filter_map(|column| { - parquet_column_names - .iter() - .position(|&name| name == column.name) - .and_then(|pos| { - // We should convert Arrow field to the rw data type instead of converting the rw data type to the Arrow data type for comparison. - // The reason is that for the timestamp type, the different time units in Arrow need to match with the timestamp and timestamptz in rw. - let arrow_filed_to_rw_data_type = IcebergArrowConvert - .type_from_field(converted_arrow_schema.field(pos)) - .ok()?; - if arrow_filed_to_rw_data_type == column.data_type { - Some(pos) - } else { - None - } - }) - }) - .collect(); - Ok(valid_column_indices) - } - None => Ok(vec![]), - } -} diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index f101ff9ed6d4b..c3f0ed1dce660 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod parquet_file_reader; +pub mod parquet_file_handler; use std::collections::HashMap; @@ -22,7 +22,7 @@ use futures_async_stream::for_await; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use itertools::Itertools; -pub use parquet_file_reader::*; +pub use parquet_file_handler::*; use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::types::JsonbVal; diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs new file mode 100644 index 0000000000000..146348545fbc0 --- /dev/null +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -0,0 +1,320 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::future::IntoFuture; +use std::ops::Range; +use std::pin::Pin; +use std::sync::Arc; + +use anyhow::anyhow; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, Stream, TryFutureExt}; +use iceberg::io::{ + FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY, +}; +use iceberg::{Error, ErrorKind}; +use itertools::Itertools; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::S3; +use opendal::Operator; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader}; +use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::StreamChunk; +use risingwave_common::catalog::ColumnId; +use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; +use url::Url; + +use crate::error::ConnectorResult; +use crate::parser::ParquetParser; +use crate::source::{Column, SourceColumnDesc}; + +pub struct ParquetFileReader { + meta: FileMetadata, + r: R, +} + +impl ParquetFileReader { + pub fn new(meta: FileMetadata, r: R) -> Self { + Self { meta, r } + } +} + +impl AsyncFileReader for ParquetFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + Box::pin( + self.r + .read(range.start as _..range.end as _) + .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))), + ) + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + let reader = ParquetMetaDataReader::new(); + let size = self.meta.size as usize; + let meta = reader.load_and_finish(self, size).await?; + + Ok(Arc::new(meta)) + } + .boxed() + } +} + +pub async fn create_parquet_stream_builder( + s3_region: String, + s3_access_key: String, + s3_secret_key: String, + location: String, +) -> Result>, anyhow::Error> { + let mut props = HashMap::new(); + props.insert(S3_REGION, s3_region.clone()); + props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone()); + props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone()); + + let file_io_builder = FileIOBuilder::new("s3"); + let file_io = file_io_builder + .with_props(props.into_iter()) + .build() + .map_err(|e| anyhow!(e))?; + let parquet_file = file_io.new_input(&location).map_err(|e| anyhow!(e))?; + + let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?; + let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?; + let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader); + + ParquetRecordBatchStreamBuilder::new(parquet_file_reader) + .await + .map_err(|e| anyhow!(e)) +} + +pub fn new_s3_operator( + s3_region: String, + s3_access_key: String, + s3_secret_key: String, + location: String, +) -> ConnectorResult { + // Create s3 builder. + let bucket = extract_bucket(&location); + let mut builder = S3::default().bucket(&bucket).region(&s3_region); + builder = builder.secret_access_key(&s3_access_key); + builder = builder.secret_access_key(&s3_secret_key); + builder = builder.endpoint(&format!( + "https://{}.s3.{}.amazonaws.com", + bucket, s3_region + )); + + builder = builder.disable_config_load(); + + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + + Ok(op) +} + +fn extract_bucket(location: &str) -> String { + let prefix = "s3://"; + let start = prefix.len(); + let end = location[start..] + .find('/') + .unwrap_or(location.len() - start); + location[start..start + end].to_string() +} + +pub async fn list_s3_directory( + s3_region: String, + s3_access_key: String, + s3_secret_key: String, + dir: String, +) -> Result, anyhow::Error> { + let url = Url::parse(&dir)?; + let bucket = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {}, missing bucket", dir), + ) + })?; + + let prefix = format!("s3://{}/", bucket); + if dir.starts_with(&prefix) { + let mut builder = S3::default(); + builder = builder + .region(&s3_region) + .access_key_id(&s3_access_key) + .secret_access_key(&s3_secret_key) + .bucket(bucket); + let op = Operator::new(builder)? + .layer(RetryLayer::default()) + .finish(); + + op.list(&dir[prefix.len()..]) + .await + .map_err(|e| anyhow!(e)) + .map(|list| { + list.into_iter() + .map(|entry| prefix.to_string() + entry.path()) + .collect() + }) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {}, should start with {}", dir, prefix), + ))? + } +} + +/// Extracts valid column indices from a Parquet file schema based on the user's requested schema. +/// +/// This function is used for column pruning of Parquet files. It calculates the intersection +/// between the columns in the currently read Parquet file and the schema provided by the user. +/// This is useful for reading a `RecordBatch` with the appropriate `ProjectionMask`, ensuring that +/// only the necessary columns are read. +/// +/// # Parameters +/// - `columns`: A vector of `Column` representing the user's requested schema. +/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file. +/// +/// # Returns +/// - A `ConnectorResult>`, which contains the indices of the valid columns in the +/// Parquet file schema that match the requested schema. If an error occurs during processing, +/// it returns an appropriate error. +pub fn extract_valid_column_indices( + columns: Option>, + metadata: &FileMetaData, +) -> ConnectorResult> { + match columns { + Some(rw_columns) => { + let parquet_column_names = metadata + .schema_descr() + .columns() + .iter() + .map(|c| c.name()) + .collect_vec(); + + let converted_arrow_schema = + parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) + .map_err(anyhow::Error::from)?; + + let valid_column_indices: Vec = rw_columns + .iter() + .filter_map(|column| { + parquet_column_names + .iter() + .position(|&name| name == column.name) + .and_then(|pos| { + let arrow_field = IcebergArrowConvert + .to_arrow_field(&column.name, &column.data_type) + .ok()?; + if &arrow_field == converted_arrow_schema.field(pos) { + Some(pos) + } else { + None + } + }) + }) + .collect(); + Ok(valid_column_indices) + } + None => Ok(vec![]), + } +} + +/// Reads a specified Parquet file and converts its content into a stream of chunks. +pub async fn read_parquet_file( + op: Operator, + file_name: String, + rw_columns: Option>, + parser_columns: Option>, + batch_size: usize, + offset: usize, +) -> ConnectorResult< + Pin> + Send>>, +> { + let mut reader: tokio_util::compat::Compat = op + .reader_with(&file_name) + .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. + .await? + .into_futures_async_read(..) + .await? + .compat(); + let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; + + let file_metadata = parquet_metadata.file_metadata(); + let column_indices = extract_valid_column_indices(rw_columns, file_metadata)?; + let projection_mask = ProjectionMask::leaves(file_metadata.schema_descr(), column_indices); + // For the Parquet format, we directly convert from a record batch to a stream chunk. + // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file. + let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader) + .await? + .with_batch_size(batch_size) + .with_projection(projection_mask) + .with_offset(offset) + .build()?; + let converted_arrow_schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + ) + .map_err(anyhow::Error::from)?; + let columns = match parser_columns { + Some(columns) => columns, + None => converted_arrow_schema + .fields + .iter() + .enumerate() + .map(|(index, field_ref)| { + let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap(); + SourceColumnDesc::simple( + field_ref.name().clone(), + data_type, + ColumnId::new(index as i32), + ) + }) + .collect(), + }; + + let parquet_parser = ParquetParser::new(columns, file_name, offset)?; + let msg_stream: Pin< + Box> + Send>, + > = parquet_parser.into_stream(record_batch_stream); + Ok(msg_stream) +} + +pub async fn get_parquet_fields( + op: Operator, + file_name: String, +) -> ConnectorResult { + let mut reader: tokio_util::compat::Compat = op + .reader_with(&file_name) + .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. + .await? + .into_futures_async_read(..) + .await? + .compat(); + let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; + + let file_metadata = parquet_metadata.file_metadata(); + let converted_arrow_schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + ) + .map_err(anyhow::Error::from)?; + let fields: risingwave_common::array::arrow::arrow_schema_udf::Fields = + converted_arrow_schema.fields; + Ok(fields) +} diff --git a/src/connector/src/source/iceberg/parquet_file_reader.rs b/src/connector/src/source/iceberg/parquet_file_reader.rs deleted file mode 100644 index 6e323f4aec9ba..0000000000000 --- a/src/connector/src/source/iceberg/parquet_file_reader.rs +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::ops::Range; -use std::sync::Arc; - -use anyhow::anyhow; -use bytes::Bytes; -use futures::future::BoxFuture; -use futures::TryFutureExt; -use iceberg::io::{ - FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY, -}; -use iceberg::{Error, ErrorKind}; -use opendal::layers::RetryLayer; -use opendal::services::S3; -use opendal::Operator; -use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; -use parquet::arrow::ParquetRecordBatchStreamBuilder; -use parquet::file::metadata::ParquetMetaData; -use url::Url; - -pub struct ParquetFileReader { - meta: FileMetadata, - r: R, -} - -impl ParquetFileReader { - pub fn new(meta: FileMetadata, r: R) -> Self { - Self { meta, r } - } -} - -impl AsyncFileReader for ParquetFileReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - Box::pin( - self.r - .read(range.start as _..range.end as _) - .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))), - ) - } - - fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { - Box::pin(async move { - let file_size = self.meta.size; - let mut loader = MetadataLoader::load(self, file_size as usize, None).await?; - loader.load_page_index(false, false).await?; - Ok(Arc::new(loader.finish())) - }) - } -} - -pub async fn create_parquet_stream_builder( - s3_region: String, - s3_access_key: String, - s3_secret_key: String, - location: String, -) -> Result>, anyhow::Error> { - let mut props = HashMap::new(); - props.insert(S3_REGION, s3_region.clone()); - props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone()); - props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone()); - - let file_io_builder = FileIOBuilder::new("s3"); - let file_io = file_io_builder - .with_props(props.into_iter()) - .build() - .map_err(|e| anyhow!(e))?; - let parquet_file = file_io.new_input(&location).map_err(|e| anyhow!(e))?; - - let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?; - let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?; - let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader); - - ParquetRecordBatchStreamBuilder::new(parquet_file_reader) - .await - .map_err(|e| anyhow!(e)) -} - -pub async fn list_s3_directory( - s3_region: String, - s3_access_key: String, - s3_secret_key: String, - dir: String, -) -> Result, anyhow::Error> { - let url = Url::parse(&dir)?; - let bucket = url.host_str().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, missing bucket", dir), - ) - })?; - - let prefix = format!("s3://{}/", bucket); - if dir.starts_with(&prefix) { - let mut builder = S3::default(); - builder - .region(&s3_region) - .access_key_id(&s3_access_key) - .secret_access_key(&s3_secret_key) - .bucket(bucket); - let op = Operator::new(builder)? - .layer(RetryLayer::default()) - .finish(); - - op.list(&dir[prefix.len()..]) - .await - .map_err(|e| anyhow!(e)) - .map(|list| { - list.into_iter() - .map(|entry| prefix.to_string() + entry.path()) - .collect() - }) - } else { - Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, should start with {}", dir, prefix), - ))? - } -} diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 5806eea792904..4853d1332b810 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -17,7 +17,9 @@ use std::sync::{Arc, LazyLock}; use itertools::Itertools; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; -use risingwave_connector::source::iceberg::{create_parquet_stream_builder, list_s3_directory}; +use risingwave_connector::source::iceberg::{ + get_parquet_fields, list_s3_directory, new_s3_operator, +}; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; use tokio::runtime::Runtime; @@ -178,7 +180,7 @@ impl TableFunction { let schema = tokio::task::block_in_place(|| { RUNTIME.block_on(async { - let parquet_stream_builder = create_parquet_stream_builder( + let op = new_s3_operator( eval_args[2].clone(), eval_args[3].clone(), eval_args[4].clone(), @@ -186,11 +188,18 @@ impl TableFunction { Some(files) => files[0].clone(), None => eval_args[5].clone(), }, + )?; + let fields = get_parquet_fields( + op, + match files.as_ref() { + Some(files) => files[0].clone(), + None => eval_args[5].clone(), + }, ) .await?; let mut rw_types = vec![]; - for field in parquet_stream_builder.schema().fields() { + for field in &fields { rw_types.push(( field.name().to_string(), IcebergArrowConvert.type_from_field(field)?, From 01567a587513850a9b68e1c441a4c1cb5eccb2d0 Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Wed, 27 Nov 2024 09:41:13 +0800 Subject: [PATCH 05/10] feat(connector): support more parquet data types (#19561) --- e2e_test/s3/file_sink.py | 39 ++++- src/common/src/array/arrow/arrow_impl.rs | 153 ++++++++++++++++++ .../source/iceberg/parquet_file_handler.rs | 77 ++++++++- 3 files changed, 264 insertions(+), 5 deletions(-) diff --git a/e2e_test/s3/file_sink.py b/e2e_test/s3/file_sink.py index e959ebc02e7ba..2232dc8534b2a 100644 --- a/e2e_test/s3/file_sink.py +++ b/e2e_test/s3/file_sink.py @@ -11,6 +11,7 @@ from minio import Minio from random import uniform from time import sleep +import numpy as np import time def gen_data(file_num, item_num_per_file): @@ -23,6 +24,12 @@ def gen_data(file_num, item_num_per_file): 'sex': item_id % 2, 'mark': (-1) ** (item_id % 2), 'test_int': pa.scalar(1, type=pa.int32()), + 'test_int8': pa.scalar(1, type=pa.int8()), + 'test_uint8': pa.scalar(item_id % 256, type=pa.uint8()), # UInt8 + 'test_uint16': pa.scalar(item_id % 65536, type=pa.uint16()), # UInt16 + 'test_uint32': pa.scalar(item_id % (2**32), type=pa.uint32()), # UInt32 + 'test_uint64': pa.scalar(item_id % (2**64), type=pa.uint64()), # UInt64 + 'test_float_16': pa.scalar(np.float16(4.0), type=pa.float16()), 'test_real': pa.scalar(4.0, type=pa.float32()), 'test_double_precision': pa.scalar(5.0, type=pa.float64()), 'test_varchar': pa.scalar('7', type=pa.string()), @@ -62,6 +69,12 @@ def _table(): sex bigint, mark bigint, test_int int, + test_int8 smallint, + test_uint8 smallint, + test_uint16 int, + test_uint32 bigint, + test_uint64 decimal, + test_float_16 real, test_real real, test_double_precision double precision, test_varchar varchar, @@ -137,6 +150,12 @@ def _table(): sex, mark, test_int, + test_int8, + test_uint8, + test_uint16, + test_uint32, + test_uint64, + test_float_16, test_real, test_double_precision, test_varchar, @@ -173,6 +192,12 @@ def _table(): sex bigint, mark bigint, test_int int, + test_int8 smallint, + test_uint8 smallint, + test_uint16 int, + test_uint32 bigint, + test_uint64 decimal, + test_float_16 real, test_real real, test_double_precision double precision, test_varchar varchar, @@ -210,7 +235,7 @@ def _table(): stmt = f'select count(*), sum(id) from test_parquet_sink_table' print(f'Execute reading sink files: {stmt}') - print(f'Create snowflake s3 sink ') + print(f'Create s3 sink json format') # Execute a SELECT statement cur.execute(f'''CREATE sink test_file_sink_json as select id, @@ -218,6 +243,12 @@ def _table(): sex, mark, test_int, + test_int8, + test_uint8, + test_uint16, + test_uint32, + test_uint64, + test_float_16, test_real, test_double_precision, test_varchar, @@ -253,6 +284,12 @@ def _table(): sex bigint, mark bigint, test_int int, + test_int8 smallint, + test_uint8 smallint, + test_uint16 int, + test_uint32 bigint, + test_uint64 decimal, + test_float_16 real, test_real real, test_double_precision double precision, test_varchar varchar, diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 45e092597c356..e7f6d938a3714 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -506,6 +506,12 @@ pub trait FromArrow { Int16 => DataType::Int16, Int32 => DataType::Int32, Int64 => DataType::Int64, + Int8 => DataType::Int16, + UInt8 => DataType::Int16, + UInt16 => DataType::Int32, + UInt32 => DataType::Int64, + UInt64 => DataType::Decimal, + Float16 => DataType::Float32, Float32 => DataType::Float32, Float64 => DataType::Float64, Decimal128(_, _) => DataType::Decimal, @@ -582,11 +588,18 @@ pub trait FromArrow { } match array.data_type() { Boolean => self.from_bool_array(array.as_any().downcast_ref().unwrap()), + Int8 => self.from_int8_array(array.as_any().downcast_ref().unwrap()), Int16 => self.from_int16_array(array.as_any().downcast_ref().unwrap()), Int32 => self.from_int32_array(array.as_any().downcast_ref().unwrap()), Int64 => self.from_int64_array(array.as_any().downcast_ref().unwrap()), + UInt8 => self.from_uint8_array(array.as_any().downcast_ref().unwrap()), + UInt16 => self.from_uint16_array(array.as_any().downcast_ref().unwrap()), + UInt32 => self.from_uint32_array(array.as_any().downcast_ref().unwrap()), + + UInt64 => self.from_uint64_array(array.as_any().downcast_ref().unwrap()), Decimal128(_, _) => self.from_decimal128_array(array.as_any().downcast_ref().unwrap()), Decimal256(_, _) => self.from_int256_array(array.as_any().downcast_ref().unwrap()), + Float16 => self.from_float16_array(array.as_any().downcast_ref().unwrap()), Float32 => self.from_float32_array(array.as_any().downcast_ref().unwrap()), Float64 => self.from_float64_array(array.as_any().downcast_ref().unwrap()), Date32 => self.from_date32_array(array.as_any().downcast_ref().unwrap()), @@ -670,6 +683,22 @@ pub trait FromArrow { Ok(ArrayImpl::Int16(array.into())) } + fn from_int8_array(&self, array: &arrow_array::Int8Array) -> Result { + Ok(ArrayImpl::Int16(array.into())) + } + + fn from_uint8_array(&self, array: &arrow_array::UInt8Array) -> Result { + Ok(ArrayImpl::Int16(array.into())) + } + + fn from_uint16_array(&self, array: &arrow_array::UInt16Array) -> Result { + Ok(ArrayImpl::Int32(array.into())) + } + + fn from_uint32_array(&self, array: &arrow_array::UInt32Array) -> Result { + Ok(ArrayImpl::Int64(array.into())) + } + fn from_int32_array(&self, array: &arrow_array::Int32Array) -> Result { Ok(ArrayImpl::Int32(array.into())) } @@ -692,6 +721,17 @@ pub trait FromArrow { Ok(ArrayImpl::Decimal(array.try_into()?)) } + fn from_uint64_array(&self, array: &arrow_array::UInt64Array) -> Result { + Ok(ArrayImpl::Decimal(array.try_into()?)) + } + + fn from_float16_array( + &self, + array: &arrow_array::Float16Array, + ) -> Result { + Ok(ArrayImpl::Float32(array.try_into()?)) + } + fn from_float32_array( &self, array: &arrow_array::Float32Array, @@ -909,6 +949,37 @@ macro_rules! converts { }; } +/// Used to convert different types. +macro_rules! converts_with_type { + ($ArrayType:ty, $ArrowType:ty, $FromType:ty, $ToType:ty) => { + impl From<&$ArrayType> for $ArrowType { + fn from(array: &$ArrayType) -> Self { + let values: Vec> = + array.iter().map(|x| x.map(|v| v as $ToType)).collect(); + <$ArrowType>::from_iter(values) + } + } + + impl From<&$ArrowType> for $ArrayType { + fn from(array: &$ArrowType) -> Self { + let values: Vec> = + array.iter().map(|x| x.map(|v| v as $FromType)).collect(); + <$ArrayType>::from_iter(values) + } + } + + impl From<&[$ArrowType]> for $ArrayType { + fn from(arrays: &[$ArrowType]) -> Self { + let values: Vec> = arrays + .iter() + .flat_map(|a| a.iter().map(|x| x.map(|v| v as $FromType))) + .collect(); + <$ArrayType>::from_iter(values) + } + } + }; +} + macro_rules! converts_with_timeunit { ($ArrayType:ty, $ArrowType:ty, $time_unit:expr, @map) => { @@ -961,6 +1032,11 @@ converts!(TimeArray, arrow_array::Time64MicrosecondArray, @map); converts!(IntervalArray, arrow_array::IntervalMonthDayNanoArray, @map); converts!(SerialArray, arrow_array::Int64Array, @map); +converts_with_type!(I16Array, arrow_array::Int8Array, i16, i8); +converts_with_type!(I16Array, arrow_array::UInt8Array, i16, u8); +converts_with_type!(I32Array, arrow_array::UInt16Array, i32, u16); +converts_with_type!(I64Array, arrow_array::UInt32Array, i64, u32); + converts_with_timeunit!(TimestampArray, arrow_array::TimestampSecondArray, TimeUnit::Second, @map); converts_with_timeunit!(TimestampArray, arrow_array::TimestampMillisecondArray, TimeUnit::Millisecond, @map); converts_with_timeunit!(TimestampArray, arrow_array::TimestampMicrosecondArray, TimeUnit::Microsecond, @map); @@ -1172,6 +1248,38 @@ impl TryFrom<&arrow_array::Decimal128Array> for DecimalArray { } } +// Since RisingWave does not support UInt type, convert UInt64Array to Decimal. +impl TryFrom<&arrow_array::UInt64Array> for DecimalArray { + type Error = ArrayError; + + fn try_from(array: &arrow_array::UInt64Array) -> Result { + let from_arrow = |value| { + // Convert the value to a Decimal with scale 0 + let res = Decimal::from(value); + Ok(res) + }; + + // Map over the array and convert each value + array + .iter() + .map(|o| o.map(from_arrow).transpose()) + .collect::>() + } +} + +impl TryFrom<&arrow_array::Float16Array> for F32Array { + type Error = ArrayError; + + fn try_from(array: &arrow_array::Float16Array) -> Result { + let from_arrow = |value| Ok(f32::from(value)); + + array + .iter() + .map(|o| o.map(from_arrow).transpose()) + .collect::>() + } +} + impl TryFrom<&arrow_array::LargeBinaryArray> for DecimalArray { type Error = ArrayError; @@ -1312,6 +1420,7 @@ impl From<&arrow_array::Decimal256Array> for Int256Array { #[cfg(test)] mod tests { + use super::*; #[test] @@ -1356,6 +1465,50 @@ mod tests { assert_eq!(F64Array::from(&arrow), array); } + #[test] + fn int8() { + let array: PrimitiveArray = I16Array::from_iter([None, Some(-128), Some(127)]); + let arr = arrow_array::Int8Array::from(vec![None, Some(-128), Some(127)]); + let converted: PrimitiveArray = (&arr).into(); + assert_eq!(converted, array); + } + + #[test] + fn uint8() { + let array: PrimitiveArray = I16Array::from_iter([None, Some(7), Some(25)]); + let arr = arrow_array::UInt8Array::from(vec![None, Some(7), Some(25)]); + let converted: PrimitiveArray = (&arr).into(); + assert_eq!(converted, array); + } + + #[test] + fn uint16() { + let array: PrimitiveArray = I32Array::from_iter([None, Some(7), Some(65535)]); + let arr = arrow_array::UInt16Array::from(vec![None, Some(7), Some(65535)]); + let converted: PrimitiveArray = (&arr).into(); + assert_eq!(converted, array); + } + + #[test] + fn uint32() { + let array: PrimitiveArray = I64Array::from_iter([None, Some(7), Some(4294967295)]); + let arr = arrow_array::UInt32Array::from(vec![None, Some(7), Some(4294967295)]); + let converted: PrimitiveArray = (&arr).into(); + assert_eq!(converted, array); + } + + #[test] + fn uint64() { + let array: PrimitiveArray = DecimalArray::from_iter([ + None, + Some(Decimal::Normalized("7".parse().unwrap())), + Some(Decimal::Normalized("18446744073709551615".parse().unwrap())), + ]); + let arr = arrow_array::UInt64Array::from(vec![None, Some(7), Some(18446744073709551615)]); + let converted: PrimitiveArray = (&arr).try_into().unwrap(); + assert_eq!(converted, array); + } + #[test] fn date() { let array = DateArray::from_iter([ diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs index 146348545fbc0..c4bfceb1ed897 100644 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -33,9 +33,11 @@ use opendal::Operator; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader}; +use risingwave_common::array::arrow::arrow_schema_udf::{DataType as ArrowDateType, IntervalUnit}; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::ColumnId; +use risingwave_common::types::DataType as RwDataType; use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; use url::Url; @@ -218,10 +220,10 @@ pub fn extract_valid_column_indices( .iter() .position(|&name| name == column.name) .and_then(|pos| { - let arrow_field = IcebergArrowConvert - .to_arrow_field(&column.name, &column.data_type) - .ok()?; - if &arrow_field == converted_arrow_schema.field(pos) { + let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_udf::DataType = converted_arrow_schema.field(pos).data_type(); + let rw_data_type: &risingwave_common::types::DataType = &column.data_type; + + if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) { Some(pos) } else { None @@ -318,3 +320,70 @@ pub async fn get_parquet_fields( converted_arrow_schema.fields; Ok(fields) } + +/// This function checks whether the schema of a Parquet file matches the user defined schema. +/// It handles the following special cases: +/// - Arrow's `timestamp(_, None)` types (all four time units) match with RisingWave's `TimeStamp` type. +/// - Arrow's `timestamp(_, Some)` matches with RisingWave's `TimeStamptz` type. +/// - Since RisingWave does not have an `UInt` type: +/// - Arrow's `UInt8` matches with RisingWave's `Int16`. +/// - Arrow's `UInt16` matches with RisingWave's `Int32`. +/// - Arrow's `UInt32` matches with RisingWave's `Int64`. +/// - Arrow's `UInt64` matches with RisingWave's `Decimal`. +/// - Arrow's `Float16` matches with RisingWave's `Float32`. +fn is_parquet_schema_match_source_schema( + arrow_data_type: &ArrowDateType, + rw_data_type: &RwDataType, +) -> bool { + matches!( + (arrow_data_type, rw_data_type), + (ArrowDateType::Boolean, RwDataType::Boolean) + | ( + ArrowDateType::Int8 | ArrowDateType::Int16 | ArrowDateType::UInt8, + RwDataType::Int16 + ) + | ( + ArrowDateType::Int32 | ArrowDateType::UInt16, + RwDataType::Int32 + ) + | ( + ArrowDateType::Int64 | ArrowDateType::UInt32, + RwDataType::Int64 + ) + | ( + ArrowDateType::UInt64 | ArrowDateType::Decimal128(_, _), + RwDataType::Decimal + ) + | (ArrowDateType::Decimal256(_, _), RwDataType::Int256) + | ( + ArrowDateType::Float16 | ArrowDateType::Float32, + RwDataType::Float32 + ) + | (ArrowDateType::Float64, RwDataType::Float64) + | (ArrowDateType::Timestamp(_, None), RwDataType::Timestamp) + | ( + ArrowDateType::Timestamp(_, Some(_)), + RwDataType::Timestamptz + ) + | (ArrowDateType::Date32, RwDataType::Date) + | ( + ArrowDateType::Time32(_) | ArrowDateType::Time64(_), + RwDataType::Time + ) + | ( + ArrowDateType::Interval(IntervalUnit::MonthDayNano), + RwDataType::Interval + ) + | ( + ArrowDateType::Utf8 | ArrowDateType::LargeUtf8, + RwDataType::Varchar + ) + | ( + ArrowDateType::Binary | ArrowDateType::LargeBinary, + RwDataType::Bytea + ) + | (ArrowDateType::List(_), RwDataType::List(_)) + | (ArrowDateType::Struct(_), RwDataType::Struct(_)) + | (ArrowDateType::Map(_, _), RwDataType::Map(_)) + ) +} From 4839f530e5c6c312232ee914a77395e7d738ab24 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 28 Nov 2024 16:31:33 +0800 Subject: [PATCH 06/10] fix --- src/common/src/array/arrow/arrow_impl.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index e7f6d938a3714..05ecb6b4898c8 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -42,8 +42,7 @@ use std::fmt::Write; -use arrow_53_schema::TimeUnit; -use arrow_array::array; +use arrow_schema::TimeUnit; use arrow_array::cast::AsArray; use arrow_array_iceberg::array; use arrow_buffer::OffsetBuffer; From 6c9386c176657ec0bbb83d7c31ef1015f01b1ec3 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 28 Nov 2024 16:42:22 +0800 Subject: [PATCH 07/10] fmt --- src/common/src/array/arrow/arrow_impl.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/array/arrow/arrow_impl.rs b/src/common/src/array/arrow/arrow_impl.rs index 05ecb6b4898c8..2695cde8a2b5c 100644 --- a/src/common/src/array/arrow/arrow_impl.rs +++ b/src/common/src/array/arrow/arrow_impl.rs @@ -42,10 +42,10 @@ use std::fmt::Write; -use arrow_schema::TimeUnit; use arrow_array::cast::AsArray; use arrow_array_iceberg::array; use arrow_buffer::OffsetBuffer; +use arrow_schema::TimeUnit; use chrono::{DateTime, NaiveDateTime, NaiveTime}; use itertools::Itertools; From 09b2309b60ea06852a022b294a3d6a18ad508bc8 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 28 Nov 2024 19:55:24 +0800 Subject: [PATCH 08/10] cargo build pass --- src/batch/src/executor/s3_file_scan.rs | 37 +- src/connector/src/parser/parquet_parser.rs | 10 +- .../opendal_source/opendal_reader.rs | 103 ++++- src/connector/src/source/iceberg/mod.rs | 4 +- .../source/iceberg/parquet_file_handler.rs | 389 ------------------ .../src/source/iceberg/parquet_file_reader.rs | 132 ++++++ src/frontend/src/expr/table_function.rs | 15 +- 7 files changed, 264 insertions(+), 426 deletions(-) delete mode 100644 src/connector/src/source/iceberg/parquet_file_handler.rs create mode 100644 src/connector/src/source/iceberg/parquet_file_reader.rs diff --git a/src/batch/src/executor/s3_file_scan.rs b/src/batch/src/executor/s3_file_scan.rs index 38907c63f8416..a7b0d1bacd791 100644 --- a/src/batch/src/executor/s3_file_scan.rs +++ b/src/batch/src/executor/s3_file_scan.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use anyhow::anyhow; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; +use parquet::arrow::ProjectionMask; +use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::catalog::{Field, Schema}; -use risingwave_connector::source::iceberg::{new_s3_operator, read_parquet_file}; +use risingwave_connector::source::iceberg::parquet_file_reader::create_parquet_stream_builder; use risingwave_pb::batch_plan::file_scan_node; use risingwave_pb::batch_plan::file_scan_node::StorageType; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -82,18 +85,34 @@ impl S3FileScanExecutor { async fn do_execute(self: Box) { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { - let op = new_s3_operator( + let mut batch_stream_builder = create_parquet_stream_builder( self.s3_region.clone(), self.s3_access_key.clone(), self.s3_secret_key.clone(), - file.clone(), - )?; - let chunk_stream = read_parquet_file(op, file, None, None, self.batch_size, 0).await?; + file, + ) + .await?; + + let arrow_schema = batch_stream_builder.schema(); + assert_eq!(arrow_schema.fields.len(), self.schema.fields.len()); + for (field, arrow_field) in self.schema.fields.iter().zip(arrow_schema.fields.iter()) { + assert_eq!(*field.name, *arrow_field.name()); + } + + batch_stream_builder = batch_stream_builder.with_projection(ProjectionMask::all()); + + batch_stream_builder = batch_stream_builder.with_batch_size(self.batch_size); + + let record_batch_stream = batch_stream_builder + .build() + .map_err(|e| anyhow!(e).context("fail to build arrow stream builder"))?; + #[for_await] - for stream_chunk in chunk_stream { - let stream_chunk = stream_chunk?; - let (data_chunk, _) = stream_chunk.into_parts(); - yield data_chunk; + for record_batch in record_batch_stream { + let record_batch = record_batch?; + let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; + debug_assert_eq!(chunk.data_types(), self.schema.data_types()); + yield chunk; } } } diff --git a/src/connector/src/parser/parquet_parser.rs b/src/connector/src/parser/parquet_parser.rs index 78ee4aeb6b568..815f5891b9e66 100644 --- a/src/connector/src/parser/parquet_parser.rs +++ b/src/connector/src/parser/parquet_parser.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use arrow_array_iceberg::RecordBatch; use deltalake::parquet::arrow::async_reader::AsyncFileReader; use futures_async_stream::try_stream; -use risingwave_common::array::arrow::arrow_array_iceberg::RecordBatch; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::{ArrayBuilderImpl, DataChunk, StreamChunk}; use risingwave_common::bail; @@ -88,11 +87,10 @@ impl ParquetParser { /// # Returns /// /// A `StreamChunk` containing the converted data from the `RecordBatch`. - - // The hidden columns that must be included here are _rw_file and _rw_offset. - // Depending on whether the user specifies a primary key (pk), there may be an additional hidden column row_id. - // Therefore, the maximum number of hidden columns is three. - + /// + /// The hidden columns that must be included here are `_rw_file` and `_rw_offset`. + /// Depending on whether the user specifies a primary key (pk), there may be an additional hidden column `row_id`. + /// Therefore, the maximum number of hidden columns is three. fn convert_record_batch_to_stream_chunk( &mut self, record_batch: RecordBatch, diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index ca8ee1ae486b2..0b455e8349573 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -19,19 +19,25 @@ use async_compression::tokio::bufread::GzipDecoder; use async_trait::async_trait; use futures::TryStreamExt; use futures_async_stream::try_stream; +use itertools::Itertools; use opendal::Operator; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::FileMetaData; +use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::StreamChunk; +use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; use tokio::io::{AsyncRead, BufReader}; use tokio_util::io::{ReaderStream, StreamReader}; use super::opendal_enumerator::OpendalEnumerator; use super::OpendalSource; use crate::error::ConnectorResult; +use crate::parser::parquet_parser::ParquetParser; use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig}; use crate::source::filesystem::file_common::CompressionFormat; use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::{nd_streaming, OpendalFsSplit}; -use crate::source::iceberg::read_parquet_file; use crate::source::{ BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, SplitReader, @@ -86,15 +92,38 @@ impl OpendalReader { let msg_stream; if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config { - msg_stream = read_parquet_file( - self.connector.op.clone(), + // // If the format is "parquet", use `ParquetParser` to convert `record_batch` into stream chunk. + let mut reader: tokio_util::compat::Compat = self + .connector + .op + .reader_with(&object_name) + .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. + .await? + .into_futures_async_read(..) + .await? + .compat(); + let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; + + let file_metadata = parquet_metadata.file_metadata(); + let column_indices = + extract_valid_column_indices(self.columns.clone(), file_metadata)?; + let projection_mask = + ProjectionMask::leaves(file_metadata.schema_descr(), column_indices); + // For the Parquet format, we directly convert from a record batch to a stream chunk. + // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file. + let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader) + .await? + .with_batch_size(self.source_ctx.source_ctrl_opts.chunk_size) + .with_projection(projection_mask) + .with_offset(split.offset) + .build()?; + + let parquet_parser = ParquetParser::new( + self.parser_config.common.rw_columns.clone(), object_name, - self.columns.clone(), - Some(self.parser_config.common.rw_columns.clone()), - self.source_ctx.source_ctrl_opts.chunk_size, split.offset, - ) - .await?; + )?; + msg_stream = parquet_parser.into_stream(record_batch_stream); } else { let data_stream = Self::stream_read_object( self.connector.op.clone(), @@ -201,3 +230,61 @@ impl OpendalReader { } } } + +/// Extracts valid column indices from a Parquet file schema based on the user's requested schema. +/// +/// This function is used for column pruning of Parquet files. It calculates the intersection +/// between the columns in the currently read Parquet file and the schema provided by the user. +/// This is useful for reading a `RecordBatch` with the appropriate `ProjectionMask`, ensuring that +/// only the necessary columns are read. +/// +/// # Parameters +/// - `columns`: A vector of `Column` representing the user's requested schema. +/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file. +/// +/// # Returns +/// - A `ConnectorResult>`, which contains the indices of the valid columns in the +/// Parquet file schema that match the requested schema. If an error occurs during processing, +/// it returns an appropriate error. +pub fn extract_valid_column_indices( + columns: Option>, + metadata: &FileMetaData, +) -> ConnectorResult> { + match columns { + Some(rw_columns) => { + let parquet_column_names = metadata + .schema_descr() + .columns() + .iter() + .map(|c| c.name()) + .collect_vec(); + + let converted_arrow_schema = + parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) + .map_err(anyhow::Error::from)?; + + let valid_column_indices: Vec = rw_columns + .iter() + .filter_map(|column| { + parquet_column_names + .iter() + .position(|&name| name == column.name) + .and_then(|pos| { + // We should convert Arrow field to the rw data type instead of converting the rw data type to the Arrow data type for comparison. + // The reason is that for the timestamp type, the different time units in Arrow need to match with the timestamp and timestamptz in rw. + let arrow_filed_to_rw_data_type = IcebergArrowConvert + .type_from_field(converted_arrow_schema.field(pos)) + .ok()?; + if arrow_filed_to_rw_data_type == column.data_type { + Some(pos) + } else { + None + } + }) + }) + .collect(); + Ok(valid_column_indices) + } + None => Ok(vec![]), + } +} diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index c3f0ed1dce660..f101ff9ed6d4b 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod parquet_file_handler; +pub mod parquet_file_reader; use std::collections::HashMap; @@ -22,7 +22,7 @@ use futures_async_stream::for_await; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use itertools::Itertools; -pub use parquet_file_handler::*; +pub use parquet_file_reader::*; use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::types::JsonbVal; diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs deleted file mode 100644 index c4bfceb1ed897..0000000000000 --- a/src/connector/src/source/iceberg/parquet_file_handler.rs +++ /dev/null @@ -1,389 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::future::IntoFuture; -use std::ops::Range; -use std::pin::Pin; -use std::sync::Arc; - -use anyhow::anyhow; -use bytes::Bytes; -use futures::future::BoxFuture; -use futures::{FutureExt, Stream, TryFutureExt}; -use iceberg::io::{ - FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY, -}; -use iceberg::{Error, ErrorKind}; -use itertools::Itertools; -use opendal::layers::{LoggingLayer, RetryLayer}; -use opendal::services::S3; -use opendal::Operator; -use parquet::arrow::async_reader::AsyncFileReader; -use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; -use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader}; -use risingwave_common::array::arrow::arrow_schema_udf::{DataType as ArrowDateType, IntervalUnit}; -use risingwave_common::array::arrow::IcebergArrowConvert; -use risingwave_common::array::StreamChunk; -use risingwave_common::catalog::ColumnId; -use risingwave_common::types::DataType as RwDataType; -use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; -use url::Url; - -use crate::error::ConnectorResult; -use crate::parser::ParquetParser; -use crate::source::{Column, SourceColumnDesc}; - -pub struct ParquetFileReader { - meta: FileMetadata, - r: R, -} - -impl ParquetFileReader { - pub fn new(meta: FileMetadata, r: R) -> Self { - Self { meta, r } - } -} - -impl AsyncFileReader for ParquetFileReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - Box::pin( - self.r - .read(range.start as _..range.end as _) - .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))), - ) - } - - fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { - async move { - let reader = ParquetMetaDataReader::new(); - let size = self.meta.size as usize; - let meta = reader.load_and_finish(self, size).await?; - - Ok(Arc::new(meta)) - } - .boxed() - } -} - -pub async fn create_parquet_stream_builder( - s3_region: String, - s3_access_key: String, - s3_secret_key: String, - location: String, -) -> Result>, anyhow::Error> { - let mut props = HashMap::new(); - props.insert(S3_REGION, s3_region.clone()); - props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone()); - props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone()); - - let file_io_builder = FileIOBuilder::new("s3"); - let file_io = file_io_builder - .with_props(props.into_iter()) - .build() - .map_err(|e| anyhow!(e))?; - let parquet_file = file_io.new_input(&location).map_err(|e| anyhow!(e))?; - - let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?; - let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?; - let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader); - - ParquetRecordBatchStreamBuilder::new(parquet_file_reader) - .await - .map_err(|e| anyhow!(e)) -} - -pub fn new_s3_operator( - s3_region: String, - s3_access_key: String, - s3_secret_key: String, - location: String, -) -> ConnectorResult { - // Create s3 builder. - let bucket = extract_bucket(&location); - let mut builder = S3::default().bucket(&bucket).region(&s3_region); - builder = builder.secret_access_key(&s3_access_key); - builder = builder.secret_access_key(&s3_secret_key); - builder = builder.endpoint(&format!( - "https://{}.s3.{}.amazonaws.com", - bucket, s3_region - )); - - builder = builder.disable_config_load(); - - let op: Operator = Operator::new(builder)? - .layer(LoggingLayer::default()) - .layer(RetryLayer::default()) - .finish(); - - Ok(op) -} - -fn extract_bucket(location: &str) -> String { - let prefix = "s3://"; - let start = prefix.len(); - let end = location[start..] - .find('/') - .unwrap_or(location.len() - start); - location[start..start + end].to_string() -} - -pub async fn list_s3_directory( - s3_region: String, - s3_access_key: String, - s3_secret_key: String, - dir: String, -) -> Result, anyhow::Error> { - let url = Url::parse(&dir)?; - let bucket = url.host_str().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, missing bucket", dir), - ) - })?; - - let prefix = format!("s3://{}/", bucket); - if dir.starts_with(&prefix) { - let mut builder = S3::default(); - builder = builder - .region(&s3_region) - .access_key_id(&s3_access_key) - .secret_access_key(&s3_secret_key) - .bucket(bucket); - let op = Operator::new(builder)? - .layer(RetryLayer::default()) - .finish(); - - op.list(&dir[prefix.len()..]) - .await - .map_err(|e| anyhow!(e)) - .map(|list| { - list.into_iter() - .map(|entry| prefix.to_string() + entry.path()) - .collect() - }) - } else { - Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, should start with {}", dir, prefix), - ))? - } -} - -/// Extracts valid column indices from a Parquet file schema based on the user's requested schema. -/// -/// This function is used for column pruning of Parquet files. It calculates the intersection -/// between the columns in the currently read Parquet file and the schema provided by the user. -/// This is useful for reading a `RecordBatch` with the appropriate `ProjectionMask`, ensuring that -/// only the necessary columns are read. -/// -/// # Parameters -/// - `columns`: A vector of `Column` representing the user's requested schema. -/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file. -/// -/// # Returns -/// - A `ConnectorResult>`, which contains the indices of the valid columns in the -/// Parquet file schema that match the requested schema. If an error occurs during processing, -/// it returns an appropriate error. -pub fn extract_valid_column_indices( - columns: Option>, - metadata: &FileMetaData, -) -> ConnectorResult> { - match columns { - Some(rw_columns) => { - let parquet_column_names = metadata - .schema_descr() - .columns() - .iter() - .map(|c| c.name()) - .collect_vec(); - - let converted_arrow_schema = - parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) - .map_err(anyhow::Error::from)?; - - let valid_column_indices: Vec = rw_columns - .iter() - .filter_map(|column| { - parquet_column_names - .iter() - .position(|&name| name == column.name) - .and_then(|pos| { - let arrow_data_type: &risingwave_common::array::arrow::arrow_schema_udf::DataType = converted_arrow_schema.field(pos).data_type(); - let rw_data_type: &risingwave_common::types::DataType = &column.data_type; - - if is_parquet_schema_match_source_schema(arrow_data_type, rw_data_type) { - Some(pos) - } else { - None - } - }) - }) - .collect(); - Ok(valid_column_indices) - } - None => Ok(vec![]), - } -} - -/// Reads a specified Parquet file and converts its content into a stream of chunks. -pub async fn read_parquet_file( - op: Operator, - file_name: String, - rw_columns: Option>, - parser_columns: Option>, - batch_size: usize, - offset: usize, -) -> ConnectorResult< - Pin> + Send>>, -> { - let mut reader: tokio_util::compat::Compat = op - .reader_with(&file_name) - .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. - .await? - .into_futures_async_read(..) - .await? - .compat(); - let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; - - let file_metadata = parquet_metadata.file_metadata(); - let column_indices = extract_valid_column_indices(rw_columns, file_metadata)?; - let projection_mask = ProjectionMask::leaves(file_metadata.schema_descr(), column_indices); - // For the Parquet format, we directly convert from a record batch to a stream chunk. - // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file. - let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader) - .await? - .with_batch_size(batch_size) - .with_projection(projection_mask) - .with_offset(offset) - .build()?; - let converted_arrow_schema = parquet_to_arrow_schema( - file_metadata.schema_descr(), - file_metadata.key_value_metadata(), - ) - .map_err(anyhow::Error::from)?; - let columns = match parser_columns { - Some(columns) => columns, - None => converted_arrow_schema - .fields - .iter() - .enumerate() - .map(|(index, field_ref)| { - let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap(); - SourceColumnDesc::simple( - field_ref.name().clone(), - data_type, - ColumnId::new(index as i32), - ) - }) - .collect(), - }; - - let parquet_parser = ParquetParser::new(columns, file_name, offset)?; - let msg_stream: Pin< - Box> + Send>, - > = parquet_parser.into_stream(record_batch_stream); - Ok(msg_stream) -} - -pub async fn get_parquet_fields( - op: Operator, - file_name: String, -) -> ConnectorResult { - let mut reader: tokio_util::compat::Compat = op - .reader_with(&file_name) - .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. - .await? - .into_futures_async_read(..) - .await? - .compat(); - let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; - - let file_metadata = parquet_metadata.file_metadata(); - let converted_arrow_schema = parquet_to_arrow_schema( - file_metadata.schema_descr(), - file_metadata.key_value_metadata(), - ) - .map_err(anyhow::Error::from)?; - let fields: risingwave_common::array::arrow::arrow_schema_udf::Fields = - converted_arrow_schema.fields; - Ok(fields) -} - -/// This function checks whether the schema of a Parquet file matches the user defined schema. -/// It handles the following special cases: -/// - Arrow's `timestamp(_, None)` types (all four time units) match with RisingWave's `TimeStamp` type. -/// - Arrow's `timestamp(_, Some)` matches with RisingWave's `TimeStamptz` type. -/// - Since RisingWave does not have an `UInt` type: -/// - Arrow's `UInt8` matches with RisingWave's `Int16`. -/// - Arrow's `UInt16` matches with RisingWave's `Int32`. -/// - Arrow's `UInt32` matches with RisingWave's `Int64`. -/// - Arrow's `UInt64` matches with RisingWave's `Decimal`. -/// - Arrow's `Float16` matches with RisingWave's `Float32`. -fn is_parquet_schema_match_source_schema( - arrow_data_type: &ArrowDateType, - rw_data_type: &RwDataType, -) -> bool { - matches!( - (arrow_data_type, rw_data_type), - (ArrowDateType::Boolean, RwDataType::Boolean) - | ( - ArrowDateType::Int8 | ArrowDateType::Int16 | ArrowDateType::UInt8, - RwDataType::Int16 - ) - | ( - ArrowDateType::Int32 | ArrowDateType::UInt16, - RwDataType::Int32 - ) - | ( - ArrowDateType::Int64 | ArrowDateType::UInt32, - RwDataType::Int64 - ) - | ( - ArrowDateType::UInt64 | ArrowDateType::Decimal128(_, _), - RwDataType::Decimal - ) - | (ArrowDateType::Decimal256(_, _), RwDataType::Int256) - | ( - ArrowDateType::Float16 | ArrowDateType::Float32, - RwDataType::Float32 - ) - | (ArrowDateType::Float64, RwDataType::Float64) - | (ArrowDateType::Timestamp(_, None), RwDataType::Timestamp) - | ( - ArrowDateType::Timestamp(_, Some(_)), - RwDataType::Timestamptz - ) - | (ArrowDateType::Date32, RwDataType::Date) - | ( - ArrowDateType::Time32(_) | ArrowDateType::Time64(_), - RwDataType::Time - ) - | ( - ArrowDateType::Interval(IntervalUnit::MonthDayNano), - RwDataType::Interval - ) - | ( - ArrowDateType::Utf8 | ArrowDateType::LargeUtf8, - RwDataType::Varchar - ) - | ( - ArrowDateType::Binary | ArrowDateType::LargeBinary, - RwDataType::Bytea - ) - | (ArrowDateType::List(_), RwDataType::List(_)) - | (ArrowDateType::Struct(_), RwDataType::Struct(_)) - | (ArrowDateType::Map(_, _), RwDataType::Map(_)) - ) -} diff --git a/src/connector/src/source/iceberg/parquet_file_reader.rs b/src/connector/src/source/iceberg/parquet_file_reader.rs new file mode 100644 index 0000000000000..bbd5b0a64b7fe --- /dev/null +++ b/src/connector/src/source/iceberg/parquet_file_reader.rs @@ -0,0 +1,132 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::ops::Range; +use std::sync::Arc; + +use anyhow::anyhow; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::TryFutureExt; +use iceberg::io::{ + FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY, +}; +use iceberg::{Error, ErrorKind}; +use opendal::layers::RetryLayer; +use opendal::services::S3; +use opendal::Operator; +use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::file::metadata::ParquetMetaData; +use url::Url; + +pub struct ParquetFileReader { + meta: FileMetadata, + r: R, +} + +impl ParquetFileReader { + pub fn new(meta: FileMetadata, r: R) -> Self { + Self { meta, r } + } +} + +impl AsyncFileReader for ParquetFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + Box::pin( + self.r + .read(range.start as _..range.end as _) + .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))), + ) + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { + let file_size = self.meta.size; + let mut loader = MetadataLoader::load(self, file_size as usize, None).await?; + loader.load_page_index(false, false).await?; + Ok(Arc::new(loader.finish())) + }) + } +} + +pub async fn create_parquet_stream_builder( + s3_region: String, + s3_access_key: String, + s3_secret_key: String, + location: String, +) -> Result>, anyhow::Error> { + let mut props = HashMap::new(); + props.insert(S3_REGION, s3_region.clone()); + props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone()); + props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone()); + + let file_io_builder = FileIOBuilder::new("s3"); + let file_io = file_io_builder + .with_props(props.into_iter()) + .build() + .map_err(|e| anyhow!(e))?; + let parquet_file = file_io.new_input(&location).map_err(|e| anyhow!(e))?; + + let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?; + let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?; + let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader); + + ParquetRecordBatchStreamBuilder::new(parquet_file_reader) + .await + .map_err(|e| anyhow!(e)) +} + +pub async fn list_s3_directory( + s3_region: String, + s3_access_key: String, + s3_secret_key: String, + dir: String, +) -> Result, anyhow::Error> { + let url = Url::parse(&dir)?; + let bucket = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {}, missing bucket", dir), + ) + })?; + + let prefix = format!("s3://{}/", bucket); + if dir.starts_with(&prefix) { + let mut builder = S3::default(); + + builder.region(&s3_region); + builder.access_key_id(&s3_access_key); + builder.secret_access_key(&s3_secret_key); + builder.bucket(bucket); + let op = Operator::new(builder)? + .layer(RetryLayer::default()) + .finish(); + + op.list(&dir[prefix.len()..]) + .await + .map_err(|e| anyhow!(e)) + .map(|list| { + list.into_iter() + .map(|entry| prefix.to_string() + entry.path()) + .collect() + }) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {}, should start with {}", dir, prefix), + ))? + } +} diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 4853d1332b810..5806eea792904 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -17,9 +17,7 @@ use std::sync::{Arc, LazyLock}; use itertools::Itertools; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; -use risingwave_connector::source::iceberg::{ - get_parquet_fields, list_s3_directory, new_s3_operator, -}; +use risingwave_connector::source::iceberg::{create_parquet_stream_builder, list_s3_directory}; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; use tokio::runtime::Runtime; @@ -180,7 +178,7 @@ impl TableFunction { let schema = tokio::task::block_in_place(|| { RUNTIME.block_on(async { - let op = new_s3_operator( + let parquet_stream_builder = create_parquet_stream_builder( eval_args[2].clone(), eval_args[3].clone(), eval_args[4].clone(), @@ -188,18 +186,11 @@ impl TableFunction { Some(files) => files[0].clone(), None => eval_args[5].clone(), }, - )?; - let fields = get_parquet_fields( - op, - match files.as_ref() { - Some(files) => files[0].clone(), - None => eval_args[5].clone(), - }, ) .await?; let mut rw_types = vec![]; - for field in &fields { + for field in parquet_stream_builder.schema().fields() { rw_types.push(( field.name().to_string(), IcebergArrowConvert.type_from_field(field)?, From a8b3736eec3da09e8bffcee6242a16b3b099c2e8 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 28 Nov 2024 22:02:32 +0800 Subject: [PATCH 09/10] resolve conflict --- e2e_test/s3/file_sink.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/e2e_test/s3/file_sink.py b/e2e_test/s3/file_sink.py index 2232dc8534b2a..cca36bc413f2c 100644 --- a/e2e_test/s3/file_sink.py +++ b/e2e_test/s3/file_sink.py @@ -338,16 +338,13 @@ def _assert_eq(field, got, expect): _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) print('File sink test pass!') -<<<<<<<< HEAD:e2e_test/s3/fs_parquet_source_and_sink.py cur.execute(f'drop sink test_file_sink') cur.execute(f'drop table test_sink_table') -======== - cur.execute(f'drop sink test_file_sink_parquet') cur.execute(f'drop table test_parquet_sink_table') cur.execute(f'drop sink test_file_sink_json') cur.execute(f'drop table test_json_sink_table') cur.execute(f'drop table s3_test_parquet') ->>>>>>>> f3e9a3be19 (refactor(test): reorganize file connector CI tests (#19230)):e2e_test/s3/file_sink.py + cur.close() conn.close() From ed93f0dccbc702f0cca8fc0eb5f93e55f598509f Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Fri, 29 Nov 2024 12:51:41 +0800 Subject: [PATCH 10/10] update test file --- e2e_test/s3/file_sink.py | 60 ++++++++++++++++++-------------------- e2e_test/s3/file_source.py | 28 ++++++++++++++++-- 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/e2e_test/s3/file_sink.py b/e2e_test/s3/file_sink.py index cca36bc413f2c..a64f40d0692df 100644 --- a/e2e_test/s3/file_sink.py +++ b/e2e_test/s3/file_sink.py @@ -93,11 +93,11 @@ def _table(): ) WITH ( connector = 's3', match_pattern = '*.parquet', - s3.region_name = '{config['S3_REGION']}', - s3.bucket_name = '{config['S3_BUCKET']}', - s3.credentials.access = '{config['S3_ACCESS_KEY']}', - s3.credentials.secret = '{config['S3_SECRET_KEY']}', - s3.endpoint_url = 'https://{config['S3_ENDPOINT']}', + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', refresh.interval.sec = 1, ) FORMAT PLAIN ENCODE PARQUET;''') @@ -144,7 +144,7 @@ def _table(): return 's3_test_parquet' # Execute a SELECT statement - cur.execute(f'''CREATE sink test_file_sink as select + cur.execute(f'''CREATE sink test_file_sink_parquet as select id, name, sex, @@ -173,18 +173,17 @@ def _table(): from {_table()} WITH ( connector = 's3', match_pattern = '*.parquet', - s3.region_name = '{config['S3_REGION']}', - s3.bucket_name = '{config['S3_BUCKET']}', - s3.credentials.access = '{config['S3_ACCESS_KEY']}', - s3.credentials.secret = '{config['S3_SECRET_KEY']}', - s3.endpoint_url = 'https://{config['S3_ENDPOINT']}', - s3.path = '', - s3.file_type = 'parquet', + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', + s3.path = 'test_parquet_sink/', type = 'append-only', force_append_only='true' ) FORMAT PLAIN ENCODE PARQUET(force_append_only='true');''') - print('Sink into s3...') + print('Sink into s3 in parquet encode...') # Execute a SELECT statement cur.execute(f'''CREATE TABLE test_parquet_sink_table( id bigint primary key,\ @@ -214,18 +213,18 @@ def _table(): test_timestamptz_ns timestamptz ) WITH ( connector = 's3', - match_pattern = '*.parquet', - s3.region_name = '{config['S3_REGION']}', - s3.bucket_name = '{config['S3_BUCKET']}', - s3.credentials.access = '{config['S3_ACCESS_KEY']}', - s3.credentials.secret = '{config['S3_SECRET_KEY']}', - s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + match_pattern = 'test_parquet_sink/*.parquet', + s3.region_name = 'custom', + s3.bucket_name = 'hummock001', + s3.credentials.access = 'hummockadmin', + s3.credentials.secret = 'hummockadmin', + s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', ) FORMAT PLAIN ENCODE PARQUET;''') total_rows = file_num * item_num_per_file MAX_RETRIES = 40 for retry_no in range(MAX_RETRIES): - cur.execute(f'select count(*) from test_sink_table') + cur.execute(f'select count(*) from test_parquet_sink_table') result = cur.fetchone() if result[0] == total_rows: break @@ -338,13 +337,11 @@ def _assert_eq(field, got, expect): _assert_eq('sum(id)', result[1], (total_rows - 1) * total_rows / 2) print('File sink test pass!') - cur.execute(f'drop sink test_file_sink') - cur.execute(f'drop table test_sink_table') + cur.execute(f'drop sink test_file_sink_parquet') cur.execute(f'drop table test_parquet_sink_table') cur.execute(f'drop sink test_file_sink_json') cur.execute(f'drop table test_json_sink_table') cur.execute(f'drop table s3_test_parquet') - cur.close() conn.close() @@ -373,7 +370,6 @@ def test_file_sink_batching(): s3.credentials.secret = 'hummockadmin', s3.endpoint_url = 'http://hummock001.127.0.0.1:9301', s3.path = 'test_file_sink_batching/', - s3.file_type = 'parquet', type = 'append-only', rollover_seconds = 5, max_row_count = 5, @@ -476,10 +472,10 @@ def _assert_greater(field, got, expect): config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) client = Minio( - config["S3_ENDPOINT"], - access_key=config["S3_ACCESS_KEY"], - secret_key=config["S3_SECRET_KEY"], - secure=True, + "127.0.0.1:9301", + "hummockadmin", + "hummockadmin", + secure=False, ) run_id = str(random.randint(1000, 9999)) _local = lambda idx: f'data_{idx}.parquet' @@ -491,7 +487,7 @@ def _assert_greater(field, got, expect): pq.write_table(table, _local(idx)) client.fput_object( - config["S3_BUCKET"], + "hummock001", _s3(idx), _local(idx) ) @@ -501,13 +497,13 @@ def _assert_greater(field, got, expect): # clean up s3 files for idx, _ in enumerate(data): - client.remove_object(config["S3_BUCKET"], _s3(idx)) + client.remove_object("hummock001", _s3(idx)) do_sink(config, FILE_NUM, ITEM_NUM_PER_FILE, run_id) # clean up s3 files for idx, _ in enumerate(data): - client.remove_object(config["S3_BUCKET"], _s3(idx)) + client.remove_object("hummock001", _s3(idx)) # test file sink batching test_file_sink_batching() \ No newline at end of file diff --git a/e2e_test/s3/file_source.py b/e2e_test/s3/file_source.py index 4fdd2b475aeb9..50eb60b1cef5b 100644 --- a/e2e_test/s3/file_source.py +++ b/e2e_test/s3/file_source.py @@ -65,13 +65,21 @@ def _encode(): else: return f"CSV (delimiter = ',', without_header = {str('without' in fmt).lower()})" + def _include_clause(): + if fmt == 'json': + return 'INCLUDE payload as rw_payload' + else: + return '' + # Execute a SELECT statement cur.execute(f'''CREATE TABLE {_table()}( id int, name TEXT, sex int, mark int, - ) WITH ( + ) + {_include_clause()} + WITH ( connector = 's3', match_pattern = '{prefix}*.{fmt}', s3.region_name = '{config['S3_REGION']}', @@ -107,7 +115,21 @@ def _assert_eq(field, got, expect): _assert_eq('sum(sex)', result[2], total_rows / 2) _assert_eq('sum(mark)', result[3], 0) - print('Test pass') + # only do payload check for json format, which enables INCLUDE CLAUSE + if fmt == 'json': + # check rw_payload + print('Check rw_payload') + stmt = f"select id, name, sex, mark, rw_payload from {_table()} limit 1;" + cur.execute(stmt) + result = cur.fetchone() + print("Got one line with rw_payload: ", result) + payload = result[4] + _assert_eq('id', payload['id'], result[0]) + _assert_eq('name', payload['name'], result[1]) + _assert_eq('sex', payload['sex'], result[2]) + _assert_eq('mark', payload['mark'], result[3]) + + print('Test pass') if need_drop_table: cur.execute(f'drop table {_table()}') @@ -296,4 +318,4 @@ def _table(): # clean up s3 files for idx, _ in enumerate(formatted_files): - client.remove_object(config["S3_BUCKET"], _s3(idx, 0)) + client.remove_object(config["S3_BUCKET"], _s3(idx, 0)) \ No newline at end of file