From 8bedc8fb8b8bd37c7e5e8b1876dec8c83a9977ee Mon Sep 17 00:00:00 2001 From: yuhui Date: Thu, 16 Jan 2025 10:09:42 +0800 Subject: [PATCH 01/26] Add testers for fileset fs --- .../filesystem-fuse/src/gravitino_client.rs | 24 +++++++++++++++++++ clients/filesystem-fuse/src/utils.rs | 14 +++++++++++ 2 files changed, 38 insertions(+) diff --git a/clients/filesystem-fuse/src/gravitino_client.rs b/clients/filesystem-fuse/src/gravitino_client.rs index 1e1cd411eac..e78e4986f50 100644 --- a/clients/filesystem-fuse/src/gravitino_client.rs +++ b/clients/filesystem-fuse/src/gravitino_client.rs @@ -35,6 +35,18 @@ pub(crate) struct Fileset { properties: HashMap, } +impl Fileset { + pub fn new(name: &str, storage_location: &str) -> Fileset { + Self { + name: name.to_string(), + fileset_type: "managed".to_string(), + comment: "".to_string(), + storage_location: storage_location.to_string(), + properties: HashMap::default(), + } + } +} + #[derive(Debug, Deserialize)] struct FilesetResponse { code: u32, @@ -58,6 +70,18 @@ pub(crate) struct Catalog { pub(crate) properties: HashMap, } +impl Catalog { + pub fn new(name: &str, properties: HashMap) -> Catalog { + Self { + name: name.to_string(), + catalog_type: "fileset".to_string(), + provider: "s3".to_string(), + comment: "".to_string(), + properties: properties, + } + } +} + #[derive(Debug, Deserialize)] struct CatalogResponse { code: u32, diff --git a/clients/filesystem-fuse/src/utils.rs b/clients/filesystem-fuse/src/utils.rs index 53eb9179d71..73af044ee4c 100644 --- a/clients/filesystem-fuse/src/utils.rs +++ b/clients/filesystem-fuse/src/utils.rs @@ -18,6 +18,7 @@ */ use crate::error::ErrorCode::InvalidConfig; use crate::error::GvfsError; +use opendal::Operator; use reqwest::Url; use std::path::PathBuf; @@ -36,6 +37,19 @@ pub(crate) fn extract_root_path(location: &str) -> GvfsResult { Ok(PathBuf::from(url.path())) } +pub(crate) async fn delete_dir(op: &Operator, dir_name: &str) { + let childs = op.list(dir_name).await.expect("list dir failed"); + for child in childs { + let child_name = dir_name.to_string() + child.name(); + if child.metadata().is_dir() { + Box::pin(delete_dir(op, &child_name)).await; + } else { + op.delete(&child_name).await.expect("delete file failed"); + } + } + op.delete(dir_name).await.expect("delete dir failed"); +} + #[cfg(test)] mod tests { use crate::utils::extract_root_path; From 0b6cf88ee8a1b143d09e90c79599a442e10ff776 Mon Sep 17 00:00:00 2001 From: yuhui Date: Mon, 6 Jan 2025 15:03:20 +0800 Subject: [PATCH 02/26] Add integration test script --- .../filesystem-fuse/tests/start_backend.sh | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100755 clients/filesystem-fuse/tests/start_backend.sh diff --git a/clients/filesystem-fuse/tests/start_backend.sh b/clients/filesystem-fuse/tests/start_backend.sh new file mode 100755 index 00000000000..a5d55671ae0 --- /dev/null +++ b/clients/filesystem-fuse/tests/start_backend.sh @@ -0,0 +1,104 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + set -euo pipefail + +s3-access_key_id=${s3-access_key_id:-} +s3-secret_access=${s3-secret_access:-} +s3-region=${s3-region:-} +s3-bucket=${s3-bucket:-} + +# Check required environment variables +if [[ -z "$s3-access_key_id" || -z "$s3-secret_access" || -z "$s3-region" || -z "$s3-bucket" ]]; then + echo "Error: One or more required S3 environment variables are not set." + echo "Please set: s3-access_key_id, s3-secret_access, s3-region, s3-bucket." + exit 1 +fi + +GRAVITINO_SERVER_HOME=../../.. +GRAVITINO_SERVER_DIR=$GRAVITINO_SERVER_HOME/distribution/package +CLIENT_FUSE_DIR=$GRAVITINO_SERVER_HOME/clients/filesystem-fuse + +echo "Start the Gravitino server" +$GRAVITINO_SERVER_DIR/bin/start_gravitino_server.sh + +GRAVITINO_SERVER_URL=http://localhost:8090 + +curl $GRAVITINO_SERVER_URL/api/metalakes + +# create metalake +curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ +-H "Content-Type: application/json" -d '{ + "name":"test","comment":"comment","properties":{} +}' $GRAVITINO_SERVER_URL/api/metalakes + +# create catalog +curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ +-H "Content-Type: application/json" -d '{ + "name": "c1", + "type": "FILESET", + "comment": "comment", + "provider": "hadoop", + "properties": { + "location": "s3a://'"$s3-bucket"'", + "s3-access-key-id": "'"$s3-access_key_id"'", + "s3-secret-access-key": "'"$s3-secret_access"'", + "s3-endpoint": "http://s3.'"$s3-region"'.amazonaws.com", + "filesystem-providers": "s3" + } +}' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs + +# create schema +curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ +-H "Content-Type: application/json" -d '{ + "name":"s1","comment":"comment","properties":{} +}' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs/catalog/schemas + +# create fileset +curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ +-H "Content-Type: application/json" -d '{ + "name":"fileset1","comment":"comment","properties":{} +}' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs/c1/schemas/s1/filesets + + +echo "Start the Gvfs fuse client" + +mount_dir=$CLIENT_FUSE_DIR/target/gvfs +if [ -d "$mount_dir" ]; then + echo "Unmount the existing mount point" + fusermount -u $mount_dir +else + echo "Create the mount point" + mkdir -p $mount_dir +fi + +fileset=gvfs://fileset/test/c1/s1/fileset1 + +config_file=$CLIENT_FUSE_DIR/target/debug/gvfs-fuse.toml +cp $CLIENT_FUSE_DIR/test/conf/gvfs_fuse-s3.toml $config_file + + +sed -i 's|s3-access_key_id = ".*"|s3-access_key_id = "$s3-access_key_id"|' "$config_file" +sed -i 's|s3-secret_access_key = ".*"|s3-secret_access_key = "$s3-secret_access"|' "$config_file" +sed -i 's|s3-region = ".*"|s3-region = "$s3-region"|' "$config_file" +sed -i 's|s3-bucket = ".*"|s3-bucket = "$s3-bucket"|' "$config_file" + +$CLIENT_FUSE_DIR/target/debug/gvfs-fuse $mount_dir $fileset $config_file + + From 90bd355323ebfad3d1ec013d520fe3590c6bff18 Mon Sep 17 00:00:00 2001 From: yuhui Date: Mon, 6 Jan 2025 15:04:59 +0800 Subject: [PATCH 03/26] Add integration testers script --- clients/filesystem-fuse/tests/{ => bin}/start_backend.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename clients/filesystem-fuse/tests/{ => bin}/start_backend.sh (100%) diff --git a/clients/filesystem-fuse/tests/start_backend.sh b/clients/filesystem-fuse/tests/bin/start_backend.sh similarity index 100% rename from clients/filesystem-fuse/tests/start_backend.sh rename to clients/filesystem-fuse/tests/bin/start_backend.sh From c0d8d10f8c70a2ebf1b60ea6241f258192c76aee Mon Sep 17 00:00:00 2001 From: yuhui Date: Mon, 6 Jan 2025 15:17:17 +0800 Subject: [PATCH 04/26] Fix --- .../tests/bin/start_backend.sh | 66 ++++++++++++------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/clients/filesystem-fuse/tests/bin/start_backend.sh b/clients/filesystem-fuse/tests/bin/start_backend.sh index a5d55671ae0..624d7a8fa45 100755 --- a/clients/filesystem-fuse/tests/bin/start_backend.sh +++ b/clients/filesystem-fuse/tests/bin/start_backend.sh @@ -19,15 +19,15 @@ set -euo pipefail -s3-access_key_id=${s3-access_key_id:-} -s3-secret_access=${s3-secret_access:-} -s3-region=${s3-region:-} -s3-bucket=${s3-bucket:-} +S3-ACCESS_KEY_ID=${S3-ACCESS_KEY_ID:-} +S3-SECRET_ACCESS=${S3-SECRET_ACCESS:-} +S3-REGION=${S3-REGION:-} +S3-BUCKET=${S3-BUCKET:-} # Check required environment variables -if [[ -z "$s3-access_key_id" || -z "$s3-secret_access" || -z "$s3-region" || -z "$s3-bucket" ]]; then +if [[ -z "$S3-ACCESS_KEY_ID" || -z "$S3-SECRET_ACCESS" || -z "$S3-REGION" || -z "$S3-BUCKET" ]]; then echo "Error: One or more required S3 environment variables are not set." - echo "Please set: s3-access_key_id, s3-secret_access, s3-region, s3-bucket." + echo "Please set: S3-ACCESS_KEY_ID, S3-SECRET_ACCESS, S3-REGION, S3-BUCKET." exit 1 fi @@ -40,7 +40,7 @@ $GRAVITINO_SERVER_DIR/bin/start_gravitino_server.sh GRAVITINO_SERVER_URL=http://localhost:8090 -curl $GRAVITINO_SERVER_URL/api/metalakes +check_server_ready "$GRAVITINO_SERVER_URL/api/metalakes" # create metalake curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ @@ -56,10 +56,10 @@ curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ "comment": "comment", "provider": "hadoop", "properties": { - "location": "s3a://'"$s3-bucket"'", - "s3-access-key-id": "'"$s3-access_key_id"'", - "s3-secret-access-key": "'"$s3-secret_access"'", - "s3-endpoint": "http://s3.'"$s3-region"'.amazonaws.com", + "location": "s3a://'"$S3-BUCKET"'", + "s3-access-key-id": "'"$S3-ACCESS_KEY_ID"'", + "s3-secret-access-key": "'"$S3-SECRET_ACCESS"'", + "s3-endpoint": "http://s3.'"$S3-REGION"'.amazonaws.com", "filesystem-providers": "s3" } }' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs @@ -70,7 +70,7 @@ curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ "name":"s1","comment":"comment","properties":{} }' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs/catalog/schemas -# create fileset +# create FILESET curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ -H "Content-Type: application/json" -d '{ "name":"fileset1","comment":"comment","properties":{} @@ -79,26 +79,44 @@ curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ echo "Start the Gvfs fuse client" -mount_dir=$CLIENT_FUSE_DIR/target/gvfs -if [ -d "$mount_dir" ]; then +MOUNT_DIR=$CLIENT_FUSE_DIR/target/gvfs +if [ -d "$MOUNT_DIR" ]; then echo "Unmount the existing mount point" - fusermount -u $mount_dir + fusermount -u $MOUNT_DIR else echo "Create the mount point" - mkdir -p $mount_dir + mkdir -p $MOUNT_DIR fi -fileset=gvfs://fileset/test/c1/s1/fileset1 +FILESET=gvfs://fileset/test/c1/s1/fileset1 -config_file=$CLIENT_FUSE_DIR/target/debug/gvfs-fuse.toml -cp $CLIENT_FUSE_DIR/test/conf/gvfs_fuse-s3.toml $config_file +CONF_FILE=$CLIENT_FUSE_DIR/target/debug/gvfs-fuse.toml +cp $CLIENT_FUSE_DIR/test/conf/gvfs_fuse-s3.toml $CONF_FILE -sed -i 's|s3-access_key_id = ".*"|s3-access_key_id = "$s3-access_key_id"|' "$config_file" -sed -i 's|s3-secret_access_key = ".*"|s3-secret_access_key = "$s3-secret_access"|' "$config_file" -sed -i 's|s3-region = ".*"|s3-region = "$s3-region"|' "$config_file" -sed -i 's|s3-bucket = ".*"|s3-bucket = "$s3-bucket"|' "$config_file" +sed -i 's|S3-ACCESS_KEY_ID = ".*"|S3-ACCESS_KEY_ID = "$S3-ACCESS_KEY_ID"|' "$CONF_FILE" +sed -i 's|S3-SECRET_ACCESS_key = ".*"|S3-SECRET_ACCESS_key = "$S3-SECRET_ACCESS"|' "$CONF_FILE" +sed -i 's|S3-REGION = ".*"|S3-REGION = "$S3-REGION"|' "$CONF_FILE" +sed -i 's|S3-BUCKET = ".*"|S3-BUCKET = "$S3-BUCKET"|' "$CONF_FILE" -$CLIENT_FUSE_DIR/target/debug/gvfs-fuse $mount_dir $fileset $config_file +$CLIENT_FUSE_DIR/target/debug/gvfs-fuse $MOUNT_DIR $FILESET $CONF_FILE +check_server_ready() { + local url=$1 + local retries=10 # Number of retries + local wait_time=3 # Wait time between retries (seconds) + + for ((i=1; i<=retries; i++)); do + if curl --silent --head --fail "$url" >/dev/null; then + echo "Gravitino server is ready." + return 0 + else + echo "Attempt $i/$retries: Server not ready. Retrying in $wait_time seconds..." + sleep "$wait_time" + fi + done + + echo "Error: Gravitino server did not become ready after $((retries * wait_time)) seconds." + exit 1 +} From c1bd3cd486808fe1986807725c59ccc9b2bb13b9 Mon Sep 17 00:00:00 2001 From: yuhui Date: Mon, 6 Jan 2025 15:23:15 +0800 Subject: [PATCH 05/26] Fix --- clients/filesystem-fuse/tests/bin/start_backend.sh | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/clients/filesystem-fuse/tests/bin/start_backend.sh b/clients/filesystem-fuse/tests/bin/start_backend.sh index 624d7a8fa45..1ab188bb87c 100755 --- a/clients/filesystem-fuse/tests/bin/start_backend.sh +++ b/clients/filesystem-fuse/tests/bin/start_backend.sh @@ -93,11 +93,13 @@ FILESET=gvfs://fileset/test/c1/s1/fileset1 CONF_FILE=$CLIENT_FUSE_DIR/target/debug/gvfs-fuse.toml cp $CLIENT_FUSE_DIR/test/conf/gvfs_fuse-s3.toml $CONF_FILE - -sed -i 's|S3-ACCESS_KEY_ID = ".*"|S3-ACCESS_KEY_ID = "$S3-ACCESS_KEY_ID"|' "$CONF_FILE" -sed -i 's|S3-SECRET_ACCESS_key = ".*"|S3-SECRET_ACCESS_key = "$S3-SECRET_ACCESS"|' "$CONF_FILE" -sed -i 's|S3-REGION = ".*"|S3-REGION = "$S3-REGION"|' "$CONF_FILE" -sed -i 's|S3-BUCKET = ".*"|S3-BUCKET = "$S3-BUCKET"|' "$CONF_FILE" +awk '{ + if ($0 ~ /S3-ACCESS_KEY_ID/) $0 = "S3-ACCESS_KEY_ID = \"" ENVIRON["S3_ACCESS_KEY_ID"] "\""; + if ($0 ~ /S3-SECRET_ACCESS_KEY/) $0 = "S3-SECRET_ACCESS_KEY = \"" ENVIRON["S3_SECRET_ACCESS"] "\""; + if ($0 ~ /S3-REGION/) $0 = "S3-REGION = \"" ENVIRON["S3_REGION"] "\""; + if ($0 ~ /S3-BUCKET/) $0 = "S3-BUCKET = \"" ENVIRON["S3_BUCKET"] "\""; + print +}' $CLIENT_FUSE_DIR/test/conf/gvfs_fuse-s3.toml > "$CONF_FILE" $CLIENT_FUSE_DIR/target/debug/gvfs-fuse $MOUNT_DIR $FILESET $CONF_FILE From 6dff6cd092e81e690fdd8d9495f5b12a38038687 Mon Sep 17 00:00:00 2001 From: yuhui Date: Mon, 6 Jan 2025 17:08:31 +0800 Subject: [PATCH 06/26] Fix --- .../tests/bin/start_backend.sh | 101 ++++++++++-------- 1 file changed, 57 insertions(+), 44 deletions(-) diff --git a/clients/filesystem-fuse/tests/bin/start_backend.sh b/clients/filesystem-fuse/tests/bin/start_backend.sh index 1ab188bb87c..e1f8245aff6 100755 --- a/clients/filesystem-fuse/tests/bin/start_backend.sh +++ b/clients/filesystem-fuse/tests/bin/start_backend.sh @@ -17,28 +17,56 @@ # specific language governing permissions and limitations # under the License. - set -euo pipefail +set -euo pipefail -S3-ACCESS_KEY_ID=${S3-ACCESS_KEY_ID:-} -S3-SECRET_ACCESS=${S3-SECRET_ACCESS:-} -S3-REGION=${S3-REGION:-} -S3-BUCKET=${S3-BUCKET:-} +S3_ACCESS_KEY_ID=${S3_ACCESS_KEY_ID:-} +S3_SECRET_ACCESS=${S3_SECRET_ACCESS:-} +S3_REGION=${S3_REGION:-} +S3_BUCKET=${S3_BUCKET:-} # Check required environment variables -if [[ -z "$S3-ACCESS_KEY_ID" || -z "$S3-SECRET_ACCESS" || -z "$S3-REGION" || -z "$S3-BUCKET" ]]; then +if [[ -z "$S3_ACCESS_KEY_ID" || -z "$S3_SECRET_ACCESS" || -z "$S3_REGION" || -z "$S3_BUCKET" ]]; then echo "Error: One or more required S3 environment variables are not set." - echo "Please set: S3-ACCESS_KEY_ID, S3-SECRET_ACCESS, S3-REGION, S3-BUCKET." + echo "Please set: S3_ACCESS_KEY_ID, S3_SECRET_ACCESS, S3_REGION, S3_BUCKET." exit 1 fi -GRAVITINO_SERVER_HOME=../../.. -GRAVITINO_SERVER_DIR=$GRAVITINO_SERVER_HOME/distribution/package -CLIENT_FUSE_DIR=$GRAVITINO_SERVER_HOME/clients/filesystem-fuse +GRAVITINO_HOME=../../../.. +GRAVITINO_SERVER_DIR=$GRAVITINO_HOME/distribution/package +CLIENT_FUSE_DIR=$GRAVITINO_HOME/clients/filesystem-fuse +GRAVITINO_SERVER_URL=http://localhost:8090 + +# copy the aws-bundle to the server +if ls $GRAVITINO_SERVER_DIR/catalogs/hadoop/libs/gravitino-aws-bundle-*-incubating-SNAPSHOT.jar 1>/dev/null 2>&1; then + echo "File exists, skipping copy." +else + cp $GRAVITINO_HOME/bundles/aws-bundle/build/libs/gravitino-aws-bundle-*-incubating-SNAPSHOT.jar \ + $GRAVITINO_SERVER_DIR/catalogs/hadoop/libs +fi + echo "Start the Gravitino server" -$GRAVITINO_SERVER_DIR/bin/start_gravitino_server.sh +rm -rf $GRAVITINO_SERVER_DIR/data +$GRAVITINO_SERVER_DIR/bin/gravitino.sh restart -GRAVITINO_SERVER_URL=http://localhost:8090 +check_server_ready() { + local url=$1 + local retries=10 # Number of retries + local wait_time=1 # Wait time between retries (seconds) + + for ((i=1; i<=retries; i++)); do + if curl --silent --head --fail "$url" >/dev/null; then + echo "Gravitino server is ready." + return 0 + else + echo "Attempt $i/$retries: Server not ready. Retrying in $wait_time seconds..." + sleep "$wait_time" + fi + done + + echo "Error: Gravitino server did not become ready after $((retries * wait_time)) seconds." + exit 1 +} check_server_ready "$GRAVITINO_SERVER_URL/api/metalakes" @@ -56,10 +84,10 @@ curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ "comment": "comment", "provider": "hadoop", "properties": { - "location": "s3a://'"$S3-BUCKET"'", - "s3-access-key-id": "'"$S3-ACCESS_KEY_ID"'", - "s3-secret-access-key": "'"$S3-SECRET_ACCESS"'", - "s3-endpoint": "http://s3.'"$S3-REGION"'.amazonaws.com", + "location": "s3a://'"$S3_BUCKET"'", + "s3-access-key-id": "'"$S3_ACCESS_KEY_ID"'", + "s3-secret-access-key": "'"$S3_SECRET_ACCESS"'", + "s3-endpoint": "http://s3.'"$S3_REGION"'.amazonaws.com", "filesystem-providers": "s3" } }' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs @@ -68,7 +96,7 @@ curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ -H "Content-Type: application/json" -d '{ "name":"s1","comment":"comment","properties":{} -}' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs/catalog/schemas +}' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs/c1/schemas # create FILESET curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ @@ -82,7 +110,7 @@ echo "Start the Gvfs fuse client" MOUNT_DIR=$CLIENT_FUSE_DIR/target/gvfs if [ -d "$MOUNT_DIR" ]; then echo "Unmount the existing mount point" - fusermount -u $MOUNT_DIR + umount -l $MOUNT_DIR > /dev/null 2>&1 || true else echo "Create the mount point" mkdir -p $MOUNT_DIR @@ -91,34 +119,19 @@ fi FILESET=gvfs://fileset/test/c1/s1/fileset1 CONF_FILE=$CLIENT_FUSE_DIR/target/debug/gvfs-fuse.toml -cp $CLIENT_FUSE_DIR/test/conf/gvfs_fuse-s3.toml $CONF_FILE -awk '{ - if ($0 ~ /S3-ACCESS_KEY_ID/) $0 = "S3-ACCESS_KEY_ID = \"" ENVIRON["S3_ACCESS_KEY_ID"] "\""; - if ($0 ~ /S3-SECRET_ACCESS_KEY/) $0 = "S3-SECRET_ACCESS_KEY = \"" ENVIRON["S3_SECRET_ACCESS"] "\""; - if ($0 ~ /S3-REGION/) $0 = "S3-REGION = \"" ENVIRON["S3_REGION"] "\""; - if ($0 ~ /S3-BUCKET/) $0 = "S3-BUCKET = \"" ENVIRON["S3_BUCKET"] "\""; - print -}' $CLIENT_FUSE_DIR/test/conf/gvfs_fuse-s3.toml > "$CONF_FILE" +awk -v access_key="$S3_ACCESS_KEY_ID" \ + -v secret_key="$S3_SECRET_ACCESS" \ + -v region="$S3_REGION" \ + -v bucket="$S3_BUCKET" \ + 'BEGIN { in_extend_config = 0 } + /^\[extend_config\]/ { in_extend_config = 1 } + in_extend_config && /s3-access_key_id/ { $0 = "s3-access_key_id = \"" access_key "\"" } + in_extend_config && /s3-secret_access_key/ { $0 = "s3-secret_access_key = \"" secret_key "\"" } + in_extend_config && /s3-region/ { $0 = "s3-region = \"" region "\"" } + in_extend_config && /s3-bucket/ { $0 = "s3-bucket = \"" bucket "\"" } + { print }' $CLIENT_FUSE_DIR/tests/conf/gvfs_fuse_s3.toml > "$CONF_FILE" $CLIENT_FUSE_DIR/target/debug/gvfs-fuse $MOUNT_DIR $FILESET $CONF_FILE -check_server_ready() { - local url=$1 - local retries=10 # Number of retries - local wait_time=3 # Wait time between retries (seconds) - - for ((i=1; i<=retries; i++)); do - if curl --silent --head --fail "$url" >/dev/null; then - echo "Gravitino server is ready." - return 0 - else - echo "Attempt $i/$retries: Server not ready. Retrying in $wait_time seconds..." - sleep "$wait_time" - fi - done - - echo "Error: Gravitino server did not become ready after $((retries * wait_time)) seconds." - exit 1 -} From 6ba8391ef0632f818823a52d79c56e7bfc489544 Mon Sep 17 00:00:00 2001 From: yuhui Date: Mon, 6 Jan 2025 17:24:18 +0800 Subject: [PATCH 07/26] Fix --- clients/filesystem-fuse/tests/bin/start_backend.sh | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/filesystem-fuse/tests/bin/start_backend.sh b/clients/filesystem-fuse/tests/bin/start_backend.sh index e1f8245aff6..115777e3d2c 100755 --- a/clients/filesystem-fuse/tests/bin/start_backend.sh +++ b/clients/filesystem-fuse/tests/bin/start_backend.sh @@ -108,10 +108,9 @@ curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ echo "Start the Gvfs fuse client" MOUNT_DIR=$CLIENT_FUSE_DIR/target/gvfs -if [ -d "$MOUNT_DIR" ]; then - echo "Unmount the existing mount point" - umount -l $MOUNT_DIR > /dev/null 2>&1 || true -else + +umount $MOUNT_DIR > /dev/null 2>&1 || true +if [ ! -d "$MOUNT_DIR" ]; then echo "Create the mount point" mkdir -p $MOUNT_DIR fi From 9611a3c0ef3a7dc6a3d1ffa2653d63aac62a7837 Mon Sep 17 00:00:00 2001 From: yuhui Date: Tue, 7 Jan 2025 09:58:41 +0800 Subject: [PATCH 08/26] Update test script --- clients/filesystem-fuse/tests/bin/start_backend.sh | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/clients/filesystem-fuse/tests/bin/start_backend.sh b/clients/filesystem-fuse/tests/bin/start_backend.sh index 115777e3d2c..b12b0a56333 100755 --- a/clients/filesystem-fuse/tests/bin/start_backend.sh +++ b/clients/filesystem-fuse/tests/bin/start_backend.sh @@ -131,6 +131,18 @@ awk -v access_key="$S3_ACCESS_KEY_ID" \ in_extend_config && /s3-bucket/ { $0 = "s3-bucket = \"" bucket "\"" } { print }' $CLIENT_FUSE_DIR/tests/conf/gvfs_fuse_s3.toml > "$CONF_FILE" -$CLIENT_FUSE_DIR/target/debug/gvfs-fuse $MOUNT_DIR $FILESET $CONF_FILE +# Start the gvfs-fuse process in the background +$CLIENT_FUSE_DIR/target/debug/gvfs-fuse $MOUNT_DIR $FILESET $CONF_FILE & +FUSE_PID=$! +# run the integration test +cd $CLIENT_FUSE_DIR +make integration_test +cd - +# Stop the gvfs-fuse process after the test completes +echo "Stopping the Gvfs fuse client..." +kill -INT $FUSE_PID + +# Stop the Gravitino server +$GRAVITINO_SERVER_DIR/bin/gravitino.sh stop From 9459887bc24eeffff4bd9991865a47b6c5373840 Mon Sep 17 00:00:00 2001 From: yuhui Date: Thu, 16 Jan 2025 10:20:41 +0800 Subject: [PATCH 09/26] Add integration test --- .../tests/bin/start_backend.sh | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/clients/filesystem-fuse/tests/bin/start_backend.sh b/clients/filesystem-fuse/tests/bin/start_backend.sh index b12b0a56333..1c472665d58 100755 --- a/clients/filesystem-fuse/tests/bin/start_backend.sh +++ b/clients/filesystem-fuse/tests/bin/start_backend.sh @@ -32,6 +32,7 @@ if [[ -z "$S3_ACCESS_KEY_ID" || -z "$S3_SECRET_ACCESS" || -z "$S3_REGION" || -z fi GRAVITINO_HOME=../../../.. +GRAVITINO_HOME=$(cd $GRAVITINO_HOME && pwd) GRAVITINO_SERVER_DIR=$GRAVITINO_HOME/distribution/package CLIENT_FUSE_DIR=$GRAVITINO_HOME/clients/filesystem-fuse GRAVITINO_SERVER_URL=http://localhost:8090 @@ -49,7 +50,7 @@ echo "Start the Gravitino server" rm -rf $GRAVITINO_SERVER_DIR/data $GRAVITINO_SERVER_DIR/bin/gravitino.sh restart -check_server_ready() { +check_gravitino_server_ready() { local url=$1 local retries=10 # Number of retries local wait_time=1 # Wait time between retries (seconds) @@ -68,7 +69,7 @@ check_server_ready() { exit 1 } -check_server_ready "$GRAVITINO_SERVER_URL/api/metalakes" +check_gravitino_server_ready "$GRAVITINO_SERVER_URL/api/metalakes" # create metalake curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ @@ -115,7 +116,7 @@ if [ ! -d "$MOUNT_DIR" ]; then mkdir -p $MOUNT_DIR fi -FILESET=gvfs://fileset/test/c1/s1/fileset1 +MOUNT_FROM_LOCATION=gvfs://fileset/test/c1/s1/fileset1 CONF_FILE=$CLIENT_FUSE_DIR/target/debug/gvfs-fuse.toml @@ -131,18 +132,27 @@ awk -v access_key="$S3_ACCESS_KEY_ID" \ in_extend_config && /s3-bucket/ { $0 = "s3-bucket = \"" bucket "\"" } { print }' $CLIENT_FUSE_DIR/tests/conf/gvfs_fuse_s3.toml > "$CONF_FILE" +cleanup() { + # Stop the gvfs-fuse process if it's running + if [ -n "$FUSE_PID" ] && ps -p $FUSE_PID > /dev/null; then + echo "Stopping gvfs-fuse..." + kill -INT $FUSE_PID + else + echo "gvfs-fuse process not found or already stopped." + fi + + # Stop the Gravitino server + echo "Stopping Gravitino server..." + $GRAVITINO_SERVER_DIR/bin/gravitino.sh stop || echo "Failed to stop Gravitino server." +} +trap cleanup EXIT + # Start the gvfs-fuse process in the background -$CLIENT_FUSE_DIR/target/debug/gvfs-fuse $MOUNT_DIR $FILESET $CONF_FILE & +$CLIENT_FUSE_DIR/target/debug/gvfs-fuse $MOUNT_DIR $MOUNT_FROM_LOCATION $CONF_FILE > $CLIENT_FUSE_DIR/target/debug/fuse.log 2>&1 & FUSE_PID=$! +echo "Gvfs fuse started with PID: $FUSE_PID" # run the integration test cd $CLIENT_FUSE_DIR -make integration_test -cd - - -# Stop the gvfs-fuse process after the test completes -echo "Stopping the Gvfs fuse client..." -kill -INT $FUSE_PID - -# Stop the Gravitino server -$GRAVITINO_SERVER_DIR/bin/gravitino.sh stop +export RUN_TEST_WITH_BACKGROUND=1 +cargo test --test fuse_test test_fuse_system_with_manual -- --exact From ccac1be5b13aebf13a604742b8e32f75c40942a4 Mon Sep 17 00:00:00 2001 From: yuhui Date: Tue, 7 Jan 2025 16:10:40 +0800 Subject: [PATCH 10/26] fix some logs --- .../tests/bin/start_backend.sh | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/clients/filesystem-fuse/tests/bin/start_backend.sh b/clients/filesystem-fuse/tests/bin/start_backend.sh index 1c472665d58..7118f8e6496 100755 --- a/clients/filesystem-fuse/tests/bin/start_backend.sh +++ b/clients/filesystem-fuse/tests/bin/start_backend.sh @@ -71,14 +71,15 @@ check_gravitino_server_ready() { check_gravitino_server_ready "$GRAVITINO_SERVER_URL/api/metalakes" +echo "Create the metalake, catalog, schema, and fileset" # create metalake -curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ +curl -s -o /dev/null -X POST -H "Accept: application/vnd.gravitino.v1+json" \ -H "Content-Type: application/json" -d '{ "name":"test","comment":"comment","properties":{} }' $GRAVITINO_SERVER_URL/api/metalakes # create catalog -curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ +curl -s -o /dev/null -X POST -H "Accept: application/vnd.gravitino.v1+json" \ -H "Content-Type: application/json" -d '{ "name": "c1", "type": "FILESET", @@ -94,18 +95,17 @@ curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ }' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs # create schema -curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ +curl -s -o /dev/null -X POST -H "Accept: application/vnd.gravitino.v1+json" \ -H "Content-Type: application/json" -d '{ "name":"s1","comment":"comment","properties":{} }' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs/c1/schemas # create FILESET -curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \ +curl -s -o /dev/null -X POST -H "Accept: application/vnd.gravitino.v1+json" \ -H "Content-Type: application/json" -d '{ "name":"fileset1","comment":"comment","properties":{} }' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs/c1/schemas/s1/filesets - echo "Start the Gvfs fuse client" MOUNT_DIR=$CLIENT_FUSE_DIR/target/gvfs @@ -152,7 +152,31 @@ $CLIENT_FUSE_DIR/target/debug/gvfs-fuse $MOUNT_DIR $MOUNT_FROM_LOCATION $CONF_FI FUSE_PID=$! echo "Gvfs fuse started with PID: $FUSE_PID" +#check the gvfs-fuse is ready +check_gvfs_fuse_ready() { + local retries=10 + local wait_time=1 + + for ((i=1; i<=retries; i++)); do + # check the $MOUNT_DIR/.gvfs_meta is exist + if [ -f "$MOUNT_DIR/.gvfs_meta" ]; then + echo "Gvfs fuse is ready." + return 0 + else + echo "Attempt $i/$retries: Gvfs fuse not ready. Retrying in $wait_time seconds..." + sleep "$wait_time" + fi + done + + echo "Error: Gvfs fuse did not become ready after $((retries * wait_time)) seconds." + exit 1 +} + +check_gvfs_fuse_ready + # run the integration test cd $CLIENT_FUSE_DIR export RUN_TEST_WITH_BACKGROUND=1 cargo test --test fuse_test test_fuse_system_with_manual -- --exact + +sleep 3 From d56c8db4e802d20f90c32b16002f167af5dc51f1 Mon Sep 17 00:00:00 2001 From: yuhui Date: Thu, 16 Jan 2025 10:22:02 +0800 Subject: [PATCH 11/26] Test s3 --- .../bin/{start_backend.sh => s3_fileset_it.sh} | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) rename clients/filesystem-fuse/tests/bin/{start_backend.sh => s3_fileset_it.sh} (97%) diff --git a/clients/filesystem-fuse/tests/bin/start_backend.sh b/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh similarity index 97% rename from clients/filesystem-fuse/tests/bin/start_backend.sh rename to clients/filesystem-fuse/tests/bin/s3_fileset_it.sh index 7118f8e6496..82ffef807ab 100755 --- a/clients/filesystem-fuse/tests/bin/start_backend.sh +++ b/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh @@ -19,6 +19,9 @@ set -euo pipefail +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + S3_ACCESS_KEY_ID=${S3_ACCESS_KEY_ID:-} S3_SECRET_ACCESS=${S3_SECRET_ACCESS:-} S3_REGION=${S3_REGION:-} @@ -147,7 +150,13 @@ cleanup() { } trap cleanup EXIT +cd $CLIENT_FUSE_DIR + # Start the gvfs-fuse process in the background +echo "Starting gvfs-fuse" +# Build the gvfs-fuse +make build + $CLIENT_FUSE_DIR/target/debug/gvfs-fuse $MOUNT_DIR $MOUNT_FROM_LOCATION $CONF_FILE > $CLIENT_FUSE_DIR/target/debug/fuse.log 2>&1 & FUSE_PID=$! echo "Gvfs fuse started with PID: $FUSE_PID" @@ -175,8 +184,6 @@ check_gvfs_fuse_ready() { check_gvfs_fuse_ready # run the integration test -cd $CLIENT_FUSE_DIR + export RUN_TEST_WITH_BACKGROUND=1 cargo test --test fuse_test test_fuse_system_with_manual -- --exact - -sleep 3 From 32d5a863a9984457edad5fef457dd799c13902e0 Mon Sep 17 00:00:00 2001 From: yuhui Date: Tue, 7 Jan 2025 20:03:16 +0800 Subject: [PATCH 12/26] Update --- clients/filesystem-fuse/src/utils.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/clients/filesystem-fuse/src/utils.rs b/clients/filesystem-fuse/src/utils.rs index 73af044ee4c..53eb9179d71 100644 --- a/clients/filesystem-fuse/src/utils.rs +++ b/clients/filesystem-fuse/src/utils.rs @@ -18,7 +18,6 @@ */ use crate::error::ErrorCode::InvalidConfig; use crate::error::GvfsError; -use opendal::Operator; use reqwest::Url; use std::path::PathBuf; @@ -37,19 +36,6 @@ pub(crate) fn extract_root_path(location: &str) -> GvfsResult { Ok(PathBuf::from(url.path())) } -pub(crate) async fn delete_dir(op: &Operator, dir_name: &str) { - let childs = op.list(dir_name).await.expect("list dir failed"); - for child in childs { - let child_name = dir_name.to_string() + child.name(); - if child.metadata().is_dir() { - Box::pin(delete_dir(op, &child_name)).await; - } else { - op.delete(&child_name).await.expect("delete file failed"); - } - } - op.delete(dir_name).await.expect("delete dir failed"); -} - #[cfg(test)] mod tests { use crate::utils::extract_root_path; From da1832eac9ae353ff8c43200ae3ca502ca84f106 Mon Sep 17 00:00:00 2001 From: yuhui Date: Wed, 8 Jan 2025 16:25:08 +0800 Subject: [PATCH 13/26] refector s3 testcases --- clients/filesystem-fuse/tests/fuse_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/filesystem-fuse/tests/fuse_test.rs b/clients/filesystem-fuse/tests/fuse_test.rs index 41e385c49f1..97f189df793 100644 --- a/clients/filesystem-fuse/tests/fuse_test.rs +++ b/clients/filesystem-fuse/tests/fuse_test.rs @@ -86,7 +86,7 @@ impl Drop for FuseTest { } #[test] -fn test_fuse_with_memory_fs() { +fn test_fuse_system_with_auto() { tracing_subscriber::fmt().init(); panic::set_hook(Box::new(|info| { From cfde267af8145e9a7b122acbb77c42cd69b0cada Mon Sep 17 00:00:00 2001 From: yuhui Date: Wed, 8 Jan 2025 16:37:52 +0800 Subject: [PATCH 14/26] Fix --- clients/filesystem-fuse/tests/bin/s3_fileset_it.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh b/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh index 82ffef807ab..0cef0544336 100755 --- a/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh +++ b/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh @@ -92,7 +92,7 @@ curl -s -o /dev/null -X POST -H "Accept: application/vnd.gravitino.v1+json" \ "location": "s3a://'"$S3_BUCKET"'", "s3-access-key-id": "'"$S3_ACCESS_KEY_ID"'", "s3-secret-access-key": "'"$S3_SECRET_ACCESS"'", - "s3-endpoint": "http://s3.'"$S3_REGION"'.amazonaws.com", + "s3-endpoint": "https://s3.amazonaws.com", "filesystem-providers": "s3" } }' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs @@ -185,5 +185,5 @@ check_gvfs_fuse_ready # run the integration test -export RUN_TEST_WITH_BACKGROUND=1 +export RUN_TEST_WITH_BACKEND=1 cargo test --test fuse_test test_fuse_system_with_manual -- --exact From 08b84365fc01b4269a41a64ebed7dfe617a12acf Mon Sep 17 00:00:00 2001 From: yuhui Date: Thu, 9 Jan 2025 11:28:32 +0800 Subject: [PATCH 15/26] Integration with localstack --- .../tests/bin/s3_fileset_it.sh | 190 +++--------------- clients/filesystem-fuse/tests/bin/s3_test.sh | 62 ++++++ clients/filesystem-fuse/tests/fuse_test.rs | 2 +- 3 files changed, 96 insertions(+), 158 deletions(-) create mode 100644 clients/filesystem-fuse/tests/bin/s3_test.sh diff --git a/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh b/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh index 0cef0544336..f72ca95f07d 100755 --- a/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh +++ b/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh @@ -1,5 +1,4 @@ #!/bin/bash - # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -22,168 +21,45 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$SCRIPT_DIR" -S3_ACCESS_KEY_ID=${S3_ACCESS_KEY_ID:-} -S3_SECRET_ACCESS=${S3_SECRET_ACCESS:-} -S3_REGION=${S3_REGION:-} -S3_BUCKET=${S3_BUCKET:-} - -# Check required environment variables -if [[ -z "$S3_ACCESS_KEY_ID" || -z "$S3_SECRET_ACCESS" || -z "$S3_REGION" || -z "$S3_BUCKET" ]]; then - echo "Error: One or more required S3 environment variables are not set." - echo "Please set: S3_ACCESS_KEY_ID, S3_SECRET_ACCESS, S3_REGION, S3_BUCKET." - exit 1 -fi - -GRAVITINO_HOME=../../../.. -GRAVITINO_HOME=$(cd $GRAVITINO_HOME && pwd) -GRAVITINO_SERVER_DIR=$GRAVITINO_HOME/distribution/package -CLIENT_FUSE_DIR=$GRAVITINO_HOME/clients/filesystem-fuse -GRAVITINO_SERVER_URL=http://localhost:8090 - -# copy the aws-bundle to the server -if ls $GRAVITINO_SERVER_DIR/catalogs/hadoop/libs/gravitino-aws-bundle-*-incubating-SNAPSHOT.jar 1>/dev/null 2>&1; then - echo "File exists, skipping copy." -else - cp $GRAVITINO_HOME/bundles/aws-bundle/build/libs/gravitino-aws-bundle-*-incubating-SNAPSHOT.jar \ - $GRAVITINO_SERVER_DIR/catalogs/hadoop/libs -fi - +source ./env.sh -echo "Start the Gravitino server" -rm -rf $GRAVITINO_SERVER_DIR/data -$GRAVITINO_SERVER_DIR/bin/gravitino.sh restart +source ./gravitino_server.sh +source ./gvfs_fuse.sh +source ./localstatck.sh -check_gravitino_server_ready() { - local url=$1 - local retries=10 # Number of retries - local wait_time=1 # Wait time between retries (seconds) - - for ((i=1; i<=retries; i++)); do - if curl --silent --head --fail "$url" >/dev/null; then - echo "Gravitino server is ready." - return 0 - else - echo "Attempt $i/$retries: Server not ready. Retrying in $wait_time seconds..." - sleep "$wait_time" - fi - done - - echo "Error: Gravitino server did not become ready after $((retries * wait_time)) seconds." - exit 1 +start_servers() { + start_localstack + start_gravitino_server + start_gvfs_fuse } -check_gravitino_server_ready "$GRAVITINO_SERVER_URL/api/metalakes" - -echo "Create the metalake, catalog, schema, and fileset" -# create metalake -curl -s -o /dev/null -X POST -H "Accept: application/vnd.gravitino.v1+json" \ --H "Content-Type: application/json" -d '{ - "name":"test","comment":"comment","properties":{} -}' $GRAVITINO_SERVER_URL/api/metalakes - -# create catalog -curl -s -o /dev/null -X POST -H "Accept: application/vnd.gravitino.v1+json" \ --H "Content-Type: application/json" -d '{ - "name": "c1", - "type": "FILESET", - "comment": "comment", - "provider": "hadoop", - "properties": { - "location": "s3a://'"$S3_BUCKET"'", - "s3-access-key-id": "'"$S3_ACCESS_KEY_ID"'", - "s3-secret-access-key": "'"$S3_SECRET_ACCESS"'", - "s3-endpoint": "https://s3.amazonaws.com", - "filesystem-providers": "s3" - } -}' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs - -# create schema -curl -s -o /dev/null -X POST -H "Accept: application/vnd.gravitino.v1+json" \ --H "Content-Type: application/json" -d '{ - "name":"s1","comment":"comment","properties":{} -}' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs/c1/schemas - -# create FILESET -curl -s -o /dev/null -X POST -H "Accept: application/vnd.gravitino.v1+json" \ --H "Content-Type: application/json" -d '{ - "name":"fileset1","comment":"comment","properties":{} -}' $GRAVITINO_SERVER_URL/api/metalakes/test/catalogs/c1/schemas/s1/filesets - -echo "Start the Gvfs fuse client" - -MOUNT_DIR=$CLIENT_FUSE_DIR/target/gvfs - -umount $MOUNT_DIR > /dev/null 2>&1 || true -if [ ! -d "$MOUNT_DIR" ]; then - echo "Create the mount point" - mkdir -p $MOUNT_DIR -fi - -MOUNT_FROM_LOCATION=gvfs://fileset/test/c1/s1/fileset1 - -CONF_FILE=$CLIENT_FUSE_DIR/target/debug/gvfs-fuse.toml - -awk -v access_key="$S3_ACCESS_KEY_ID" \ - -v secret_key="$S3_SECRET_ACCESS" \ - -v region="$S3_REGION" \ - -v bucket="$S3_BUCKET" \ - 'BEGIN { in_extend_config = 0 } - /^\[extend_config\]/ { in_extend_config = 1 } - in_extend_config && /s3-access_key_id/ { $0 = "s3-access_key_id = \"" access_key "\"" } - in_extend_config && /s3-secret_access_key/ { $0 = "s3-secret_access_key = \"" secret_key "\"" } - in_extend_config && /s3-region/ { $0 = "s3-region = \"" region "\"" } - in_extend_config && /s3-bucket/ { $0 = "s3-bucket = \"" bucket "\"" } - { print }' $CLIENT_FUSE_DIR/tests/conf/gvfs_fuse_s3.toml > "$CONF_FILE" - -cleanup() { - # Stop the gvfs-fuse process if it's running - if [ -n "$FUSE_PID" ] && ps -p $FUSE_PID > /dev/null; then - echo "Stopping gvfs-fuse..." - kill -INT $FUSE_PID - else - echo "gvfs-fuse process not found or already stopped." - fi - - # Stop the Gravitino server - echo "Stopping Gravitino server..." - $GRAVITINO_SERVER_DIR/bin/gravitino.sh stop || echo "Failed to stop Gravitino server." +stop_servers() { + set +e + stop_gvfs_fuse + stop_gravitino_server + stop_localstack } -trap cleanup EXIT - -cd $CLIENT_FUSE_DIR -# Start the gvfs-fuse process in the background -echo "Starting gvfs-fuse" -# Build the gvfs-fuse -make build - -$CLIENT_FUSE_DIR/target/debug/gvfs-fuse $MOUNT_DIR $MOUNT_FROM_LOCATION $CONF_FILE > $CLIENT_FUSE_DIR/target/debug/fuse.log 2>&1 & -FUSE_PID=$! -echo "Gvfs fuse started with PID: $FUSE_PID" - -#check the gvfs-fuse is ready -check_gvfs_fuse_ready() { - local retries=10 - local wait_time=1 - - for ((i=1; i<=retries; i++)); do - # check the $MOUNT_DIR/.gvfs_meta is exist - if [ -f "$MOUNT_DIR/.gvfs_meta" ]; then - echo "Gvfs fuse is ready." - return 0 - else - echo "Attempt $i/$retries: Gvfs fuse not ready. Retrying in $wait_time seconds..." - sleep "$wait_time" - fi - done - - echo "Error: Gvfs fuse did not become ready after $((retries * wait_time)) seconds." +# Main logic based on parameters +if [ "$1" == "test" ]; then + trap stop_servers EXIT + start_servers + # Run the integration test + echo "Running tests..." + cd $CLIENT_FUSE_DIR + export RUN_TEST_WITH_FUSE=1 + cargo test --test fuse_test fuse_it_ +elif [ "$1" == "start" ]; then + # Start the servers + echo "Starting servers..." + start_servers +elif [ "$1" == "stop" ]; then + # Stop the servers + echo "Stopping servers..." + stop_servers +else + echo "Usage: $0 {test|start|stop}" exit 1 -} - -check_gvfs_fuse_ready +fi -# run the integration test -export RUN_TEST_WITH_BACKEND=1 -cargo test --test fuse_test test_fuse_system_with_manual -- --exact diff --git a/clients/filesystem-fuse/tests/bin/s3_test.sh b/clients/filesystem-fuse/tests/bin/s3_test.sh new file mode 100644 index 00000000000..2948aa9ba64 --- /dev/null +++ b/clients/filesystem-fuse/tests/bin/s3_test.sh @@ -0,0 +1,62 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + +source ./env.sh + +source ./localstatck.sh + +start_servers() { + start_localstack +} + +stop_servers() { + set +e + stop_localstack +} + +# Main logic based on parameters +if [ "$1" == "test" ]; then + trap stop_servers EXIT + start_servers + # Run the integration test + echo "Running tests..." + cd $CLIENT_FUSE_DIR + export RUN_TEST_WITH_S3=1 + cargo test s3_ut_ --lib + +elif [ "$1" == "start" ]; then + # Start the servers + echo "Starting servers..." + start_servers + +elif [ "$1" == "stop" ]; then + # Stop the servers + echo "Stopping servers..." + stop_servers + +else + echo "Usage: $0 {test|start|stop}" + exit 1 +fi + + diff --git a/clients/filesystem-fuse/tests/fuse_test.rs b/clients/filesystem-fuse/tests/fuse_test.rs index 97f189df793..41e385c49f1 100644 --- a/clients/filesystem-fuse/tests/fuse_test.rs +++ b/clients/filesystem-fuse/tests/fuse_test.rs @@ -86,7 +86,7 @@ impl Drop for FuseTest { } #[test] -fn test_fuse_system_with_auto() { +fn test_fuse_with_memory_fs() { tracing_subscriber::fmt().init(); panic::set_hook(Box::new(|info| { From b94ecd802b31b1d9a85a6c9e31cbcafca9d04f0b Mon Sep 17 00:00:00 2001 From: yuhui Date: Thu, 16 Jan 2025 10:32:50 +0800 Subject: [PATCH 16/26] Add pipline  Conflicts:  .github/workflows/gvfs-fuse-build-test.yml  clients/filesystem-fuse/tests/bin/gvfs_fuse.sh --- clients/filesystem-fuse/tests/bin/s3_fileset_it.sh | 7 ++++++- clients/filesystem-fuse/tests/bin/s3_test.sh | 4 +++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh b/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh index f72ca95f07d..20e10f6ef47 100755 --- a/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh +++ b/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh @@ -22,15 +22,17 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$SCRIPT_DIR" source ./env.sh - source ./gravitino_server.sh source ./gvfs_fuse.sh source ./localstatck.sh +TEST_CONFIG_FILE=$CLIENT_FUSE_DIR/target/debug/gvfs-fuse.toml + start_servers() { start_localstack start_gravitino_server start_gvfs_fuse + generate_test_config } stop_servers() { @@ -49,14 +51,17 @@ if [ "$1" == "test" ]; then cd $CLIENT_FUSE_DIR export RUN_TEST_WITH_FUSE=1 cargo test --test fuse_test fuse_it_ + elif [ "$1" == "start" ]; then # Start the servers echo "Starting servers..." start_servers + elif [ "$1" == "stop" ]; then # Stop the servers echo "Stopping servers..." stop_servers + else echo "Usage: $0 {test|start|stop}" exit 1 diff --git a/clients/filesystem-fuse/tests/bin/s3_test.sh b/clients/filesystem-fuse/tests/bin/s3_test.sh index 2948aa9ba64..ea232fe2fc7 100644 --- a/clients/filesystem-fuse/tests/bin/s3_test.sh +++ b/clients/filesystem-fuse/tests/bin/s3_test.sh @@ -21,12 +21,14 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$SCRIPT_DIR" -source ./env.sh +TEST_CONFIG_FILE=$CLIENT_FUSE_DIR/target/conf/gvfs_fuse_s3.toml +source ./env.sh source ./localstatck.sh start_servers() { start_localstack + generate_test_config } stop_servers() { From 52238fbd8bd2c3a936e9bdb9d0c29d72006aaa0f Mon Sep 17 00:00:00 2001 From: yuhui Date: Thu, 9 Jan 2025 15:53:26 +0800 Subject: [PATCH 17/26] Fix --- clients/filesystem-fuse/tests/bin/s3_test.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/filesystem-fuse/tests/bin/s3_test.sh b/clients/filesystem-fuse/tests/bin/s3_test.sh index ea232fe2fc7..ac5f9812c93 100644 --- a/clients/filesystem-fuse/tests/bin/s3_test.sh +++ b/clients/filesystem-fuse/tests/bin/s3_test.sh @@ -21,11 +21,11 @@ set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$SCRIPT_DIR" -TEST_CONFIG_FILE=$CLIENT_FUSE_DIR/target/conf/gvfs_fuse_s3.toml - source ./env.sh source ./localstatck.sh +TEST_CONFIG_FILE=$CLIENT_FUSE_DIR/target/conf/gvfs_fuse_s3.toml + start_servers() { start_localstack generate_test_config From f2c43eabb2e6f1de4db377683326c9cb6d37806b Mon Sep 17 00:00:00 2001 From: yuhui Date: Thu, 9 Jan 2025 16:30:49 +0800 Subject: [PATCH 18/26] Fix ci error --- clients/filesystem-fuse/tests/bin/s3_fileset_it.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh b/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh index 20e10f6ef47..6dc38c48f07 100755 --- a/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh +++ b/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh @@ -31,8 +31,8 @@ TEST_CONFIG_FILE=$CLIENT_FUSE_DIR/target/debug/gvfs-fuse.toml start_servers() { start_localstack start_gravitino_server - start_gvfs_fuse generate_test_config + start_gvfs_fuse } stop_servers() { From ae61ac6319b174505913e60dcf19cc9a64e7762d Mon Sep 17 00:00:00 2001 From: yuhui Date: Fri, 10 Jan 2025 10:08:33 +0800 Subject: [PATCH 19/26] Fix --- clients/filesystem-fuse/src/gravitino_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/filesystem-fuse/src/gravitino_client.rs b/clients/filesystem-fuse/src/gravitino_client.rs index e78e4986f50..1c09c600276 100644 --- a/clients/filesystem-fuse/src/gravitino_client.rs +++ b/clients/filesystem-fuse/src/gravitino_client.rs @@ -223,7 +223,7 @@ impl GravitinoClient { } #[cfg(test)] -pub(crate) mod tests { +mod tests { use super::*; use mockito::mock; From bef5e4902e56ecb2949a6dcbcc25d6d401c6031d Mon Sep 17 00:00:00 2001 From: yuhui Date: Fri, 10 Jan 2025 10:21:56 +0800 Subject: [PATCH 20/26] Fix --- .../tests/bin/s3_fileset_it.sh | 70 ------------------- clients/filesystem-fuse/tests/bin/s3_test.sh | 64 ----------------- 2 files changed, 134 deletions(-) delete mode 100755 clients/filesystem-fuse/tests/bin/s3_fileset_it.sh delete mode 100644 clients/filesystem-fuse/tests/bin/s3_test.sh diff --git a/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh b/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh deleted file mode 100755 index 6dc38c48f07..00000000000 --- a/clients/filesystem-fuse/tests/bin/s3_fileset_it.sh +++ /dev/null @@ -1,70 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -cd "$SCRIPT_DIR" - -source ./env.sh -source ./gravitino_server.sh -source ./gvfs_fuse.sh -source ./localstatck.sh - -TEST_CONFIG_FILE=$CLIENT_FUSE_DIR/target/debug/gvfs-fuse.toml - -start_servers() { - start_localstack - start_gravitino_server - generate_test_config - start_gvfs_fuse -} - -stop_servers() { - set +e - stop_gvfs_fuse - stop_gravitino_server - stop_localstack -} - -# Main logic based on parameters -if [ "$1" == "test" ]; then - trap stop_servers EXIT - start_servers - # Run the integration test - echo "Running tests..." - cd $CLIENT_FUSE_DIR - export RUN_TEST_WITH_FUSE=1 - cargo test --test fuse_test fuse_it_ - -elif [ "$1" == "start" ]; then - # Start the servers - echo "Starting servers..." - start_servers - -elif [ "$1" == "stop" ]; then - # Stop the servers - echo "Stopping servers..." - stop_servers - -else - echo "Usage: $0 {test|start|stop}" - exit 1 -fi - - diff --git a/clients/filesystem-fuse/tests/bin/s3_test.sh b/clients/filesystem-fuse/tests/bin/s3_test.sh deleted file mode 100644 index ac5f9812c93..00000000000 --- a/clients/filesystem-fuse/tests/bin/s3_test.sh +++ /dev/null @@ -1,64 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -cd "$SCRIPT_DIR" - -source ./env.sh -source ./localstatck.sh - -TEST_CONFIG_FILE=$CLIENT_FUSE_DIR/target/conf/gvfs_fuse_s3.toml - -start_servers() { - start_localstack - generate_test_config -} - -stop_servers() { - set +e - stop_localstack -} - -# Main logic based on parameters -if [ "$1" == "test" ]; then - trap stop_servers EXIT - start_servers - # Run the integration test - echo "Running tests..." - cd $CLIENT_FUSE_DIR - export RUN_TEST_WITH_S3=1 - cargo test s3_ut_ --lib - -elif [ "$1" == "start" ]; then - # Start the servers - echo "Starting servers..." - start_servers - -elif [ "$1" == "stop" ]; then - # Stop the servers - echo "Stopping servers..." - stop_servers - -else - echo "Usage: $0 {test|start|stop}" - exit 1 -fi - - From c8c35c963f0b3bc875e3f97cbfae1e25103b40d4 Mon Sep 17 00:00:00 2001 From: yuhui Date: Tue, 14 Jan 2025 09:47:00 +0800 Subject: [PATCH 21/26] Add bigfile and openfile flag testcases --- clients/filesystem-fuse/Makefile | 3 + clients/filesystem-fuse/src/filesystem.rs | 4 +- .../filesystem-fuse/src/memory_filesystem.rs | 36 ++-- .../src/open_dal_filesystem.rs | 6 +- clients/filesystem-fuse/tests/fuse_test.rs | 154 +++++++++++++++--- 5 files changed, 164 insertions(+), 39 deletions(-) diff --git a/clients/filesystem-fuse/Makefile b/clients/filesystem-fuse/Makefile index 86dd2f22152..23ebd9e7004 100644 --- a/clients/filesystem-fuse/Makefile +++ b/clients/filesystem-fuse/Makefile @@ -62,6 +62,9 @@ doc-test: unit-test: doc-test cargo test --no-fail-fast --lib --all-features --workspace +test-it: + cargo test --test fuse_test + test-fuse-it: @bash ./tests/bin/run_fuse_testers.sh test diff --git a/clients/filesystem-fuse/src/filesystem.rs b/clients/filesystem-fuse/src/filesystem.rs index dcf35f8ebca..94d27c7074d 100644 --- a/clients/filesystem-fuse/src/filesystem.rs +++ b/clients/filesystem-fuse/src/filesystem.rs @@ -297,7 +297,7 @@ pub trait FileWriter: Sync + Send { #[cfg(test)] pub(crate) mod tests { use super::*; - use libc::{O_APPEND, O_CREAT, O_RDONLY}; + use libc::{O_CREAT, O_RDONLY, O_RDWR}; use std::collections::HashMap; use std::path::Component; @@ -548,7 +548,7 @@ pub(crate) mod tests { async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr) -> FileHandle { let file = self .fs - .create_file(root_file_id, name, (O_CREAT | O_APPEND) as u32) + .create_file(root_file_id, name, (O_CREAT | O_RDWR) as u32) .await; assert!(file.is_ok()); let file = file.unwrap(); diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs b/clients/filesystem-fuse/src/memory_filesystem.rs index f56e65ea33a..84ff162a8bd 100644 --- a/clients/filesystem-fuse/src/memory_filesystem.rs +++ b/clients/filesystem-fuse/src/memory_filesystem.rs @@ -91,7 +91,7 @@ impl PathFileSystem for MemoryFileSystem { Ok(results) } - async fn open_file(&self, path: &Path, _flags: OpenFileFlags) -> Result { + async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result { let file_stat = self.stat(path).await?; let mut opened_file = OpenedFile::new(file_stat); match opened_file.file_stat.kind { @@ -105,8 +105,18 @@ impl PathFileSystem for MemoryFileSystem { .unwrap() .data .clone(); - opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() })); - opened_file.writer = Some(Box::new(MemoryFileWriter { data: data })); + if flags.is_read() { + + opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() })); + } + if flags.is_write() || flags.is_append() { + opened_file.writer = Some(Box::new(MemoryFileWriter { data: data.clone() })); + } + + if flags.is_truncate() { + let mut data = data.lock().unwrap(); + data.clear(); + } Ok(opened_file) } _ => Err(Errno::from(libc::EBADF)), @@ -117,27 +127,19 @@ impl PathFileSystem for MemoryFileSystem { self.open_file(path, flags).await } - async fn create_file(&self, path: &Path, _flags: OpenFileFlags) -> Result { - let mut file_map = self.file_map.write().unwrap(); - if file_map.contains_key(path) { + async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result { + if self.file_map.read().unwrap().contains_key(path) && flags.is_exclusive() { return Err(Errno::from(libc::EEXIST)); } - let mut opened_file = OpenedFile::new(FileStat::new_file_filestat_with_path(path, 0)); - - let data = Arc::new(Mutex::new(Vec::new())); - file_map.insert( - opened_file.file_stat.path.clone(), + self.file_map.write().unwrap().insert( + path.to_path_buf(), MemoryFile { kind: RegularFile, - data: data.clone(), + data: Arc::new(Mutex::new(Vec::new())), }, ); - - opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() })); - opened_file.writer = Some(Box::new(MemoryFileWriter { data: data })); - - Ok(opened_file) + self.open_file(path, flags).await } async fn create_dir(&self, path: &Path) -> Result { diff --git a/clients/filesystem-fuse/src/open_dal_filesystem.rs b/clients/filesystem-fuse/src/open_dal_filesystem.rs index d32b014d1f0..92202c968f5 100644 --- a/clients/filesystem-fuse/src/open_dal_filesystem.rs +++ b/clients/filesystem-fuse/src/open_dal_filesystem.rs @@ -120,7 +120,11 @@ impl PathFileSystem for OpenDalFileSystem { .map_err(opendal_error_to_errno)?; file.reader = Some(Box::new(FileReaderImpl { reader })); } - if flags.is_write() || flags.is_create() || flags.is_append() || flags.is_truncate() { + if !flags.is_create() && flags.is_append() { + return Err(Errno::from(libc::EBADF)); + } + + if flags.is_write() || flags.is_truncate() { let writer = self .op .writer_with(&file_name) diff --git a/clients/filesystem-fuse/tests/fuse_test.rs b/clients/filesystem-fuse/tests/fuse_test.rs index 41e385c49f1..37800f4388d 100644 --- a/clients/filesystem-fuse/tests/fuse_test.rs +++ b/clients/filesystem-fuse/tests/fuse_test.rs @@ -22,7 +22,8 @@ use gvfs_fuse::config::AppConfig; use gvfs_fuse::RUN_TEST_WITH_FUSE; use gvfs_fuse::{gvfs_mount, gvfs_unmount, test_enable_with}; use log::{error, info}; -use std::fs::File; +use std::fs::{File, OpenOptions}; +use std::io::{Read, Seek, SeekFrom, Write}; use std::path::Path; use std::sync::Arc; use std::thread::sleep; @@ -89,11 +90,6 @@ impl Drop for FuseTest { fn test_fuse_with_memory_fs() { tracing_subscriber::fmt().init(); - panic::set_hook(Box::new(|info| { - error!("A panic occurred: {:?}", info); - process::exit(1); - })); - let mount_point = "target/gvfs"; let _ = fs::create_dir_all(mount_point); @@ -104,26 +100,31 @@ fn test_fuse_with_memory_fs() { }; test.setup(); - test_fuse_filesystem(mount_point); + + let test_dir = Path::new(&test.mount_point).join("test_dir"); + run_tests(&test_dir); +} + +fn run_tests(test_dir: &Path) { + fs::create_dir_all(test_dir).expect("Failed to create test dir"); + test_fuse_filesystem(test_dir); + test_big_file(test_dir); + test_open_file_flag(test_dir); } #[test] fn fuse_it_test_fuse() { test_enable_with!(RUN_TEST_WITH_FUSE); + let mount_point = Path::new("target/gvfs"); + let test_dir = mount_point.join("test_dir"); - test_fuse_filesystem("target/gvfs/gvfs_test"); + run_tests(&test_dir); } -fn test_fuse_filesystem(mount_point: &str) { +fn test_fuse_filesystem(test_path: &Path) { info!("Test startup"); - let base_path = Path::new(mount_point); - - if !file_exists(base_path) { - fs::create_dir_all(base_path).expect("Failed to create test dir"); - } - //test create file - let test_file = base_path.join("test_create"); + let test_file = test_path.join("test_create"); let file = File::create(&test_file).expect("Failed to create file"); assert!(file.metadata().is_ok(), "Failed to get file metadata"); assert!(file_exists(&test_file)); @@ -140,16 +141,16 @@ fn test_fuse_filesystem(mount_point: &str) { assert!(!file_exists(&test_file)); //test create directory - let test_dir = base_path.join("test_dir"); + let test_dir = test_path.join("test_dir"); fs::create_dir(&test_dir).expect("Failed to create directory"); //test create file in directory - let test_file = base_path.join("test_dir/test_file"); + let test_file = test_path.join("test_dir/test_file"); let file = File::create(&test_file).expect("Failed to create file"); assert!(file.metadata().is_ok(), "Failed to get file metadata"); //test write file in directory - let test_file = base_path.join("test_dir/test_read"); + let test_file = test_path.join("test_dir/test_read"); fs::write(&test_file, "read test").expect("Failed to write file"); //test read file in directory @@ -167,6 +168,121 @@ fn test_fuse_filesystem(mount_point: &str) { info!("Success test"); } +#[allow(clippy::needless_range_loop)] +fn test_big_file(test_dir: &Path) { + if !file_exists(test_dir) { + fs::create_dir_all(test_dir).expect("Failed to create test dir"); + } + + let test_file = test_dir.join("test_big_file"); + let mut file = File::create(&test_file).expect("Failed to create file"); + assert!(file.metadata().is_ok(), "Failed to get file metadata"); + assert!(file_exists(&test_file)); + + let round_size: usize = 1024 * 1024; + let round: u8 = 10; + + for i in 0..round { + let mut content = vec![0; round_size]; + for j in 0..round_size { + content[j] = (i as usize + j) as u8; + } + + file.write_all(&content).expect("Failed to write file"); + } + file.flush().expect("Failed to flush file"); + + file = File::open(&test_file).expect("Failed to open file"); + for i in 0..round { + let mut buffer = vec![0; round_size]; + file.read_exact(&mut buffer).unwrap(); + + for j in 0..round_size { + assert_eq!(buffer[j], (i as usize + j) as u8, "File content mismatch"); + } + } + + fs::remove_file(&test_file).expect("Failed to delete file"); + assert!(!file_exists(&test_file)); +} + +fn test_open_file_flag(test_dir: &Path) { + // test open file with read and write create flag + let file_path = test_dir.join("test_open_file"); + let mut file = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .open(&file_path) + .expect("Failed to open file"); + // test file offset is 0 + let offset = file.stream_position().expect("Failed to seek file"); + assert_eq!(offset, 0, "File offset mismatch"); + + // test write can be done + let write_content = "write content"; + file.write_all(write_content.as_bytes()) + .expect("Failed to write file"); + let mut content = vec![0; write_content.len()]; + + // test read can be done + file.seek(SeekFrom::Start(0)).expect("Failed to seek file"); + file.read_exact(&mut content).expect("Failed to read file"); + assert_eq!(content, write_content.as_bytes(), "File content mismatch"); + + // test open file with read flag + let mut file = OpenOptions::new() + .read(true) + .open(&file_path) + .expect("Failed to open file"); + // test reaad can be done + let mut content = vec![0; write_content.len()]; + file.read_exact(&mut content).expect("Failed to read file"); + assert_eq!(content, write_content.as_bytes(), "File content mismatch"); + + // test write can be have error + let result = file.write_all(write_content.as_bytes()); + if let Err(e) = result { + assert_eq!(e.to_string(), "Bad file descriptor (os error 9)"); + } + + // test open file with truncate file + // test file size is not 0 + let old_file_size = file.metadata().expect("Failed to get file metadata").len(); + assert_eq!(old_file_size, write_content.len() as u64); + + let mut file = OpenOptions::new() + .write(true) + .truncate(true) + .open(&file_path) + .expect("Failed to open file"); + // validate file size is 0 + let file_size = file.metadata().expect("Failed to get file metadata").len(); + assert_eq!(file_size, 0, "File size mismatch"); + // validate file offset is 0 + let offset = file.stream_position().expect("Failed to seek file"); + assert_eq!(offset, 0, "File offset mismatch"); + + file.write_all(write_content.as_bytes()) + .expect("Failed to write file"); + + // test open file with append flag + let mut file = OpenOptions::new() + .append(true) + .open(&file_path) + .expect("Failed to open file"); + // test append + file.write_all(write_content.as_bytes()) + .expect("Failed to write file"); + let file_len = file.metadata().expect("Failed to get file metadata").len(); + // validate file size is 2 * write_content.len() + assert_eq!( + file_len, + 2 * write_content.len() as u64, + "File size mismatch" + ); +} + fn file_exists>(path: P) -> bool { fs::metadata(path).is_ok() } From 702616e444dfa328b7087ff2947930f5347c64a2 Mon Sep 17 00:00:00 2001 From: yuhui Date: Wed, 15 Jan 2025 23:32:25 +0800 Subject: [PATCH 22/26] Add more testcases --- clients/filesystem-fuse/src/main.rs | 2 +- .../filesystem-fuse/src/memory_filesystem.rs | 1 - .../src/open_dal_filesystem.rs | 104 +++- .../tests/bin/run_fuse_testers.sh | 9 +- clients/filesystem-fuse/tests/fuse_test.rs | 577 ++++++++++++------ 5 files changed, 494 insertions(+), 199 deletions(-) diff --git a/clients/filesystem-fuse/src/main.rs b/clients/filesystem-fuse/src/main.rs index 3534e033465..4e517c76b37 100644 --- a/clients/filesystem-fuse/src/main.rs +++ b/clients/filesystem-fuse/src/main.rs @@ -24,7 +24,7 @@ use tokio::signal; #[tokio::main] async fn main() -> fuse3::Result<()> { - tracing_subscriber::fmt().init(); + tracing_subscriber::fmt::init(); // todo need inmprove the args parsing let args: Vec = std::env::args().collect(); diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs b/clients/filesystem-fuse/src/memory_filesystem.rs index 84ff162a8bd..9f2b031f4aa 100644 --- a/clients/filesystem-fuse/src/memory_filesystem.rs +++ b/clients/filesystem-fuse/src/memory_filesystem.rs @@ -106,7 +106,6 @@ impl PathFileSystem for MemoryFileSystem { .data .clone(); if flags.is_read() { - opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() })); } if flags.is_write() || flags.is_append() { diff --git a/clients/filesystem-fuse/src/open_dal_filesystem.rs b/clients/filesystem-fuse/src/open_dal_filesystem.rs index 92202c968f5..deae343a302 100644 --- a/clients/filesystem-fuse/src/open_dal_filesystem.rs +++ b/clients/filesystem-fuse/src/open_dal_filesystem.rs @@ -26,7 +26,7 @@ use bytes::Bytes; use fuse3::FileType::{Directory, RegularFile}; use fuse3::{Errno, FileType, Timestamp}; use log::error; -use opendal::{EntryMode, ErrorKind, Metadata, Operator}; +use opendal::{Buffer, EntryMode, ErrorKind, Metadata, Operator}; use std::path::{Path, PathBuf}; use std::time::SystemTime; @@ -37,6 +37,8 @@ pub(crate) struct OpenDalFileSystem { impl OpenDalFileSystem {} impl OpenDalFileSystem { + const WRITE_BUFFER_SIZE: usize = 5 * 1024 * 1024; + pub(crate) fn new(op: Operator, _config: &AppConfig, _fs_context: &FileSystemContext) -> Self { Self { op: op } } @@ -121,17 +123,29 @@ impl PathFileSystem for OpenDalFileSystem { file.reader = Some(Box::new(FileReaderImpl { reader })); } if !flags.is_create() && flags.is_append() { + error!("The file system does not support open a exists file with the append mode "); return Err(Errno::from(libc::EBADF)); } - if flags.is_write() || flags.is_truncate() { + if flags.is_truncate() { + self.op + .write(&file_name, Buffer::new()) + .await + .map_err(opendal_error_to_errno)?; + } + + if flags.is_write() || flags.is_append() || flags.is_truncate() { let writer = self .op .writer_with(&file_name) .await .map_err(opendal_error_to_errno)?; - file.writer = Some(Box::new(FileWriterImpl { writer })); + file.writer = Some(Box::new(FileWriterImpl { + writer, + buffer: Vec::with_capacity(OpenDalFileSystem::WRITE_BUFFER_SIZE), + })); } + Ok(file) } @@ -146,14 +160,10 @@ impl PathFileSystem for OpenDalFileSystem { async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result { let file_name = path.to_string_lossy().to_string(); - let mut writer = self - .op - .writer_with(&file_name) + self.op + .write(&file_name, Buffer::new()) .await .map_err(opendal_error_to_errno)?; - - writer.close().await.map_err(opendal_error_to_errno)?; - let file = self.open_file(path, flags).await?; Ok(file) } @@ -214,19 +224,34 @@ impl FileReader for FileReaderImpl { struct FileWriterImpl { writer: opendal::Writer, + buffer: Vec, } #[async_trait] impl FileWriter for FileWriterImpl { async fn write(&mut self, _offset: u64, data: &[u8]) -> Result { - self.writer - .write(data.to_vec()) - .await - .map_err(opendal_error_to_errno)?; + if self.buffer.len() > OpenDalFileSystem::WRITE_BUFFER_SIZE { + let mut new_buffer: Vec = Vec::with_capacity(OpenDalFileSystem::WRITE_BUFFER_SIZE); + new_buffer.append(&mut self.buffer); + + self.writer + .write(new_buffer) + .await + .map_err(opendal_error_to_errno)?; + } + self.buffer.extend(data); Ok(data.len() as u32) } async fn close(&mut self) -> Result<()> { + if !self.buffer.is_empty() { + let mut new_buffer: Vec = vec![]; + new_buffer.append(&mut self.buffer); + self.writer + .write(new_buffer) + .await + .map_err(opendal_error_to_errno)?; + } self.writer.close().await.map_err(opendal_error_to_errno)?; Ok(()) } @@ -268,6 +293,7 @@ mod test { use crate::s3_filesystem::tests::s3_test_config; use crate::test_enable_with; use crate::RUN_TEST_WITH_S3; + use bytes::Buf; use opendal::layers::LoggingLayer; use opendal::{services, Builder, Operator}; @@ -331,4 +357,56 @@ mod test { } } } + + #[tokio::test] + async fn s3_ut_test_s3_write() { + test_enable_with!(RUN_TEST_WITH_S3); + let config = s3_test_config(); + + let op = create_opendal(&config); + let path = "/s1/fileset1/gvfs_test/test_dir/test_file"; + let mut writer = op.writer_with(path).await.unwrap(); + + let mut buffer: Vec = vec![]; + for i in 0..10 * 1024 { + let data = vec![i as u8; 1024]; + buffer.extend(&data); + + if buffer.len() > 5 * 1024 * 1024 { + writer.write(buffer).await.unwrap(); + buffer = vec![]; + }; + } + + if !buffer.is_empty() { + writer.write(buffer).await.unwrap(); + } + writer.close().await.unwrap(); + } + + #[tokio::test] + async fn s3_ut_test_s3_read() { + test_enable_with!(RUN_TEST_WITH_S3); + let config = s3_test_config(); + + let op = create_opendal(&config); + let path = "/s1/fileset1/test_dir/test_big_file"; + let reader = op.reader(path).await.unwrap(); + + let mut buffer = Vec::new(); + + let mut start = 0; + let mut end = 1024; + loop { + let buf = reader.read(start..end).await.unwrap(); + if buf.is_empty() { + break; + } + buffer.extend_from_slice(buf.chunk()); + start = end; + end += 1024; + } + + println!("Read {} bytes.", buffer.len()); + } } diff --git a/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh b/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh index 6dc38c48f07..dfdd5e6855e 100755 --- a/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh +++ b/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh @@ -50,13 +50,20 @@ if [ "$1" == "test" ]; then echo "Running tests..." cd $CLIENT_FUSE_DIR export RUN_TEST_WITH_FUSE=1 - cargo test --test fuse_test fuse_it_ + cargo test --test fuse_test fuse_it_ -- weak_consistency elif [ "$1" == "start" ]; then # Start the servers echo "Starting servers..." start_servers +elif [ "$1" == "restart" ]; then + echo "Stopping servers..." + stop_servers + # Start the servers + echo "Starting servers..." + start_servers + elif [ "$1" == "stop" ]; then # Stop the servers echo "Stopping servers..." diff --git a/clients/filesystem-fuse/tests/fuse_test.rs b/clients/filesystem-fuse/tests/fuse_test.rs index 37800f4388d..1538e5d925a 100644 --- a/clients/filesystem-fuse/tests/fuse_test.rs +++ b/clients/filesystem-fuse/tests/fuse_test.rs @@ -22,30 +22,32 @@ use gvfs_fuse::config::AppConfig; use gvfs_fuse::RUN_TEST_WITH_FUSE; use gvfs_fuse::{gvfs_mount, gvfs_unmount, test_enable_with}; use log::{error, info}; +use once_cell::sync::Lazy; +use std::collections::HashSet; use std::fs::{File, OpenOptions}; -use std::io::{Read, Seek, SeekFrom, Write}; -use std::path::Path; -use std::sync::Arc; -use std::thread::sleep; +use std::io::{Read, Write}; +use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; -use std::{fs, panic, process}; +use std::{env, fs}; use tokio::runtime::Runtime; use tokio::task::JoinHandle; +use tokio::time::interval; -struct FuseTest { - runtime: Arc, +static ASYNC_RUNTIME: Lazy = Lazy::new(|| Runtime::new().unwrap()); + +struct FuseTestEnv { mount_point: String, gvfs_mount: Option>>, } -impl FuseTest { +impl FuseTestEnv { pub fn setup(&mut self) { info!("Start gvfs fuse server"); let mount_point = self.mount_point.clone(); let config = AppConfig::from_file(Some("tests/conf/gvfs_fuse_memory.toml")) .expect("Failed to load config"); - self.runtime.spawn(async move { + ASYNC_RUNTIME.spawn(async move { let result = gvfs_mount(&mount_point, "", &config).await; if let Err(e) = result { error!("Failed to mount gvfs: {:?}", e); @@ -58,231 +60,440 @@ impl FuseTest { } pub fn shutdown(&mut self) { - self.runtime.block_on(async { + ASYNC_RUNTIME.block_on(async { let _ = gvfs_unmount().await; }); } fn wait_for_fuse_server_ready(path: &str, timeout: Duration) -> bool { let test_file = format!("{}/.gvfs_meta", path); - let start_time = Instant::now(); + AwaitUtil::wait(timeout, Duration::from_millis(500), || { + file_exists(&test_file) + }) + } +} - while start_time.elapsed() < timeout { - if file_exists(&test_file) { - info!("Fuse server is ready",); - return true; +struct AwaitUtil(); + +impl AwaitUtil { + pub(crate) fn wait( + max_wait: Duration, + poll_interval: Duration, + check_fn: impl Fn() -> bool + Send, + ) -> bool { + ASYNC_RUNTIME.block_on(async { + let start = Instant::now(); + let mut interval = interval(poll_interval); + + while start.elapsed() < max_wait { + interval.tick().await; + if check_fn() { + return true; + } } - info!("Wait for fuse server ready",); - sleep(Duration::from_secs(1)); - } - false + false + }) } } -impl Drop for FuseTest { +impl Drop for FuseTestEnv { fn drop(&mut self) { info!("Shutdown fuse server"); self.shutdown(); } } -#[test] -fn test_fuse_with_memory_fs() { - tracing_subscriber::fmt().init(); +struct SequenceFileOperationTest { + test_dir: PathBuf, + weak_consistency: bool, +} - let mount_point = "target/gvfs"; - let _ = fs::create_dir_all(mount_point); +impl SequenceFileOperationTest { + fn new(test_dir: &Path) -> Self { + let args: Vec = env::args().collect(); + let weak_consistency = args.contains(&"weak_consistency".to_string()); - let mut test = FuseTest { - runtime: Arc::new(Runtime::new().unwrap()), - mount_point: mount_point.to_string(), - gvfs_mount: None, - }; + SequenceFileOperationTest { + test_dir: test_dir.to_path_buf(), + weak_consistency: weak_consistency, + } + } + fn test_create_file(&self, name: &str, open_options: Option<&OpenOptions>) -> File { + let path = self.test_dir.join(name); + let file = { + match open_options { + None => File::create(&path) + .unwrap_or_else(|_| panic!("Failed to create file: {:?}", path)), + Some(options) => options.open(&path).unwrap_or_else(|_| { + panic!( + "Failed to create file: {:?}, + options {:?}", + path, options + ) + }), + } + }; + let file_metadata = file + .metadata() + .unwrap_or_else(|_| panic!("Failed to get file metadata: {:?}", path)); + assert!(file_exists(path)); + if !self.weak_consistency { + assert_eq!(file_metadata.len(), 0); + } + file + } - test.setup(); + fn test_open_file(&self, name: &str, open_options: Option<&OpenOptions>) -> File { + let path = self.test_dir.join(name); + let file = { + match open_options { + None => { + File::open(&path).unwrap_or_else(|_| panic!("Failed to open file: {:?}", path)) + } + Some(options) => options.open(&path).unwrap_or_else(|_| { + panic!( + "Failed to open file: {:?}, + options {:?}", + path, options + ) + }), + } + }; + let file_metadata = file + .metadata() + .unwrap_or_else(|_| panic!("Failed to get file metadata: {:?}", path)); + assert!(file_metadata.is_file()); + assert!(file_exists(path)); + file + } - let test_dir = Path::new(&test.mount_point).join("test_dir"); - run_tests(&test_dir); -} + fn test_read_file(&self, file: &mut File, expect: &[u8]) { + let mut content = vec![0; expect.len()]; + file.read_exact(&mut content).expect("Failed to read file"); + assert_eq!(content, *expect, "File content mismatch"); + } -fn run_tests(test_dir: &Path) { - fs::create_dir_all(test_dir).expect("Failed to create test dir"); - test_fuse_filesystem(test_dir); - test_big_file(test_dir); - test_open_file_flag(test_dir); -} + fn test_read_data(&self, file: &mut File, len: usize) -> Vec { + let mut content = vec![0; len]; + file.read_exact(&mut content).expect("Failed to read file"); + content + } -#[test] -fn fuse_it_test_fuse() { - test_enable_with!(RUN_TEST_WITH_FUSE); - let mount_point = Path::new("target/gvfs"); - let test_dir = mount_point.join("test_dir"); + fn test_append_file(&self, file: &mut File, content: &[u8]) { + let old_len = file.metadata().unwrap().len(); + let size = content.len(); + file.write_all(content).expect("Failed to write file"); - run_tests(&test_dir); -} + if !self.weak_consistency { + let new_len = file.metadata().unwrap().len(); + assert_eq!(new_len, old_len + size as u64, "File size mismatch"); + } + } -fn test_fuse_filesystem(test_path: &Path) { - info!("Test startup"); - //test create file - let test_file = test_path.join("test_create"); - let file = File::create(&test_file).expect("Failed to create file"); - assert!(file.metadata().is_ok(), "Failed to get file metadata"); - assert!(file_exists(&test_file)); + fn test_remove_file(&self, name: &str) { + let path = self.test_dir.join(name); + fs::remove_file(&path).unwrap_or_else(|_| panic!("Failed to remove file: {:?}", path)); + assert!(!file_exists(path)); + } - //test write file - fs::write(&test_file, "read test").expect("Failed to write file"); + fn test_create_dir(&self, name: &str) { + let path = self.test_dir.join(name); + fs::create_dir(&path).unwrap_or_else(|_| panic!("Failed to create directory: {:?}", path)); + assert!(file_exists(path)); + } - //test read file - let content = fs::read_to_string(&test_file).expect("Failed to read file"); - assert_eq!(content, "read test", "File content mismatch"); + fn test_list_dir_with_expect(&self, name: &str, expect_childs: &Vec<&str>) { + self.test_list_dir(name, expect_childs, &vec![]); + } - //test delete file - fs::remove_file(&test_file).expect("Failed to delete file"); - assert!(!file_exists(&test_file)); + fn test_list_dir_with_unexpected(&self, name: &str, unexpected_childs: &Vec<&str>) { + self.test_list_dir(name, &vec![], unexpected_childs); + } - //test create directory - let test_dir = test_path.join("test_dir"); - fs::create_dir(&test_dir).expect("Failed to create directory"); + fn test_list_dir(&self, name: &str, expect_childs: &Vec<&str>, unexpected_childs: &Vec<&str>) { + let path = self.test_dir.join(name); + let dir_childs = + fs::read_dir(&path).unwrap_or_else(|_| panic!("Failed to list directory: {:?}", path)); + let mut childs_set: HashSet = HashSet::default(); + for child in dir_childs { + let entry = child.expect("Failed to get entry"); + childs_set.insert(entry.file_name().to_string_lossy().to_string()); + } + for expect_child in expect_childs { + assert!( + childs_set.contains(*expect_child), + "Expect child not found: {}", + expect_child + ); + } - //test create file in directory - let test_file = test_path.join("test_dir/test_file"); - let file = File::create(&test_file).expect("Failed to create file"); - assert!(file.metadata().is_ok(), "Failed to get file metadata"); + for unexpected_child in unexpected_childs { + assert!( + !childs_set.contains(*unexpected_child), + "Unexpected child found: {}", + unexpected_child + ); + } + } - //test write file in directory - let test_file = test_path.join("test_dir/test_read"); - fs::write(&test_file, "read test").expect("Failed to write file"); + fn test_remove_dir(&self, name: &str) { + let path = self.test_dir.join(name); + fs::remove_dir(&path).unwrap_or_else(|_| panic!("Failed to remove directory: {:?}", path)); + assert!(!file_exists(path)); + } - //test read file in directory - let content = fs::read_to_string(&test_file).expect("Failed to read file"); - assert_eq!(content, "read test", "File content mismatch"); + // some file storage can't sync file immediately, so we need to sync file to make sure the file is written to disk + fn sync_file(&self, file: File, name: &str, expect_len: u64) -> Result<(), ()> { + if !self.weak_consistency { + return Ok(()); + } + drop(file); + + let path = self.test_dir.join(name); + let success = AwaitUtil::wait(Duration::from_secs(3), Duration::from_millis(200), || { + let file = + File::open(&path).unwrap_or_else(|_| panic!("Failed to open file: {:?}", path)); + let file_len = file.metadata().unwrap().len(); + file_len >= expect_len + }); + if !success { + return Err(()); + } + Ok(()) + } - //test delete file in directory - fs::remove_file(&test_file).expect("Failed to delete file"); - assert!(!file_exists(&test_file)); + fn test_basic_filesystem(fs_test: &SequenceFileOperationTest) { + let file_name1 = "test_create"; + //test create file + let mut file1 = fs_test.test_create_file(file_name1, None); - //test delete directory - fs::remove_dir_all(&test_dir).expect("Failed to delete directory"); - assert!(!file_exists(&test_dir)); + //test write file + let content = "write test".as_bytes(); + fs_test.test_append_file(&mut file1, content); + fs_test + .sync_file(file1, file_name1, content.len() as u64) + .expect("Failed to sync file"); - info!("Success test"); -} + //test read file + let mut file1 = fs_test.test_open_file(file_name1, None); + fs_test.test_read_file(&mut file1, content); + + //test delete file + fs_test.test_remove_file(file_name1); + + //test create directory + let dir_name1 = "test_dir"; + fs_test.test_create_dir(dir_name1); + + //test create file in directory + let test_file2 = "test_dir/test_file"; + let mut file2 = fs_test.test_create_file(test_file2, None); + + //test write file in directory + fs_test.test_append_file(&mut file2, content); + fs_test + .sync_file(file2, test_file2, content.len() as u64) + .expect("Failed to sync file"); + + //test read file in directory + let mut file2 = fs_test.test_open_file(test_file2, None); + fs_test.test_read_file(&mut file2, content); + + //test list directory + fs_test.test_list_dir_with_expect(dir_name1, &vec!["test_file"]); -#[allow(clippy::needless_range_loop)] -fn test_big_file(test_dir: &Path) { - if !file_exists(test_dir) { - fs::create_dir_all(test_dir).expect("Failed to create test dir"); + //test delete file in directory + fs_test.test_remove_file(test_file2); + + //test list directory after delete file + fs_test.test_list_dir_with_unexpected(dir_name1, &vec!["test_file"]); + + //test delete directory + fs_test.test_remove_dir(dir_name1); } - let test_file = test_dir.join("test_big_file"); - let mut file = File::create(&test_file).expect("Failed to create file"); - assert!(file.metadata().is_ok(), "Failed to get file metadata"); - assert!(file_exists(&test_file)); + #[allow(clippy::needless_range_loop)] + fn test_big_file(fs_test: &SequenceFileOperationTest) { + let test_file = "test_big_file"; + let round_size: usize = 1024 * 1024; + let round: u8 = 1; + + //test write big file + { + let mut file = fs_test.test_create_file(test_file, None); + + for i in 0..round { + let mut content = vec![0; round_size]; + for j in 0..round_size { + content[j] = (i as usize + j) as u8; + } - let round_size: usize = 1024 * 1024; - let round: u8 = 10; + fs_test.test_append_file(&mut file, &content); + } + fs_test + .sync_file(file, test_file, round_size as u64 * round as u64) + .expect("Failed to sync file"); + } - for i in 0..round { - let mut content = vec![0; round_size]; - for j in 0..round_size { - content[j] = (i as usize + j) as u8; + //test read big file + { + let mut file = fs_test.test_open_file(test_file, None); + for i in 0..round { + let buffer = fs_test.test_read_data(&mut file, round_size); + + for j in 0..round_size { + assert_eq!(buffer[j], (i as usize + j) as u8, "File content mismatch"); + } + } } - file.write_all(&content).expect("Failed to write file"); + fs_test.test_remove_file(test_file); } - file.flush().expect("Failed to flush file"); - file = File::open(&test_file).expect("Failed to open file"); - for i in 0..round { - let mut buffer = vec![0; round_size]; - file.read_exact(&mut buffer).unwrap(); + fn test_open_file_flag(fs_test: &SequenceFileOperationTest) { + let write_content = "write content"; + { + // test open file with read and write create flag + let file_name = "test_open_file"; + let mut file = fs_test.test_create_file( + file_name, + Some(OpenOptions::new().read(true).write(true).create(true)), + ); + + // test write can be done + fs_test.test_append_file(&mut file, write_content.as_bytes()); + + // test read end of file + let result = file.read_exact(&mut [1]); + assert!(result.is_err()); + if let Err(e) = result { + assert_eq!(e.to_string(), "failed to fill whole buffer"); + } + } + + { + // test open file with write flag + let file_name = "test_open_file2"; + let mut file = fs_test + .test_create_file(file_name, Some(OpenOptions::new().write(true).create(true))); + + // test write can be done + fs_test.test_append_file(&mut file, write_content.as_bytes()); - for j in 0..round_size { - assert_eq!(buffer[j], (i as usize + j) as u8, "File content mismatch"); + // test read can be have error + let result = file.read(&mut [0; 10]); + assert!(result.is_err()); + if let Err(e) = result { + assert_eq!(e.to_string(), "Bad file descriptor (os error 9)"); + } } - } - fs::remove_file(&test_file).expect("Failed to delete file"); - assert!(!file_exists(&test_file)); -} + { + // test open file with read flag + let file_name = "test_open_file2"; + let mut file = fs_test.test_open_file(file_name, Some(OpenOptions::new().read(true))); -fn test_open_file_flag(test_dir: &Path) { - // test open file with read and write create flag - let file_path = test_dir.join("test_open_file"); - let mut file = OpenOptions::new() - .write(true) - .read(true) - .create(true) - .open(&file_path) - .expect("Failed to open file"); - // test file offset is 0 - let offset = file.stream_position().expect("Failed to seek file"); - assert_eq!(offset, 0, "File offset mismatch"); - - // test write can be done - let write_content = "write content"; - file.write_all(write_content.as_bytes()) - .expect("Failed to write file"); - let mut content = vec![0; write_content.len()]; - - // test read can be done - file.seek(SeekFrom::Start(0)).expect("Failed to seek file"); - file.read_exact(&mut content).expect("Failed to read file"); - assert_eq!(content, write_content.as_bytes(), "File content mismatch"); - - // test open file with read flag - let mut file = OpenOptions::new() - .read(true) - .open(&file_path) - .expect("Failed to open file"); - // test reaad can be done - let mut content = vec![0; write_content.len()]; - file.read_exact(&mut content).expect("Failed to read file"); - assert_eq!(content, write_content.as_bytes(), "File content mismatch"); - - // test write can be have error - let result = file.write_all(write_content.as_bytes()); - if let Err(e) = result { - assert_eq!(e.to_string(), "Bad file descriptor (os error 9)"); - } + // test read can be done + fs_test.test_read_file(&mut file, write_content.as_bytes()); + + // test write can be have error + let result = file.write_all(write_content.as_bytes()); + assert!(result.is_err()); + if let Err(e) = result { + assert_eq!(e.to_string(), "Bad file descriptor (os error 9)"); + } + } + + { + // test open file with truncate file + let file_name = "test_open_file2"; + let file = fs_test.test_open_file( + file_name, + Some(OpenOptions::new().write(true).truncate(true)), + ); + + // test file size is 0 + assert_eq!(file.metadata().unwrap().len(), 0); + } - // test open file with truncate file - // test file size is not 0 - let old_file_size = file.metadata().expect("Failed to get file metadata").len(); - assert_eq!(old_file_size, write_content.len() as u64); - - let mut file = OpenOptions::new() - .write(true) - .truncate(true) - .open(&file_path) - .expect("Failed to open file"); - // validate file size is 0 - let file_size = file.metadata().expect("Failed to get file metadata").len(); - assert_eq!(file_size, 0, "File size mismatch"); - // validate file offset is 0 - let offset = file.stream_position().expect("Failed to seek file"); - assert_eq!(offset, 0, "File offset mismatch"); - - file.write_all(write_content.as_bytes()) - .expect("Failed to write file"); - - // test open file with append flag - let mut file = OpenOptions::new() - .append(true) - .open(&file_path) - .expect("Failed to open file"); - // test append - file.write_all(write_content.as_bytes()) - .expect("Failed to write file"); - let file_len = file.metadata().expect("Failed to get file metadata").len(); - // validate file size is 2 * write_content.len() - assert_eq!( - file_len, - 2 * write_content.len() as u64, - "File size mismatch" - ); + { + // test open file with append flag + let file_name = "test_open_file"; + + // opendal_fs does not support open and appand + let result = OpenOptions::new() + .append(true) + .open(fs_test.test_dir.join(file_name)); + if let Err(e) = result { + assert_eq!(e.to_string(), "Bad file descriptor (os error 9)"); + return; + } + + let mut file = fs_test.test_open_file(file_name, Some(OpenOptions::new().append(true))); + + assert_eq!(file.metadata().unwrap().len(), write_content.len() as u64); + + // test append + fs_test.test_append_file(&mut file, write_content.as_bytes()); + let file_len = file.metadata().unwrap().len(); + assert_eq!(file_len, 2 * write_content.len() as u64); + } + } } fn file_exists>(path: P) -> bool { fs::metadata(path).is_ok() } + +fn run_tests(test_dir: &Path) { + fs::create_dir_all(test_dir).expect("Failed to create test dir"); + + let fs_test = SequenceFileOperationTest::new(test_dir); + + info!("test_fuse_filesystem started"); + SequenceFileOperationTest::test_basic_filesystem(&fs_test); + info!("testtest_fuse_filesystem finished"); + + info!("test_big_file started"); + SequenceFileOperationTest::test_big_file(&fs_test); + info!("test_big_file finished"); + + info!("test_open_file_flag started"); + SequenceFileOperationTest::test_open_file_flag(&fs_test); + info!("test_open_file_flag finished"); +} + +fn test_manually() { + let mount_point = Path::new("target/gvfs"); + let test_dir = mount_point.join("test_dir"); + run_tests(&test_dir); +} + +#[test] +fn fuse_it_test_fuse() { + test_enable_with!(RUN_TEST_WITH_FUSE); + tracing_subscriber::fmt().init(); + + let mount_point = Path::new("target/gvfs"); + let test_dir = mount_point.join("test_dir"); + + run_tests(&test_dir); +} + +#[test] +fn test_fuse_with_memory_fs() { + tracing_subscriber::fmt().init(); + + let mount_point = "target/gvfs"; + let _ = fs::create_dir_all(mount_point); + + let mut test = FuseTestEnv { + mount_point: mount_point.to_string(), + gvfs_mount: None, + }; + + test.setup(); + + let test_dir = Path::new(&test.mount_point).join("test_dir"); + run_tests(&test_dir); +} From 00c644c2605d51c24a6c3293b5a80c462f04ea39 Mon Sep 17 00:00:00 2001 From: yuhui Date: Thu, 16 Jan 2025 10:43:38 +0800 Subject: [PATCH 23/26] Fix rebase error --- .../filesystem-fuse/src/gravitino_client.rs | 26 +------------------ .../src/open_dal_filesystem.rs | 6 +++++ 2 files changed, 7 insertions(+), 25 deletions(-) diff --git a/clients/filesystem-fuse/src/gravitino_client.rs b/clients/filesystem-fuse/src/gravitino_client.rs index 1c09c600276..1e1cd411eac 100644 --- a/clients/filesystem-fuse/src/gravitino_client.rs +++ b/clients/filesystem-fuse/src/gravitino_client.rs @@ -35,18 +35,6 @@ pub(crate) struct Fileset { properties: HashMap, } -impl Fileset { - pub fn new(name: &str, storage_location: &str) -> Fileset { - Self { - name: name.to_string(), - fileset_type: "managed".to_string(), - comment: "".to_string(), - storage_location: storage_location.to_string(), - properties: HashMap::default(), - } - } -} - #[derive(Debug, Deserialize)] struct FilesetResponse { code: u32, @@ -70,18 +58,6 @@ pub(crate) struct Catalog { pub(crate) properties: HashMap, } -impl Catalog { - pub fn new(name: &str, properties: HashMap) -> Catalog { - Self { - name: name.to_string(), - catalog_type: "fileset".to_string(), - provider: "s3".to_string(), - comment: "".to_string(), - properties: properties, - } - } -} - #[derive(Debug, Deserialize)] struct CatalogResponse { code: u32, @@ -223,7 +199,7 @@ impl GravitinoClient { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use mockito::mock; diff --git a/clients/filesystem-fuse/src/open_dal_filesystem.rs b/clients/filesystem-fuse/src/open_dal_filesystem.rs index deae343a302..e400d67638e 100644 --- a/clients/filesystem-fuse/src/open_dal_filesystem.rs +++ b/clients/filesystem-fuse/src/open_dal_filesystem.rs @@ -159,6 +159,12 @@ impl PathFileSystem for OpenDalFileSystem { async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result { let file_name = path.to_string_lossy().to_string(); + if flags.is_exclusive() { + let meta = self.op.stat(&file_name).await; + if meta.is_ok() { + return Err(Errno::from(libc::EEXIST)); + } + } self.op .write(&file_name, Buffer::new()) From c586f87901edd074360be4b1e2df45c860d456d7 Mon Sep 17 00:00:00 2001 From: yuhui Date: Thu, 16 Jan 2025 11:16:14 +0800 Subject: [PATCH 24/26] Fix some errors --- clients/filesystem-fuse/Makefile | 2 ++ clients/filesystem-fuse/src/open_dal_filesystem.rs | 5 +++++ clients/filesystem-fuse/tests/bin/run_fuse_testers.sh | 2 ++ clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh | 9 +++++++++ 4 files changed, 18 insertions(+) diff --git a/clients/filesystem-fuse/Makefile b/clients/filesystem-fuse/Makefile index 23ebd9e7004..aede9bb0fe9 100644 --- a/clients/filesystem-fuse/Makefile +++ b/clients/filesystem-fuse/Makefile @@ -74,5 +74,7 @@ test-s3: test: doc-test cargo test --no-fail-fast --all-targets --all-features --workspace +test-all: test test-s3 test-fuse-it + clean: cargo clean diff --git a/clients/filesystem-fuse/src/open_dal_filesystem.rs b/clients/filesystem-fuse/src/open_dal_filesystem.rs index e400d67638e..1bef3efaeec 100644 --- a/clients/filesystem-fuse/src/open_dal_filesystem.rs +++ b/clients/filesystem-fuse/src/open_dal_filesystem.rs @@ -397,6 +397,11 @@ mod test { let op = create_opendal(&config); let path = "/s1/fileset1/test_dir/test_big_file"; + let meta = op.stat(path).await; + if meta.is_err() { + println!("stat error: {:?}", meta.err()); + return; + } let reader = op.reader(path).await.unwrap(); let mut buffer = Vec::new(); diff --git a/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh b/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh index dfdd5e6855e..7088a310b50 100755 --- a/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh +++ b/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh @@ -58,8 +58,10 @@ elif [ "$1" == "start" ]; then start_servers elif [ "$1" == "restart" ]; then + # Stop the servers echo "Stopping servers..." stop_servers + # Start the servers echo "Starting servers..." start_servers diff --git a/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh b/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh index ac5f9812c93..8f25c0b3954 100644 --- a/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh +++ b/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh @@ -51,6 +51,15 @@ elif [ "$1" == "start" ]; then echo "Starting servers..." start_servers +elif [ "$1" == "restart" ]; then + # Stop the servers + echo "Stopping servers..." + stop_servers + + # Start the servers + echo "Starting servers..." + start_servers + elif [ "$1" == "stop" ]; then # Stop the servers echo "Stopping servers..." From 4093a6cf418da2ab6eb81c7e3b4dddb6458def53 Mon Sep 17 00:00:00 2001 From: yuhui Date: Thu, 16 Jan 2025 11:23:51 +0800 Subject: [PATCH 25/26] add flag to TestRawFs::create_file --- clients/filesystem-fuse/src/filesystem.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/filesystem-fuse/src/filesystem.rs b/clients/filesystem-fuse/src/filesystem.rs index 94d27c7074d..a263b726591 100644 --- a/clients/filesystem-fuse/src/filesystem.rs +++ b/clients/filesystem-fuse/src/filesystem.rs @@ -297,7 +297,7 @@ pub trait FileWriter: Sync + Send { #[cfg(test)] pub(crate) mod tests { use super::*; - use libc::{O_CREAT, O_RDONLY, O_RDWR}; + use libc::{O_CREAT, O_RDONLY, O_WRONLY}; use std::collections::HashMap; use std::path::Component; @@ -461,7 +461,7 @@ pub(crate) mod tests { // Test create file let file_handle = self - .test_create_file(parent_file_id, "file1.txt".as_ref()) + .test_create_file(parent_file_id, "file1.txt".as_ref(), (O_CREAT | O_WRONLY) as u32) .await; // Test write file @@ -545,10 +545,10 @@ pub(crate) mod tests { self.files.insert(file_stat.file_id, file_stat); } - async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr) -> FileHandle { + async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr, flags: u32) -> FileHandle { let file = self .fs - .create_file(root_file_id, name, (O_CREAT | O_RDWR) as u32) + .create_file(root_file_id, name, flags) .await; assert!(file.is_ok()); let file = file.unwrap(); From b654b36eeabc48bccebe39a23ce22a22b61ee623 Mon Sep 17 00:00:00 2001 From: yuhui Date: Fri, 17 Jan 2025 10:34:12 +0800 Subject: [PATCH 26/26] Fix format error --- clients/filesystem-fuse/src/filesystem.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/clients/filesystem-fuse/src/filesystem.rs b/clients/filesystem-fuse/src/filesystem.rs index a263b726591..c0c27a5fbe2 100644 --- a/clients/filesystem-fuse/src/filesystem.rs +++ b/clients/filesystem-fuse/src/filesystem.rs @@ -461,7 +461,11 @@ pub(crate) mod tests { // Test create file let file_handle = self - .test_create_file(parent_file_id, "file1.txt".as_ref(), (O_CREAT | O_WRONLY) as u32) + .test_create_file( + parent_file_id, + "file1.txt".as_ref(), + (O_CREAT | O_WRONLY) as u32, + ) .await; // Test write file @@ -545,11 +549,13 @@ pub(crate) mod tests { self.files.insert(file_stat.file_id, file_stat); } - async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr, flags: u32) -> FileHandle { - let file = self - .fs - .create_file(root_file_id, name, flags) - .await; + async fn test_create_file( + &mut self, + root_file_id: u64, + name: &OsStr, + flags: u32, + ) -> FileHandle { + let file = self.fs.create_file(root_file_id, name, flags).await; assert!(file.is_ok()); let file = file.unwrap(); assert!(file.handle_id > 0);