diff --git a/.github/services/hdfs/hdfs_cluster/action.yml b/.github/services/hdfs/hdfs_cluster/action.yml new file mode 100644 index 00000000000..f54624264b0 --- /dev/null +++ b/.github/services/hdfs/hdfs_cluster/action.yml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: hdfs_cluster +description: 'Behavior test for hdfs cluster' + +runs: + using: "composite" + steps: + - name: Setup HDFS cluster + shell: bash + working-directory: fixtures/hdfs + run: docker compose -f docker-compose-hdfs-cluster.yml up -d --wait + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup hadoop env + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + export HADOOP_HOME=/home/runner/hadoop-3.3.5 + echo "HADOOP_HOME=${HADOOP_HOME}" >> $GITHUB_ENV + echo "CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)" >> $GITHUB_ENV + echo "LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${HADOOP_HOME}/lib/native" >> $GITHUB_ENV + cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml + - name: Setup opendal env + shell: bash + run: | + cat << EOF >> $GITHUB_ENV + OPENDAL_HDFS_ROOT=/tmp/opendal/ + OPENDAL_HDFS_NAME_NODE=hdfs://localhost:8020 + OPENDAL_HDFS_ENABLE_APPEND=true + EOF \ No newline at end of file diff --git a/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml b/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml new file mode 100644 index 00000000000..860b6137a14 --- /dev/null +++ b/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: hdfs_cluster_with_atomic_write_dir +description: 'Behavior test for hdfs cluster with atomic write dir' + +runs: + using: "composite" + steps: + - name: Setup HDFS cluster + shell: bash + working-directory: fixtures/hdfs + run: docker compose -f docker-compose-hdfs-cluster.yml up -d --wait + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup hadoop env + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + export HADOOP_HOME=/home/runner/hadoop-3.3.5 + echo "HADOOP_HOME=${HADOOP_HOME}" >> $GITHUB_ENV + echo "CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)" >> $GITHUB_ENV + echo "LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${HADOOP_HOME}/lib/native" >> $GITHUB_ENV + cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml + - name: Setup opendal env + shell: bash + run: | + cat << EOF >> $GITHUB_ENV + OPENDAL_HDFS_ROOT=/tmp/opendal/ + OPENDAL_HDFS_ATOMIC_WRITE_DIR=/tmp/atomic_write_dir/opendal/ + OPENDAL_HDFS_NAME_NODE=hdfs://localhost:8020 + OPENDAL_HDFS_ENABLE_APPEND=false + EOF \ No newline at end of file diff --git a/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml b/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml new file mode 100644 index 00000000000..b8de8671611 --- /dev/null +++ b/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: hdfs_default_with_atomic_write_dir +description: 'Behavior test for hdfs default with atomic write dir' + +runs: + using: "composite" + steps: + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + + export HADOOP_HOME="/home/runner/hadoop-3.3.5" + export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) + + cp ./fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml + + cat << EOF >> $GITHUB_ENV + HADOOP_HOME=${HADOOP_HOME} + CLASSPATH=${CLASSPATH} + LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native + OPENDAL_HDFS_ROOT=/tmp/opendal/ + OPENDAL_HDFS_ATOMIC_WRITE_DIR=/tmp/atomic_write_dir/opendal/ + OPENDAL_HDFS_NAME_NODE=default + OPENDAL_HDFS_ENABLE_APPEND=false + EOF \ No newline at end of file diff --git a/.github/workflows/service_test_hdfs.yml b/.github/workflows/service_test_hdfs.yml deleted file mode 100644 index c4447c2d459..00000000000 --- a/.github/workflows/service_test_hdfs.yml +++ /dev/null @@ -1,139 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: Service Test HDFS - -on: - push: - branches: - - main - pull_request: - branches: - - main - paths: - - "core/src/**" - - "core/tests/**" - - "!core/src/docs/**" - - "!core/src/services/**" - - "core/src/services/hdfs/**" - - ".github/workflows/service_test_hdfs.yml" - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} - cancel-in-progress: true - -jobs: - hdfs-cluster: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Configure Hdfs - # namenode will use ports: 9870, 9000, 8020 - # datanode will use ports: 9864 - run: | - docker run -d \ - --name namenode \ - --network host \ - -e CLUSTER_NAME=test \ - -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \ - -e CORE_CONF_hadoop_http_staticuser_user=root \ - -e HDFS_CONF_dfs_permissions_enabled=false \ - -e HDFS_CONF_dfs_support_append=true \ - -e HDFS_CONF_dfs_replication=1 \ - bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 - - docker run -d \ - --name datanode \ - --network host \ - -e CLUSTER_NAME=test \ - -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \ - -e CORE_CONF_hadoop_http_staticuser_user=root \ - -e HDFS_CONF_dfs_permissions_enabled=false \ - -e HDFS_CONF_dfs_support_append=true \ - -e HDFS_CONF_dfs_replication=1 \ - bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 - - curl --retry 30 --retry-delay 1 --retry-connrefused http://localhost:9870 - - - name: Setup Rust toolchain - uses: ./.github/actions/setup - with: - need-nextest: true - - - name: Setup java env - uses: actions/setup-java@v4 - with: - distribution: temurin - java-version: "11" - - name: Setup hadoop env - shell: bash - run: | - curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.2.4/hadoop-3.2.4.tar.gz | tar zxf - -C /home/runner - - - name: Test - shell: bash - working-directory: core - run: | - export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) - export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{ env.HADOOP_HOME }}/lib/native - cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{ env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml - - cargo test behavior --features tests,services-hdfs - env: - HADOOP_HOME: "/home/runner/hadoop-3.2.4" - OPENDAL_TEST: hdfs - OPENDAL_HDFS_ROOT: /tmp/opendal/ - OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020 - OPENDAL_HDFS_ENABLE_APPEND: true - - hdfs-default-with-atomic-write-dir: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Setup Rust toolchain - uses: ./.github/actions/setup - with: - need-nextest: true - - - name: Setup java env - uses: actions/setup-java@v4 - with: - distribution: temurin - java-version: "11" - - name: Setup hadoop env - shell: bash - run: | - curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner - - - name: Test - shell: bash - working-directory: core - run: | - export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) - export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{ env.HADOOP_HOME }}/lib/native - cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{ env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml - - cargo test behavior --features tests,services-hdfs - env: - HADOOP_HOME: "/home/runner/hadoop-3.3.5" - OPENDAL_TEST: hdfs - OPENDAL_HDFS_ROOT: /tmp/opendal/ - OPENDAL_HDFS_ATOMIC_WRITE_DIR: /tmp/atomic_write_dir/opendal/ - OPENDAL_HDFS_NAME_NODE: default - OPENDAL_HDFS_ENABLE_APPEND: false diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index d5c15eccbec..4d87d24a108 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -22,7 +22,6 @@ use std::io::SeekFrom; use std::path::PathBuf; use std::sync::Arc; -use futures::AsyncWriteExt; use log::debug; use uuid::Uuid; @@ -290,45 +289,35 @@ impl Access for HdfsBackend { } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir { - let target_path = build_rooted_abs_path(&self.root, path); - let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)); - - // If the target file exists, we should append to the end of it directly. - if op.append() && self.client.metadata(&target_path).is_ok() { - (target_path, None) - } else { - (target_path, Some(tmp_path)) + let target_path = build_rooted_abs_path(&self.root, path); + let target_exists = match self.client.metadata(&target_path) { + Ok(_) => true, + Err(err) => { + if err.kind() != io::ErrorKind::NotFound { + return Err(new_std_io_error(err)); + } + false } - } else { - let p = build_rooted_abs_path(&self.root, path); - (p, None) }; - if let Err(err) = self.client.metadata(&target_path) { - // Early return if other error happened. - if err.kind() != io::ErrorKind::NotFound { - return Err(new_std_io_error(err)); + let should_append = op.append() && target_exists; + let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| { + // If the target file exists, we should append to the end of it directly. + if should_append { + None + } else { + Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path))) } + }); + if !target_exists { let parent = get_parent(&target_path); - self.client.create_dir(parent).map_err(new_std_io_error)?; - - let mut f = self - .client - .open_file() - .create(true) - .write(true) - .async_open(&target_path) - .await - .map_err(new_std_io_error)?; - f.close().await.map_err(new_std_io_error)?; } let mut open_options = self.client.open_file(); open_options.create(true); - if op.append() { + if should_append { open_options.append(true); } else { open_options.write(true); @@ -341,7 +330,13 @@ impl Access for HdfsBackend { Ok(( RpWrite::new(), - HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)), + HdfsWriter::new( + target_path, + tmp_path, + f, + Arc::clone(&self.client), + target_exists, + ), )) } @@ -489,43 +484,35 @@ impl Access for HdfsBackend { } fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir { - let target_path = build_rooted_abs_path(&self.root, path); - let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)); - - // If the target file exists, we should append to the end of it directly. - if op.append() && self.client.metadata(&target_path).is_ok() { - (target_path, None) - } else { - (target_path, Some(tmp_path)) + let target_path = build_rooted_abs_path(&self.root, path); + let target_exists = match self.client.metadata(&target_path) { + Ok(_) => true, + Err(err) => { + if err.kind() != io::ErrorKind::NotFound { + return Err(new_std_io_error(err)); + } + false } - } else { - let p = build_rooted_abs_path(&self.root, path); - - (p, None) }; - if let Err(err) = self.client.metadata(&target_path) { - // Early return if other error happened. - if err.kind() != io::ErrorKind::NotFound { - return Err(new_std_io_error(err)); + let should_append = op.append() && target_exists; + let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| { + // If the target file exists, we should append to the end of it directly. + if should_append { + None + } else { + Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path))) } + }); + if !target_exists { let parent = get_parent(&target_path); - self.client.create_dir(parent).map_err(new_std_io_error)?; - - self.client - .open_file() - .create(true) - .write(true) - .open(&target_path) - .map_err(new_std_io_error)?; } let mut open_options = self.client.open_file(); open_options.create(true); - if op.append() { + if should_append { open_options.append(true); } else { open_options.write(true); @@ -537,7 +524,13 @@ impl Access for HdfsBackend { Ok(( RpWrite::new(), - HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)), + HdfsWriter::new( + target_path, + tmp_path, + f, + Arc::clone(&self.client), + target_exists, + ), )) } diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 0f60014f410..e4123861a0f 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -29,6 +29,7 @@ pub struct HdfsWriter { tmp_path: Option, f: Option, client: Arc, + target_path_exists: bool, } /// # Safety @@ -42,12 +43,14 @@ impl HdfsWriter { tmp_path: Option, f: F, client: Arc, + target_path_exists: bool, ) -> Self { Self { target_path, tmp_path, f: Some(f), client, + target_path_exists, } } } @@ -70,6 +73,12 @@ impl oio::Write for HdfsWriter { // TODO: we need to make rename async. if let Some(tmp_path) = &self.tmp_path { + // we must delete the target_path, otherwise the rename_file operation will fail + if self.target_path_exists { + self.client + .remove_file(&self.target_path) + .map_err(new_std_io_error)?; + } self.client .rename_file(tmp_path, &self.target_path) .map_err(new_std_io_error)? @@ -102,6 +111,12 @@ impl oio::BlockingWrite for HdfsWriter { f.flush().map_err(new_std_io_error)?; if let Some(tmp_path) = &self.tmp_path { + // we must delete the target_path, otherwise the rename_file operation will fail + if self.target_path_exists { + self.client + .remove_file(&self.target_path) + .map_err(new_std_io_error)?; + } self.client .rename_file(tmp_path, &self.target_path) .map_err(new_std_io_error)?; diff --git a/fixtures/hdfs/docker-compose-hdfs-cluster.yml b/fixtures/hdfs/docker-compose-hdfs-cluster.yml new file mode 100644 index 00000000000..376e8ed4868 --- /dev/null +++ b/fixtures/hdfs/docker-compose-hdfs-cluster.yml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3.8' + +services: + namenode: + image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 + container_name: namenode + network_mode: "host" + environment: + CLUSTER_NAME: test + WEBHDFS_CONF_dfs_webhdfs_enabled: true + CORE_CONF_hadoop_http_staticuser_user: root + HDFS_CONF_dfs_permissions_enabled: false + HDFS_CONF_dfs_support_append: true + HDFS_CONF_dfs_replication: 1 + datanode: + image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 + container_name: datanode + network_mode: "host" + environment: + CLUSTER_NAME: test + WEBHDFS_CONF_dfs_webhdfs_enabled: true + CORE_CONF_hadoop_http_staticuser_user: root + HDFS_CONF_dfs_permissions_enabled: false + HDFS_CONF_dfs_support_append: true + HDFS_CONF_dfs_replication: 1 +