Skip to content

Commit

Permalink
Merge branch 'apache:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
chaorongzhi authored Aug 10, 2023
2 parents be95cd6 + c0422db commit f045fc0
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -72,6 +74,10 @@ public JobMetrics merge(JobMetrics jobMetrics) {
}
Map<String, List<Measurement>> metricsMap = new HashMap<>();
metrics.forEach((key, value) -> metricsMap.put(key, new ArrayList<>(value)));
//// Because if a job is restarted, the running node might change, so we need to remove the
// node information.
Set<String> keysToExclude =
new HashSet<>(Arrays.asList(MetricTags.MEMBER, MetricTags.ADDRESS));
jobMetrics.metrics.forEach(
(key, value) ->
metricsMap.merge(
Expand All @@ -82,7 +88,11 @@ public JobMetrics merge(JobMetrics jobMetrics) {
for (Measurement m1 : v1) {
if (v2.stream()
.noneMatch(
m2 -> m2.getTags().equals(m1.getTags()))) {
m2 ->
areMapsEqualExcludingKeys(
m2.getTags(),
m1.getTags(),
keysToExclude))) {
ms.add(m1);
}
}
Expand All @@ -91,6 +101,40 @@ public JobMetrics merge(JobMetrics jobMetrics) {
return new JobMetrics(metricsMap);
}

/**
* Compares two Map objects excluding certain keys.
*
* @param map1 the first map
* @param map2 the second map
* @param keysToExclude the keys to be excluded during comparison
* @return true if the maps are equal excluding the specific keys, false otherwise
*/
public static boolean areMapsEqualExcludingKeys(
Map<String, String> map1, Map<String, String> map2, Set<String> keysToExclude) {
// Return false if either of the maps is null
if (map1 == null || map2 == null) {
return false;
}

// Return false if the sizes of the maps are different
if (map1.size() != map2.size()) {
return false;
}

// Create copies of the maps to avoid modifying the original maps
Map<String, String> map1Copy = new HashMap<>(map1);
Map<String, String> map2Copy = new HashMap<>(map2);

// Remove specific keys from the copies
for (String key : keysToExclude) {
map1Copy.remove(key);
map2Copy.remove(key);
}

// Return whether the copies are equal
return map1Copy.equals(map2Copy);
}

/** Returns all metrics present. */
public Set<String> metrics() {
return Collections.unmodifiableSet(metrics.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ private List<ChunkRange> splitTableIntoChunks(
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold();

log.info(
"Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, "
+ "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}",
tableId,
splitColumnName,
min,
max,
chunkSize,
distributionFactorUpper,
distributionFactorLower,
sampleShardingThreshold);

if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
Expand All @@ -130,7 +143,7 @@ private List<ChunkRange> splitTableIntoChunks(
} else {
int shardCount = (int) (approximateRowCnt / chunkSize);
int inverseSamplingRate = sourceConfig.getInverseSamplingRate();
if (sourceConfig.getSampleShardingThreshold() < shardCount) {
if (sampleShardingThreshold < shardCount) {
// It is necessary to ensure that the number of data rows sampled by the
// sampling rate is greater than the number of shards.
// Otherwise, if the sampling rate is too low, it may result in an insufficient
Expand All @@ -144,9 +157,17 @@ private List<ChunkRange> splitTableIntoChunks(
chunkSize);
inverseSamplingRate = chunkSize;
}
log.info(
"Use sampling sharding for table {}, the sampling rate is {}",
tableId,
inverseSamplingRate);
Object[] sample =
sampleDataFromColumn(
jdbc, tableId, splitColumnName, inverseSamplingRate);
log.info(
"Sample data from table {} end, the sample size is {}",
tableId,
sample.length);
return efficientShardingThroughSampling(
tableId, sample, approximateRowCnt, shardCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public OptionRule optionRule() {
JdbcSourceOptions.CONNECTION_POOL_SIZE,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
JdbcSourceOptions.INVERSE_SAMPLING_RATE)
.optional(MySqlSourceOptions.STARTUP_MODE, MySqlSourceOptions.STOP_MODE)
.conditional(
MySqlSourceOptions.STARTUP_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;

/** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */
@Slf4j
public class MySqlChunkSplitter extends AbstractJdbcSourceChunkSplitter {

public MySqlChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dialect) {
Expand All @@ -55,7 +57,7 @@ public Object queryMin(
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
return MySqlUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate);
return MySqlUtils.skipReadAndSortSampleData(jdbc, tableId, columnName, inverseSamplingRate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -52,6 +54,7 @@
import static org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.rowToArray;

/** Utils to prepare MySQL SQL statement. */
@Slf4j
public class MySqlUtils {

private MySqlUtils() {}
Expand Down Expand Up @@ -142,6 +145,56 @@ public static Object[] sampleDataFromColumn(
});
}

public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));

Statement stmt = null;
ResultSet rs = null;

List<Object> results = new ArrayList<>();
try {
stmt =
jdbc.connection()
.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);

stmt.setFetchSize(Integer.MIN_VALUE);
rs = stmt.executeQuery(sampleQuery);

int count = 0;
while (rs.next()) {
count++;
if (count % 100000 == 0) {
log.info("Processing row index: {}", count);
}
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
}
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
log.error("Failed to close ResultSet", e);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
log.error("Failed to close Statement", e);
}
}
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
return resultsArray;
}

public static Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public Object queryMin(
public Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
return SqlServerUtils.sampleDataFromColumn(jdbc, tableId, columnName, inverseSamplingRate);
return SqlServerUtils.skipReadAndSortSampleData(
jdbc, tableId, columnName, inverseSamplingRate);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -52,6 +55,7 @@
import java.util.Optional;

/** The utils for SqlServer data source. */
@Slf4j
public class SqlServerUtils {

public SqlServerUtils() {}
Expand Down Expand Up @@ -145,6 +149,56 @@ public static Object[] sampleDataFromColumn(
});
}

public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate)
throws SQLException {
final String sampleQuery =
String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));

Statement stmt = null;
ResultSet rs = null;

List<Object> results = new ArrayList<>();
try {
stmt =
jdbc.connection()
.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);

stmt.setFetchSize(Integer.MIN_VALUE);
rs = stmt.executeQuery(sampleQuery);

int count = 0;
while (rs.next()) {
count++;
if (count % 100000 == 0) {
log.info("Processing row index: {}", count);
}
if (count % inverseSamplingRate == 0) {
results.add(rs.getObject(1));
}
}
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
log.error("Failed to close ResultSet", e);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
log.error("Failed to close Statement", e);
}
}
}
Object[] resultsArray = results.toArray();
Arrays.sort(resultsArray);
return resultsArray;
}

/**
* Returns the next LSN to be read from the database. This is the LSN of the last record that
* was read from the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,18 @@ public ResourceManager getResourceManager() {
/** call by client to submit job */
public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
CompletableFuture<Void> jobSubmitFuture = new CompletableFuture<>();

// Check if the current jobID is already running. If so, complete the submission
// successfully.
// This avoids potential issues like redundant job restores or other anomalies.
if (getJobMaster(jobId) != null) {
logger.warning(
String.format(
"The job %s is currently running; no need to submit again.", jobId));
jobSubmitFuture.complete(null);
return new PassiveCompletableFuture<>(jobSubmitFuture);
}

JobMaster jobMaster =
new JobMaster(
jobImmutableInformation,
Expand Down

0 comments on commit f045fc0

Please sign in to comment.