diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java index 73e358179b..4a5d41e221 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java @@ -19,16 +19,43 @@ package org.apache.tez.common.counters; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames; @Private public enum FileSystemCounter { - BYTES_READ, - BYTES_WRITTEN, - READ_OPS, - LARGE_READ_OPS, - WRITE_OPS, - HDFS_BYTES_READ, - HDFS_BYTES_WRITTEN, - FILE_BYTES_READ, - FILE_BYTES_WRITTEN -} + BYTES_READ ("bytesRead"), + BYTES_WRITTEN ("bytesWritten"), + READ_OPS ("readOps"), + LARGE_READ_OPS ("largeReadOps"), + WRITE_OPS ("writeOps"), + HDFS_BYTES_READ ("hdfsBytesRead"), + HDFS_BYTES_WRITTEN ("hdfsBytesWritten"), + FILE_BYTES_READ ("fileBytesRead"), + FILE_BYTES_WRITTEN ("fileBytesWritten"), + + // Additional counters from HADOOP-13305 + OP_APPEND (CommonStatisticNames.OP_APPEND), + OP_CREATE (CommonStatisticNames.OP_CREATE), + OP_DELETE (CommonStatisticNames.OP_DELETE), + OP_GET_FILE_STATUS (CommonStatisticNames.OP_GET_FILE_STATUS), + OP_LIST_FILES (CommonStatisticNames.OP_LIST_FILES), + OP_LIST_LOCATED_STATUS (CommonStatisticNames.OP_LIST_LOCATED_STATUS), + OP_MKDIRS (CommonStatisticNames.OP_MKDIRS), + OP_OPEN (CommonStatisticNames.OP_OPEN), + OP_RENAME (CommonStatisticNames.OP_RENAME), + OP_SET_ACL (CommonStatisticNames.OP_SET_ACL), + OP_SET_OWNER (CommonStatisticNames.OP_SET_OWNER), + OP_SET_PERMISSION (CommonStatisticNames.OP_SET_PERMISSION), + OP_GET_FILE_BLOCK_LOCATIONS ("op_get_file_block_locations"); + + private final String opName; + + FileSystemCounter(String opName) { + this.opName = opName; + } + + public String getOpName() { + return opName; + } + + } diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml index 503bf2b103..e235e933cd 100644 --- a/tez-runtime-internals/pom.xml +++ b/tez-runtime-internals/pom.xml @@ -49,6 +49,23 @@ org.apache.hadoop hadoop-common + + org.apache.hadoop + hadoop-common + test + test-jar + + + org.apache.hadoop + hadoop-hdfs + test + + + org.apache.hadoop + hadoop-hdfs + test + test-jar + org.apache.hadoop hadoop-yarn-api diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java index bb15ef159f..4347340d7c 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/FileSystemStatisticUpdater.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -17,9 +17,7 @@ package org.apache.tez.runtime.metrics; -import java.util.List; - -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.StorageStatistics; import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; @@ -30,50 +28,22 @@ */ public class FileSystemStatisticUpdater { - private List stats; - private TezCounter readBytesCounter, writeBytesCounter, readOpsCounter, largeReadOpsCounter, - writeOpsCounter; - private String scheme; + private StorageStatistics stats; private TezCounters counters; - FileSystemStatisticUpdater(TezCounters counters, List stats, String scheme) { - this.stats = stats; - this.scheme = scheme; + FileSystemStatisticUpdater(TezCounters counters, StorageStatistics storageStatistics) { + this.stats = storageStatistics; this.counters = counters; } void updateCounters() { - if (readBytesCounter == null) { - readBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_READ); - } - if (writeBytesCounter == null) { - writeBytesCounter = counters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN); - } - if (readOpsCounter == null) { - readOpsCounter = counters.findCounter(scheme, FileSystemCounter.READ_OPS); - } - if (largeReadOpsCounter == null) { - largeReadOpsCounter = counters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS); - } - if (writeOpsCounter == null) { - writeOpsCounter = counters.findCounter(scheme, FileSystemCounter.WRITE_OPS); - } - long readBytes = 0; - long writeBytes = 0; - long readOps = 0; - long largeReadOps = 0; - long writeOps = 0; - for (FileSystem.Statistics stat : stats) { - readBytes = readBytes + stat.getBytesRead(); - writeBytes = writeBytes + stat.getBytesWritten(); - readOps = readOps + stat.getReadOps(); - largeReadOps = largeReadOps + stat.getLargeReadOps(); - writeOps = writeOps + stat.getWriteOps(); + // loop through FileSystemCounter enums as it is a smaller set + for (FileSystemCounter fsCounter : FileSystemCounter.values()) { + Long val = stats.getLong(fsCounter.getOpName()); + if (val != null && val != 0) { + TezCounter counter = counters.findCounter(stats.getScheme(), fsCounter); + counter.setValue(val); + } } - readBytesCounter.setValue(readBytes); - writeBytesCounter.setValue(writeBytes); - readOpsCounter.setValue(readOps); - largeReadOpsCounter.setValue(largeReadOps); - writeOpsCounter.setValue(writeOps); } } diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java index 48676e225b..854fbd2bbc 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java @@ -20,9 +20,13 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.TreeMap; +import org.apache.hadoop.fs.GlobalStorageStatistics; +import org.apache.hadoop.fs.StorageStatistics; import org.apache.tez.util.TezMxBeanResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,11 +52,16 @@ public class TaskCounterUpdater { private final TezCounters tezCounters; private final Configuration conf; +// /** +// * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater +// */ +// private Map statisticUpdaters = +// new HashMap<>(); /** - * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater + * A Map where Key-> URIScheme and value->Map */ - private Map statisticUpdaters = - new HashMap(); + private Map> statisticUpdaters = + new HashMap<>(); protected final GcTimeUpdater gcUpdater; private ResourceCalculatorProcessTree pTree; private long initCpuCumulativeTime = 0; @@ -69,32 +78,16 @@ public TaskCounterUpdater(TezCounters counters, Configuration conf, String pid) public void updateCounters() { - // FileSystemStatistics are reset each time a new task is seen by the - // container. - // This doesn't remove the fileSystem, and does not clear all statistics - - // so there is a potential of an unused FileSystem showing up for a - // Container, and strange values for READ_OPS etc. - Map> map = new - HashMap>(); - for(Statistics stat: FileSystem.getAllStatistics()) { - String uriScheme = stat.getScheme(); - if (map.containsKey(uriScheme)) { - List list = map.get(uriScheme); - list.add(stat); - } else { - List list = new ArrayList(); - list.add(stat); - map.put(uriScheme, list); - } - } - for (Map.Entry> entry: map.entrySet()) { - FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey()); - if(updater==null) {//new FileSystem has been found in the cache - updater = - new FileSystemStatisticUpdater(tezCounters, entry.getValue(), - entry.getKey()); - statisticUpdaters.put(entry.getKey(), updater); + GlobalStorageStatistics globalStorageStatistics = FileSystem.getGlobalStorageStatistics(); + Iterator iter = globalStorageStatistics.iterator(); + while (iter.hasNext()) { + StorageStatistics stats = iter.next(); + if (!statisticUpdaters.containsKey(stats.getScheme())) { + Map updaterSet = new TreeMap<>(); + statisticUpdaters.put(stats.getScheme(), updaterSet); } + FileSystemStatisticUpdater updater = statisticUpdaters.get(stats.getScheme()) + .computeIfAbsent(stats.getName(), k -> new FileSystemStatisticUpdater(tezCounters, stats)); updater.updateCounters(); } diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java new file mode 100644 index 0000000000..0277eaddf6 --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.metrics; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.tez.common.counters.FileSystemCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestFileSystemStatisticUpdater { + + private static final Logger LOG = LoggerFactory.getLogger( + TestFileSystemStatisticUpdater.class); + + protected static MiniDFSCluster dfsCluster; + + private static Configuration conf = new Configuration(); + private static FileSystem remoteFs; + + private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + + TestFileSystemStatisticUpdater.class.getName() + "-tmpDir"; + + @BeforeClass + public static void setup() throws IOException { + try { + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null) + .build(); + remoteFs = dfsCluster.getFileSystem(); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + } + + @AfterClass + public static void tearDown() { + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + } + + @Test + public void basicTest() throws IOException { + TezCounters counters = new TezCounters(); + TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid"); + + remoteFs.mkdirs(new Path("/tmp/foo/")); + FSDataOutputStream out = remoteFs.create(new Path("/tmp/foo/abc.txt")); + out.writeBytes("xyz"); + out.close(); + + updater.updateCounters(); + + LOG.info("Counters: " + counters); + TezCounter mkdirCounter = counters.findCounter(remoteFs.getScheme(), + FileSystemCounter.OP_MKDIRS); + TezCounter createCounter = counters.findCounter(remoteFs.getScheme(), + FileSystemCounter.OP_CREATE); + Assert.assertNotNull(mkdirCounter); + Assert.assertNotNull(createCounter); + Assert.assertEquals(1, mkdirCounter.getValue()); + Assert.assertEquals(1, createCounter.getValue()); + + FSDataOutputStream out1 = remoteFs.create(new Path("/tmp/foo/abc1.txt")); + out1.writeBytes("xyz"); + out1.close(); + + long oldCreateVal = createCounter.getValue(); + updater.updateCounters(); + + LOG.info("Counters: " + counters); + Assert.assertTrue("Counter not updated, old=" + oldCreateVal + + ", new=" + createCounter.getValue(), createCounter.getValue() > oldCreateVal); + + oldCreateVal = createCounter.getValue(); + // Ensure all numbers are reset + remoteFs.clearStatistics(); + updater.updateCounters(); + LOG.info("Counters: " + counters); + Assert.assertEquals(oldCreateVal, createCounter.getValue()); + + } + + + +} diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java new file mode 100644 index 0000000000..88b0941fe7 --- /dev/null +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.counters.TaskCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestTaskCounterUpdater { + + private static final Logger LOG = LoggerFactory.getLogger( + TestTaskCounterUpdater.class); + private static Configuration conf = new Configuration(); + + @Test + public void basicTest() { + TezCounters counters = new TezCounters(); + TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid"); + + updater.updateCounters(); + LOG.info("Counters: " + counters); + TezCounter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS); + TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS); + + Assert.assertNotNull(gcCounter); + Assert.assertNotNull(cpuCounter); + long oldVal = cpuCounter.getValue(); + Assert.assertTrue(cpuCounter.getValue() > 0); + + updater.updateCounters(); + LOG.info("Counters: " + counters); + Assert.assertTrue("Counter not updated, old=" + oldVal + + ", new=" + cpuCounter.getValue(), cpuCounter.getValue() > oldVal); + + } + + +} diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java index aeaec53124..7246f61268 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java @@ -181,7 +181,9 @@ public void testMultipleSuccessfulTasks() throws IOException, InterruptedExcepti assertFalse(TestProcessor.wasAborted()); umbilical.resetTrackedEvents(); TezCounters tezCounters = runtimeTask.getCounters(); - verifySysCounters(tezCounters, 5, 5); + // with TEZ-3331, fs counters are not set if the value is 0 (see FileSystemStatisticUpdater2), so there can be + // a mismatch in task counter count and fs counter count + verifySysCounters(tezCounters, 5, 0); taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.CONF_EMPTY, false); @@ -693,10 +695,6 @@ public void testClusterStorageCapacityFatalError() throws IOException { private void verifySysCounters(TezCounters tezCounters, int minTaskCounterCount, int minFsCounterCount) { - Preconditions.checkArgument((minTaskCounterCount > 0 && minFsCounterCount > 0) || - (minTaskCounterCount <= 0 && minFsCounterCount <= 0), - "Both targetCounter counts should be postitive or negative. A mix is not expected"); - int numTaskCounters = 0; int numFsCounters = 0; for (CounterGroup counterGroup : tezCounters) {