From 8d17a99733ad5070bf8f733ad4c8775280ce8c2e Mon Sep 17 00:00:00 2001 From: "Dom G." Date: Fri, 17 Jan 2025 12:16:51 -0500 Subject: [PATCH] Improve BulkSplitOptimizationIT (#5179) * Improve BulkSplitOptimizationIT * Add timing to test logging --- .../functional/BulkSplitOptimizationIT.java | 94 ++++++++++++++----- 1 file changed, 71 insertions(+), 23 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java index de2ed8475f8..bd29f6e10e5 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java @@ -18,49 +18,81 @@ */ package org.apache.accumulo.test.functional; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This test verifies that when a lot of files are bulk imported into a table with one tablet and * then splits that not all data files go to the children tablets. */ public class BulkSplitOptimizationIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(BulkSplitOptimizationIT.class); + + Path testDir; @Override protected Duration defaultTimeout() { - return Duration.ofMinutes(2); + return Duration.ofMinutes(5); } @BeforeEach public void alterConfig() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + final int initialTserverCount = + client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size(); + log.info("Tserver count: {}", initialTserverCount); + Timer timer = Timer.startNew(); getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty(), + 120_000); + log.info("Took {} ms to stop all tservers", timer.elapsed(MILLISECONDS)); + timer.restart(); getClusterControl().startAllServers(ServerType.TABLET_SERVER); + Wait.waitFor(() -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() + < initialTserverCount, 120_000); + log.info("Took {} ms to start all tservers", timer.elapsed(MILLISECONDS)); + + FileSystem fs = cluster.getFileSystem(); + testDir = new Path(cluster.getTemporaryPath(), "testmf"); + fs.deleteOnExit(testDir); + + timer.restart(); + FunctionalTestUtils.createRFiles(client, fs, testDir.toString(), ROWS, SPLITS, 8); + long elapsed = timer.elapsed(MILLISECONDS); + FileStatus[] stats = fs.listStatus(testDir); + log.info("Generated {} files in {} ms", stats.length, elapsed); } } @AfterEach public void resetConfig() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - getClusterControl().stopAllServers(ServerType.TABLET_SERVER); - getClusterControl().startAllServers(ServerType.TABLET_SERVER); - } + getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getClusterControl().startAllServers(ServerType.TABLET_SERVER); } private static final int ROWS = 100000; @@ -68,35 +100,51 @@ public void resetConfig() throws Exception { @Test public void testBulkSplitOptimization() throws Exception { + log.info("Starting BulkSplitOptimizationIT test"); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - final String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1000"); - c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000"); - c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G"); - FileSystem fs = cluster.getFileSystem(); - Path testDir = new Path(cluster.getTemporaryPath(), "testmf"); - fs.deleteOnExit(testDir); - FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8); - FileStatus[] stats = fs.listStatus(testDir); - System.out.println("Number of generated files: " + stats.length); + final String tableName = getUniqueNames(1)[0]; + Map tableProps = new HashMap<>(); + tableProps.put(Property.TABLE_MAJC_RATIO.getKey(), "1000"); + tableProps.put(Property.TABLE_FILE_MAX.getKey(), "1000"); + tableProps.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G"); + + log.info("Creating table {}", tableName); + Timer timer = Timer.startNew(); + c.tableOperations().create(tableName, new NewTableConfiguration().setProperties(tableProps) + .withInitialTabletAvailability(TabletAvailability.HOSTED)); + log.info("Created table in {} ms. Starting bulk import", timer.elapsed(MILLISECONDS)); + + timer.restart(); c.tableOperations().importDirectory(testDir.toString()).to(tableName).load(); + log.info("Imported into table {} in {} ms", tableName, timer.elapsed(MILLISECONDS)); + timer.restart(); FunctionalTestUtils.checkSplits(c, tableName, 0, 0); FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100); + log.info("Checked splits and rfiles in {} ms", timer.elapsed(MILLISECONDS)); - // initiate splits + log.info("Lowering split threshold to 100K to initiate splits"); c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K"); - Thread.sleep(SECONDS.toMillis(2)); + timer.restart(); // wait until over split threshold -- should be 78 splits - while (c.tableOperations().listSplits(tableName).size() < 50) { - Thread.sleep(500); - } + Wait.waitFor(() -> { + try { + FunctionalTestUtils.checkSplits(c, tableName, 50, 100); + } catch (Exception e) { + if (e.getMessage().contains("splits points out of range")) { + return false; + } else { + throw e; + } + } + return true; + }); + + log.info("Took {} ms for split count to reach expected range", timer.elapsed(MILLISECONDS)); - FunctionalTestUtils.checkSplits(c, tableName, 50, 100); VerifyParams params = new VerifyParams(getClientProps(), tableName, ROWS); params.timestamp = 1; params.dataSize = 50;