diff --git a/.github/services/hdfs/hdfs_cluster/action.yml b/.github/services/hdfs/hdfs_cluster/action.yml new file mode 100644 index 00000000000..f54624264b0 --- /dev/null +++ b/.github/services/hdfs/hdfs_cluster/action.yml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: hdfs_cluster +description: 'Behavior test for hdfs cluster' + +runs: + using: "composite" + steps: + - name: Setup HDFS cluster + shell: bash + working-directory: fixtures/hdfs + run: docker compose -f docker-compose-hdfs-cluster.yml up -d --wait + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup hadoop env + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + export HADOOP_HOME=/home/runner/hadoop-3.3.5 + echo "HADOOP_HOME=${HADOOP_HOME}" >> $GITHUB_ENV + echo "CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)" >> $GITHUB_ENV + echo "LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${HADOOP_HOME}/lib/native" >> $GITHUB_ENV + cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml + - name: Setup opendal env + shell: bash + run: | + cat << EOF >> $GITHUB_ENV + OPENDAL_HDFS_ROOT=/tmp/opendal/ + OPENDAL_HDFS_NAME_NODE=hdfs://localhost:8020 + OPENDAL_HDFS_ENABLE_APPEND=true + EOF \ No newline at end of file diff --git a/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml b/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml new file mode 100644 index 00000000000..860b6137a14 --- /dev/null +++ b/.github/services/hdfs/hdfs_cluster_with_atomic_write_dir/action.yml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: hdfs_cluster_with_atomic_write_dir +description: 'Behavior test for hdfs cluster with atomic write dir' + +runs: + using: "composite" + steps: + - name: Setup HDFS cluster + shell: bash + working-directory: fixtures/hdfs + run: docker compose -f docker-compose-hdfs-cluster.yml up -d --wait + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup hadoop env + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + export HADOOP_HOME=/home/runner/hadoop-3.3.5 + echo "HADOOP_HOME=${HADOOP_HOME}" >> $GITHUB_ENV + echo "CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob)" >> $GITHUB_ENV + echo "LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${HADOOP_HOME}/lib/native" >> $GITHUB_ENV + cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml + - name: Setup opendal env + shell: bash + run: | + cat << EOF >> $GITHUB_ENV + OPENDAL_HDFS_ROOT=/tmp/opendal/ + OPENDAL_HDFS_ATOMIC_WRITE_DIR=/tmp/atomic_write_dir/opendal/ + OPENDAL_HDFS_NAME_NODE=hdfs://localhost:8020 + OPENDAL_HDFS_ENABLE_APPEND=false + EOF \ No newline at end of file diff --git a/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml b/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml new file mode 100644 index 00000000000..b8de8671611 --- /dev/null +++ b/.github/services/hdfs/hdfs_default_with_atomic_write_dir/action.yml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: hdfs_default_with_atomic_write_dir +description: 'Behavior test for hdfs default with atomic write dir' + +runs: + using: "composite" + steps: + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + + export HADOOP_HOME="/home/runner/hadoop-3.3.5" + export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) + + cp ./fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml + + cat << EOF >> $GITHUB_ENV + HADOOP_HOME=${HADOOP_HOME} + CLASSPATH=${CLASSPATH} + LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native + OPENDAL_HDFS_ROOT=/tmp/opendal/ + OPENDAL_HDFS_ATOMIC_WRITE_DIR=/tmp/atomic_write_dir/opendal/ + OPENDAL_HDFS_NAME_NODE=default + OPENDAL_HDFS_ENABLE_APPEND=false + EOF \ No newline at end of file diff --git a/.github/workflows/service_test_hdfs.yml b/.github/workflows/service_test_hdfs.yml deleted file mode 100644 index c4447c2d459..00000000000 --- a/.github/workflows/service_test_hdfs.yml +++ /dev/null @@ -1,139 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: Service Test HDFS - -on: - push: - branches: - - main - pull_request: - branches: - - main - paths: - - "core/src/**" - - "core/tests/**" - - "!core/src/docs/**" - - "!core/src/services/**" - - "core/src/services/hdfs/**" - - ".github/workflows/service_test_hdfs.yml" - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} - cancel-in-progress: true - -jobs: - hdfs-cluster: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Configure Hdfs - # namenode will use ports: 9870, 9000, 8020 - # datanode will use ports: 9864 - run: | - docker run -d \ - --name namenode \ - --network host \ - -e CLUSTER_NAME=test \ - -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \ - -e CORE_CONF_hadoop_http_staticuser_user=root \ - -e HDFS_CONF_dfs_permissions_enabled=false \ - -e HDFS_CONF_dfs_support_append=true \ - -e HDFS_CONF_dfs_replication=1 \ - bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 - - docker run -d \ - --name datanode \ - --network host \ - -e CLUSTER_NAME=test \ - -e WEBHDFS_CONF_dfs_webhdfs_enabled=true \ - -e CORE_CONF_hadoop_http_staticuser_user=root \ - -e HDFS_CONF_dfs_permissions_enabled=false \ - -e HDFS_CONF_dfs_support_append=true \ - -e HDFS_CONF_dfs_replication=1 \ - bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 - - curl --retry 30 --retry-delay 1 --retry-connrefused http://localhost:9870 - - - name: Setup Rust toolchain - uses: ./.github/actions/setup - with: - need-nextest: true - - - name: Setup java env - uses: actions/setup-java@v4 - with: - distribution: temurin - java-version: "11" - - name: Setup hadoop env - shell: bash - run: | - curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.2.4/hadoop-3.2.4.tar.gz | tar zxf - -C /home/runner - - - name: Test - shell: bash - working-directory: core - run: | - export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) - export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{ env.HADOOP_HOME }}/lib/native - cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{ env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml - - cargo test behavior --features tests,services-hdfs - env: - HADOOP_HOME: "/home/runner/hadoop-3.2.4" - OPENDAL_TEST: hdfs - OPENDAL_HDFS_ROOT: /tmp/opendal/ - OPENDAL_HDFS_NAME_NODE: hdfs://localhost:8020 - OPENDAL_HDFS_ENABLE_APPEND: true - - hdfs-default-with-atomic-write-dir: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Setup Rust toolchain - uses: ./.github/actions/setup - with: - need-nextest: true - - - name: Setup java env - uses: actions/setup-java@v4 - with: - distribution: temurin - java-version: "11" - - name: Setup hadoop env - shell: bash - run: | - curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner - - - name: Test - shell: bash - working-directory: core - run: | - export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) - export LD_LIBRARY_PATH=${{ env.JAVA_HOME }}/lib/server:${{ env.HADOOP_HOME }}/lib/native - cp ${{ github.workspace }}/fixtures/hdfs/hdfs-site.xml ${{ env.HADOOP_HOME }}/etc/hadoop/hdfs-site.xml - - cargo test behavior --features tests,services-hdfs - env: - HADOOP_HOME: "/home/runner/hadoop-3.3.5" - OPENDAL_TEST: hdfs - OPENDAL_HDFS_ROOT: /tmp/opendal/ - OPENDAL_HDFS_ATOMIC_WRITE_DIR: /tmp/atomic_write_dir/opendal/ - OPENDAL_HDFS_NAME_NODE: default - OPENDAL_HDFS_ENABLE_APPEND: false diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 11a4f399357..cb76475bc3b 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -62,7 +62,7 @@ pub use self::mime_guess::MimeGuessLayer; #[cfg(feature = "layers-prometheus")] mod prometheus; #[cfg(feature = "layers-prometheus")] -pub use self::prometheus::PrometheusLayer; +pub use self::prometheus::{PrometheusLayer, PrometheusLayerBuilder}; #[cfg(feature = "layers-prometheus-client")] mod prometheus_client; diff --git a/core/src/layers/observe/mod.rs b/core/src/layers/observe/mod.rs index 6f4586d155b..c7a9c2dd288 100644 --- a/core/src/layers/observe/mod.rs +++ b/core/src/layers/observe/mod.rs @@ -18,6 +18,17 @@ //! OpenDAL Observability Layer //! //! This module offers essential components to facilitate the implementation of observability in OpenDAL. +//! +//! # Prometheus Metrics +//! +//! These metrics are essential for understanding the behavior and performance of our applications. +//! +//! | Metric Name | Type | Description | Labels | +//! |------------------------------|-----------|--------------------------------------------------------------|-------------------------------------------------| +//! | operation_duration_seconds | Histogram | Histogram of time spent during opendal operations | scheme, namespace, root, operation, path | +//! | operation_bytes. | Histogram | Histogram of the bytes transferred during opendal operations | scheme, operation, root, operation, path | +//! | operation_errors_total | Counter | Error counter during opendal operations | scheme, operation, root, operation, path, error | +//! mod metrics; @@ -40,16 +51,16 @@ pub use metrics::METRIC_OPERATION_ERRORS_TOTAL; /// - level = 0: return `None`, which means we ignore the path label. /// - level > 0: the path label will be the path split by "/" and get the last n level, /// if n=1 and input path is "abc/def/ghi", and then we'll use "abc/" as the path label. -pub fn path_label_value(path: &str, path_level: usize) -> Option<&str> { +pub fn path_label_value(path: &str, level: usize) -> Option<&str> { if path.is_empty() { return None; } - if path_level > 0 { + if level > 0 { let label_value = path .char_indices() .filter(|&(_, c)| c == '/') - .nth(path_level - 1) + .nth(level - 1) .map_or(path, |(i, _)| &path[..i]); Some(label_value) } else { diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 92055ff6bcd..85bb29edcdd 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -22,9 +22,8 @@ use prometheus::core::AtomicU64; use prometheus::core::GenericCounterVec; use prometheus::exponential_buckets; use prometheus::histogram_opts; -use prometheus::register_histogram_vec_with_registry; -use prometheus::register_int_counter_vec_with_registry; use prometheus::HistogramVec; +use prometheus::Opts; use prometheus::Registry; use crate::layers::observe; @@ -36,43 +35,28 @@ use crate::*; /// /// # Prometheus Metrics /// -/// In this section, we will introduce three metrics that are currently being exported by our project. These metrics are essential for understanding the behavior and performance of our applications. -/// -/// -/// | Metric Name | Type | Description | Labels | -/// |-------------------------|----------|---------------------------------------------------|---------------------| -/// | requests_total | Counter | Total times of 'create' operation being called | scheme, operation | -/// | requests_duration_seconds | Histogram | Histogram of the time spent on specific operation | scheme, operation | -/// | bytes_total | Histogram | Total size | scheme, operation | -/// +/// We provide several metrics, please see the documentation of [`observe`] module. /// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/). /// -/// # Histogram Configuration -/// -/// The metric buckets for these histograms are automatically generated based on the `exponential_buckets(0.01, 2.0, 16)` configuration. -/// /// # Examples /// -/// ```no_build -/// use log::debug; -/// use log::info; -/// use opendal::layers::PrometheusLayer; -/// use opendal::services; -/// use opendal::Operator; -/// use opendal::Result; -/// use prometheus::Encoder; +/// ```no_run +/// # use log::debug; +/// # use log::info; +/// # use opendal::layers::PrometheusLayer; +/// # use opendal::services; +/// # use opendal::Operator; +/// # use opendal::Result; +/// # use prometheus::Encoder; /// -/// /// Visit [`opendal::services`] for more service related config. -/// /// Visit [`opendal::Operator`] for more operator level APIs. -/// #[tokio::main] -/// async fn main() -> Result<()> { +/// # #[tokio::main] +/// # async fn main() -> Result<()> { /// // Pick a builder and configure it. /// let builder = services::Memory::default(); /// let registry = prometheus::default_registry(); /// -/// let op = Operator::new(builder) -/// .expect("must init") -/// .layer(PrometheusLayer::new(registry.clone())) +/// let op = Operator::new(builder)? +/// .layer(PrometheusLayer::builder().register(registry).expect("register metrics successfully")) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -93,32 +77,125 @@ use crate::*; /// println!("## Prometheus Metrics"); /// println!("{}", String::from_utf8(buffer.clone()).unwrap()); /// Ok(()) -/// } +/// # } /// ``` #[derive(Clone, Debug)] pub struct PrometheusLayer { - registry: Registry, + interceptor: PrometheusInterceptor, +} + +impl PrometheusLayer { + /// Create a [`PrometheusLayerBuilder`] to set the configuration of metrics. + /// + /// # Default Configuration + /// + /// - `operation_duration_seconds_buckets`: `exponential_buckets(0.01, 2.0, 16)` + /// - `operation_bytes_buckets`: `exponential_buckets(1.0, 2.0, 16)` + /// - `path_label`: `0` + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let duration_seconds_buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap(); + /// let bytes_buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap(); + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .operation_duration_seconds_buckets(duration_seconds_buckets) + /// .operation_bytes_buckets(bytes_buckets) + /// .path_label(0) + /// .register(registry) + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn builder() -> PrometheusLayerBuilder { + let operation_duration_seconds_buckets = exponential_buckets(0.01, 2.0, 16).unwrap(); + let operation_bytes_buckets = exponential_buckets(1.0, 2.0, 16).unwrap(); + let path_label_level = 0; + PrometheusLayerBuilder::new( + operation_duration_seconds_buckets, + operation_bytes_buckets, + path_label_level, + ) + } +} + +impl Layer for PrometheusLayer { + type LayeredAccess = observe::MetricsAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + observe::MetricsLayer::new(self.interceptor.clone()).layer(inner) + } +} + +/// [`PrometheusLayerBuilder`] is a config builder to build a [`PrometheusLayer`]. +pub struct PrometheusLayerBuilder { operation_duration_seconds_buckets: Vec, operation_bytes_buckets: Vec, path_label_level: usize, } -impl PrometheusLayer { - /// Create a [`PrometheusLayer`] while registering itself to this registry. - pub fn new(registry: Registry) -> Self { +impl PrometheusLayerBuilder { + fn new( + operation_duration_seconds_buckets: Vec, + operation_bytes_buckets: Vec, + path_label_level: usize, + ) -> Self { Self { - registry, - operation_duration_seconds_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), - operation_bytes_buckets: exponential_buckets(1.0, 2.0, 16).unwrap(), - path_label_level: 0, + operation_duration_seconds_buckets, + operation_bytes_buckets, + path_label_level, } } /// Set buckets for `operation_duration_seconds` histogram. /// - /// You could call the [`linear_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.linear_buckets.html) - /// or [`exponential_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.exponential_buckets.html) - /// to generate the buckets. + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap(); + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .operation_duration_seconds_buckets(buckets) + /// .register(registry) + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn operation_duration_seconds_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { self.operation_duration_seconds_buckets = buckets; @@ -128,9 +205,35 @@ impl PrometheusLayer { /// Set buckets for `operation_bytes` histogram. /// - /// You could call the [`linear_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.linear_buckets.html) - /// or [`exponential_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.exponential_buckets.html) - /// to generate the buckets. + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap(); + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .operation_bytes_buckets(buckets) + /// .register(registry) + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn operation_bytes_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { self.operation_bytes_buckets = buckets; @@ -143,81 +246,167 @@ impl PrometheusLayer { /// - level = 0: we will ignore the path label. /// - level > 0: the path label will be the path split by "/" and get the last n level, /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/" as the path label. - pub fn enable_path_label(mut self, level: usize) -> Self { + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .path_label(1) + /// .register(registry) + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn path_label(mut self, level: usize) -> Self { self.path_label_level = level; self } -} -impl Layer for PrometheusLayer { - type LayeredAccess = observe::MetricsAccessor; - - fn layer(&self, inner: A) -> Self::LayeredAccess { - let interceptor = PrometheusInterceptor::register( - self.registry.clone(), - self.operation_duration_seconds_buckets.clone(), - self.operation_bytes_buckets.clone(), - self.path_label_level, - ); - observe::MetricsLayer::new(interceptor).layer(inner) - } -} - -#[derive(Clone, Debug)] -pub struct PrometheusInterceptor { - operation_duration_seconds: HistogramVec, - operation_bytes: HistogramVec, - operation_errors_total: GenericCounterVec, - path_label_level: usize, -} - -impl PrometheusInterceptor { - fn register( - registry: Registry, - operation_duration_seconds_buckets: Vec, - operation_bytes_buckets: Vec, - path_label_level: usize, - ) -> Self { - let labels = OperationLabels::names(false, path_label_level); - let operation_duration_seconds = register_histogram_vec_with_registry!( + /// Register the metrics into the given registry and return a [`PrometheusLayer`]. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .register(registry) + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn register(self, registry: &Registry) -> Result { + let labels = OperationLabels::names(false, self.path_label_level); + let operation_duration_seconds = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_DURATION_SECONDS.name(), observe::METRIC_OPERATION_DURATION_SECONDS.help(), - operation_duration_seconds_buckets + self.operation_duration_seconds_buckets ), &labels, - registry ) - .unwrap(); - let operation_bytes = register_histogram_vec_with_registry!( + .map_err(parse_prometheus_error)?; + let operation_bytes = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_BYTES.name(), observe::METRIC_OPERATION_BYTES.help(), - operation_bytes_buckets + self.operation_bytes_buckets ), &labels, - registry ) - .unwrap(); + .map_err(parse_prometheus_error)?; - let labels = OperationLabels::names(true, path_label_level); - let operation_errors_total = register_int_counter_vec_with_registry!( - observe::METRIC_OPERATION_ERRORS_TOTAL.name(), - observe::METRIC_OPERATION_ERRORS_TOTAL.help(), + let labels = OperationLabels::names(true, self.path_label_level); + let operation_errors_total = GenericCounterVec::new( + Opts::new( + observe::METRIC_OPERATION_ERRORS_TOTAL.name(), + observe::METRIC_OPERATION_ERRORS_TOTAL.help(), + ), &labels, - registry ) - .unwrap(); + .map_err(parse_prometheus_error)?; - Self { - operation_duration_seconds, - operation_bytes, - operation_errors_total, - path_label_level, - } + registry + .register(Box::new(operation_duration_seconds.clone())) + .map_err(parse_prometheus_error)?; + registry + .register(Box::new(operation_bytes.clone())) + .map_err(parse_prometheus_error)?; + registry + .register(Box::new(operation_errors_total.clone())) + .map_err(parse_prometheus_error)?; + + Ok(PrometheusLayer { + interceptor: PrometheusInterceptor { + operation_duration_seconds, + operation_bytes, + operation_errors_total, + path_label_level: self.path_label_level, + }, + }) + } + + /// Register the metrics into the default registry and return a [`PrometheusLayer`]. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .register_default() + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn register_default(self) -> Result { + let registry = prometheus::default_registry(); + self.register(registry) } } +/// Convert the [`prometheus::Error`] to [`Error`]. +fn parse_prometheus_error(err: prometheus::Error) -> Error { + Error::new(ErrorKind::Unexpected, err.to_string()).set_source(err) +} + +#[derive(Clone, Debug)] +pub struct PrometheusInterceptor { + operation_duration_seconds: HistogramVec, + operation_bytes: HistogramVec, + operation_errors_total: GenericCounterVec, + path_label_level: usize, +} + impl observe::MetricsIntercept for PrometheusInterceptor { fn observe_operation_duration_seconds( &self, @@ -232,11 +421,11 @@ impl observe::MetricsIntercept for PrometheusInterceptor { scheme, namespace: &namespace, root: &root, - op, + operation: op, error: None, path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_duration_seconds .with_label_values(&labels) @@ -256,11 +445,11 @@ impl observe::MetricsIntercept for PrometheusInterceptor { scheme, namespace: &namespace, root: &root, - op, + operation: op, error: None, path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_bytes .with_label_values(&labels) @@ -280,11 +469,11 @@ impl observe::MetricsIntercept for PrometheusInterceptor { scheme, namespace: &namespace, root: &root, - op, + operation: op, error: Some(error), path, } - .into_values(self.path_label_level); + .into_values(self.path_label_level); self.operation_errors_total.with_label_values(&labels).inc(); } @@ -294,7 +483,7 @@ struct OperationLabels<'a> { scheme: Scheme, namespace: &'a str, root: &'a str, - op: Operation, + operation: Operation, path: &'a str, error: Option, } @@ -334,11 +523,11 @@ impl<'a> OperationLabels<'a> { self.scheme.into_static(), self.namespace, self.root, - self.op.into_static(), + self.operation.into_static(), ]); - if let Some(path_label) = observe::path_label_value(self.path, path_label_level) { - labels.push(path_label); + if let Some(path) = observe::path_label_value(self.path, path_label_level) { + labels.push(path); } if let Some(error) = self.error { diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index d5c15eccbec..4d87d24a108 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -22,7 +22,6 @@ use std::io::SeekFrom; use std::path::PathBuf; use std::sync::Arc; -use futures::AsyncWriteExt; use log::debug; use uuid::Uuid; @@ -290,45 +289,35 @@ impl Access for HdfsBackend { } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir { - let target_path = build_rooted_abs_path(&self.root, path); - let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)); - - // If the target file exists, we should append to the end of it directly. - if op.append() && self.client.metadata(&target_path).is_ok() { - (target_path, None) - } else { - (target_path, Some(tmp_path)) + let target_path = build_rooted_abs_path(&self.root, path); + let target_exists = match self.client.metadata(&target_path) { + Ok(_) => true, + Err(err) => { + if err.kind() != io::ErrorKind::NotFound { + return Err(new_std_io_error(err)); + } + false } - } else { - let p = build_rooted_abs_path(&self.root, path); - (p, None) }; - if let Err(err) = self.client.metadata(&target_path) { - // Early return if other error happened. - if err.kind() != io::ErrorKind::NotFound { - return Err(new_std_io_error(err)); + let should_append = op.append() && target_exists; + let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| { + // If the target file exists, we should append to the end of it directly. + if should_append { + None + } else { + Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path))) } + }); + if !target_exists { let parent = get_parent(&target_path); - self.client.create_dir(parent).map_err(new_std_io_error)?; - - let mut f = self - .client - .open_file() - .create(true) - .write(true) - .async_open(&target_path) - .await - .map_err(new_std_io_error)?; - f.close().await.map_err(new_std_io_error)?; } let mut open_options = self.client.open_file(); open_options.create(true); - if op.append() { + if should_append { open_options.append(true); } else { open_options.write(true); @@ -341,7 +330,13 @@ impl Access for HdfsBackend { Ok(( RpWrite::new(), - HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)), + HdfsWriter::new( + target_path, + tmp_path, + f, + Arc::clone(&self.client), + target_exists, + ), )) } @@ -489,43 +484,35 @@ impl Access for HdfsBackend { } fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir { - let target_path = build_rooted_abs_path(&self.root, path); - let tmp_path = build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path)); - - // If the target file exists, we should append to the end of it directly. - if op.append() && self.client.metadata(&target_path).is_ok() { - (target_path, None) - } else { - (target_path, Some(tmp_path)) + let target_path = build_rooted_abs_path(&self.root, path); + let target_exists = match self.client.metadata(&target_path) { + Ok(_) => true, + Err(err) => { + if err.kind() != io::ErrorKind::NotFound { + return Err(new_std_io_error(err)); + } + false } - } else { - let p = build_rooted_abs_path(&self.root, path); - - (p, None) }; - if let Err(err) = self.client.metadata(&target_path) { - // Early return if other error happened. - if err.kind() != io::ErrorKind::NotFound { - return Err(new_std_io_error(err)); + let should_append = op.append() && target_exists; + let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| { + // If the target file exists, we should append to the end of it directly. + if should_append { + None + } else { + Some(build_rooted_abs_path(atomic_write_dir, &tmp_file_of(path))) } + }); + if !target_exists { let parent = get_parent(&target_path); - self.client.create_dir(parent).map_err(new_std_io_error)?; - - self.client - .open_file() - .create(true) - .write(true) - .open(&target_path) - .map_err(new_std_io_error)?; } let mut open_options = self.client.open_file(); open_options.create(true); - if op.append() { + if should_append { open_options.append(true); } else { open_options.write(true); @@ -537,7 +524,13 @@ impl Access for HdfsBackend { Ok(( RpWrite::new(), - HdfsWriter::new(target_path, tmp_path, f, Arc::clone(&self.client)), + HdfsWriter::new( + target_path, + tmp_path, + f, + Arc::clone(&self.client), + target_exists, + ), )) } diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 0f60014f410..e4123861a0f 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -29,6 +29,7 @@ pub struct HdfsWriter { tmp_path: Option, f: Option, client: Arc, + target_path_exists: bool, } /// # Safety @@ -42,12 +43,14 @@ impl HdfsWriter { tmp_path: Option, f: F, client: Arc, + target_path_exists: bool, ) -> Self { Self { target_path, tmp_path, f: Some(f), client, + target_path_exists, } } } @@ -70,6 +73,12 @@ impl oio::Write for HdfsWriter { // TODO: we need to make rename async. if let Some(tmp_path) = &self.tmp_path { + // we must delete the target_path, otherwise the rename_file operation will fail + if self.target_path_exists { + self.client + .remove_file(&self.target_path) + .map_err(new_std_io_error)?; + } self.client .rename_file(tmp_path, &self.target_path) .map_err(new_std_io_error)? @@ -102,6 +111,12 @@ impl oio::BlockingWrite for HdfsWriter { f.flush().map_err(new_std_io_error)?; if let Some(tmp_path) = &self.tmp_path { + // we must delete the target_path, otherwise the rename_file operation will fail + if self.target_path_exists { + self.client + .remove_file(&self.target_path) + .map_err(new_std_io_error)?; + } self.client .rename_file(tmp_path, &self.target_path) .map_err(new_std_io_error)?; diff --git a/core/src/services/icloud/core.rs b/core/src/services/icloud/core.rs index c62ba60330d..306c0ad206b 100644 --- a/core/src/services/icloud/core.rs +++ b/core/src/services/icloud/core.rs @@ -526,11 +526,11 @@ impl PathQuery for IcloudPathQuery { let node = &root[0]; - let id = match node.items.iter().find(|it| it.name == name) { - Some(it) => Ok(Some(it.drivewsid.clone())), - None => Ok(None), - }?; - Ok(id) + Ok(node + .items + .iter() + .find(|it| it.name == name) + .map(|it| it.drivewsid.clone())) } async fn create_dir(&self, parent_id: &str, name: &str) -> Result { diff --git a/fixtures/hdfs/docker-compose-hdfs-cluster.yml b/fixtures/hdfs/docker-compose-hdfs-cluster.yml new file mode 100644 index 00000000000..376e8ed4868 --- /dev/null +++ b/fixtures/hdfs/docker-compose-hdfs-cluster.yml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3.8' + +services: + namenode: + image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 + container_name: namenode + network_mode: "host" + environment: + CLUSTER_NAME: test + WEBHDFS_CONF_dfs_webhdfs_enabled: true + CORE_CONF_hadoop_http_staticuser_user: root + HDFS_CONF_dfs_permissions_enabled: false + HDFS_CONF_dfs_support_append: true + HDFS_CONF_dfs_replication: 1 + datanode: + image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 + container_name: datanode + network_mode: "host" + environment: + CLUSTER_NAME: test + WEBHDFS_CONF_dfs_webhdfs_enabled: true + CORE_CONF_hadoop_http_staticuser_user: root + HDFS_CONF_dfs_permissions_enabled: false + HDFS_CONF_dfs_support_append: true + HDFS_CONF_dfs_replication: 1 +