Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Startup metric solves feature request #179 #180

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
79 changes: 79 additions & 0 deletions src/main/java/io/aiven/connect/jdbc/source/CountQuerier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.connect.jdbc.source;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;

import io.aiven.connect.jdbc.dialect.DatabaseDialect;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CountQuerier extends TableQuerier {
private static final Logger log = LoggerFactory.getLogger(CountQuerier.class);

public CountQuerier(final DatabaseDialect dialect, final QueryMode mode, final String nameOrQuery) {
super(dialect, mode, nameOrQuery, null);
}

@Override
protected void createPreparedStatement(final Connection db) throws SQLException {
final String queryStr;
switch (mode) {
case TABLE:
queryStr = dialect.expressionBuilder().append("SELECT count(*) FROM ")
.append(tableId).toString();
break;
case QUERY:
queryStr = dialect.expressionBuilder().append("SELECT count(*) FROM ")
.append("(")
.append(query)
.append(") as count_query")
.toString();
break;
default:
throw new ConnectException("Unknown mode: " + mode);
}
log.debug("{} prepared SQL query: {}", this, queryStr);
stmt = dialect.createPreparedStatement(db, queryStr);
}

@Override
protected ResultSet executeQuery() throws SQLException {
return stmt.executeQuery();
}

@Override
public SourceRecord extractRecord() throws SQLException {
throw new UnsupportedOperationException("CountQuerier does not support extracting records");
}

public Long count() throws SQLException {
final ResultSet resultSet = this.executeQuery();
if (resultSet.next()) {
return resultSet.getLong(1);
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import io.aiven.connect.jdbc.dialect.DatabaseDialect;
import io.aiven.connect.jdbc.dialect.DatabaseDialects;
import io.aiven.connect.jdbc.util.CachedConnectionProvider;
import io.aiven.connect.jdbc.util.ColumnDefinition;
import io.aiven.connect.jdbc.util.ColumnId;
import io.aiven.connect.jdbc.util.TableId;
Expand All @@ -59,7 +58,7 @@ public class JdbcSourceTask extends SourceTask {
private Time time;
private JdbcSourceTaskConfig config;
private DatabaseDialect dialect;
private CachedConnectionProvider cachedConnectionProvider;
private SourceConnectionProvider cachedConnectionProvider;
private PriorityQueue<TableQuerier> tableQueue = new PriorityQueue<TableQuerier>();
private final AtomicBoolean running = new AtomicBoolean(false);

Expand Down Expand Up @@ -156,6 +155,11 @@ public void start(final Map<String, String> properties) {
final boolean validateNonNulls
= config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG);

if (config.getBoolean(JdbcSourceTaskConfig.INITIAL_MESSAGE_COUNT_METRIC_ENABLED_CONFIG)) {
new StartupMetricUpdater(dialect, cachedConnectionProvider, config)
.initializeAndExecuteMetric(properties, query);
}

for (final String tableOrQuery : tablesOrQuery) {
final List<Map<String, String>> tablePartitionsToCheck;
final Map<String, String> partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,19 @@ public class JdbcSourceTaskConfig extends JdbcSourceConnectorConfig {
public static final String TABLES_CONFIG = "tables";
private static final String TABLES_DOC = "List of tables for this task to watch for changes.";

public static final String INITIAL_MESSAGE_COUNT_METRIC_ENABLED_CONFIG = "sourceTask.initialMessageCount.enabled";

private static final String INITIAL_MESSAGE_COUNT_METRIC_ENABLED_DOC = "Enables a custom metric to determine the "
+ "number of messages/records that will be published into the Kafka topic. To reduce database load, the "
+ "corresponding query is executed at startup only. The metric is published via JMX under "
+ "io.aiven.connect.jdbc.initialImportCount<task=\"{connector/task_name}\", topic=\"{topic_name}\", "
+ "[table=\"{table_name}\","
+ "The attribute 'table' is only present in mode 'table', not in 'query' mode.";

static ConfigDef config = baseConfigDef()
.define(TABLES_CONFIG, Type.LIST, Importance.HIGH, TABLES_DOC);
.define(TABLES_CONFIG, Type.LIST, Importance.HIGH, TABLES_DOC)
.define(INITIAL_MESSAGE_COUNT_METRIC_ENABLED_CONFIG, Type.BOOLEAN, Boolean.FALSE, Importance.MEDIUM,
INITIAL_MESSAGE_COUNT_METRIC_ENABLED_DOC);

public JdbcSourceTaskConfig(final Map<String, String> props) {
super(config, props);
Expand Down
143 changes: 143 additions & 0 deletions src/main/java/io/aiven/connect/jdbc/source/StartupMetricUpdater.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.connect.jdbc.source;

import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;

import java.lang.management.ManagementFactory;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.aiven.connect.jdbc.dialect.DatabaseDialect;
import io.aiven.connect.jdbc.util.CachedConnectionProvider;
import io.aiven.connect.jdbc.util.StartupMetric;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.aiven.connect.jdbc.source.JdbcSourceTaskConfig.TABLES_CONFIG;

public class StartupMetricUpdater {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add proper documentation here. What does this class do. Are there any design constraints that future developers should know about, etc.


private static final Logger log = LoggerFactory.getLogger(StartupMetricUpdater.class);

private final DatabaseDialect dialect;
private final CachedConnectionProvider cachedConnectionProvider;
private final JdbcSourceConnectorConfig config;

Map<String, StartupMetric> startupMetricByQueryOrTable;

Map<String, CountQuerier> countQuerierByTableOrQuery;

public StartupMetricUpdater(final DatabaseDialect dialect, final CachedConnectionProvider cachedConnectionProvider,
final JdbcSourceConnectorConfig config) {
this.dialect = dialect;
this.cachedConnectionProvider = cachedConnectionProvider;
this.config = config;
startupMetricByQueryOrTable = new HashMap<>();
countQuerierByTableOrQuery = new HashMap<>();
}

public void initializeAndExecuteMetric(final Map<String, String> properties, final String query) {
final CountQuerier.QueryMode queryMode = !query.isEmpty()
? CountQuerier.QueryMode.QUERY : CountQuerier.QueryMode.TABLE;
final List<String> tablesOrQuery = queryMode == CountQuerier.QueryMode.QUERY
? Collections.singletonList(query) : config.getList(TABLES_CONFIG);

final String taskName = properties.get("name");

for (final String tableOrQuery : tablesOrQuery) {
getOrCreateStartupMetric(taskName, queryMode, tableOrQuery);
getOrCreateQuryCounter(queryMode, tableOrQuery);
updateMetric(tableOrQuery);
}
}

private CountQuerier getOrCreateQuryCounter(final CountQuerier.QueryMode queryMode, final String tableOrQuery) {
if (countQuerierByTableOrQuery.containsKey(tableOrQuery)) {
return countQuerierByTableOrQuery.get(tableOrQuery);
}
final CountQuerier countQuerier = new CountQuerier(dialect, queryMode, tableOrQuery);
countQuerierByTableOrQuery.put(tableOrQuery, countQuerier);
return countQuerier;
}

public Long updateMetric(final String tableOrQuery) {
final StartupMetric metricsProvider = startupMetricByQueryOrTable.get(tableOrQuery);
final CountQuerier countQuerier = countQuerierByTableOrQuery.get(tableOrQuery);
try {
countQuerier.getOrCreatePreparedStatement(cachedConnectionProvider.getConnection());
final Long counter = countQuerier.count();
metricsProvider.updateCounter(counter);
log.info("Update StartupMetric for {}. Set Counter to {}",
tableOrQuery.substring(0, Math.min(10, tableOrQuery.length())), counter);
return counter;
} catch (final SQLException e) {
log.error("Exception while querying for number of possible source records", e);
}
return -1L;
}

private StartupMetric getOrCreateStartupMetric(final String taskName,
final CountQuerier.QueryMode queryMode, final String tableOrQuery) {
if (startupMetricByQueryOrTable.containsKey(tableOrQuery)) {
return startupMetricByQueryOrTable.get(tableOrQuery);
}

final StartupMetric metricsProvider = new StartupMetric();
startupMetricByQueryOrTable.put(tableOrQuery, metricsProvider);

final String objectNameValue = createObjectName(taskName, queryMode, tableOrQuery);

registerJmxBean(metricsProvider, objectNameValue);

return metricsProvider;
}

private void registerJmxBean(final StartupMetric metricsProvider, final String objectNameValue) {
try {
final ObjectName objectName = new ObjectName(objectNameValue);
final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
server.registerMBean(metricsProvider, objectName);
} catch (MalformedObjectNameException | NotCompliantMBeanException | InstanceAlreadyExistsException
| MBeanRegistrationException e) {
log.error(e.getMessage(), e);
}
}

private String createObjectName(final String taskName, final CountQuerier.QueryMode queryMode,
final String tableOrQuery) {
final String topicName = config.getString(JdbcSourceConnectorConfig.TOPIC_PREFIX_CONFIG);

final String identifier = "io.aiven.connect.jdbc.initialImportCount";
String objectNameValue = String.format("%s:task=\"%s\",topic=\"%s\"", identifier, taskName, topicName);
if (queryMode == CountQuerier.QueryMode.TABLE) {
objectNameValue += String.format(",table=\"%s\"", tableOrQuery);
}

log.info("CounterMetric Name: {}", objectNameValue);
return objectNameValue;
}
}
49 changes: 49 additions & 0 deletions src/main/java/io/aiven/connect/jdbc/util/StartupMetric.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.connect.jdbc.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add proper documentation here.

* dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum.
*/
public class StartupMetric implements StartupMetricMBean {

private static final Logger log = LoggerFactory.getLogger(StartupMetric.class);

private long counter = -1;

public StartupMetric() {
super();
}

@Override
public Long getCounter() {
log.debug("getCounter: " + counter);
return counter;
}

public void updateCounter(final Long counter) {
log.info("setCounter: " + counter);
if (counter != null) {
this.counter = counter.intValue();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2019 Aiven Oy and jdbc-connector-for-apache-kafka project contributors
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.connect.jdbc.util;

public interface StartupMetricMBean {

/**
* Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et
* dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum.
*/
Long getCounter();
}
Loading