diff --git a/build.sbt b/build.sbt index 4d585b933..a79156c86 100644 --- a/build.sbt +++ b/build.sbt @@ -55,9 +55,6 @@ lazy val testScalastyle = taskKey[Unit]("testScalastyle") // - .inAll applies the rule to all dependencies, not just direct dependencies val packagesToShade = Seq( "com.amazonaws.cloudwatch.**", - "com.fasterxml.jackson.core.**", - "com.fasterxml.jackson.dataformat.**", - "com.fasterxml.jackson.databind.**", "com.google.**", "com.sun.jna.**", "com.thoughtworks.paranamer.**", @@ -325,6 +322,28 @@ lazy val integtest = (project in file("integ-test")) lazy val integration = taskKey[Unit]("Run integration tests") lazy val awsIntegration = taskKey[Unit]("Run AWS integration tests") +lazy val e2etest = (project in file("e2e-test")) + .dependsOn(flintCommons % "test->package", flintSparkIntegration % "test->package", pplSparkIntegration % "test->package", sparkSqlApplication % "test->package") + .settings( + commonSettings, + name := "e2e-test", + scalaVersion := scala212, + libraryDependencies ++= Seq( + "org.scalatest" %% "scalatest" % "3.2.15" % "test", + "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.3" % "test", + "com.amazonaws" % "aws-java-sdk-s3" % "1.12.568" % "test", + "com.softwaremill.sttp.client3" %% "core" % "3.10.2" % "test", + "com.softwaremill.sttp.client3" %% "play2-json" % "3.10.2", + "com.typesafe.play" %% "play-json" % "2.9.2" % "test", + ), + libraryDependencies ++= deps(sparkVersion), + javaOptions ++= Seq( + s"-DappJar=${(sparkSqlApplication / assembly).value.getAbsolutePath}", + s"-DextensionJar=${(flintSparkIntegration / assembly).value.getAbsolutePath}", + s"-DpplJar=${(pplSparkIntegration / assembly).value.getAbsolutePath}", + ) + ) + lazy val standaloneCosmetic = project .settings( name := "opensearch-spark-standalone", diff --git a/docker/integ-test/configuration-updater/apply-configuration.sh b/docker/integ-test/configuration-updater/apply-configuration.sh index 7946c75cd..4d2150b5f 100644 --- a/docker/integ-test/configuration-updater/apply-configuration.sh +++ b/docker/integ-test/configuration-updater/apply-configuration.sh @@ -20,13 +20,26 @@ curl -q \ -H 'Content-Type: application/json' \ -d '{"name": "integ-test", "versioning": {"enabled": true, "excludePrefixes": [], "excludeFolders": false}, "locking": true}' \ http://minio-S3:9001/api/v1/buckets -# Create the access key +# Create the test-resources bucket curl -q \ -b /tmp/minio-cookies.txt \ -X POST \ -H 'Content-Type: application/json' \ - -d "{\"policy\": \"\", \"accessKey\": \"${S3_ACCESS_KEY}\", \"secretKey\": \"${S3_SECRET_KEY}\", \"description\": \"\", \"comment\": \"\", \"name\": \"\", \"expiry\": null}" \ - http://minio-S3:9001/api/v1/service-account-credentials + -d '{"name": "test-resources", "versioning": {"enabled": false, "excludePrefixes": [], "excludeFolders": false}, "locking": true}' \ + http://minio-S3:9001/api/v1/buckets +# Create the access key +curl -q \ + -b /tmp/minio-cookies.txt \ + -X GET + "http://minio-S3:9001/api/v1/service-accounts/${S3_ACCESS_KEY}" +if [ "$?" -ne "0" ]; then + curl -q \ + -b /tmp/minio-cookies.txt \ + -X POST \ + -H 'Content-Type: application/json' \ + -d "{\"policy\": \"\", \"accessKey\": \"${S3_ACCESS_KEY}\", \"secretKey\": \"${S3_SECRET_KEY}\", \"description\": \"\", \"comment\": \"\", \"name\": \"\", \"expiry\": null}" \ + http://minio-S3:9001/api/v1/service-account-credentials +fi # Login to OpenSearch Dashboards echo ">>> Login to OpenSearch dashboards" @@ -43,31 +56,38 @@ if [ "$?" -eq "0" ]; then else echo " >>> Login failed" fi + # Create the S3/Glue datasource -echo ">>> Creating datasource" curl -q \ -b /tmp/opensearch-cookies.txt \ - -X POST \ - -H 'Content-Type: application/json' \ - -H 'Osd-Version: 2.18.0' \ - -H 'Osd-Xsrf: fetch' \ - -d "{\"name\": \"mys3\", \"allowedRoles\": [], \"connector\": \"s3glue\", \"properties\": {\"glue.auth.type\": \"iam_role\", \"glue.auth.role_arn\": \"arn:aws:iam::123456789012:role/S3Access\", \"glue.indexstore.opensearch.uri\": \"http://opensearch:9200\", \"glue.indexstore.opensearch.auth\": \"basicauth\", \"glue.indexstore.opensearch.auth.username\": \"admin\", \"glue.indexstore.opensearch.auth.password\": \"${OPENSEARCH_ADMIN_PASSWORD}\"}}" \ - http://opensearch-dashboards:5601/api/directquery/dataconnections -if [ "$?" -eq "0" ]; then - echo " >>> S3 datasource created" -else - echo " >>> Failed to create S3 datasource" -fi + -X GET \ + http://localhost:5601/api/directquery/dataconnections/mys3 +if [ "$?" -ne "0" ]; then + echo ">>> Creating datasource" + curl -q \ + -b /tmp/opensearch-cookies.txt \ + -X POST \ + -H 'Content-Type: application/json' \ + -H 'Osd-Version: 2.18.0' \ + -H 'Osd-Xsrf: fetch' \ + -d "{\"name\": \"mys3\", \"allowedRoles\": [], \"connector\": \"s3glue\", \"properties\": {\"glue.auth.type\": \"iam_role\", \"glue.auth.role_arn\": \"arn:aws:iam::123456789012:role/S3Access\", \"glue.indexstore.opensearch.uri\": \"http://opensearch:9200\", \"glue.indexstore.opensearch.auth\": \"basicauth\", \"glue.indexstore.opensearch.auth.username\": \"admin\", \"glue.indexstore.opensearch.auth.password\": \"${OPENSEARCH_ADMIN_PASSWORD}\"}}" \ + http://opensearch-dashboards:5601/api/directquery/dataconnections + if [ "$?" -eq "0" ]; then + echo " >>> S3 datasource created" + else + echo " >>> Failed to create S3 datasource" + fi -echo ">>> Setting cluster settings" -curl -v \ - -u "admin:${OPENSEARCH_ADMIN_PASSWORD}" \ - -X PUT \ - -H 'Content-Type: application/json' \ - -d '{"persistent": {"plugins.query.executionengine.spark.config": "{\"applicationId\":\"integ-test\",\"executionRoleARN\":\"arn:aws:iam::xxxxx:role/emr-job-execution-role\",\"region\":\"us-west-2\", \"sparkSubmitParameters\": \"--conf spark.dynamicAllocation.enabled=false\"}"}}' \ - http://opensearch:9200/_cluster/settings -if [ "$?" -eq "0" ]; then - echo " >>> Successfully set cluster settings" -else - echo " >>> Failed to set cluster settings" + echo ">>> Setting cluster settings" + curl -v \ + -u "admin:${OPENSEARCH_ADMIN_PASSWORD}" \ + -X PUT \ + -H 'Content-Type: application/json' \ + -d '{"persistent": {"plugins.query.executionengine.spark.config": "{\"applicationId\":\"integ-test\",\"executionRoleARN\":\"arn:aws:iam::xxxxx:role/emr-job-execution-role\",\"region\":\"us-west-2\", \"sparkSubmitParameters\": \"--conf spark.dynamicAllocation.enabled=false\"}"}}' \ + http://opensearch:9200/_cluster/settings + if [ "$?" -eq "0" ]; then + echo " >>> Successfully set cluster settings" + else + echo " >>> Failed to set cluster settings" + fi fi diff --git a/docker/integ-test/docker-compose.yml b/docker/integ-test/docker-compose.yml index 9fe79dc22..0a110f2fd 100644 --- a/docker/integ-test/docker-compose.yml +++ b/docker/integ-test/docker-compose.yml @@ -103,9 +103,8 @@ services: FLINT_JAR: ${FLINT_JAR} PPL_JAR: ${PPL_JAR} SQL_APP_JAR: ${SQL_APP_JAR} - depends_on: - metastore: - condition: service_completed_successfully + entrypoint: /bin/bash + command: exit opensearch: build: ./opensearch diff --git a/docs/docker/integ-test/images/datasource-browser.png b/docs/docker/integ-test/images/datasource-browser.png new file mode 100644 index 000000000..4b594c80a Binary files /dev/null and b/docs/docker/integ-test/images/datasource-browser.png differ diff --git a/docs/docker/integ-test/images/datasource-drop-down.png b/docs/docker/integ-test/images/datasource-drop-down.png new file mode 100644 index 000000000..d474c870c Binary files /dev/null and b/docs/docker/integ-test/images/datasource-drop-down.png differ diff --git a/docs/docker/integ-test/images/datasource-selector.png b/docs/docker/integ-test/images/datasource-selector.png new file mode 100644 index 000000000..f2a401636 Binary files /dev/null and b/docs/docker/integ-test/images/datasource-selector.png differ diff --git a/docs/docker/integ-test/images/queries-for-async-api.png b/docs/docker/integ-test/images/queries-for-async-api.png new file mode 100644 index 000000000..a59a0351f Binary files /dev/null and b/docs/docker/integ-test/images/queries-for-async-api.png differ diff --git a/docs/docker/integ-test/images/queries-for-spark-master.png b/docs/docker/integ-test/images/queries-for-spark-master.png new file mode 100644 index 000000000..1cf684b49 Binary files /dev/null and b/docs/docker/integ-test/images/queries-for-spark-master.png differ diff --git a/docs/docker/integ-test/images/query-workbench-query.png b/docs/docker/integ-test/images/query-workbench-query.png new file mode 100644 index 000000000..a10366438 Binary files /dev/null and b/docs/docker/integ-test/images/query-workbench-query.png differ diff --git a/docs/docker/integ-test/query-execution.md b/docs/docker/integ-test/query-execution.md new file mode 100644 index 000000000..e3b4e33be --- /dev/null +++ b/docs/docker/integ-test/query-execution.md @@ -0,0 +1,112 @@ +# Query Execution with the Integration Test Docker Cluster + +The integration test docker cluster can be used for the following tests: +* SQL/PPL queries on Spark using local tables +* SQL/PPL queries on Spark using external tables with data stored in MinIO(S3) +* SQL/PPL queries on OpenSearch of OpenSearch indices +* SQL/PPL async queries on OpenSearch of data stored in S3 + +In all cases, SQL or PPL queries be used and the processing is very similar. At most there may be a minor +difference in the query request. + +## SQL/PPL Queries on Spark Using Local Tables + +Connect directly to the Spark master node and execute a query. Could connect using Spark Connect, submitting +a job or even running `spark-shell` on the Docker container. Execute `sql()` calls on the SparkSession object. + +Local tables are tables that were created in Spark that are not external tables. The metadata and data is stored +in the Spark master container. + +Spark will begin query processing by assuming that the query is a PPL query. If it fails to parse in PPL, then +it will fall back to parsing it as a SQL query. + +After parsing the query, Spark will lookup the metadata for the table(s) and perform the query. The only other +container that may be involved in processing the request is the Spark worker container. + +## SQL/PPL Queries on Spark Using External Tables with Data Stored in MinIO(S3) + +Connect directly to the Spark master node and execute a query. Could connect using Spark Connect, submitting +a job or even running `spark-shell` on the Docker container. Execute `sql()` calls on the SparkSession object. + +External tables are tables that were created in Spark that have an `s3a://` location. The metadata is stored in +Hive and the data is stored in MinIO(S3). + +Spark will begin query processing by assuming that the query is a PPL query. If it fails to parse in PPL, then +it will fall back to parsing it as a SQL query. + +After parsing the query, Spark will lookup the metadata for the table(s) from Hive and perform the query. It +will retrieve table data from MinIO(S3). + +![Queries for Spark Master](images/queries-for-spark-master.png "Queries for Spark Master") + +## SQL/PPL Queries on OpenSearch of OpenSearch Indices + +Connect directly to the OpenSearch container to submit queries. Use the +[SQL and PPL API](https://opensearch.org/docs/latest/search-plugins/sql/sql-ppl-api/). + +The indices are stored in the OpenSearch container. + +## SQL/PPL Async Queries on OpenSearch of Data Stored in S3 + +Connect directly to the OpenSearch container to submit queries. Use the +[Async Query Interface](https://github.com/opensearch-project/sql/blob/main/docs/user/interfaces/asyncqueryinterface.rst). +This type of query simulates querying an S3/Glue datasource in OpenSearch. + +The table metadata is stored in Hive and the table data is stored in MinIO(S3). + +There are three phases to query processing: +1. Setup +2. Processing +3. Results Retrieval + +OpenSearch will use two special indices. +1. `.query_execution_request_[DATASOURCE_NAME]` - In the integration test Docker cluster, the datasource is + named `mys3`. When an Async Query request is received, an entry is added to this index. The entry contains + the query as well as its state. The state is updated as the request is processed. +2. `query_execution_result_[DATASOURCE_NAME]` - In the integration test Docker cluster, the datasource is + named `mys3`. An entry is added to this index when the results are ready. The entry contains the results of + the query. + +Temporary Docker containers are used. They are Apache Spark containers and run a jobs locally. + +![Queries for Async Query API](images/queries-for-async-api.png "Queries for Async Query API") + +### Setup + +The setup phase is started when OpenSearch receives an Async Query API request and continues until the query +ID and session ID are returned to the client. + +1. Check if the index `.query_execution_request_[DATASOURCE_NAME]` exists. +2. If `.query_execution_request_[DATASOURCE_NAME]` does not exist, then create it. +3. Insert the request into `.query_execution_request_[DATASOURCE_NAME]` +4. Return the query ID and session ID + +### Processing + +The processing phase started when checking if there is a container running for the request's session and +continues until the results are added to the `query_execution_result_[DATASOURCE_NAME]`. + +1. Check if there is a Spark container already running for the request's session +2. If a Spark container is not running for the request's session, then use Docker to start one. + 1. Docker initializes and starts the Spark container for the session +3. Spark container checks if the index `query_execution_result_[DATASOURCE_NAME]` exists. +4. If the index `query_execution_result_[DATASOURCE_NAME]` does not exist, then create it. +5. Spark container searches the `.query_execution_request_[DATASOURCE_NAME]` index for the next request + in the session to process. +6. Spark container identifies the tables in the query and get their metadata from the Hive container +7. Spark container retrieves the table data from the MinIO(S3) container +8. Spark container writes the results to the index `query_execution_result_[DATASOURCE_NAME]` + +The Spark container will keep looping from steps 5-8 until it reaches its timeout (currently set to 180 seconds). +Once the timeout is received, the Spark container will shutdown. + +### Results Retrieval + +The results retrieval phase can happen any time after the results for the query have been added to the index +`query_execution_result_[DATASOURCE_NAME]`. + +1. Client request the results of a previously submitted query from the OpenSearch container using the query ID + received earlier. +2. OpenSearch container searches the index `query_execution_result_[DATASOURCE_NAME]` for the results of the + query. +3. OpenSearch container returns the query results to the client. \ No newline at end of file diff --git a/docs/docker/integ-test/using-query-workbench.md b/docs/docker/integ-test/using-query-workbench.md new file mode 100644 index 000000000..95f36f4ac --- /dev/null +++ b/docs/docker/integ-test/using-query-workbench.md @@ -0,0 +1,33 @@ +# Using the Query Workbench in OpenSearch Dashboards + +The integration test Docker cluster contains an OpenSearch Dashboards container. This container can be used +as a web interface for querying data in the cluster. + +[Query Workbench Documentation](https://opensearch.org/docs/latest/dashboards/query-workbench/) + +## Logging in to OpenSearch Dashboards + +* URL - `http://localhsot:5601` +* Username: `admin` +* Password: The password is in the file `docker/integ-test/.env`. It is the value of `OPENSEARCH_ADMIN_PASSWORD`. + +## Querying the S3/Glue Datasource + +1. Navigate to the Query Workbench +2. Choose `Data source Connections` in the top left + + ![Data source Connections](images/datasource-selector.png "Data source Connections") +3. In the drop-down below `Data source Connections`, select the S3/Glue datasource. It is named `mys3`. + + ![Data source Drop-down](images/datasource-drop-down.png "Data source Drop-down") +4. It may take some time to load the namespaces in the datasource. `mys3` only contains the namespace `default`. +5. If you like, you can browse the tables in the `default` namespace by clicking on `default`. + + ![Data source Browser](images/datasource-browser.png "Data source Browser") +6. Execute a Query + + ![Query Interface](images/query-workbench-query.png "Query Interface") + 1. Choose the query language by clicking on `SQL` or `PPL` + 2. Enter a query in the text box + 3. Click `Run` to execute the query + 4. The results are displayed in the bottom right part of the page diff --git a/e2e-test/README.md b/e2e-test/README.md new file mode 100644 index 000000000..63c97a05a --- /dev/null +++ b/e2e-test/README.md @@ -0,0 +1,124 @@ +# End-to-End Tests + +## Overview + +The end-to-end tests start the integration test docker cluster and execute queries against it. Queries can be +sent to the Spark master or to OpenSearch server (using async query). + +The tests will run a query and compare the results to an expected results file. + +There are four types of tests: +1. SQL queries sent to the Spark master +2. PPL queries sent to the Spark master +3. SQL queries sent to the OpenSearch server as an async query +4. PPL queries sent to the OpenSearch server as an async query + +## Running the End-to-End Tests + +The tests can be run using SBT: + +```shell +sbt e2etest/test +``` + +## Test Structure + +### SQL Queries for Spark Master + +Create two files: +* `e2e-test/src/test/resources/spark/queries/sql/[NAME].sql` +* `e2e-test/src/test/resources/spark/queries/sql/[NAME].results` + +The `*.sql` file contains only the SQL query on one line. + +The `*.results` file contains the results in CSV format with a header (column names). + +### PPL Queries for Spark Master + +Create two files: +* `e2e-test/src/test/resources/spark/queries/ppl/[NAME].ppl` +* `e2e-test/src/test/resources/spark/queries/ppl/[NAME].results` + +The `*.ppl` file contains only the PPL query on one line. + +The `*.results` file contains the results in CSV format with a header (column names). + +### SQL Queries for OpenSearch Async API + +Create two files: +* `e2e-test/src/test/resources/opensearch/queries/sql/[NAME].sql` +* `e2e-test/src/test/resources/opensearch/queries/sql/[NAME].results` + +The `*.sql` file contains only the SQL query on one line. + +The `*.results` file contains the results in JSON format. The format is the exact output from the REST call +to get the async query results (`_plugins/_async_query/[QUERY_ID]`). + +Results example: +```json +{ + "status": "SUCCESS", + "schema": [ + { + "name": "id", + "type": "integer" + }, + { + "name": "name", + "type": "string" + } + ], + "datarows": [ + [ + 1, + "Foo" + ], + [ + 2, + "Bar" + ] + ], + "total": 2, + "size": 2 +} +``` + +### PPL Queries for OpenSearch Async API + +Create two files: +* `e2e-test/src/test/resources/opensearch/queries/ppl/[NAME].ppl` +* `e2e-test/src/test/resources/opensearch/queries/ppl/[NAME].results` + +The `*.ppl` file contains only the PPL query on one line. + +The `*.results` file contains the results in JSON format. The format is the exact output from the REST call +to get the async query results (`_plugins/_async_query/[QUERY_ID]`). + +Results example: +```json +{ + "status": "SUCCESS", + "schema": [ + { + "name": "id", + "type": "integer" + }, + { + "name": "name", + "type": "string" + } + ], + "datarows": [ + [ + 1, + "Foo" + ], + [ + 2, + "Bar" + ] + ], + "total": 2, + "size": 2 +} +``` \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/customer.mapping.json b/e2e-test/src/test/resources/opensearch/indices/customer.mapping.json new file mode 100644 index 000000000..a98d473a2 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/customer.mapping.json @@ -0,0 +1,30 @@ +{ + "mappings": { + "properties": { + "c_custkey": { + "type": "integer" + }, + "c_name": { + "type": "text" + }, + "c_address": { + "type": "text" + }, + "c_nationkey": { + "type": "integer" + }, + "c_phone": { + "type": "text" + }, + "c_acctbal": { + "type": "double" + }, + "c_mktsegment": { + "type": "text" + }, + "c_comment": { + "type": "text" + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/http_logs.json b/e2e-test/src/test/resources/opensearch/indices/http_logs.json new file mode 100644 index 000000000..ff2aa2fca --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/http_logs.json @@ -0,0 +1,12 @@ +{"index": {"_index": "http_logs"}} +{"@timestamp": 1696154400000, "year": 2023, "month": 10, "day": 1, "clientip": "40.135.0.0", "request": "GET /images/hm_bg.jpg HTTP/1.0", "status": 200, "size": 24736} +{"index": {"_index": "http_logs"}} +{"@timestamp": 1696154700000, "year": 2023, "month": 10, "day": 1, "clientip": "232.0.0.0", "request": "GET /images/hm_bg.jpg HTTP/1.0", "status": 200, "size": 24736} +{"index": {"_index": "http_logs"}} +{"@timestamp": 1696155000000, "year": 2023, "month": 10, "day": 1, "clientip": "26.1.0.0", "request": "GET /images/hm_bg.jpg HTTP/1.0", "status": 200, "size": 24736} +{"index": {"_index": "http_logs"}} +{"@timestamp": 1696155300000, "year": 2023, "month": 10, "day": 1, "clientip": "247.37.0.0", "request": "GET /french/splash_inet.html HTTP/1.0", "status": 200, "size": 3781} +{"index": {"_index": "http_logs"}} +{"@timestamp": 1696155600000, "year": 2023, "month": 10, "day": 1, "clientip": "247.37.0.0", "request": "GET /images/hm_nbg.jpg HTTP/1.0", "status": 304, "size": 0} +{"index": {"_index": "http_logs"}} +{"@timestamp": 1696155900000, "year": 2023, "month": 10, "day": 1, "clientip": "252.0.0.0", "request": "GET /images/hm_bg.jpg HTTP/1.0", "status": 200, "size": 24736} diff --git a/e2e-test/src/test/resources/opensearch/indices/http_logs.mapping.json b/e2e-test/src/test/resources/opensearch/indices/http_logs.mapping.json new file mode 100644 index 000000000..b944fbd4b --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/http_logs.mapping.json @@ -0,0 +1,30 @@ +{ + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "year": { + "type": "integer" + }, + "month": { + "type": "integer" + }, + "day": { + "type": "integer" + }, + "clientip": { + "type": "keyword" + }, + "request": { + "type": "text" + }, + "status": { + "type": "integer" + }, + "size": { + "type": "integer" + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/lineitem.mapping.json b/e2e-test/src/test/resources/opensearch/indices/lineitem.mapping.json new file mode 100644 index 000000000..2fb1cdb40 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/lineitem.mapping.json @@ -0,0 +1,54 @@ +{ + "mappings": { + "properties": { + "l_orderkey": { + "type": "integer" + }, + "l_partkey": { + "type": "text" + }, + "l_suppkey": { + "type": "integer" + }, + "l_linenumber": { + "type": "integer" + }, + "l_quantity": { + "type": "double" + }, + "l_extendedprice": { + "type": "double" + }, + "l_discount": { + "type": "double" + }, + "l_tax": { + "type": "double" + }, + "l_returnflag": { + "type": "text" + }, + "l_linestatus": { + "type": "text" + }, + "l_shipdate": { + "type": "date" + }, + "l_commitdate": { + "type": "date" + }, + "l_receiptdate": { + "type": "date" + }, + "l_shipinstruct": { + "type": "text" + }, + "l_shipmode": { + "type": "text" + }, + "l_comment": { + "type": "text" + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/nation.mapping.json b/e2e-test/src/test/resources/opensearch/indices/nation.mapping.json new file mode 100644 index 000000000..d0e82e559 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/nation.mapping.json @@ -0,0 +1,18 @@ +{ + "mappings": { + "properties": { + "n_nationkey": { + "type": "integer" + }, + "n_name": { + "type": "text" + }, + "n_regionkey": { + "type": "integer" + }, + "n_comment": { + "type": "text" + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/nested.json b/e2e-test/src/test/resources/opensearch/indices/nested.json new file mode 100644 index 000000000..eb8af683b --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/nested.json @@ -0,0 +1,10 @@ +{"index": {"_index": "nested"}} +{"int_col": 30, "struct_col": {"field1": {"subfield": "value1"}, "field2": 123}, "struct_col2": {"field1": {"subfield": "valueA"}, "field2": 23}} +{"index": {"_index": "nested"}} +{"int_col": 40, "struct_col": {"field1": {"subfield": "value5"}, "field2": 123}, "struct_col2": {"field1": {"subfield": "valueB"}, "field2": 33}} +{"index": {"_index": "nested"}} +{"int_col": 30, "struct_col": {"field1": {"subfield": "value4"}, "field2": 823}, "struct_col2": {"field1": {"subfield": "valueC"}, "field2": 83}} +{"index": {"_index": "nested"}} +{"int_col": 40, "struct_col": {"field1": {"subfield": "value2"}, "field2": 456}, "struct_col2": {"field1": {"subfield": "valueD"}, "field2": 46}} +{"index": {"_index": "nested"}} +{"int_col": 50, "struct_col": {"field1": {"subfield": "value3"}, "field2": 789}, "struct_col2": {"field1": {"subfield": "valueE"}, "field2": 89}} diff --git a/e2e-test/src/test/resources/opensearch/indices/nested.mapping.json b/e2e-test/src/test/resources/opensearch/indices/nested.mapping.json new file mode 100644 index 000000000..1aa189415 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/nested.mapping.json @@ -0,0 +1,37 @@ +{ + "mappings": { + "properties": { + "int_col": { + "type": "integer" + }, + "struct_col": { + "properties": { + "field1": { + "properties": { + "subfield": { + "type": "text" + } + } + }, + "field2": { + "type": "integer" + } + } + }, + "struct_col2": { + "properties": { + "field1": { + "properties": { + "subfield": { + "type": "text" + } + } + }, + "field2": { + "type": "integer" + } + } + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/orders.mapping.json b/e2e-test/src/test/resources/opensearch/indices/orders.mapping.json new file mode 100644 index 000000000..59b3cecdd --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/orders.mapping.json @@ -0,0 +1,33 @@ +{ + "mappings": { + "properties": { + "o_orderkey": { + "type": "integer" + }, + "o_custkey": { + "type": "integer" + }, + "o_orderstatus": { + "type": "text" + }, + "o_totalprice": { + "type": "double" + }, + "o_orderdate": { + "type": "date" + }, + "o_orderpriority": { + "type": "text" + }, + "o_clerk": { + "type": "text" + }, + "o_shippriority": { + "type": "integer" + }, + "o_comment": { + "type": "text" + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/part.mapping.json b/e2e-test/src/test/resources/opensearch/indices/part.mapping.json new file mode 100644 index 000000000..8be7e9aa0 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/part.mapping.json @@ -0,0 +1,33 @@ +{ + "mappings": { + "properties": { + "p_partkey": { + "type": "integer" + }, + "p_name": { + "type": "text" + }, + "p_mfgr": { + "type": "text" + }, + "p_brand": { + "type": "text" + }, + "p_type": { + "type": "text" + }, + "p_size": { + "type": "integer" + }, + "p_container": { + "type": "text" + }, + "p_retailprice": { + "type": "double" + }, + "p_comment": { + "type": "text" + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/partsupp.mapping.json b/e2e-test/src/test/resources/opensearch/indices/partsupp.mapping.json new file mode 100644 index 000000000..13509ad46 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/partsupp.mapping.json @@ -0,0 +1,21 @@ +{ + "mappings": { + "properties": { + "ps_partkey": { + "type": "integer" + }, + "ps_suppkey": { + "type": "integer" + }, + "ps_availqty": { + "type": "integer" + }, + "ps_supplycost": { + "type": "double" + }, + "ps_comment": { + "type": "text" + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/people.json b/e2e-test/src/test/resources/opensearch/indices/people.json new file mode 100644 index 000000000..4563a2c4b --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/people.json @@ -0,0 +1,12 @@ +{"index": {"_index": "people"}} +{"@timestamp": 1718458823000, "id": 1000, "name": "Jake", "occupation": "Engineer", "country": "England", "salary": 100000} +{"index": {"_index": "people"}} +{"@timestamp": 1718458833000, "id": 1001, "name": "Hello", "occupation": "Artist", "country": "USA", "salary": 70000} +{"index": {"_index": "people"}} +{"@timestamp": 1718458843000, "id": 1002, "name": "John", "occupation": "Doctor", "country": "Canada", "salary": 120000} +{"index": {"_index": "people"}} +{"@timestamp": 1718458853000, "id": 1003, "name": "David", "occupation": "Doctor", "country": null, "salary": 120000} +{"index": {"_index": "people"}} +{"@timestamp": 1718458863000, "id": 1004, "name": "David", "occupation": null, "country": "Canada", "salary": 0} +{"index": {"_index": "people"}} +{"@timestamp": 1718458873000, "id": 1005, "name": "Jane", "occupation": "Scientist", "country": "Canada", "salary": 90000} diff --git a/e2e-test/src/test/resources/opensearch/indices/people.mapping.json b/e2e-test/src/test/resources/opensearch/indices/people.mapping.json new file mode 100644 index 000000000..b5dde8ff6 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/people.mapping.json @@ -0,0 +1,24 @@ +{ + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "id": { + "type": "integer" + }, + "name": { + "type": "text" + }, + "occupation": { + "type": "text" + }, + "country": { + "type": "text" + }, + "salary": { + "type": "integer" + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/region.mapping.json b/e2e-test/src/test/resources/opensearch/indices/region.mapping.json new file mode 100644 index 000000000..3dddbc580 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/region.mapping.json @@ -0,0 +1,15 @@ +{ + "mappings": { + "properties": { + "r_regionkey": { + "type": "integer" + }, + "r_name": { + "type": "text" + }, + "r_comment": { + "type": "text" + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/supplier.mapping.json b/e2e-test/src/test/resources/opensearch/indices/supplier.mapping.json new file mode 100644 index 000000000..bdcb933b6 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/supplier.mapping.json @@ -0,0 +1,27 @@ +{ + "mappings": { + "properties": { + "s_suppkey": { + "type": "integer" + }, + "s_name": { + "type": "text" + }, + "s_address": { + "type": "text" + }, + "s_nationkey": { + "type": "integer" + }, + "s_phone": { + "type": "text" + }, + "s_acctbal": { + "type": "double" + }, + "s_comment": { + "type": "text" + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/indices/work_info.json b/e2e-test/src/test/resources/opensearch/indices/work_info.json new file mode 100644 index 000000000..64802bdad --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/work_info.json @@ -0,0 +1,10 @@ +{"index": {"_index": "work_info"}} +{"uid": 1000, "name": "Jake", "department": "IT", "occupation": "Engineer"} +{"index": {"_index": "work_info"}} +{"uid": 1002, "name": "John", "department": "DATA", "occupation": "Scientist"} +{"index": {"_index": "work_info"}} +{"uid": 1003, "name": "David", "department": "HR", "occupation": "Doctor"} +{"index": {"_index": "work_info"}} +{"uid": 1005, "name": "Jane", "department": "DATA", "occupation": "Engineer"} +{"index": {"_index": "work_info"}} +{"uid": 1006, "name": "Tom", "department": "SALES", "occupation": "Artist"} diff --git a/e2e-test/src/test/resources/opensearch/indices/work_info.mapping.json b/e2e-test/src/test/resources/opensearch/indices/work_info.mapping.json new file mode 100644 index 000000000..3fb5e2c28 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/indices/work_info.mapping.json @@ -0,0 +1,18 @@ +{ + "mappings": { + "properties": { + "uid": { + "type": "integer" + }, + "name": { + "type": "text" + }, + "department": { + "type": "text" + }, + "occupation": { + "type": "text" + } + } + } +} \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/queries/ppl/select-all-from-s3.ppl b/e2e-test/src/test/resources/opensearch/queries/ppl/select-all-from-s3.ppl new file mode 100644 index 000000000..527716652 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/queries/ppl/select-all-from-s3.ppl @@ -0,0 +1 @@ +SOURCE=mys3.default.foo \ No newline at end of file diff --git a/e2e-test/src/test/resources/opensearch/queries/ppl/select-all-from-s3.results b/e2e-test/src/test/resources/opensearch/queries/ppl/select-all-from-s3.results new file mode 100644 index 000000000..ec4968348 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/queries/ppl/select-all-from-s3.results @@ -0,0 +1,25 @@ +{ + "status": "SUCCESS", + "schema": [ + { + "name": "id", + "type": "integer" + }, + { + "name": "name", + "type": "string" + } + ], + "datarows": [ + [ + 1, + "Foo" + ], + [ + 2, + "Bar" + ] + ], + "total": 2, + "size": 2 +} diff --git a/e2e-test/src/test/resources/opensearch/queries/sql/select-all-from-s3.results b/e2e-test/src/test/resources/opensearch/queries/sql/select-all-from-s3.results new file mode 100644 index 000000000..ec4968348 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/queries/sql/select-all-from-s3.results @@ -0,0 +1,25 @@ +{ + "status": "SUCCESS", + "schema": [ + { + "name": "id", + "type": "integer" + }, + { + "name": "name", + "type": "string" + } + ], + "datarows": [ + [ + 1, + "Foo" + ], + [ + 2, + "Bar" + ] + ], + "total": 2, + "size": 2 +} diff --git a/e2e-test/src/test/resources/opensearch/queries/sql/select-all-from-s3.sql b/e2e-test/src/test/resources/opensearch/queries/sql/select-all-from-s3.sql new file mode 100644 index 000000000..c149918c0 --- /dev/null +++ b/e2e-test/src/test/resources/opensearch/queries/sql/select-all-from-s3.sql @@ -0,0 +1 @@ +SELECT * FROM mys3.default.foo \ No newline at end of file diff --git a/e2e-test/src/test/resources/spark/queries/ppl/select-all-from-os.ppl b/e2e-test/src/test/resources/spark/queries/ppl/select-all-from-os.ppl new file mode 100644 index 000000000..d32a87ace --- /dev/null +++ b/e2e-test/src/test/resources/spark/queries/ppl/select-all-from-os.ppl @@ -0,0 +1 @@ +SOURCE=dev.default.people \ No newline at end of file diff --git a/e2e-test/src/test/resources/spark/queries/ppl/select-all-from-os.results b/e2e-test/src/test/resources/spark/queries/ppl/select-all-from-os.results new file mode 100644 index 000000000..07f1692cf --- /dev/null +++ b/e2e-test/src/test/resources/spark/queries/ppl/select-all-from-os.results @@ -0,0 +1,7 @@ +@timestamp,name,country,salary,id,occupation +2024-06-15T13:40:23.000Z,Jake,England,100000,1000,Engineer +2024-06-15T13:40:33.000Z,Hello,USA,70000,1001,Artist +2024-06-15T13:40:43.000Z,John,Canada,120000,1002,Doctor +2024-06-15T13:40:53.000Z,David,,120000,1003,Doctor +2024-06-15T13:41:03.000Z,David,Canada,0,1004, +2024-06-15T13:41:13.000Z,Jane,Canada,90000,1005,Scientist diff --git a/e2e-test/src/test/resources/spark/queries/ppl/select-all.ppl b/e2e-test/src/test/resources/spark/queries/ppl/select-all.ppl new file mode 100644 index 000000000..31bbfe228 --- /dev/null +++ b/e2e-test/src/test/resources/spark/queries/ppl/select-all.ppl @@ -0,0 +1 @@ +SOURCE=foo \ No newline at end of file diff --git a/e2e-test/src/test/resources/spark/queries/ppl/select-all.results b/e2e-test/src/test/resources/spark/queries/ppl/select-all.results new file mode 100644 index 000000000..962a5a93f --- /dev/null +++ b/e2e-test/src/test/resources/spark/queries/ppl/select-all.results @@ -0,0 +1,3 @@ +id,name +1,Foo +2,Bar diff --git a/e2e-test/src/test/resources/spark/queries/sql/select-all.results b/e2e-test/src/test/resources/spark/queries/sql/select-all.results new file mode 100644 index 000000000..962a5a93f --- /dev/null +++ b/e2e-test/src/test/resources/spark/queries/sql/select-all.results @@ -0,0 +1,3 @@ +id,name +1,Foo +2,Bar diff --git a/e2e-test/src/test/resources/spark/queries/sql/select-all.sql b/e2e-test/src/test/resources/spark/queries/sql/select-all.sql new file mode 100644 index 000000000..1ed904d48 --- /dev/null +++ b/e2e-test/src/test/resources/spark/queries/sql/select-all.sql @@ -0,0 +1 @@ +SELECT * FROM foo \ No newline at end of file diff --git a/e2e-test/src/test/resources/spark/tables/foo.parquet b/e2e-test/src/test/resources/spark/tables/foo.parquet new file mode 100644 index 000000000..fbcd2e207 Binary files /dev/null and b/e2e-test/src/test/resources/spark/tables/foo.parquet differ diff --git a/e2e-test/src/test/scala/org/opensearch/spark/e2e/EndToEndITSuite.scala b/e2e-test/src/test/scala/org/opensearch/spark/e2e/EndToEndITSuite.scala new file mode 100644 index 000000000..4f07cad8b --- /dev/null +++ b/e2e-test/src/test/scala/org/opensearch/spark/e2e/EndToEndITSuite.scala @@ -0,0 +1,381 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.spark.e2e + +import java.io.{BufferedReader, File, FileInputStream, InputStreamReader} +import java.util.Properties +import java.util.concurrent.TimeUnit +import java.util.regex.Pattern + +import scala.collection.mutable.ListBuffer +import scala.io.Source.fromFile + +import org.scalatest.{Assertions, BeforeAndAfterAll, Suite} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.prop.TableDrivenPropertyChecks +import play.api.libs.json.{JsError, Json, JsValue} +import sttp.client3.{basicRequest, HttpClientSyncBackend, Identity, Response, ResponseException, SttpBackend, UriContext} +import sttp.client3.playJson.asJson + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Dataset, Row} + +/** + * Tests requiring the should extend OpenSearchSuite. + */ +class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with BeforeAndAfterAll with SparkTrait with S3ClientTrait with Assertions with Logging { + self: Suite => + + val DOCKER_INTEG_DIR: String = "docker/integ-test" + val DOCKER_VOLUMES: Array[String] = Array("integ-test_metastore-data", "integ-test_minio-data", "integ-test_opensearch-data") + var OPENSEARCH_URL: String = null + val OPENSEARCH_USERNAME: String = "admin" + var OPENSEARCH_PASSWORD: String = null + var SPARK_CONNECT_PORT: Int = 0 + var S3_ACCESS_KEY: String = null + var S3_SECRET_KEY: String = null + val INTEG_TEST_BUCKET: String = "integ-test" + val TEST_RESOURCES_BUCKET: String = "test-resources" + val S3_DATASOURCE: String = "mys3" + + override def getSparkConnectPort(): Int = { + SPARK_CONNECT_PORT + } + + override def getS3AccessKey(): String = { + S3_ACCESS_KEY + } + + override def getS3SecretKey(): String = { + S3_SECRET_KEY + } + + override def beforeAll(): Unit = { + logInfo("Starting docker cluster") + + val dockerEnv = new Properties() + dockerEnv.load(new FileInputStream(new File(DOCKER_INTEG_DIR, ".env"))) + OPENSEARCH_URL = "http://localhost:" + dockerEnv.getProperty("OPENSEARCH_PORT") + OPENSEARCH_PASSWORD = dockerEnv.getProperty("OPENSEARCH_ADMIN_PASSWORD") + SPARK_CONNECT_PORT = Integer.parseInt(dockerEnv.getProperty("SPARK_CONNECT_PORT")) + S3_ACCESS_KEY = dockerEnv.getProperty("S3_ACCESS_KEY") + S3_SECRET_KEY = dockerEnv.getProperty("S3_SECRET_KEY") + + val cmdWithArgs = List("docker", "volume", "rm") ++ DOCKER_VOLUMES + val deleteDockerVolumesProcess = new ProcessBuilder(cmdWithArgs.toArray: _*).start() + deleteDockerVolumesProcess.waitFor(10, TimeUnit.SECONDS) + + val dockerProcess = new ProcessBuilder("docker", "compose", "up", "-d") + .directory(new File(DOCKER_INTEG_DIR)) + .start() + dockerProcess.waitFor(5, TimeUnit.MINUTES) + + if (dockerProcess.exitValue() != 0) { + logError("Unable to start docker cluster") + } + + logInfo("Started docker cluster") + + createTables() + createIndices() + } + + override def afterAll(): Unit = { + logInfo("Stopping docker cluster") + waitForSparkSubmitCompletion() + + val dockerProcess = new ProcessBuilder("docker", "compose", "down") + .directory(new File(DOCKER_INTEG_DIR)) + .start() + dockerProcess.waitFor(2, TimeUnit.MINUTES) + + if (dockerProcess.exitValue() != 0) { + logError("Unable to stop docker cluster") + } + + logInfo("Stopped docker cluster") + } + + def waitForSparkSubmitCompletion(): Unit = { + val endTime = System.currentTimeMillis() + 300000 + while (System.currentTimeMillis() < endTime) { + val dockerProcess = new ProcessBuilder("docker", "ps").start() + val outputReader = new BufferedReader(new InputStreamReader(dockerProcess.getInputStream)) + + // Ignore the header + outputReader.readLine() + var line = outputReader.readLine() + val pattern = Pattern.compile("^[^ ]+ +integ-test-spark-submit:latest +.*") + var matched = false + while (line != null) { + if (pattern.matcher(line).matches()) { + matched = true + } + line = outputReader.readLine() + } + + if (matched) { + outputReader.close() + dockerProcess.waitFor(2, TimeUnit.SECONDS) + Thread.sleep(5000) + } else { + return + } + } + } + + def createTables(): Unit = { + try { + val tablesDir = new File("e2e-test/src/test/resources/spark/tables") + tablesDir.listFiles((_, name) => name.endsWith(".parquet")).foreach(f => { + val tableName = f.getName.substring(0, f.getName.length() - 8) + getS3Client().putObject(TEST_RESOURCES_BUCKET, "spark/tables/" + f.getName, f) + + try { + val df = getSparkSession().read.parquet(s"s3a://$TEST_RESOURCES_BUCKET/spark/tables/" + f.getName) + df.write.option("path", s"s3a://$INTEG_TEST_BUCKET/$tableName").saveAsTable(tableName) + } catch { + case e: Exception => logError("Unable to create table", e) + } + }) + } catch { + case e: Exception => logError("Failure", e) + } + } + + def createIndices(): Unit = { + val indicesDir = new File("e2e-test/src/test/resources/opensearch/indices") + val backend = HttpClientSyncBackend() + + indicesDir.listFiles((_, name) => name.endsWith(".mapping.json")).foreach(f => { + val indexName = f.getName.substring(0, f.getName.length() - 13) + + val checkIndexRequest = basicRequest.get(uri"$OPENSEARCH_URL/$indexName") + .auth.basic(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD) + val response = checkIndexRequest.send(backend) + if (response.isSuccess) { + val deleteIndexRequest = basicRequest.delete(uri"$OPENSEARCH_URL/$indexName") + .auth.basic(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD) + deleteIndexRequest.send(backend) + } + + val createIndexRequest = basicRequest.put(uri"$OPENSEARCH_URL/$indexName") + .auth.basic(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD) + .contentType("application/json") + .body(new FileInputStream(f)) + createIndexRequest.send(backend) + + val dataFile = new File(f.getParent, indexName + ".json") + if (dataFile.exists()) { + val bulkInsertRequest = basicRequest.post(uri"$OPENSEARCH_URL/$indexName/_bulk") + .auth.basic(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD) + .contentType("application/x-ndjson") + .body(new FileInputStream(new File(f.getParent, indexName + ".json"))) + bulkInsertRequest.send(backend) + } + }) + } + + it should "SQL Queries" in { + val queriesDir = new File("e2e-test/src/test/resources/spark/queries/sql") + val queriesTableData : ListBuffer[(String, String)] = new ListBuffer() + + queriesDir.listFiles((_, name) => name.endsWith(".sql")).foreach(f => { + val querySource = fromFile(f) + val query = querySource.mkString + querySource.close() + + val baseName = f.getName.substring(0, f.getName.length - 4) + queriesTableData += ((query, baseName)) + }) + + forEvery(Table(("Query", "Base Filename"), queriesTableData: _*)) { (query, baseName) => + logInfo(s">>> Testing query [$baseName]: $query") + val results : Dataset[Row] = getSparkSession().sql(query).coalesce(1) + + val s3Folder = s"spark/query-results/sql/$baseName" + results.write.format("csv").option("header", "true").save(s"s3a://$TEST_RESOURCES_BUCKET/$s3Folder") + + val actualResults = getActualResults(s"$s3Folder/") + val expectedFile = new File(s"e2e-test/src/test/resources/spark/queries/sql/$baseName.results") + val expectedSource = fromFile(expectedFile) + val expectedResults = expectedSource.mkString.stripTrailing() + expectedSource.close() + + assert(expectedResults == actualResults) + } + } + + it should "PPL Queries" in { + val queriesDir = new File("e2e-test/src/test/resources/spark/queries/ppl") + val queriesTableData : ListBuffer[(String, String)] = new ListBuffer() + + queriesDir.listFiles((_, name) => name.endsWith(".ppl")).foreach(f => { + val querySource = fromFile(f) + val query = querySource.mkString + querySource.close() + + val baseName = f.getName.substring(0, f.getName.length - 4) + queriesTableData += ((query, baseName)) + }) + + forEvery(Table(("Query", "Base Filename"), queriesTableData: _*)) { (query, baseName) => + logInfo(s">>> Testing query [$baseName]: $query") + val results : Dataset[Row] = getSparkSession().sql(query).coalesce(1) + + val s3Folder = s"spark/query-results/ppl/$baseName" + results.write.format("csv").option("header", "true").save(s"s3a://$TEST_RESOURCES_BUCKET/$s3Folder") + + val actualResults = getActualResults(s"$s3Folder/") + val expectedFile = new File(s"e2e-test/src/test/resources/spark/queries/ppl/$baseName.results") + val expectedSource = fromFile(expectedFile) + val expectedResults = expectedSource.mkString.stripTrailing() + expectedSource.close() + + assert(expectedResults == actualResults) + } + } + + it should "Async SQL Queries" in { + var sessionId : String = null + val backend = HttpClientSyncBackend() + + val queriesDir = new File("e2e-test/src/test/resources/opensearch/queries/sql") + val queriesTableData : ListBuffer[(String, String)] = new ListBuffer() + + queriesDir.listFiles((_, name) => name.endsWith(".sql")).foreach(f => { + val querySource = fromFile(f) + val query = querySource.mkString + querySource.close() + + val baseName = f.getName.substring(0, f.getName.length - 4) + queriesTableData += ((query, baseName)) + }) + + forEvery(Table(("Query", "Base Filename"), queriesTableData: _*)) { (query: String, baseName: String) => + logInfo(s">>> Testing query [$baseName]: $query") + val queryResponse = executeAsyncQuery("sql", query, sessionId, backend) + + if (queryResponse.isSuccess) { + val responseData = queryResponse.body.right.get + val queryId = (responseData \ "queryId").as[String] + if (sessionId == null) { + sessionId = (responseData \ "sessionId").as[String] + } + + val actualResults = getAsyncResults(queryId, backend) + val expectedResults = Json.parse(new FileInputStream(new File(queriesDir, baseName + ".results"))) + + assert(expectedResults == actualResults) + } + } + } + + it should "Async PPL Queries" in { + var sessionId : String = null + val backend = HttpClientSyncBackend() + + val queriesDir = new File("e2e-test/src/test/resources/opensearch/queries/ppl") + val queriesTableData : ListBuffer[(String, String)] = new ListBuffer() + + queriesDir.listFiles((_, name) => name.endsWith(".ppl")).foreach(f => { + val querySource = fromFile(f) + val query = querySource.mkString + querySource.close() + + val baseName = f.getName.substring(0, f.getName.length - 4) + queriesTableData += ((query, baseName)) + }) + + forEvery(Table(("Query", "Base Filename"), queriesTableData: _*)) { (query: String, baseName: String) => + logInfo(s">>> Testing query [$baseName]: $query") + val queryResponse = executeAsyncQuery("ppl", query, sessionId, backend) + + if (queryResponse.isSuccess) { + val responseData = queryResponse.body.right.get + val queryId = (responseData \ "queryId").as[String] + if (sessionId == null) { + sessionId = (responseData \ "sessionId").as[String] + } + + val actualResults = getAsyncResults(queryId, backend) + val expectedResults = Json.parse(new FileInputStream(new File(queriesDir, baseName + ".results"))) + + assert(expectedResults == actualResults) + } + } + } + + def getActualResults(s3Path : String): String = { + val objectSummaries = getS3Client().listObjects("test-resources", s3Path).getObjectSummaries + var jsonKey : String = null + for (i <- 0 until objectSummaries.size()) { + val objectSummary = objectSummaries.get(i) + if (jsonKey == null && objectSummary.getKey.endsWith(".csv")) { + jsonKey = objectSummary.getKey + } + } + + val results = new ListBuffer[String] + if (jsonKey != null) { + val s3Object = getS3Client().getObject("test-resources", jsonKey) + val reader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent)) + + var line = reader.readLine() + while (line != null) { + results += line + line = reader.readLine() + } + reader.close() + + return results.mkString("\n").stripTrailing() + } + + throw new Exception("Object not found") + } + + def executeAsyncQuery(language: String, query: String, sessionId: String, backend: SttpBackend[Identity, Any]) : Identity[Response[Either[ResponseException[String, JsError], JsValue]]] = { + var queryBody : String = null + if (sessionId == null) { + queryBody = "{\"datasource\": \"" + S3_DATASOURCE + "\", \"lang\": \"" + language + "\", \"query\": \"" + query + "\"}" + } else { + queryBody = "{\"datasource\": \"" + S3_DATASOURCE + "\", \"lang\": \"" + language + "\", \"query\": \"" + query + "\", \"sessionId\": \"" + sessionId + "\"}" + } + basicRequest.post(uri"$OPENSEARCH_URL/_plugins/_async_query") + .auth.basic(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD) + .contentType("application/json") + .body(queryBody.getBytes) + .response(asJson[JsValue]) + .send(backend) + } + + def getAsyncResults(queryId: String, backend: SttpBackend[Identity, Any]): JsValue = { + val endTime = System.currentTimeMillis() + 30000 + + while (System.currentTimeMillis() < endTime) { + val response = basicRequest.get(uri"$OPENSEARCH_URL/_plugins/_async_query/$queryId") + .auth.basic(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD) + .contentType("application/json") + .response(asJson[JsValue]) + .send(backend) + + if (response.isSuccess) { + val responseBody = response.body.right.get + val status = (responseBody \ "status").asOpt[String] + + if (status.isDefined) { + if (status.get == "SUCCESS") { + return responseBody + } + } + } + + Thread.sleep(500) + } + + throw new IllegalStateException("Unable to get async query results") + } +} diff --git a/e2e-test/src/test/scala/org/opensearch/spark/e2e/S3ClientTrait.scala b/e2e-test/src/test/scala/org/opensearch/spark/e2e/S3ClientTrait.scala new file mode 100644 index 000000000..dcd535f29 --- /dev/null +++ b/e2e-test/src/test/scala/org/opensearch/spark/e2e/S3ClientTrait.scala @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.spark.e2e + +import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} +import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration +import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} + +trait S3ClientTrait { + var s3Client : AmazonS3 = null + + def getS3AccessKey(): String + + def getS3SecretKey(): String + + def getS3Client(): AmazonS3 = { + this.synchronized { + if (s3Client == null) { + s3Client = AmazonS3ClientBuilder.standard() + .withEndpointConfiguration(new EndpointConfiguration("http://localhost:9000", "us-east-1")) + .withCredentials(new AWSStaticCredentialsProvider( + new BasicAWSCredentials(getS3AccessKey(), getS3SecretKey()) + )) + .withPathStyleAccessEnabled(true) + .build() + } + + s3Client + } + } +} diff --git a/e2e-test/src/test/scala/org/opensearch/spark/e2e/SparkTrait.scala b/e2e-test/src/test/scala/org/opensearch/spark/e2e/SparkTrait.scala new file mode 100644 index 000000000..467519a69 --- /dev/null +++ b/e2e-test/src/test/scala/org/opensearch/spark/e2e/SparkTrait.scala @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.spark.e2e + +import org.apache.spark.sql.SparkSession + +trait SparkTrait { + var spark : SparkSession = null + + def getSparkConnectPort(): Int + + def getSparkSession(): SparkSession = { + this.synchronized { + if (spark == null) { + spark = SparkSession.builder().remote("sc://localhost:" + getSparkConnectPort()).build() + } + spark + } + } +}