From b8b5880dca7e1f7d318cdb5baa04c4fea07392a8 Mon Sep 17 00:00:00 2001 From: Usener Date: Fri, 26 Aug 2022 15:12:28 +0200 Subject: [PATCH 1/6] reduce class fan out complexity from 20 to 19 (20 is upper boundary) --- src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java index f28f6e2b..3fd1dc98 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java @@ -39,7 +39,6 @@ import io.aiven.connect.jdbc.dialect.DatabaseDialect; import io.aiven.connect.jdbc.dialect.DatabaseDialects; -import io.aiven.connect.jdbc.util.CachedConnectionProvider; import io.aiven.connect.jdbc.util.ColumnDefinition; import io.aiven.connect.jdbc.util.ColumnId; import io.aiven.connect.jdbc.util.TableId; @@ -59,7 +58,7 @@ public class JdbcSourceTask extends SourceTask { private Time time; private JdbcSourceTaskConfig config; private DatabaseDialect dialect; - private CachedConnectionProvider cachedConnectionProvider; + private SourceConnectionProvider cachedConnectionProvider; private PriorityQueue tableQueue = new PriorityQueue(); private final AtomicBoolean running = new AtomicBoolean(false); From 7025f75cfed7e945a80692f810168a795571a1a5 Mon Sep 17 00:00:00 2001 From: Usener Date: Fri, 26 Aug 2022 15:18:24 +0200 Subject: [PATCH 2/6] introduce tTask config INITIAL_MESSAGE_COUNT_METRIC_ENABLED_CONFIG --- .../aiven/connect/jdbc/source/JdbcSourceTask.java | 3 +++ .../connect/jdbc/source/JdbcSourceTaskConfig.java | 13 ++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java index 3fd1dc98..c46abfc0 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java @@ -155,6 +155,9 @@ public void start(final Map properties) { final boolean validateNonNulls = config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG); + if (config.getBoolean(JdbcSourceTaskConfig.INITIAL_MESSAGE_COUNT_METRIC_ENABLED_CONFIG)) { + } + for (final String tableOrQuery : tablesOrQuery) { final List> tablePartitionsToCheck; final Map partition; diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTaskConfig.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTaskConfig.java index b705ad31..0b0664e4 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTaskConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTaskConfig.java @@ -32,8 +32,19 @@ public class JdbcSourceTaskConfig extends JdbcSourceConnectorConfig { public static final String TABLES_CONFIG = "tables"; private static final String TABLES_DOC = "List of tables for this task to watch for changes."; + public static final String INITIAL_MESSAGE_COUNT_METRIC_ENABLED_CONFIG = "sourceTask.initialMessageCount.enabled"; + + private static final String INITIAL_MESSAGE_COUNT_METRIC_ENABLED_DOC = "Enables a custom metric to determine the " + + "number of messages/records that will be published into the Kafka topic. To reduce database load, the " + + "corresponding query is executed at startup only. The metric is published via JMX under " + + "io.aiven.connect.jdbc.initialImportCount props) { super(config, props); From 5213a942febd3588b2e0e4159e04303da6a8fac3 Mon Sep 17 00:00:00 2001 From: Usener Date: Fri, 26 Aug 2022 15:19:04 +0200 Subject: [PATCH 3/6] provide StartupMetric as JMX MBean --- .../connect/jdbc/util/StartupMetric.java | 49 +++++++++++++++++++ .../connect/jdbc/util/StartupMetricMBean.java | 27 ++++++++++ 2 files changed, 76 insertions(+) create mode 100644 src/main/java/io/aiven/connect/jdbc/util/StartupMetric.java create mode 100644 src/main/java/io/aiven/connect/jdbc/util/StartupMetricMBean.java diff --git a/src/main/java/io/aiven/connect/jdbc/util/StartupMetric.java b/src/main/java/io/aiven/connect/jdbc/util/StartupMetric.java new file mode 100644 index 00000000..5e0a20fe --- /dev/null +++ b/src/main/java/io/aiven/connect/jdbc/util/StartupMetric.java @@ -0,0 +1,49 @@ +/* + * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et + * dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. + */ +public class StartupMetric implements StartupMetricMBean { + + private static final Logger log = LoggerFactory.getLogger(StartupMetric.class); + + private long counter = -1; + + public StartupMetric() { + super(); + } + + @Override + public Long getCounter() { + log.debug("getCounter: " + counter); + return counter; + } + + public void updateCounter(final Long counter) { + log.info("setCounter: " + counter); + if (counter != null) { + this.counter = counter.intValue(); + } + } +} diff --git a/src/main/java/io/aiven/connect/jdbc/util/StartupMetricMBean.java b/src/main/java/io/aiven/connect/jdbc/util/StartupMetricMBean.java new file mode 100644 index 00000000..85dceca3 --- /dev/null +++ b/src/main/java/io/aiven/connect/jdbc/util/StartupMetricMBean.java @@ -0,0 +1,27 @@ +/* + * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc.util; + +public interface StartupMetricMBean { + + /** + * Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et + * dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. + */ + Long getCounter(); +} From c1c1bfcda3a6d5b14b8e1afb7fc2b4d452a0e7d6 Mon Sep 17 00:00:00 2001 From: Usener Date: Fri, 26 Aug 2022 15:20:15 +0200 Subject: [PATCH 4/6] Introduce CountQuerier to count number of rows in a table or number of selected rows in a query --- .../connect/jdbc/source/CountQuerier.java | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 src/main/java/io/aiven/connect/jdbc/source/CountQuerier.java diff --git a/src/main/java/io/aiven/connect/jdbc/source/CountQuerier.java b/src/main/java/io/aiven/connect/jdbc/source/CountQuerier.java new file mode 100644 index 00000000..f48f9e91 --- /dev/null +++ b/src/main/java/io/aiven/connect/jdbc/source/CountQuerier.java @@ -0,0 +1,79 @@ +/* + * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * Copyright 2018 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc.source; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; + +import io.aiven.connect.jdbc.dialect.DatabaseDialect; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CountQuerier extends TableQuerier { + private static final Logger log = LoggerFactory.getLogger(CountQuerier.class); + + public CountQuerier(final DatabaseDialect dialect, final QueryMode mode, final String nameOrQuery) { + super(dialect, mode, nameOrQuery, null); + } + + @Override + protected void createPreparedStatement(final Connection db) throws SQLException { + final String queryStr; + switch (mode) { + case TABLE: + queryStr = dialect.expressionBuilder().append("SELECT count(*) FROM ") + .append(tableId).toString(); + break; + case QUERY: + queryStr = dialect.expressionBuilder().append("SELECT count(*) FROM ") + .append("(") + .append(query) + .append(") as count_query") + .toString(); + break; + default: + throw new ConnectException("Unknown mode: " + mode); + } + log.debug("{} prepared SQL query: {}", this, queryStr); + stmt = dialect.createPreparedStatement(db, queryStr); + } + + @Override + protected ResultSet executeQuery() throws SQLException { + return stmt.executeQuery(); + } + + @Override + public SourceRecord extractRecord() throws SQLException { + throw new UnsupportedOperationException("CountQuerier does not support extracting records"); + } + + public Long count() throws SQLException { + final ResultSet resultSet = this.executeQuery(); + if (resultSet.next()) { + return resultSet.getLong(1); + } + return null; + } + +} From a333471bc8892d80abe4531ee80455585e17ac33 Mon Sep 17 00:00:00 2001 From: Usener Date: Fri, 26 Aug 2022 15:22:28 +0200 Subject: [PATCH 5/6] Introduce StartupMetricUpdater as service in JdbcSourceTask as configurable service --- .../connect/jdbc/source/JdbcSourceTask.java | 2 + .../jdbc/source/StartupMetricUpdater.java | 143 ++++++++++++++++++ .../source/JdbcSourceTaskLifecycleTest.java | 80 ++++++++++ 3 files changed, 225 insertions(+) create mode 100644 src/main/java/io/aiven/connect/jdbc/source/StartupMetricUpdater.java diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java index c46abfc0..c77bea83 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java @@ -156,6 +156,8 @@ public void start(final Map properties) { = config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG); if (config.getBoolean(JdbcSourceTaskConfig.INITIAL_MESSAGE_COUNT_METRIC_ENABLED_CONFIG)) { + new StartupMetricUpdater(dialect, cachedConnectionProvider, config) + .initializeAndExecuteMetric(properties, query); } for (final String tableOrQuery : tablesOrQuery) { diff --git a/src/main/java/io/aiven/connect/jdbc/source/StartupMetricUpdater.java b/src/main/java/io/aiven/connect/jdbc/source/StartupMetricUpdater.java new file mode 100644 index 00000000..823593db --- /dev/null +++ b/src/main/java/io/aiven/connect/jdbc/source/StartupMetricUpdater.java @@ -0,0 +1,143 @@ +/* + * Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.connect.jdbc.source; + +import javax.management.InstanceAlreadyExistsException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; + +import java.lang.management.ManagementFactory; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.aiven.connect.jdbc.dialect.DatabaseDialect; +import io.aiven.connect.jdbc.util.CachedConnectionProvider; +import io.aiven.connect.jdbc.util.StartupMetric; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.aiven.connect.jdbc.source.JdbcSourceTaskConfig.TABLES_CONFIG; + +public class StartupMetricUpdater { + + private static final Logger log = LoggerFactory.getLogger(StartupMetricUpdater.class); + + private final DatabaseDialect dialect; + private final CachedConnectionProvider cachedConnectionProvider; + private final JdbcSourceConnectorConfig config; + + Map startupMetricByQueryOrTable; + + Map countQuerierByTableOrQuery; + + public StartupMetricUpdater(final DatabaseDialect dialect, final CachedConnectionProvider cachedConnectionProvider, + final JdbcSourceConnectorConfig config) { + this.dialect = dialect; + this.cachedConnectionProvider = cachedConnectionProvider; + this.config = config; + startupMetricByQueryOrTable = new HashMap<>(); + countQuerierByTableOrQuery = new HashMap<>(); + } + + public void initializeAndExecuteMetric(final Map properties, final String query) { + final CountQuerier.QueryMode queryMode = !query.isEmpty() + ? CountQuerier.QueryMode.QUERY : CountQuerier.QueryMode.TABLE; + final List tablesOrQuery = queryMode == CountQuerier.QueryMode.QUERY + ? Collections.singletonList(query) : config.getList(TABLES_CONFIG); + + final String taskName = properties.get("name"); + + for (final String tableOrQuery : tablesOrQuery) { + getOrCreateStartupMetric(taskName, queryMode, tableOrQuery); + getOrCreateQuryCounter(queryMode, tableOrQuery); + updateMetric(tableOrQuery); + } + } + + private CountQuerier getOrCreateQuryCounter(final CountQuerier.QueryMode queryMode, final String tableOrQuery) { + if (countQuerierByTableOrQuery.containsKey(tableOrQuery)) { + return countQuerierByTableOrQuery.get(tableOrQuery); + } + final CountQuerier countQuerier = new CountQuerier(dialect, queryMode, tableOrQuery); + countQuerierByTableOrQuery.put(tableOrQuery, countQuerier); + return countQuerier; + } + + public Long updateMetric(final String tableOrQuery) { + final StartupMetric metricsProvider = startupMetricByQueryOrTable.get(tableOrQuery); + final CountQuerier countQuerier = countQuerierByTableOrQuery.get(tableOrQuery); + try { + countQuerier.getOrCreatePreparedStatement(cachedConnectionProvider.getConnection()); + final Long counter = countQuerier.count(); + metricsProvider.updateCounter(counter); + log.info("Update StartupMetric for {}. Set Counter to {}", + tableOrQuery.substring(0, Math.min(10, tableOrQuery.length())), counter); + return counter; + } catch (final SQLException e) { + log.error("Exception while querying for number of possible source records", e); + } + return -1L; + } + + private StartupMetric getOrCreateStartupMetric(final String taskName, + final CountQuerier.QueryMode queryMode, final String tableOrQuery) { + if (startupMetricByQueryOrTable.containsKey(tableOrQuery)) { + return startupMetricByQueryOrTable.get(tableOrQuery); + } + + final StartupMetric metricsProvider = new StartupMetric(); + startupMetricByQueryOrTable.put(tableOrQuery, metricsProvider); + + final String objectNameValue = createObjectName(taskName, queryMode, tableOrQuery); + + registerJmxBean(metricsProvider, objectNameValue); + + return metricsProvider; + } + + private void registerJmxBean(final StartupMetric metricsProvider, final String objectNameValue) { + try { + final ObjectName objectName = new ObjectName(objectNameValue); + final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + server.registerMBean(metricsProvider, objectName); + } catch (MalformedObjectNameException | NotCompliantMBeanException | InstanceAlreadyExistsException + | MBeanRegistrationException e) { + log.error(e.getMessage(), e); + } + } + + private String createObjectName(final String taskName, final CountQuerier.QueryMode queryMode, + final String tableOrQuery) { + final String topicName = config.getString(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG); + + final String identifier = "io.aiven.connect.jdbc.initialImportCount"; + String objectNameValue = String.format("%s:task=\"%s\",topic=\"%s\"", identifier, taskName, topicName); + if (queryMode == CountQuerier.QueryMode.TABLE) { + objectNameValue += String.format(",table=\"%s\"", tableOrQuery); + } + + log.info("CounterMetric Name: {}", objectNameValue); + return objectNameValue; + } +} diff --git a/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java b/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java index 8649df3b..ccd71c8a 100644 --- a/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java +++ b/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java @@ -17,9 +17,15 @@ package io.aiven.connect.jdbc.source; +import javax.management.ObjectName; + +import java.lang.management.ManagementFactory; import java.sql.Connection; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; @@ -37,6 +43,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) @PrepareForTest({JdbcSourceTask.class}) @@ -172,7 +179,80 @@ public void testMultipleTables() throws Exception { assertEquals(startTime + JdbcSourceConnectorConfig.POLL_INTERVAL_MS_DEFAULT, time.milliseconds()); validatePollResultTable(records, 1, SECOND_TABLE_NAME); + } + + @Test + public void testStartupMetricMultipleTables() throws Exception { + db.createTable(SINGLE_TABLE_NAME, "id", "INT"); + db.createTable(SECOND_TABLE_NAME, "id", "INT"); + + final int mbeanCountBefore = ManagementFactory.getPlatformMBeanServer().getMBeanCount(); + + final Map properties = twoTableConfig(); + properties.put(JdbcSourceTaskConfig.INITIAL_MESSAGE_COUNT_METRIC_ENABLED_CONFIG, Boolean.TRUE.toString()); + + db.insert(SINGLE_TABLE_NAME, "id", 1); + db.insert(SECOND_TABLE_NAME, "id", 2); + db.insert(SECOND_TABLE_NAME, "id", 3); + + task.start(properties); + // wait for task to execute + task.poll(); + + assertEquals((Integer) (mbeanCountBefore + 2), ManagementFactory.getPlatformMBeanServer().getMBeanCount()); + + final ObjectName mbeanQuery = new ObjectName("io.aiven.connect.jdbc.initialImportCount:" + "*"); + final Set objectNames = ManagementFactory.getPlatformMBeanServer().queryNames(mbeanQuery, null); + + assertEquals(2, objectNames.size()); + + Optional objectName = objectNames.stream().filter(filterByTableName(SINGLE_TABLE_NAME)).findFirst(); + assertTrue(objectName.isPresent()); + assertEquals(1L, ManagementFactory.getPlatformMBeanServer().getAttribute(objectName.get(), "Counter")); + + objectName = objectNames.stream().filter(filterByTableName(SECOND_TABLE_NAME)).findFirst(); + assertTrue(objectName.isPresent()); + assertEquals(2L, ManagementFactory.getPlatformMBeanServer().getAttribute(objectName.get(), "Counter")); + } + + @Test + public void testStartupMetricQuery() throws Exception { + db.createTable(SINGLE_TABLE_NAME, "id", "INT"); + db.createTable(SECOND_TABLE_NAME, "id", "INT"); + + final int mbeanCountBefore = ManagementFactory.getPlatformMBeanServer().getMBeanCount(); + + final Map properties = twoTableConfig(); + properties.put(JdbcSourceTaskConfig.TABLES_CONFIG, ""); + properties.put(JdbcSourceTaskConfig.QUERY_CONFIG, + "SELECT * FROM \"" + SINGLE_TABLE_NAME + "\" WHERE \"id\" > 0"); + properties.put("name", "soneTaskName"); + properties.put(JdbcSourceTaskConfig.INITIAL_MESSAGE_COUNT_METRIC_ENABLED_CONFIG, Boolean.TRUE.toString()); + + db.insert(SINGLE_TABLE_NAME, "id", 1); + db.insert(SINGLE_TABLE_NAME, "id", 2); + db.insert(SINGLE_TABLE_NAME, "id", 3); + + task.start(properties); + // wait for task to execute + task.poll(); + + assertEquals((Integer) (mbeanCountBefore + 1), ManagementFactory.getPlatformMBeanServer().getMBeanCount()); + + final ObjectName mbeanQuery = new ObjectName("io.aiven.connect.jdbc.initialImportCount:" + "*"); + final Set objectNames = ManagementFactory.getPlatformMBeanServer().queryNames(mbeanQuery, null); + + assertEquals(1, objectNames.size()); + + final Optional objectName = objectNames.stream().findFirst(); + assertEquals("io.aiven.connect.jdbc.initialImportCount:task=\"soneTaskName\",topic=\"test-\"", + objectName.get().getCanonicalName()); + + assertEquals(3L, ManagementFactory.getPlatformMBeanServer().getAttribute(objectName.get(), "Counter")); + } + private static Predicate filterByTableName(final String tableName) { + return objectName -> ("\"" + tableName + "\"").equals(objectName.getKeyProperty("table")); } @Test From 16d3686f32e2a2e5dfba0ada3d6e97d0d6c9a477 Mon Sep 17 00:00:00 2001 From: cl-a-us Date: Tue, 6 Sep 2022 08:50:27 +0200 Subject: [PATCH 6/6] fix tests by cleaning up jmx beans in @Before --- .../jdbc/source/JdbcSourceTaskLifecycleTest.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java b/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java index ccd71c8a..2ef73804 100644 --- a/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java +++ b/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskLifecycleTest.java @@ -34,6 +34,7 @@ import io.aiven.connect.jdbc.dialect.DatabaseDialect; import org.easymock.EasyMock; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; @@ -56,6 +57,18 @@ public class JdbcSourceTaskLifecycleTest extends JdbcSourceTaskTestBase { @Mock private Connection conn; + @Before + public void beforeEach() throws Exception { + super.setup(); + // cleanup JMX beans + final ObjectName mbeanQuery = new ObjectName("io.aiven.connect.jdbc.initialImportCount:*"); + final Set objectNamesToClean = ManagementFactory.getPlatformMBeanServer() + .queryNames(mbeanQuery, null); + for (final ObjectName objectName : objectNamesToClean) { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(objectName); + } + } + @Test(expected = ConnectException.class) public void testMissingParentConfig() { final Map props = singleTableConfig();