Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/hdfs_native): Add read,write,list implementation for hdfs_native #4505

Open
wants to merge 22 commits into
base: main
Choose a base branch
from

Conversation

shbhmrzd
Copy link
Contributor

For #3144
Kindly review @Xuanwo

@shbhmrzd shbhmrzd force-pushed the native_hdfs_read_write branch 4 times, most recently from 06417f6 to e9e417b Compare April 19, 2024 23:01
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be placed in hdfs_native/hdfs.

HADOOP_HOME=${HADOOP_HOME}
CLASSPATH=${CLASSPATH}
LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native
OPENDAL_HDFS_ROOT=/tmp/opendal/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They should be OPENDAL_HDFS_NATIVE_ROOT

}
}
}

impl oio::List for HdfsNativeLister {
async fn next(&mut self) -> Result<Option<Entry>> {
todo!()
let de: FileStatus = match self.lsi.next().await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use code like the following for better reading:

let Ok(de) = self
    .lsi
    .next()
    .await
    .transpose()
    .map_err(parse_hdfs_error)?
else {
    return Ok(None);
};


let entry = if !de.isdir {
let odt = DateTime::from_timestamp(de.modification_time as i64, 0);
let dt = match odt {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't match an Option if the None case returns an error, use let-else instead.

todo!()
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
// Check for offset being too large for usize on 32-bit systems
if offset > usize::MAX as u64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't run checks in services.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where can we put this check ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where can we put this check ?

We don't need this check for current. It's more like an upstream issue that can't range from i32::MAX..i64::MAX. You can create an opendal issue to track this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, created issue #4506

.f
.read_range(offset as usize, limit)
.await
.map_err(parse_hdfs_error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ? instead of matching an Result.

let bytes = bs.to_bytes();
let total_bytes = bytes.len();
self.f.write(bytes).await.map_err(parse_hdfs_error)?;
Ok(total_bytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does self.f.write() make sure that all bytes are written?

Copy link
Contributor Author

@shbhmrzd shbhmrzd Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, based on the definition, it continues to write until complete buf has been written.

    pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
        let bytes_to_write = buf.len();
        // Create a shallow copy of the bytes instance to mutate and track what's been read
        while !buf.is_empty() {
            let block_writer = self.get_block_writer().await?;

            block_writer.write(&mut buf).await?;
        }

        self.bytes_written += bytes_to_write;

        Ok(bytes_to_write)
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't rely this behavior. Return written like this:

let n = self.f.write(bytes).await.map_err(parse_hdfs_error)?;
Ok(n)

@shbhmrzd
Copy link
Contributor Author

shbhmrzd commented Apr 22, 2024

@Xuanwo
Implemented review comments. Please check now.
Could you also please help with the failure in behavior tests? Thank you!

@shbhmrzd shbhmrzd requested a review from tisonkun as a code owner April 22, 2024 16:44
@@ -221,9 +221,10 @@ def generate_language_binding_cases(
) -> list[dict[str, str]]:
cases = unique_cases(cases)

# Remove hdfs cases for java.
# Remove specified services cases for java.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't touch those files, hdfs_native is pure rust which should be fine on other bindings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure
Due to failures in behaviour tests I was trying a bunch of stuff to figure them out. Could you please help with the failures ?

core/src/types/scheme.rs Outdated Show resolved Hide resolved
core/src/types/scheme.rs Outdated Show resolved Hide resolved
CLASSPATH=${CLASSPATH}
LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native
OPENDAL_HDFS_NATIVE_ROOT=/tmp/opendal/
OPENDAL_HDFS_NATIVE_URL=http://127.0.0.1:9000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only hdfs and viewfs schemes are supported

url should be hdfs://127.0.0.1:9000. It's better to have them tested locally before debugging github actions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, missed this one. Thank you for pointing this.

let bytes = bs.to_bytes();
let total_bytes = bytes.len();
self.f.write(bytes).await.map_err(parse_hdfs_error)?;
Ok(total_bytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't rely this behavior. Return written like this:

let n = self.f.write(bytes).await.map_err(parse_hdfs_error)?;
Ok(n)

@tisonkun tisonkun removed their request for review May 26, 2024 23:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants