Skip to content

Commit

Permalink
ci: Bring hdfs test back (#2114)
Browse files Browse the repository at this point in the history
* ci: Bring hdfs test back

Signed-off-by: Xuanwo <[email protected]>

* Fix hdfs

Signed-off-by: Xuanwo <[email protected]>

* fix cmd

Signed-off-by: Xuanwo <[email protected]>

* Fix cmd

Signed-off-by: Xuanwo <[email protected]>

* -e should be added before container

Signed-off-by: Xuanwo <[email protected]>

* Fix ld lib path

Signed-off-by: Xuanwo <[email protected]>

* Fix ld path

Signed-off-by: Xuanwo <[email protected]>

* Try build behavor with old ubuntu

Signed-off-by: Xuanwo <[email protected]>

* try setup hadoop locally

Signed-off-by: Xuanwo <[email protected]>

* try again

Signed-off-by: Xuanwo <[email protected]>

* Fix hdfs

Signed-off-by: Xuanwo <[email protected]>

* FIx typo

Signed-off-by: Xuanwo <[email protected]>

* Running test!

Signed-off-by: Xuanwo <[email protected]>

* fix typo

Signed-off-by: Xuanwo <[email protected]>

* avoid match webhdfs

Signed-off-by: Xuanwo <[email protected]>

* Fix hadoop home

Signed-off-by: Xuanwo <[email protected]>

* Fix typo

Signed-off-by: Xuanwo <[email protected]>

* Fix hdfs write

Signed-off-by: Xuanwo <[email protected]>

* Fix typo

Signed-off-by: Xuanwo <[email protected]>

* Use the same version with cluster

Signed-off-by: Xuanwo <[email protected]>

* Default port should be 8020

Signed-off-by: Xuanwo <[email protected]>

* Add comments

Signed-off-by: Xuanwo <[email protected]>

* write into tmp

Signed-off-by: Xuanwo <[email protected]>

* Disable permision on hdfs

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Apr 25, 2023
1 parent d6ce049 commit 8fc4f89
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 42 deletions.
90 changes: 72 additions & 18 deletions .github/workflows/service_test_hdfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,39 +37,93 @@ concurrency:
cancel-in-progress: true

jobs:
hdfs:
hdfs-default:
runs-on: ubuntu-latest
strategy:
matrix:
hdfs-version: ["2.10.1", "3.2.3", "3.3.2"]
steps:
- uses: actions/checkout@v3

- name: Checkout python env
uses: actions/setup-python@v4
with:
python-version: "3.8"
- name: Checkout java env
- name: Setup Rust toolchain
uses: ./.github/actions/setup

- name: Setup java env
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: "11"
- name: Setup-hdfs env
uses: beyondstorage/setup-hdfs@master
with:
hdfs-version: ${{ matrix.hdfs-version }}
- name: Setup hadoop env
shell: bash
run: |
curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner
- name: Test
shell: bash
working-directory: core
run: |
export CLASSPATH=$(find $HADOOP_HOME -iname "*.jar" | xargs echo | tr ' ' ':')
cargo test services_hdfs --features services-hdfs -- --show-output
env:
RUST_BACKTRACE: full
RUST_LOG: debug
HADOOP_HOME: "/home/runner/hadoop-3.3.5"
LD_LIBRARY_PATH: ${{ env.JAVA_HOME }}/lib/server:${{ env.LD_LIBRARY_PATH }}
OPENDAL_HDFS_TEST: on
OPENDAL_HDFS_ROOT: /tmp/opendal/
OPENDAL_HDFS_NAME_NODE: default

hdfs-cluster:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Configure Hdfs
# namenode will use ports: 9870, 9000, 8020
# datanode will use ports: 9864
run: |
docker run -d \
--name namenode \
--network host \
-e CLUSTER_NAME=test \
-e WEBHDFS_CONF_dfs_webhdfs_enabled=true \
-e CORE_CONF_hadoop_http_staticuser_user=root \
-e HDFS_CONF_dfs_permissions_enabled=false \
bde2020/hadoop-namenode:2.0.0-hadoop3.1.3-java8
docker run -d \
--name datanode \
--network host \
-e CLUSTER_NAME=test \
-e WEBHDFS_CONF_dfs_webhdfs_enabled=true \
-e CORE_CONF_hadoop_http_staticuser_user=root \
-e HDFS_CONF_dfs_permissions_enabled=false \
bde2020/hadoop-datanode:2.0.0-hadoop3.1.3-java8
curl --retry 30 --retry-delay 1 --retry-connrefused http://localhost:9870
- name: Setup Rust toolchain
uses: ./.github/actions/setup

- name: Setup java env
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: "11"
- name: Setup hadoop env
shell: bash
run: |
curl -LsSf https://archive.apache.org/dist/hadoop/common/hadoop-3.1.3/hadoop-3.1.3.tar.gz | tar zxf - -C /home/runner
- name: Test
shell: bash
working-directory: core
continue-on-error: true
run: cargo test hdfs --features services-hdfs -- --show-output
run: |
export CLASSPATH=$(find $HADOOP_HOME -iname "*.jar" | xargs echo | tr ' ' ':')
cargo test services_hdfs --features services-hdfs -- --show-output
env:
RUST_BACKTRACE: full
RUST_LOG: debug
OPENDAL_HDFS_TEST: on
OPENDAL_HDFS_ROOT: /
OPENDAL_HDFS_NAME_NODE: hdfs://${{ env.HDFS_NAMENODE_ADDR }}
HADOOP_HOME: "/home/runner/hadoop-3.1.3"
LD_LIBRARY_PATH: ${{ env.JAVA_HOME }}/lib/server:${{ env.LD_LIBRARY_PATH }}
OPENDAL_HDFS_TEST: on
OPENDAL_HDFS_ROOT: /tmp/opendal/
OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020
2 changes: 1 addition & 1 deletion core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
.map(|(rp, w)| {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} -> written",
"service={} operation={} path={} -> start writing",
self.scheme,
Operation::BlockingWrite,
path,
Expand Down
43 changes: 20 additions & 23 deletions core/src/services/hdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;

use async_trait::async_trait;
use bytes::Bytes;
use futures::AsyncSeekExt;
use futures::AsyncWriteExt;

use super::error::parse_io_error;
Expand All @@ -30,7 +27,10 @@ use crate::*;

pub struct HdfsWriter<F> {
f: F,
pos: u64,
/// The position of current written bytes in the buffer.
///
/// We will maintain the posstion in pos to make sure the buffer is written correctly.
pos: usize,
}

impl<F> HdfsWriter<F> {
Expand All @@ -41,17 +41,17 @@ impl<F> HdfsWriter<F> {

#[async_trait]
impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
/// # Notes
///
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
async fn write(&mut self, bs: Bytes) -> Result<()> {
self.f
.seek(SeekFrom::Start(self.pos))
.await
.map_err(parse_io_error)?;
self.f.write_all(&bs).await.map_err(parse_io_error)?;
self.pos += bs.len() as u64;
while self.pos < bs.len() {
let n = self
.f
.write(&bs[self.pos..])
.await
.map_err(parse_io_error)?;
self.pos += n;
}
// Reset pos to 0 for next write.
self.pos = 0;

Ok(())
}
Expand All @@ -71,16 +71,13 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
}

impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
/// # Notes
///
/// File could be partial written, so we will seek to start to make sure
/// we write the same content.
fn write(&mut self, bs: Bytes) -> Result<()> {
self.f
.seek(SeekFrom::Start(self.pos))
.map_err(parse_io_error)?;
self.f.write_all(&bs).map_err(parse_io_error)?;
self.pos += bs.len() as u64;
while self.pos < bs.len() {
let n = self.f.write(&bs[self.pos..]).map_err(parse_io_error)?;
self.pos += n;
}
// Reset pos to 0 for next write.
self.pos = 0;

Ok(())
}
Expand Down

0 comments on commit 8fc4f89

Please sign in to comment.