diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
index 73e358179b..6cf15b2a52 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java
@@ -19,16 +19,42 @@
package org.apache.tez.common.counters;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;
@Private
public enum FileSystemCounter {
- BYTES_READ,
- BYTES_WRITTEN,
- READ_OPS,
- LARGE_READ_OPS,
- WRITE_OPS,
- HDFS_BYTES_READ,
- HDFS_BYTES_WRITTEN,
- FILE_BYTES_READ,
- FILE_BYTES_WRITTEN
+ BYTES_READ("bytesRead"),
+ BYTES_WRITTEN("bytesWritten"),
+ READ_OPS("readOps"),
+ LARGE_READ_OPS("largeReadOps"),
+ WRITE_OPS("writeOps"),
+ HDFS_BYTES_READ("hdfsBytesRead"),
+ HDFS_BYTES_WRITTEN("hdfsBytesWritten"),
+ FILE_BYTES_READ("fileBytesRead"),
+ FILE_BYTES_WRITTEN("fileBytesWritten"),
+
+ // Additional counters from HADOOP-13305
+ OP_APPEND(CommonStatisticNames.OP_APPEND),
+ OP_CREATE(CommonStatisticNames.OP_CREATE),
+ OP_DELETE(CommonStatisticNames.OP_DELETE),
+ OP_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS),
+ OP_LIST_FILES(CommonStatisticNames.OP_LIST_FILES),
+ OP_LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS),
+ OP_MKDIRS(CommonStatisticNames.OP_MKDIRS),
+ OP_OPEN(CommonStatisticNames.OP_OPEN),
+ OP_RENAME(CommonStatisticNames.OP_RENAME),
+ OP_SET_ACL(CommonStatisticNames.OP_SET_ACL),
+ OP_SET_OWNER(CommonStatisticNames.OP_SET_OWNER),
+ OP_SET_PERMISSION(CommonStatisticNames.OP_SET_PERMISSION),
+ OP_GET_FILE_BLOCK_LOCATIONS("op_get_file_block_locations");
+
+ private final String opName;
+
+ FileSystemCounter(String opName) {
+ this.opName = opName;
+ }
+
+ public String getOpName() {
+ return opName;
+ }
}
diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml
index 503bf2b103..e235e933cd 100644
--- a/tez-runtime-internals/pom.xml
+++ b/tez-runtime-internals/pom.xml
@@ -49,6 +49,23 @@
org.apache.hadoop
hadoop-common
+
+ org.apache.hadoop
+ hadoop-common
+ test
+ test-jar
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+ test-jar
+
org.apache.hadoop
hadoop-yarn-api
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
index bb15ef159f..4347340d7c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -17,9 +17,7 @@
package org.apache.tez.runtime.metrics;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.StorageStatistics;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
@@ -30,50 +28,22 @@
*/
public class FileSystemStatisticUpdater {
- private List stats;
- private TezCounter readBytesCounter, writeBytesCounter, readOpsCounter, largeReadOpsCounter,
- writeOpsCounter;
- private String scheme;
+ private StorageStatistics stats;
private TezCounters counters;
- FileSystemStatisticUpdater(TezCounters counters, List stats, String scheme) {
- this.stats = stats;
- this.scheme = scheme;
+ FileSystemStatisticUpdater(TezCounters counters, StorageStatistics storageStatistics) {
+ this.stats = storageStatistics;
this.counters = counters;
}
void updateCounters() {
- if (readBytesCounter == null) {
- readBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_READ);
- }
- if (writeBytesCounter == null) {
- writeBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN);
- }
- if (readOpsCounter == null) {
- readOpsCounter = counters.findCounter(scheme, FileSystemCounter.READ_OPS);
- }
- if (largeReadOpsCounter == null) {
- largeReadOpsCounter = counters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS);
- }
- if (writeOpsCounter == null) {
- writeOpsCounter = counters.findCounter(scheme, FileSystemCounter.WRITE_OPS);
- }
- long readBytes = 0;
- long writeBytes = 0;
- long readOps = 0;
- long largeReadOps = 0;
- long writeOps = 0;
- for (FileSystem.Statistics stat : stats) {
- readBytes = readBytes + stat.getBytesRead();
- writeBytes = writeBytes + stat.getBytesWritten();
- readOps = readOps + stat.getReadOps();
- largeReadOps = largeReadOps + stat.getLargeReadOps();
- writeOps = writeOps + stat.getWriteOps();
+ // loop through FileSystemCounter enums as it is a smaller set
+ for (FileSystemCounter fsCounter : FileSystemCounter.values()) {
+ Long val = stats.getLong(fsCounter.getOpName());
+ if (val != null && val != 0) {
+ TezCounter counter = counters.findCounter(stats.getScheme(), fsCounter);
+ counter.setValue(val);
+ }
}
- readBytesCounter.setValue(readBytes);
- writeBytesCounter.setValue(writeBytes);
- readOpsCounter.setValue(readOps);
- largeReadOpsCounter.setValue(largeReadOps);
- writeOpsCounter.setValue(writeOps);
}
}
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
index 48676e225b..d0ab041b50 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java
@@ -18,17 +18,18 @@
package org.apache.tez.runtime.metrics;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
+import java.util.Iterator;
import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.fs.GlobalStorageStatistics;
+import org.apache.hadoop.fs.StorageStatistics;
import org.apache.tez.util.TezMxBeanResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.tez.common.GcTimeUpdater;
import org.apache.tez.common.counters.TaskCounter;
@@ -48,11 +49,16 @@ public class TaskCounterUpdater {
private final TezCounters tezCounters;
private final Configuration conf;
+// /**
+// * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
+// */
+// private Map statisticUpdaters =
+// new HashMap<>();
/**
- * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
+ * A Map where Key-> URIScheme and value->Map
*/
- private Map statisticUpdaters =
- new HashMap();
+ private Map> statisticUpdaters =
+ new HashMap<>();
protected final GcTimeUpdater gcUpdater;
private ResourceCalculatorProcessTree pTree;
private long initCpuCumulativeTime = 0;
@@ -69,32 +75,16 @@ public TaskCounterUpdater(TezCounters counters, Configuration conf, String pid)
public void updateCounters() {
- // FileSystemStatistics are reset each time a new task is seen by the
- // container.
- // This doesn't remove the fileSystem, and does not clear all statistics -
- // so there is a potential of an unused FileSystem showing up for a
- // Container, and strange values for READ_OPS etc.
- Map> map = new
- HashMap>();
- for(Statistics stat: FileSystem.getAllStatistics()) {
- String uriScheme = stat.getScheme();
- if (map.containsKey(uriScheme)) {
- List list = map.get(uriScheme);
- list.add(stat);
- } else {
- List list = new ArrayList();
- list.add(stat);
- map.put(uriScheme, list);
- }
- }
- for (Map.Entry> entry: map.entrySet()) {
- FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
- if(updater==null) {//new FileSystem has been found in the cache
- updater =
- new FileSystemStatisticUpdater(tezCounters, entry.getValue(),
- entry.getKey());
- statisticUpdaters.put(entry.getKey(), updater);
+ GlobalStorageStatistics globalStorageStatistics = FileSystem.getGlobalStorageStatistics();
+ Iterator iter = globalStorageStatistics.iterator();
+ while (iter.hasNext()) {
+ StorageStatistics stats = iter.next();
+ if (!statisticUpdaters.containsKey(stats.getScheme())) {
+ Map updaterSet = new TreeMap<>();
+ statisticUpdaters.put(stats.getScheme(), updaterSet);
}
+ FileSystemStatisticUpdater updater = statisticUpdaters.get(stats.getScheme())
+ .computeIfAbsent(stats.getName(), k -> new FileSystemStatisticUpdater(tezCounters, stats));
updater.updateCounters();
}
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java
new file mode 100644
index 0000000000..040960907c
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFileSystemStatisticUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestFileSystemStatisticUpdater.class);
+
+ private static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR +
+ TestFileSystemStatisticUpdater.class.getName() + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+ .build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ @Test
+ public void basicTest() throws IOException {
+ TezCounters counters = new TezCounters();
+ TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");
+
+ remoteFs.mkdirs(new Path("/tmp/foo/"));
+ FSDataOutputStream out = remoteFs.create(new Path("/tmp/foo/abc.txt"));
+ out.writeBytes("xyz");
+ out.close();
+
+ updater.updateCounters();
+
+ LOG.info("Counters: " + counters);
+ TezCounter mkdirCounter = counters.findCounter(remoteFs.getScheme(),
+ FileSystemCounter.OP_MKDIRS);
+ TezCounter createCounter = counters.findCounter(remoteFs.getScheme(),
+ FileSystemCounter.OP_CREATE);
+ Assert.assertNotNull(mkdirCounter);
+ Assert.assertNotNull(createCounter);
+ Assert.assertEquals(1, mkdirCounter.getValue());
+ Assert.assertEquals(1, createCounter.getValue());
+
+ FSDataOutputStream out1 = remoteFs.create(new Path("/tmp/foo/abc1.txt"));
+ out1.writeBytes("xyz");
+ out1.close();
+
+ long oldCreateVal = createCounter.getValue();
+ updater.updateCounters();
+
+ LOG.info("Counters: " + counters);
+ Assert.assertTrue("Counter not updated, old=" + oldCreateVal
+ + ", new=" + createCounter.getValue(), createCounter.getValue() > oldCreateVal);
+
+ oldCreateVal = createCounter.getValue();
+ // Ensure all numbers are reset
+ remoteFs.clearStatistics();
+ updater.updateCounters();
+ LOG.info("Counters: " + counters);
+ Assert.assertEquals(oldCreateVal, createCounter.getValue());
+
+ }
+
+
+
+}
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java
new file mode 100644
index 0000000000..88b0941fe7
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskCounterUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestTaskCounterUpdater.class);
+ private static Configuration conf = new Configuration();
+
+ @Test
+ public void basicTest() {
+ TezCounters counters = new TezCounters();
+ TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");
+
+ updater.updateCounters();
+ LOG.info("Counters: " + counters);
+ TezCounter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS);
+ TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
+
+ Assert.assertNotNull(gcCounter);
+ Assert.assertNotNull(cpuCounter);
+ long oldVal = cpuCounter.getValue();
+ Assert.assertTrue(cpuCounter.getValue() > 0);
+
+ updater.updateCounters();
+ LOG.info("Counters: " + counters);
+ Assert.assertTrue("Counter not updated, old=" + oldVal
+ + ", new=" + cpuCounter.getValue(), cpuCounter.getValue() > oldVal);
+
+ }
+
+
+}
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index aeaec53124..9c26a321ca 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -41,7 +41,6 @@
import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.tez.common.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -181,7 +180,9 @@ public void testMultipleSuccessfulTasks() throws IOException, InterruptedExcepti
assertFalse(TestProcessor.wasAborted());
umbilical.resetTrackedEvents();
TezCounters tezCounters = runtimeTask.getCounters();
- verifySysCounters(tezCounters, 5, 5);
+ // with TEZ-3331, fs counters are not set if the value is 0 (see FileSystemStatisticUpdater2), so there can be
+ // a mismatch in task counter count and fs counter count
+ verifySysCounters(tezCounters, 5, 0);
taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY, false);
@@ -693,10 +694,6 @@ public void testClusterStorageCapacityFatalError() throws IOException {
private void verifySysCounters(TezCounters tezCounters, int minTaskCounterCount, int minFsCounterCount) {
- Preconditions.checkArgument((minTaskCounterCount > 0 && minFsCounterCount > 0) ||
- (minTaskCounterCount <= 0 && minFsCounterCount <= 0),
- "Both targetCounter counts should be postitive or negative. A mix is not expected");
-
int numTaskCounters = 0;
int numFsCounters = 0;
for (CounterGroup counterGroup : tezCounters) {