-
Notifications
You must be signed in to change notification settings - Fork 481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(services/hdfs_native): Add read,write,list implementation for hdfs_native #4505
base: main
Are you sure you want to change the base?
Conversation
06417f6
to
e9e417b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test should be placed in hdfs_native/hdfs
.
HADOOP_HOME=${HADOOP_HOME} | ||
CLASSPATH=${CLASSPATH} | ||
LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native | ||
OPENDAL_HDFS_ROOT=/tmp/opendal/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They should be OPENDAL_HDFS_NATIVE_ROOT
} | ||
} | ||
} | ||
|
||
impl oio::List for HdfsNativeLister { | ||
async fn next(&mut self) -> Result<Option<Entry>> { | ||
todo!() | ||
let de: FileStatus = match self.lsi.next().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use code like the following for better reading:
let Ok(de) = self
.lsi
.next()
.await
.transpose()
.map_err(parse_hdfs_error)?
else {
return Ok(None);
};
|
||
let entry = if !de.isdir { | ||
let odt = DateTime::from_timestamp(de.modification_time as i64, 0); | ||
let dt = match odt { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't match
an Option
if the None
case returns an error, use let-else
instead.
todo!() | ||
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> { | ||
// Check for offset being too large for usize on 32-bit systems | ||
if offset > usize::MAX as u64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't run checks in services.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where can we put this check ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where can we put this check ?
We don't need this check for current. It's more like an upstream issue that can't range from i32::MAX..i64::MAX
. You can create an opendal issue to track this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, created issue #4506
.f | ||
.read_range(offset as usize, limit) | ||
.await | ||
.map_err(parse_hdfs_error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ?
instead of matching an Result.
let bytes = bs.to_bytes(); | ||
let total_bytes = bytes.len(); | ||
self.f.write(bytes).await.map_err(parse_hdfs_error)?; | ||
Ok(total_bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does self.f.write()
make sure that all bytes are written?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, based on the definition, it continues to write until complete buf
has been written.
pub async fn write(&mut self, mut buf: Bytes) -> Result<usize> {
let bytes_to_write = buf.len();
// Create a shallow copy of the bytes instance to mutate and track what's been read
while !buf.is_empty() {
let block_writer = self.get_block_writer().await?;
block_writer.write(&mut buf).await?;
}
self.bytes_written += bytes_to_write;
Ok(bytes_to_write)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't rely this behavior. Return written like this:
let n = self.f.write(bytes).await.map_err(parse_hdfs_error)?;
Ok(n)
@Xuanwo |
5f3bb5b
to
a8a6d17
Compare
@@ -221,9 +221,10 @@ def generate_language_binding_cases( | |||
) -> list[dict[str, str]]: | |||
cases = unique_cases(cases) | |||
|
|||
# Remove hdfs cases for java. | |||
# Remove specified services cases for java. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't touch those files, hdfs_native is pure rust which should be fine on other bindings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
Due to failures in behaviour tests I was trying a bunch of stuff to figure them out. Could you please help with the failures ?
CLASSPATH=${CLASSPATH} | ||
LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native | ||
OPENDAL_HDFS_NATIVE_ROOT=/tmp/opendal/ | ||
OPENDAL_HDFS_NATIVE_URL=http://127.0.0.1:9000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only
hdfs
andviewfs
schemes are supported
url should be hdfs://127.0.0.1:9000
. It's better to have them tested locally before debugging github actions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, missed this one. Thank you for pointing this.
let bytes = bs.to_bytes(); | ||
let total_bytes = bytes.len(); | ||
self.f.write(bytes).await.map_err(parse_hdfs_error)?; | ||
Ok(total_bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't rely this behavior. Return written like this:
let n = self.f.write(bytes).await.map_err(parse_hdfs_error)?;
Ok(n)
a7d2563
to
77acd20
Compare
For #3144
Kindly review @Xuanwo