From 43708134a4d2e8c45b5df176f2bfc3bfdab369b1 Mon Sep 17 00:00:00 2001 From: wchen11 Date: Tue, 20 Jun 2023 16:52:12 +0800 Subject: [PATCH 1/4] [BAHIR-322] Add streaming source flink kudu connector --- .../kudu/connector/IncrementalCPState.java | 99 +++++ .../kudu/connector/KuduDataSplit.java | 42 +++ .../connector/KuduStreamingRunningMode.java | 25 ++ .../kudu/connector/KuduStreamingSource.java | 354 ++++++++++++++++++ .../KuduStreamingSourceConfiguration.java | 132 +++++++ .../StreamingLocalEventsManager.java | 167 +++++++++ .../assigner/KuduDataSplitsAssigner.java | 41 ++ .../configuration/ReflectionTypeDetail.java | 44 +++ .../configuration/StreamingColumn.java | 62 +++ .../configuration/StreamingKeySorter.java | 46 +++ .../UserTableDataQueryDetail.java | 70 ++++ .../UserTableDataTypeDetail.java | 56 +++ .../configuration/type/FilterOp.java | 41 ++ .../type/UserTableDataQueryFilter.java | 90 +++++ .../type/annotation/ColumnDetail.java | 31 ++ .../type/annotation/FilterDetail.java | 36 ++ .../type/annotation/StreamingKey.java | 35 ++ .../UserTableDataRowResultConvertor.java | 105 ++++++ .../builder/UserTableDataTypeBuilder.java | 42 +++ .../parser/UserTableDataTypeParser.java | 90 +++++ .../discover/KuduDataSplitsDiscoverer.java | 89 +++++ ...UserTableDataQueryFilterValueResolver.java | 32 ++ ...UserTableDataQueryFilterValueResolver.java | 24 ++ ...UserTableDataQueryFilterValueResolver.java | 23 ++ .../connector/KuduStreamingSourceTest.java | 142 +++++++ .../StreamingKeyOffsetManagerTest.java | 82 ++++ .../connector/StreamingKeySorterTest.java | 70 ++++ .../UserTableDataTypeBuilderTest.java | 41 ++ .../UserTableDataTypeParserTest.java | 49 +++ .../connectors/kudu/connector/UserType.java | 58 +++ 30 files changed, 2218 insertions(+) create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/IncrementalCPState.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduDataSplit.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingRunningMode.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceConfiguration.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/StreamingLocalEventsManager.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/assigner/KuduDataSplitsAssigner.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/ReflectionTypeDetail.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingColumn.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingKeySorter.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/UserTableDataQueryDetail.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/UserTableDataTypeDetail.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/FilterOp.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/UserTableDataQueryFilter.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/ColumnDetail.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/FilterDetail.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/StreamingKey.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/UserTableDataRowResultConvertor.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/builder/UserTableDataTypeBuilder.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/parser/UserTableDataTypeParser.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/discover/KuduDataSplitsDiscoverer.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/ConstantUserTableDataQueryFilterValueResolver.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/SnapshotStateUserTableDataQueryFilterValueResolver.java create mode 100644 flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/UserTableDataQueryFilterValueResolver.java create mode 100644 flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceTest.java create mode 100644 flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeyOffsetManagerTest.java create mode 100644 flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeySorterTest.java create mode 100644 flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeBuilderTest.java create mode 100644 flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeParserTest.java create mode 100644 flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserType.java diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/IncrementalCPState.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/IncrementalCPState.java new file mode 100644 index 00000000..c8a91b3e --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/IncrementalCPState.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + + +import org.apache.flink.annotation.Internal; + +import java.io.Serializable; + +@Internal +public class IncrementalCPState implements Serializable { + private static final long serialVersionUID = 1L; + + private int subTaskId; + private long checkpointId; + private String offset; + private boolean committed; + + public IncrementalCPState(int subTaskId, long checkpointId, String offset, boolean committed) { + this.subTaskId = subTaskId; + this.checkpointId = checkpointId; + this.offset = offset; + this.committed = committed; + } + + public int getSubTaskId() { + return subTaskId; + } + + public String getOffset() { + return offset; + } + + public boolean isCommitted() { + return committed; + } + + public void setCommitted(boolean committed) { + this.committed = committed; + } + + public static IncrementalCPState.Builder builder() { + return new IncrementalCPState.Builder(); + } + + @Override + public String toString() { + return "IncrementalCPState{" + + "subTaskId=" + subTaskId + + ", checkpointId=" + checkpointId + + ", offset='" + offset + '\'' + + ", committed=" + committed + + '}'; + } + + public static class Builder { + private int subTaskId; + private long checkpointId; + private String offset; + private boolean committed; + public Builder subTaskId(int suTaskId) { + this.subTaskId = suTaskId; + return this; + } + + public Builder checkpointId(long checkpointId) { + this.checkpointId = checkpointId; + return this; + } + + public Builder offset(String offset) { + this.offset = offset; + return this; + } + + public Builder committed(boolean committed) { + this.committed = committed; + return this; + } + + public IncrementalCPState build() { + return new IncrementalCPState(subTaskId, checkpointId, offset, committed); + } + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduDataSplit.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduDataSplit.java new file mode 100644 index 00000000..7460b1f3 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduDataSplit.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + + +import org.apache.flink.annotation.Internal; + +@Internal +public class KuduDataSplit { + private byte[] scanToken; + private String tabletId; + + public byte[] getScanToken() { + return scanToken; + } + + public void setScanToken(byte[] scanToken) { + this.scanToken = scanToken; + } + + public String getTabletId() { + return tabletId; + } + + public void setTabletId(String tabletId) { + this.tabletId = tabletId; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingRunningMode.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingRunningMode.java new file mode 100644 index 00000000..44e405a1 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingRunningMode.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + +import org.apache.flink.annotation.Internal; + +@Internal +public enum KuduStreamingRunningMode { + CUSTOM_QUERY, + INCREMENTAL, +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java new file mode 100644 index 00000000..706d51c4 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.map.LinkedMap; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connectors.kudu.connector.assigner.KuduDataSplitsAssigner; +import org.apache.flink.connectors.kudu.connector.configuration.StreamingColumn; +import org.apache.flink.connectors.kudu.connector.configuration.UserTableDataQueryDetail; +import org.apache.flink.connectors.kudu.connector.configuration.type.UserTableDataQueryFilter; +import org.apache.flink.connectors.kudu.connector.convertor.UserTableDataRowResultConvertor; +import org.apache.flink.connectors.kudu.connector.discover.KuduDataSplitsDiscoverer; +import org.apache.flink.connectors.kudu.connector.reader.KuduReader; +import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig; +import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.kudu.shaded.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * Kudu source connector which provides the data continuously in the kudu table. + * The connector will be running under the two modes: + * + * 1. {@link KuduStreamingRunningMode#CUSTOM_QUERY} the source connector will fetch the records + * in kudu table with logic of the user provided + * + * 2. {@link KuduStreamingRunningMode#INCREMENTAL} the source connector will fetch the records + * in the kudu table by the order of the fields annotated {@link StreamingColumn}. This assumes + * that the fields are unique and increased monotonically, e.g. db auto incremented identifier. + * To promise the order, only one subtask will be running and sorted the records before emitting. + * + * @param The mapped Java type against the Kudu table. + */ +@PublicEvolving +public class KuduStreamingSource extends RichParallelSourceFunction + implements CheckpointListener, CheckpointedFunction { + + private static final long serialVersionUID = -2527403358494874319L; + + private final KuduStreamingSourceConfiguration kuduStreamingSourceConfiguration; + + private transient KuduTableInfo tableInfo; + + private transient Long batchRunningInterval = 10000L; + + private transient UserTableDataRowResultConvertor rowResultConvertor; + + private transient StreamingLocalEventsManager streamingKeyOffsetManager; + + private volatile boolean running = true; + + private transient ListState snapshotOffsetStates; + private static final String SNAPSHOT_OFFSET_STATES_KEY = "snapshot-offset-states"; + + private LinkedMap inflightOffsets = new LinkedMap(); + + private static final Logger LOGGER = LoggerFactory.getLogger(KuduStreamingSource.class); + + public KuduStreamingSource(KuduStreamingSourceConfiguration kuduStreamingSourceConfiguration) { + this.kuduStreamingSourceConfiguration = kuduStreamingSourceConfiguration; + } + + private Object getTypeValue(Class clz, String value) { + if (clz == Long.class) { + return Long.valueOf(value); + } else if (clz == Integer.class) { + return Integer.valueOf(value); + } else if (clz == Short.class) { + return Short.valueOf(value); + } else if (clz == Byte.class) { + return Byte.valueOf(value); + } else if (clz == Timestamp.class) { + return new Timestamp(Long.valueOf(value)); + } else { + return value; + } + } + + @Override + public void run(SourceContext sourceContext) { + while (running) { + LOGGER.info("Running the kudu source connector ..."); + KuduReader kuduReader = null; + try { + KuduReaderConfig.Builder kuduReaderConfigBuilder = KuduReaderConfig.Builder + .setMasters(kuduStreamingSourceConfiguration.getMasterAddresses()); + KuduReaderConfig readerConfig = kuduReaderConfigBuilder.build(); + + kuduReader = new KuduReader<>(tableInfo, readerConfig, rowResultConvertor); + + List filterInfoList = Lists.newArrayList(); + List projectedColumnList = null; + if (CollectionUtils.isNotEmpty(kuduStreamingSourceConfiguration.getUserTableDataQueryDetailList())) { + + List allUserTableDataQueryDetails = + kuduStreamingSourceConfiguration.getUserTableDataQueryDetailList(); + UserTableDataQueryDetail userTableDataQueryDetail = allUserTableDataQueryDetails.get(0); + + if (kuduStreamingSourceConfiguration.getRunningMode() == KuduStreamingRunningMode.INCREMENTAL) { + String[] streamingLowerKey = streamingKeyOffsetManager.getCurrentHWM(); + String[] streamingUpperKey = streamingKeyOffsetManager.getUserConfiguredUpperKey(); + List streamingColumns = rowResultConvertor.getUserTableDataTypeDetail().getStreamingCols(); + // Build the lower and upper bound + for (int i = 0; i < streamingColumns.size(); i++) { + LOGGER.info("STREAMING_LOWER_KEY={}, STREAMING_UPPER_KEY={}", Arrays.toString(streamingLowerKey), Arrays.toString(streamingUpperKey)); + StreamingColumn streamingColumn = streamingColumns.get(i); + if (streamingLowerKey != null) { + KuduFilterInfo lowerFilterInfo = KuduFilterInfo.Builder + .create(streamingColumn.getColName()).filter( + i == 0 ? KuduFilterInfo.FilterType.GREATER : KuduFilterInfo.FilterType.GREATER_EQUAL, + getTypeValue(streamingColumn.getFieldType(), streamingLowerKey[i])).build(); + + filterInfoList.add(lowerFilterInfo); + } + + if (streamingUpperKey != null) { + KuduFilterInfo upperFilterInfo = KuduFilterInfo.Builder + .create(streamingColumn.getColName()).filter( + KuduFilterInfo.FilterType.LESS_EQUAL, + getTypeValue(streamingColumn.getFieldType(), streamingUpperKey[i])).build(); + + filterInfoList.add(upperFilterInfo); + } + } + } else { + if (CollectionUtils.isNotEmpty(userTableDataQueryDetail.getUserTableDataQueryFilters())) { + List tableFilters = Lists.newArrayList(); + for (UserTableDataQueryFilter filterDetail : + userTableDataQueryDetail.getUserTableDataQueryFilters()) { + KuduFilterInfo filterInfo = KuduFilterInfo.Builder + .create(filterDetail.getColName()).filter( + filterDetail.getFilterOp().getKuduFilterType(), + filterDetail.getFilterValueResolver().resolve()) + .build(); + + tableFilters.add(filterInfo); + } + + filterInfoList = tableFilters; + } + } + + kuduReader.setTableFilters(filterInfoList); + + if (CollectionUtils.isNotEmpty(userTableDataQueryDetail.getProjectedColumns())) { + projectedColumnList = userTableDataQueryDetail.getProjectedColumns(); + } + + kuduReader.setTableProjections(projectedColumnList); + } + + KuduDataSplitsDiscoverer kuduDataSplitsDiscoverer = KuduDataSplitsDiscoverer.builder() + .reader(kuduReader) + .filterInfoList(filterInfoList) + .projectedColumnList(projectedColumnList) + .build(); + + List dataSplits = kuduDataSplitsDiscoverer.getAllKuduDataSplits(); + List assignedSplits = Lists.newArrayList(); + int thisSubTaskId = getRuntimeContext().getIndexOfThisSubtask(); + int totalSubTask = getRuntimeContext().getNumberOfParallelSubtasks(); + for (KuduDataSplit split : dataSplits) { + int assignedSubtaskId = KuduDataSplitsAssigner.assign( + split, totalSubTask, + kuduStreamingSourceConfiguration.getRunningMode()); + LOGGER.info("TASK_ASSIGNED, totalSubTask={}, assignedSubTaskId={}, thisSubTaskId={}", + totalSubTask, assignedSubtaskId, thisSubTaskId); + if (assignedSubtaskId == thisSubTaskId) { + assignedSplits.add(split); + } + } + + for (KuduDataSplit split : assignedSplits) { + KuduReaderIterator resultIterator = kuduReader.scanner(split.getScanToken()); + while (resultIterator.hasNext()) { + T row = resultIterator.next(); + if (row != null) { + /** For the running mode == KuduStreamingRunningMode.INCREMENTAL, we need to manage the offsets of the table. + * The data will be in the local buffer and sorted before emitting. + */ + if (kuduStreamingSourceConfiguration.getRunningMode() == KuduStreamingRunningMode.INCREMENTAL) { + streamingKeyOffsetManager.update(row); + } else { + sourceContext.collect(row); + } + } + } + } + + if (kuduStreamingSourceConfiguration.getRunningMode() == KuduStreamingRunningMode.INCREMENTAL) { + Iterator eventItr = streamingKeyOffsetManager.getSortedLocalEvents().iterator(); + while (eventItr.hasNext()) { + sourceContext.collect(eventItr.next()); + } + + streamingKeyOffsetManager.next(); + } + + Thread.sleep(batchRunningInterval); + } catch (Exception e) { + LOGGER.error("Exception happened when reading records", e); + throw new RuntimeException(e); + } finally { + try { + if (kuduReader != null) { + kuduReader.close(); + } + } catch (Exception e) { + LOGGER.error("Error on closing kuduReader", e); + } + } + } + } + + @Override + public void open(Configuration parameters) throws Exception { + tableInfo = KuduTableInfo.forTable(kuduStreamingSourceConfiguration.getTableName()); + + if (rowResultConvertor == null) { + rowResultConvertor = + new UserTableDataRowResultConvertor<>(kuduStreamingSourceConfiguration.getTargetKuduRowClz()); + } + + if (streamingKeyOffsetManager == null) { + String userConfiguredLowerKey = + CollectionUtils.isNotEmpty(kuduStreamingSourceConfiguration.getUserTableDataQueryDetailList()) ? + kuduStreamingSourceConfiguration.getUserTableDataQueryDetailList().get(0).getLowerBoundKey() : null; + String userConfiguredUpperKey = + CollectionUtils.isNotEmpty(kuduStreamingSourceConfiguration.getUserTableDataQueryDetailList()) ? + kuduStreamingSourceConfiguration.getUserTableDataQueryDetailList().get(0).getUpperBoundKey() : null; + + streamingKeyOffsetManager = new StreamingLocalEventsManager<>( + rowResultConvertor.getUserTableDataTypeDetail().getStreamingCols(), userConfiguredLowerKey, userConfiguredUpperKey); + } + + if (kuduStreamingSourceConfiguration.getBatchRunningInterval() != null) { + batchRunningInterval = kuduStreamingSourceConfiguration.getBatchRunningInterval(); + } + } + + @Override + public void cancel() { + running = false; + } + + @Override + public void notifyCheckpointComplete(long l) { + if (kuduStreamingSourceConfiguration.getRunningMode() == KuduStreamingRunningMode.INCREMENTAL) { + // Mark the state as committed + IncrementalCPState incrementalCPState = (IncrementalCPState) inflightOffsets.get(l); + incrementalCPState.setCommitted(true); + + int position = inflightOffsets.indexOf(l); + + LOGGER.info("NOTIFY_CHECKPOINT, checkpointId={}, subTaskId={}, position={}", + l, getRuntimeContext().getIndexOfThisSubtask(), position); + // Remove the stale entries + if (position != -1) { + for (int i = 0; i < position; i++) { + inflightOffsets.remove(0); + } + } + } + } + + @Override + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + if (kuduStreamingSourceConfiguration.getRunningMode() == KuduStreamingRunningMode.INCREMENTAL) { + snapshotOffsetStates.clear(); + + IncrementalCPState incrementalCPState = IncrementalCPState.builder() + .subTaskId(getRuntimeContext().getIndexOfThisSubtask()) + .checkpointId(functionSnapshotContext.getCheckpointId()) + .offset(streamingKeyOffsetManager.getCurrentHWMStr()) + .build(); + + LOGGER.info("SNAPSHOT_STATE for checkpointId={}, subTask={}, with state={}", + functionSnapshotContext.getCheckpointId(), getRuntimeContext().getIndexOfThisSubtask(), incrementalCPState); + + inflightOffsets.put(functionSnapshotContext.getCheckpointId(), incrementalCPState); + + snapshotOffsetStates.add(inflightOffsets); + } + } + + @Override + public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception { + if (kuduStreamingSourceConfiguration.getRunningMode() == KuduStreamingRunningMode.INCREMENTAL) { + snapshotOffsetStates = functionInitializationContext.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + SNAPSHOT_OFFSET_STATES_KEY, TypeInformation.of(new TypeHint() { + }))); + LOGGER.info("INIT_STATE for subTask={}, with state={}", getRuntimeContext().getIndexOfThisSubtask(), snapshotOffsetStates); + if (functionInitializationContext.isRestored()) { + rowResultConvertor = + new UserTableDataRowResultConvertor<>(kuduStreamingSourceConfiguration.getTargetKuduRowClz()); + String userConfiguredLowerKey = + CollectionUtils.isNotEmpty(kuduStreamingSourceConfiguration.getUserTableDataQueryDetailList()) ? + kuduStreamingSourceConfiguration.getUserTableDataQueryDetailList().get(0).getLowerBoundKey() : null; + String userConfiguredUpperKey = + CollectionUtils.isNotEmpty(kuduStreamingSourceConfiguration.getUserTableDataQueryDetailList()) ? + kuduStreamingSourceConfiguration.getUserTableDataQueryDetailList().get(0).getUpperBoundKey() : null; + streamingKeyOffsetManager = new StreamingLocalEventsManager<>( + rowResultConvertor.getUserTableDataTypeDetail().getStreamingCols(), userConfiguredLowerKey, userConfiguredUpperKey); + + for (LinkedMap state : snapshotOffsetStates.get()) { + LOGGER.info("RESTORE_STATE for subTask={}, with state={}", getRuntimeContext().getIndexOfThisSubtask(), state); + // Find the latest committed offset for the current subTask + for (int i = 0; i < state.size(); i++) { + IncrementalCPState incrementalCPState = (IncrementalCPState)state.getValue(i); + if (incrementalCPState.isCommitted()) { + LOGGER.info("RESTORE_STATE for subTask={}, with incrementalCPState={}", getRuntimeContext().getIndexOfThisSubtask(), incrementalCPState); + if (getRuntimeContext().getIndexOfThisSubtask() == incrementalCPState.getSubTaskId()) { + LOGGER.info("SET_INIT_HWM={}", incrementalCPState.getOffset()); + streamingKeyOffsetManager.setInitialHWM(incrementalCPState.getOffset()); + } + } + } + } + LOGGER.info("FINAL_INIT_HWM={}", streamingKeyOffsetManager.getCurrentHWMStr()); + } + } + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceConfiguration.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceConfiguration.java new file mode 100644 index 00000000..9659b81a --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceConfiguration.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connectors.kudu.connector.configuration.UserTableDataQueryDetail; + +import java.io.Serializable; +import java.util.List; + +/** + * Kudu source connector options + * + * @param The mapped Java type against the Kudu table. + */ +@PublicEvolving +public class KuduStreamingSourceConfiguration implements Serializable { + private static final long serialVersionUID = 529352279313155517L; + + private String masterAddresses; + private String tableName; + private List userTableDataQueryDetailList; + private Long batchRunningInterval; + private KuduStreamingRunningMode runningMode; + private Class targetKuduRowClz; + + KuduStreamingSourceConfiguration(String masterAddresses, + String tableName, + List userTableDataQueryDetailList, + Long batchRunningInterval, + KuduStreamingRunningMode runningMode, + Class targetKuduRowClz) { + this.masterAddresses = masterAddresses; + this.tableName = tableName; + this.userTableDataQueryDetailList = userTableDataQueryDetailList; + this.batchRunningInterval = batchRunningInterval; + this.runningMode = runningMode; + this.targetKuduRowClz = targetKuduRowClz; + } + + public String getMasterAddresses() { + return masterAddresses; + } + + + public String getTableName() { + return tableName; + } + + public List getUserTableDataQueryDetailList() { + return userTableDataQueryDetailList; + } + + public Long getBatchRunningInterval() { + return batchRunningInterval; + } + + public KuduStreamingRunningMode getRunningMode() { + return runningMode; + } + + public Class getTargetKuduRowClz() { + return targetKuduRowClz; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String masterAddresses; + private String tableName; + private List userTableDataQueryDetailList; + private Long batchRunningInterval; + private KuduStreamingRunningMode runningMode; + private Class targetKuduRowClz; + + public Builder masterAddresses(String masterAddresses) { + this.masterAddresses = masterAddresses; + return this; + } + + public Builder tableName(String tableName) { + this.tableName = tableName; + return this; + } + + public Builder userTableDataQueryDetailList(List userTableDataQueryDetailList) { + this.userTableDataQueryDetailList = userTableDataQueryDetailList; + return this; + } + + public Builder batchRunningInterval(Long batchRunningInterval) { + this.batchRunningInterval = batchRunningInterval; + return this; + } + + public Builder runningMode(KuduStreamingRunningMode runningMode) { + this.runningMode = runningMode; + return this; + } + + public Builder targetKuduRowClz(Class targetKuduRowClz) { + this.targetKuduRowClz = targetKuduRowClz; + return this; + } + + public KuduStreamingSourceConfiguration build() { + return new KuduStreamingSourceConfiguration(this.masterAddresses, + this.tableName, + this.userTableDataQueryDetailList, + this.batchRunningInterval, + this.runningMode, + this.targetKuduRowClz); + } + + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/StreamingLocalEventsManager.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/StreamingLocalEventsManager.java new file mode 100644 index 00000000..4fec56a0 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/StreamingLocalEventsManager.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.kudu.connector.configuration.StreamingColumn; +import org.apache.flink.connectors.kudu.connector.configuration.StreamingKeySorter; +import org.apache.kudu.shaded.com.google.common.base.Joiner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.TreeSet; + +/** + * This is to manage the local buffered events in sorted order for {@link KuduStreamingRunningMode#INCREMENTAL} + * + * @param The mapped Java type against the Kudu table. + */ +@Internal +public class StreamingLocalEventsManager implements Serializable { + private String initialHWM; + + /** + * The manager will be initialized with the user configured lowerKey and upperKey; + */ + private String userConfiguredLowerKey; + private String userConfiguredUpperKey; + + private List streamingColumns; + + private TreeSet localEvents = new TreeSet<>(new EventComparator()); + + private static final String STREAMING_KEY_DELIMITER_RE = "\\|"; + private static final String STREAMING_KEY_DELIMITER = "|"; + + private static final String GET_METHOD_PREFIX = "get"; + + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingLocalEventsManager.class); + + private class EventComparator implements Comparator { + @Override + public int compare(T o1, T o2) { + String[] part1 = getStreamingKeyParts(o1); + String[] part2 = getStreamingKeyParts(o2); + return StreamingKeySorter.compareOffsets(part1, part2, streamingColumns); + } + } + + private String[] getStreamingKeyParts(T row) { + String[] newParts = new String[streamingColumns.size()]; + for (int i = 0; i < streamingColumns.size(); i++) { + StreamingColumn sc = streamingColumns.get(i); + String methodName = + Joiner.on("") + .join(GET_METHOD_PREFIX, + sc.getFieldName().substring(0, 1).toUpperCase(), + sc.getFieldName().substring(1)); + try { + String part = String.valueOf(row.getClass().getDeclaredMethod(methodName).invoke(row)); + newParts[i] = part; + } catch (Exception e) { + LOGGER.error("Fail to get the streaming key dynamically."); + throw new IllegalArgumentException(e); + } + } + return newParts; + } + + private String buildRangeKeyTemplate(String longTypeKey, String stringTypeKey) { + StringBuffer key = new StringBuffer(); + for (int i = 0; i < streamingColumns.size(); i++) { + StreamingColumn sc = streamingColumns.get(i); + if (sc.getFieldType() == Long.class || + sc.getFieldType() == Integer.class || + sc.getFieldType() == Short.class || + sc.getFieldType() == Byte.class) { + key.append(longTypeKey); + } else if (sc.getFieldType() == String.class) { + key.append(stringTypeKey); + } + if (i < streamingColumns.size() - 1) { + key.append(STREAMING_KEY_DELIMITER); + } + } + return key.toString(); + } + + private String buildDefaultLowerStreamingKey() { + return buildRangeKeyTemplate(String.valueOf(Long.MIN_VALUE), "0"); + } + + private String buildDefaultUpperStreamingKey() { + return buildRangeKeyTemplate(String.valueOf(Long.MAX_VALUE), "z"); + } + + public StreamingLocalEventsManager(List streamingColumns, + String userConfiguredLowerKey, + String userConfiguredUpperKey + ) { + this.streamingColumns = streamingColumns; + if (userConfiguredLowerKey == null) { + this.userConfiguredLowerKey = null; //buildDefaultLowerStreamingKey(); + } else { + this.userConfiguredLowerKey = userConfiguredLowerKey; + } + + if (userConfiguredUpperKey == null) { + this.userConfiguredUpperKey = null; //buildDefaultUpperStreamingKey(); + } else { + this.userConfiguredUpperKey = userConfiguredUpperKey; + } + //this.initialHWM = this.userConfiguredLowerKey; + } + + public void update(T row) { + localEvents.add(row); + } + + public void setInitialHWM(String initialHWM) { + this.initialHWM = initialHWM; + } + + public String[] getCurrentHWM() { + if (localEvents.isEmpty()) { + return StringUtils.isNotBlank(initialHWM) ? initialHWM.split(STREAMING_KEY_DELIMITER_RE) : null; + } else { + T lastOne = localEvents.last(); + return getStreamingKeyParts(lastOne); + } + } + + public String getCurrentHWMStr() { + String[] hwm = getCurrentHWM(); + return hwm != null ? Joiner.on(STREAMING_KEY_DELIMITER).join(getCurrentHWM()) : StringUtils.EMPTY; + } + + public TreeSet getSortedLocalEvents() { + return localEvents; + } + + public void next() { + initialHWM = getCurrentHWMStr(); + localEvents.clear(); + } + + public String[] getUserConfiguredUpperKey() { + return StringUtils.isNotBlank(userConfiguredUpperKey) ? userConfiguredUpperKey.split(STREAMING_KEY_DELIMITER_RE) : null; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/assigner/KuduDataSplitsAssigner.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/assigner/KuduDataSplitsAssigner.java new file mode 100644 index 00000000..1e94d619 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/assigner/KuduDataSplitsAssigner.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.assigner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.kudu.connector.KuduDataSplit; +import org.apache.flink.connectors.kudu.connector.KuduStreamingRunningMode; + +@Internal +public class KuduDataSplitsAssigner { + /** + * Assign the data split to the flink TM subTask. The data splits within the same tablet will be handled + * by the same subTask. + * + * @param dataSplit The allocated data split for the specific subtask of kudu source connector + * @param numParallelSubtasks Total number of subtasks for kudu source connector + * @param runningMode Running mode of the current kudu source connector + * @return the calculated value which will be compared with the subTask index, {@link KuduStreamingRunningMode} + */ + public static int assign(KuduDataSplit dataSplit, int numParallelSubtasks, KuduStreamingRunningMode runningMode) { + if (runningMode == KuduStreamingRunningMode.INCREMENTAL) { + return 0; // Always pick up the first subtask to handle + } else { + return ((dataSplit.getTabletId().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks; + } + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/ReflectionTypeDetail.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/ReflectionTypeDetail.java new file mode 100644 index 00000000..615b14fa --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/ReflectionTypeDetail.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.configuration; + +import org.apache.flink.annotation.Internal; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; + +@Internal +public class ReflectionTypeDetail { + private Field field; + private Method method; + + public Field getField() { + return field; + } + + public void setField(Field field) { + this.field = field; + } + + public Method getMethod() { + return method; + } + + public void setMethod(Method method) { + this.method = method; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingColumn.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingColumn.java new file mode 100644 index 00000000..0deda72a --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingColumn.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.configuration; + + +import org.apache.flink.annotation.Internal; + +import java.util.Locale; + +@Internal +public class StreamingColumn implements Comparable { + private String colName; + private String fieldName; + private Class fieldType; + + private Locale locale; + private int order; + + public StreamingColumn(String colName, String fieldName, Class fieldType, int order, Locale locale) { + this.colName = colName; + this.fieldName = fieldName; + this.fieldType = fieldType; + this.order = order; + this.locale = locale; + } + + public String getColName() { + return colName; + } + + + public String getFieldName() { + return fieldName; + } + + public Class getFieldType() { + return fieldType; + } + + public Locale getLocale() { + return locale; + } + + @Override + public int compareTo(StreamingColumn o) { + return order <= o.order ? -1 : 1; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingKeySorter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingKeySorter.java new file mode 100644 index 00000000..bbd65cef --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingKeySorter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.configuration; + +import org.apache.flink.annotation.Internal; + +import java.text.Collator; +import java.util.List; + +@Internal +public class StreamingKeySorter { + public static int compareOffsets(String[] one, String[] two, List streamingColumns) { + for (int i = 0; i < one.length; i++) { + StreamingColumn columnDetail = streamingColumns.get(i); + int r; + if (columnDetail.getFieldType() == Long.class || columnDetail.getFieldType() == Integer.class) { + Long a = Long.valueOf(one[i]); + Long b = Long.valueOf(two[i]); + r = a.compareTo(b); + } else if (columnDetail.getFieldType() == String.class) { + r = Collator.getInstance(columnDetail.getLocale()).compare(one[i], two[i]); + } else { + throw new IllegalArgumentException("Unsupported field type: " + columnDetail.getFieldType() + " for field: " + columnDetail.getFieldName()); + } + + if (r != 0) { + return r; + } + } + return 0; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/UserTableDataQueryDetail.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/UserTableDataQueryDetail.java new file mode 100644 index 00000000..30eb822c --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/UserTableDataQueryDetail.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.configuration; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.kudu.connector.configuration.type.UserTableDataQueryFilter; +import org.apache.kudu.shaded.com.google.common.collect.Lists; + +import java.io.Serializable; +import java.util.List; + +@Internal +public class UserTableDataQueryDetail implements Serializable { + private static final long serialVersionUID = -5260450483244281444L; + private List projectedColumns = Lists.newArrayList(); + private List userTableDataQueryFilters = Lists.newArrayList(); + + /** + * Lower and upper bound keys for query in the incremental mode, the query range will be (lowerBoundKey, upperBoundKey). + * If lowerBoundKey or upperBoundKey is null, the value will be replaced with -inf and +inf. + */ + private String lowerBoundKey; + private String upperBoundKey; + + public List getProjectedColumns() { + return projectedColumns; + } + + public void setProjectedColumns(List projectedColumns) { + this.projectedColumns = projectedColumns; + } + + public List getUserTableDataQueryFilters() { + return userTableDataQueryFilters; + } + + public void setUserTableDataQueryFilters(List userTableDataQueryFilters) { + this.userTableDataQueryFilters = userTableDataQueryFilters; + } + + public String getLowerBoundKey() { + return lowerBoundKey; + } + + public void setLowerBoundKey(String lowerBoundKey) { + this.lowerBoundKey = lowerBoundKey; + } + + public String getUpperBoundKey() { + return upperBoundKey; + } + + public void setUpperBoundKey(String upperBoundKey) { + this.upperBoundKey = upperBoundKey; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/UserTableDataTypeDetail.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/UserTableDataTypeDetail.java new file mode 100644 index 00000000..c1331b55 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/UserTableDataTypeDetail.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.configuration; + +import org.apache.flink.annotation.Internal; +import org.apache.kudu.shaded.com.google.common.collect.Lists; +import org.apache.kudu.shaded.com.google.common.collect.Maps; + +import java.lang.reflect.Constructor; +import java.util.List; +import java.util.Map; + +@Internal +public class UserTableDataTypeDetail { + private List streamingCols = Lists.newArrayList(); + private Map reflectionTypeDetailByColNames = Maps.newHashMap(); + private Constructor userTableDataTypeConstructor; + + public List getStreamingCols() { + return streamingCols; + } + + public void setStreamingCols(List streamingCols) { + this.streamingCols = streamingCols; + } + + public Map getReflectionTypeDetailByColNames() { + return reflectionTypeDetailByColNames; + } + + public void setReflectionTypeDetailByColNames(Map reflectionTypeDetailByColNames) { + this.reflectionTypeDetailByColNames = reflectionTypeDetailByColNames; + } + + public Constructor getUserTableDataTypeConstructor() { + return userTableDataTypeConstructor; + } + + public void setUserTableDataTypeConstructor(Constructor userTableDataTypeConstructor) { + this.userTableDataTypeConstructor = userTableDataTypeConstructor; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/FilterOp.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/FilterOp.java new file mode 100644 index 00000000..7b7dd7a8 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/FilterOp.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.configuration.type; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.kudu.connector.KuduFilterInfo; + +@Internal +public enum FilterOp { + + GREATER(KuduFilterInfo.FilterType.GREATER), + GREATER_EQUAL(KuduFilterInfo.FilterType.GREATER_EQUAL), + EQUAL(KuduFilterInfo.FilterType.EQUAL), + LESS(KuduFilterInfo.FilterType.LESS), + LESS_EQUAL(KuduFilterInfo.FilterType.LESS_EQUAL), + ; + + private KuduFilterInfo.FilterType kuduFilterType; + + FilterOp(KuduFilterInfo.FilterType kuduFilterType) { + this.kuduFilterType = kuduFilterType; + } + + public KuduFilterInfo.FilterType getKuduFilterType() { + return kuduFilterType; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/UserTableDataQueryFilter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/UserTableDataQueryFilter.java new file mode 100644 index 00000000..a7110d79 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/UserTableDataQueryFilter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.configuration.type; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.kudu.connector.query.UserTableDataQueryFilterValueResolver; + +import java.io.Serializable; + +@Internal +public class UserTableDataQueryFilter implements Serializable { + private static final long serialVersionUID = -8244400106392939137L; + + private String colName; + private FilterOp filterOp; + private UserTableDataQueryFilterValueResolver filterValueResolver; + + public UserTableDataQueryFilter(String colName, FilterOp filterOp, UserTableDataQueryFilterValueResolver filterValueResolver) { + this.colName = colName; + this.filterOp = filterOp; + this.filterValueResolver = filterValueResolver; + } + + public String getColName() { + return colName; + } + + public void setColName(String colName) { + this.colName = colName; + } + + public FilterOp getFilterOp() { + return filterOp; + } + + public void setFilterOp(FilterOp filterOp) { + this.filterOp = filterOp; + } + + public UserTableDataQueryFilterValueResolver getFilterValueResolver() { + return filterValueResolver; + } + + public void setFilterValueResolver(UserTableDataQueryFilterValueResolver filterValueResolver) { + this.filterValueResolver = filterValueResolver; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String colName; + private FilterOp filterOp; + private UserTableDataQueryFilterValueResolver filterValueResolver; + + public Builder colName(String colName) { + this.colName = colName; + return this; + } + + public Builder filterOp(FilterOp filterOp) { + this.filterOp = filterOp; + return this; + } + + public Builder filterValueResolver(UserTableDataQueryFilterValueResolver filterValueResolver) { + this.filterValueResolver = filterValueResolver; + return this; + } + + public UserTableDataQueryFilter build() { + return new UserTableDataQueryFilter(colName, filterOp, filterValueResolver); + } + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/ColumnDetail.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/ColumnDetail.java new file mode 100644 index 00000000..6ee115f6 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/ColumnDetail.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.configuration.type.annotation; + +import org.apache.flink.annotation.Internal; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Internal +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +public @interface ColumnDetail { + String name(); +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/FilterDetail.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/FilterDetail.java new file mode 100644 index 00000000..b821540f --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/FilterDetail.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.configuration.type.annotation; + + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.kudu.connector.configuration.type.FilterOp; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Internal +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +public @interface FilterDetail { + String categoryKey(); + FilterOp op(); + String value(); + int order() default 0; +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/StreamingKey.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/StreamingKey.java new file mode 100644 index 00000000..2a667b95 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/type/annotation/StreamingKey.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.configuration.type.annotation; + +import org.apache.flink.annotation.Internal; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Internal +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +public @interface StreamingKey { + int order() default 0; + + String lang() default ""; + + String region() default ""; +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/UserTableDataRowResultConvertor.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/UserTableDataRowResultConvertor.java new file mode 100644 index 00000000..5854673e --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/UserTableDataRowResultConvertor.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.convertor; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connectors.kudu.connector.configuration.UserTableDataTypeDetail; +import org.apache.flink.connectors.kudu.connector.convertor.builder.UserTableDataTypeBuilder; +import org.apache.flink.connectors.kudu.connector.convertor.parser.UserTableDataTypeParser; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.RowResult; + +import java.sql.Timestamp; + +@PublicEvolving +/** + * Generic table data convertor which will convert the Kudu Row to the user defined Java type + * + * @param The mapped Java type against the Kudu table. + */ +public class UserTableDataRowResultConvertor implements RowResultConvertor { + private UserTableDataTypeDetail userTableDataTypeDetail; + + public UserTableDataRowResultConvertor(Class userTableDataType) throws Exception { + this.userTableDataTypeDetail = UserTableDataTypeParser.getInstance().parse(userTableDataType); + } + + public UserTableDataTypeDetail getUserTableDataTypeDetail() { + return userTableDataTypeDetail; + } + + public void setUserTableDataTypeDetail(UserTableDataTypeDetail userTableDataTypeDetail) { + this.userTableDataTypeDetail = userTableDataTypeDetail; + } + + @Override + public T convertor(RowResult row) { + T newUserTypeInst; + try { + newUserTypeInst = (T)userTableDataTypeDetail.getUserTableDataTypeConstructor().newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException("Fail to initialize the UserTableData instance: ", e); + } + + UserTableDataTypeBuilder userTableDataTypeBuilder = new UserTableDataTypeBuilder(userTableDataTypeDetail); + Schema schema = row.getColumnProjection(); + for (ColumnSchema columnSchema : schema.getColumns()) { + String colName = columnSchema.getName(); + Type colType = columnSchema.getType(); + + if (row.isNull(colName)) { + continue; + } + + switch (colType) { + case INT64: + userTableDataTypeBuilder.build(newUserTypeInst, colName, new Long[]{row.getLong(colName)}); + break; + case INT8: + userTableDataTypeBuilder.build(newUserTypeInst, colName, new Byte[]{row.getByte(colName)}); + break; + case INT16: + userTableDataTypeBuilder.build(newUserTypeInst, colName, new Short[]{row.getShort(colName)}); + break; + case INT32: + userTableDataTypeBuilder.build(newUserTypeInst, colName, new Integer[]{row.getInt(colName)}); + break; + case STRING: + userTableDataTypeBuilder.build(newUserTypeInst, colName, new String[]{row.getString(colName)}); + break; + case UNIXTIME_MICROS: + userTableDataTypeBuilder.build(newUserTypeInst, colName, new Timestamp[]{row.getTimestamp(colName)}); + break; + case DOUBLE: + userTableDataTypeBuilder.build(newUserTypeInst, colName, new Double[]{row.getDouble(colName)}); + break; + case FLOAT: + userTableDataTypeBuilder.build(newUserTypeInst, colName, new Float[]{row.getFloat(colName)}); + break; + case BOOL: + userTableDataTypeBuilder.build(newUserTypeInst, colName, new Boolean[]{row.getBoolean(colName)}); + break; + default: + throw new IllegalArgumentException("Illegal column name: " + colName); + } + } + + return newUserTypeInst; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/builder/UserTableDataTypeBuilder.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/builder/UserTableDataTypeBuilder.java new file mode 100644 index 00000000..8d129856 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/builder/UserTableDataTypeBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.convertor.builder; + + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.kudu.connector.configuration.UserTableDataTypeDetail; + +@Internal +public class UserTableDataTypeBuilder { + private UserTableDataTypeDetail userTableDataTypeDetail; + + public UserTableDataTypeBuilder(UserTableDataTypeDetail userTableDataTypeDetail) { + this.userTableDataTypeDetail = userTableDataTypeDetail; + } + + public void build(T userTableDataInstance, String colName, Object[] params) { + try { + userTableDataTypeDetail + .getReflectionTypeDetailByColNames() + .get(colName) + .getMethod() + .invoke(userTableDataInstance, params); + } catch (Exception e) { + throw new IllegalArgumentException("Fail to invoke by column name: " + colName); + } + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/parser/UserTableDataTypeParser.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/parser/UserTableDataTypeParser.java new file mode 100644 index 00000000..76040f6f --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/parser/UserTableDataTypeParser.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.convertor.parser; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.kudu.connector.configuration.ReflectionTypeDetail; +import org.apache.flink.connectors.kudu.connector.configuration.StreamingColumn; +import org.apache.flink.connectors.kudu.connector.configuration.UserTableDataTypeDetail; +import org.apache.flink.connectors.kudu.connector.configuration.type.annotation.ColumnDetail; +import org.apache.flink.connectors.kudu.connector.configuration.type.annotation.StreamingKey; +import org.apache.kudu.shaded.com.google.common.base.Joiner; +import org.apache.kudu.shaded.com.google.common.collect.Maps; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Locale; +import java.util.Map; + +@Internal +public class UserTableDataTypeParser { + private static final String SET_METHOD_PREFIX = "set"; + + private static final UserTableDataTypeParser INST = new UserTableDataTypeParser(); + + private UserTableDataTypeParser() {} + + public static UserTableDataTypeParser getInstance() { + return INST; + } + + public UserTableDataTypeDetail parse(Class userTableDataType) throws Exception { + UserTableDataTypeDetail typeDetail = new UserTableDataTypeDetail(); + + Constructor ctor = userTableDataType.getConstructor(); + typeDetail.setUserTableDataTypeConstructor(ctor); + + Map methodsByName = Maps.newHashMap(); + for (Method method : userTableDataType.getMethods()) { + methodsByName.put(method.getName(), method); + } + for (Field field : userTableDataType.getDeclaredFields()) { + StreamingKey streamingKey = field.getAnnotation(StreamingKey.class); + ColumnDetail columnDetail = field.getAnnotation(ColumnDetail.class); + + if (streamingKey != null) { + typeDetail.getStreamingCols().add( + new StreamingColumn( + columnDetail.name(), + field.getName(), + field.getType(), + streamingKey.order(), + new Locale(streamingKey.lang(), streamingKey.region())) + ); + } + + String fieldName = field.getName(); + String methodName = Joiner.on("").join(SET_METHOD_PREFIX, + fieldName.substring(0, 1).toUpperCase(), + fieldName.substring(1)); + Method method = methodsByName.get(methodName); + + ReflectionTypeDetail reflectionTypeDetail = new ReflectionTypeDetail(); + reflectionTypeDetail.setField(field); + reflectionTypeDetail.setMethod(method); + + typeDetail.getReflectionTypeDetailByColNames().put(columnDetail.name(), reflectionTypeDetail); + } + + // Sort the streaming columns by order in ascending order + Collections.sort(typeDetail.getStreamingCols()); + + return typeDetail; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/discover/KuduDataSplitsDiscoverer.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/discover/KuduDataSplitsDiscoverer.java new file mode 100644 index 00000000..6f641840 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/discover/KuduDataSplitsDiscoverer.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.discover; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.kudu.connector.KuduDataSplit; +import org.apache.flink.connectors.kudu.connector.KuduFilterInfo; +import org.apache.flink.connectors.kudu.connector.reader.KuduReader; +import org.apache.kudu.client.KuduScanToken; +import org.apache.kudu.shaded.com.google.common.collect.Lists; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +@Internal +public class KuduDataSplitsDiscoverer { + private KuduReader reader; + private List filterInfoList; + private List projectedColumnList; + + public KuduDataSplitsDiscoverer(KuduReader reader, List filterInfoList, List projectedColumnList) { + this.reader = reader; + this.filterInfoList = filterInfoList; + this.projectedColumnList = projectedColumnList; + } + + /** + * Get all the data splits against the filterInfoList + * + * @return all the data splits against the filterInfoList + * @throws Exception + */ + public List getAllKuduDataSplits() throws Exception { + List kuduScanTokenList = reader.scanTokens(filterInfoList, projectedColumnList, null); + + List splits = Lists.newArrayList(); + for (KuduScanToken kuduScanToken : kuduScanTokenList) { + KuduDataSplit split = new KuduDataSplit(); + split.setScanToken(kuduScanToken.serialize()); + split.setTabletId(new String(kuduScanToken.getTablet().getTabletId(), StandardCharsets.UTF_8)); + + splits.add(split); + } + return splits; + } + + public static KuduDataSplitsDiscoverer.Builder builder() { + return new KuduDataSplitsDiscoverer.Builder(); + } + + public static class Builder { + private KuduReader reader; + private List filterInfoList; + private List projectedColumnList; + + public Builder reader(KuduReader reader) { + this.reader = reader; + return this; + } + + public Builder filterInfoList(List filterInfoList) { + this.filterInfoList = filterInfoList; + return this; + } + + public Builder projectedColumnList(List projectedColumnList) { + this.projectedColumnList = projectedColumnList; + return this; + } + + public KuduDataSplitsDiscoverer build() { + return new KuduDataSplitsDiscoverer(reader, filterInfoList, projectedColumnList); + } + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/ConstantUserTableDataQueryFilterValueResolver.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/ConstantUserTableDataQueryFilterValueResolver.java new file mode 100644 index 00000000..56a3a50c --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/ConstantUserTableDataQueryFilterValueResolver.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.query; + +public class ConstantUserTableDataQueryFilterValueResolver implements UserTableDataQueryFilterValueResolver { + + private static final long serialVersionUID = 6078695386593882525L; + private Object constantVal; + + public ConstantUserTableDataQueryFilterValueResolver(Object constantVal) { + this.constantVal = constantVal; + } + + @Override + public Object resolve() { + return constantVal; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/SnapshotStateUserTableDataQueryFilterValueResolver.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/SnapshotStateUserTableDataQueryFilterValueResolver.java new file mode 100644 index 00000000..2e721ee2 --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/SnapshotStateUserTableDataQueryFilterValueResolver.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.query; + +public class SnapshotStateUserTableDataQueryFilterValueResolver implements UserTableDataQueryFilterValueResolver { + @Override + public Object resolve() { + return null; + } +} diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/UserTableDataQueryFilterValueResolver.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/UserTableDataQueryFilterValueResolver.java new file mode 100644 index 00000000..99579abd --- /dev/null +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/query/UserTableDataQueryFilterValueResolver.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector.query; + +import java.io.Serializable; + +public interface UserTableDataQueryFilterValueResolver extends Serializable { + Object resolve(); +} diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceTest.java new file mode 100644 index 00000000..f7f8d8ef --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connectors.kudu.connector.convertor.RowResultRowConvertor; +import org.apache.flink.connectors.kudu.connector.reader.KuduInputSplit; +import org.apache.flink.connectors.kudu.connector.reader.KuduReader; +import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig; +import org.apache.flink.connectors.kudu.connector.reader.KuduReaderIterator; +import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper; +import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig; +import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper; +import org.apache.flink.connectors.kudu.streaming.KuduSink; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.types.Row; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Type; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.shaded.com.google.common.collect.Lists; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +public class KuduStreamingSourceTest { + private static final String[] columns = new String[]{"name_col", "id_col", "age_col"}; + + private static StreamingRuntimeContext context; + + private static final String masterAddresses = "192.168.0.68:7051,192.168.0.68:7151,192.168.0.68:7251"; + + @BeforeAll + static void start() { + context = Mockito.mock(StreamingRuntimeContext.class); + Mockito.when(context.isCheckpointingEnabled()).thenReturn(true); + } + + private void prepareData() throws Exception { + KuduTableInfo tableInfo = KuduTableInfo + .forTable("users") + .createTableIfNotExists( + () -> + Lists.newArrayList( + new ColumnSchema + .ColumnSchemaBuilder("name_col", Type.STRING) + .key(true) + .build(), + new ColumnSchema + .ColumnSchemaBuilder("id_col", Type.INT64) + .key(true) + .build(), + new ColumnSchema + .ColumnSchemaBuilder("age_col", Type.INT32) + .key(false) + .build() + ), + () -> new CreateTableOptions() + .setNumReplicas(3) + .addHashPartitions(Lists.newArrayList("name_col", "id_col"), 6)); + + KuduWriterConfig writerConfig = KuduWriterConfig.Builder + .setMasters(masterAddresses) + .setEventualConsistency() + .build(); + KuduSink sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)); + + sink.setRuntimeContext(context); + sink.open(new Configuration()); + + + Row kuduRow = new Row(3); + kuduRow.setField(0, "Tom"); + kuduRow.setField(1, 1000L); + kuduRow.setField(2, 10); + + Row kuduRow2 = new Row(3); + kuduRow2.setField(0, "Mary"); + kuduRow2.setField(1, 2000L); + kuduRow2.setField(2, 20); + + Row kuduRow3 = new Row(3); + kuduRow3.setField(0, "Jack"); + kuduRow3.setField(1, 3000L); + kuduRow3.setField(2, 30); + + sink.invoke(kuduRow); + sink.invoke(kuduRow2); + sink.invoke(kuduRow3); + + + // sleep to allow eventual consistency to finish + Thread.sleep(1000); + + sink.close(); + + List rows = readRows(tableInfo); + for (Row row : rows) { + System.out.println(row.getField(0) + " " + row.getField(1) + " " + row.getField(2)); + } + } + + protected List readRows(KuduTableInfo tableInfo) throws Exception { + KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build(); + KuduReader reader = new KuduReader<>(tableInfo, readerConfig, new RowResultRowConvertor()); + + KuduInputSplit[] splits = reader.createInputSplits(1); + List rows = new ArrayList<>(); + for (KuduInputSplit split : splits) { + KuduReaderIterator resultIterator = reader.scanner(split.getScanToken()); + while (resultIterator.hasNext()) { + Row row = resultIterator.next(); + if (row != null) { + rows.add(row); + } + } + } + reader.close(); + + return rows; + } + @Test + void testCustomQuery() throws Exception { + prepareData(); + } +} diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeyOffsetManagerTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeyOffsetManagerTest.java new file mode 100644 index 00000000..4c645069 --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeyOffsetManagerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + + +import org.apache.flink.connectors.kudu.connector.configuration.StreamingColumn; +import org.apache.kudu.shaded.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; + +public class StreamingKeyOffsetManagerTest { + @Test + public void testUpdate() throws Exception { + List streamingColumns = Lists.newArrayList(); + + StreamingColumn one = new StreamingColumn("name_col", "name", String.class, 1, Locale.US); + StreamingColumn two = new StreamingColumn("id_col", "id", Long.class, 2, Locale.US); + + streamingColumns.add(one); + streamingColumns.add(two); + + UserType row = new UserType(); + row.setId(1L); + row.setName("hello_world"); + row.setAge(1); + + UserType row2 = new UserType(); + row2.setId(2L); + row2.setName("hello_world"); + row2.setAge(1); + + UserType row3 = new UserType(); + row3.setId(1L); + row3.setName("hello_world2"); + row3.setAge(1); + + StreamingLocalEventsManager streamingLocalEventsManager = + new StreamingLocalEventsManager<>(Arrays.asList(one, two), null, null); + + Assert.assertEquals("0|" + Long.MIN_VALUE, streamingLocalEventsManager.getCurrentHWMStr()); + + streamingLocalEventsManager.update(row); + Assert.assertEquals("hello_world|1", streamingLocalEventsManager.getCurrentHWMStr()); + + streamingLocalEventsManager.update(row2); + Assert.assertEquals("hello_world|2", streamingLocalEventsManager.getCurrentHWMStr()); + + Assert.assertEquals(2, streamingLocalEventsManager.getSortedLocalEvents().size()); + Iterator itr = streamingLocalEventsManager.getSortedLocalEvents().iterator(); + long id = 1L; + while (itr.hasNext()) { + Assert.assertEquals(Long.valueOf(id++), itr.next().getId()); + } + + streamingLocalEventsManager.update(row3); + Assert.assertEquals("hello_world2|1", streamingLocalEventsManager.getCurrentHWMStr()); + + String[] upperBoundKey = streamingLocalEventsManager.getUserConfiguredUpperKey(); + Assert.assertEquals(2, upperBoundKey.length); + Assert.assertEquals("z", upperBoundKey[0]); + Assert.assertEquals(String.valueOf(Long.MAX_VALUE), upperBoundKey[1]); + } +} diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeySorterTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeySorterTest.java new file mode 100644 index 00000000..d4b13b17 --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeySorterTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + +import org.apache.flink.connectors.kudu.connector.configuration.StreamingColumn; +import org.apache.flink.connectors.kudu.connector.configuration.StreamingKeySorter; +import org.apache.kudu.shaded.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Locale; + +public class StreamingKeySorterTest { + @Test + public void testCompareOffsets() { + List streamingColumns = Lists.newArrayList(); + + StreamingColumn one = new StreamingColumn("col_1", "col1", Long.class, 1, Locale.US); + StreamingColumn two = new StreamingColumn("col_2", "col2", String.class, 2, Locale.US); + + streamingColumns.add(one); + streamingColumns.add(two); + + String[] key1 = {"-1", "b"}; + String[] key2 = {"1", "a"}; + Assert.assertTrue(StreamingKeySorter.compareOffsets(key1, key2, streamingColumns) < 0); + + String[] key3 = {"1", "b"}; + String[] key4 = {"0", "a"}; + Assert.assertTrue(StreamingKeySorter.compareOffsets(key3, key4, streamingColumns) > 0); + + String[] key5 = {"1", "b"}; + String[] key6 = {"1", "a"}; + Assert.assertTrue(StreamingKeySorter.compareOffsets(key5, key6, streamingColumns) > 0); + + String[] key7 = {"1", "a"}; + String[] key8 = {"1", "a"}; + Assert.assertTrue(StreamingKeySorter.compareOffsets(key7, key8, streamingColumns) == 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testCompareOffsetsWithIllegalType() { + List streamingColumns = Lists.newArrayList(); + + StreamingColumn one = new StreamingColumn("col_1", "col1", Long.class, 1, Locale.US); + StreamingColumn two = new StreamingColumn("col_2", "col2", UserType.class, 2, Locale.US); + + streamingColumns.add(one); + streamingColumns.add(two); + + String[] key1 = {"1", "{}"}; + String[] key2 = {"1", "{}"}; + StreamingKeySorter.compareOffsets(key1, key2, streamingColumns); + } +} diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeBuilderTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeBuilderTest.java new file mode 100644 index 00000000..82926572 --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeBuilderTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + +import org.apache.flink.connectors.kudu.connector.configuration.UserTableDataTypeDetail; +import org.apache.flink.connectors.kudu.connector.convertor.builder.UserTableDataTypeBuilder; +import org.apache.flink.connectors.kudu.connector.convertor.parser.UserTableDataTypeParser; +import org.junit.Assert; +import org.junit.Test; + +public class UserTableDataTypeBuilderTest { + @Test + public void build() throws Exception { + UserTableDataTypeDetail detail = UserTableDataTypeParser.getInstance().parse(UserType.class); + + UserTableDataTypeBuilder builder = new UserTableDataTypeBuilder(detail); + + UserType userType = (UserType)detail.getUserTableDataTypeConstructor().newInstance(); + builder.build(userType, "id_col", new Long[]{123L}); + builder.build(userType, "name_col", new String[]{"hello_world"}); + builder.build(userType, "age_col", new Integer[]{100}); + + Assert.assertEquals(Long.valueOf(123L), userType.getId()); + Assert.assertEquals("hello_world", userType.getName()); + Assert.assertEquals(Integer.valueOf(100), userType.getAge()); + } +} diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeParserTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeParserTest.java new file mode 100644 index 00000000..c1d14365 --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeParserTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + +import org.apache.flink.connectors.kudu.connector.configuration.ReflectionTypeDetail; +import org.apache.flink.connectors.kudu.connector.configuration.UserTableDataTypeDetail; +import org.apache.flink.connectors.kudu.connector.convertor.parser.UserTableDataTypeParser; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class UserTableDataTypeParserTest { + @Test + public void testParse() throws Exception { + UserTableDataTypeDetail detail = UserTableDataTypeParser.getInstance().parse(UserType.class); + + Assert.assertEquals(2, detail.getStreamingCols().size()); + Assert.assertEquals("name_col", detail.getStreamingCols().get(0).getColName()); + Assert.assertEquals("id_col", detail.getStreamingCols().get(1).getColName()); + + Map reflectionTypeDetailMap = detail.getReflectionTypeDetailByColNames(); + + Assert.assertEquals("id", reflectionTypeDetailMap.get("id_col").getField().getName()); + Assert.assertEquals("name", reflectionTypeDetailMap.get("name_col").getField().getName()); + Assert.assertEquals("age", reflectionTypeDetailMap.get("age_col").getField().getName()); + + Assert.assertEquals("setId", reflectionTypeDetailMap.get("id_col").getMethod().getName()); + Assert.assertEquals("setName", reflectionTypeDetailMap.get("name_col").getMethod().getName()); + Assert.assertEquals("setAge", reflectionTypeDetailMap.get("age_col").getMethod().getName()); + + Assert.assertEquals("org.apache.flink.connectors.kudu.connector.UserType", + detail.getUserTableDataTypeConstructor().getName()); + } +} diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserType.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserType.java new file mode 100644 index 00000000..25dafece --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserType.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + + +import org.apache.flink.connectors.kudu.connector.configuration.type.annotation.ColumnDetail; +import org.apache.flink.connectors.kudu.connector.configuration.type.annotation.StreamingKey; + +public class UserType { + @StreamingKey(order = 2) + @ColumnDetail(name = "id_col") + private Long id; + + @StreamingKey(order = 1) + @ColumnDetail(name = "name_col") + private String name; + + @ColumnDetail(name = "age_col") + private Integer age; + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getAge() { + return age; + } + + public void setAge(Integer age) { + this.age = age; + } +} From 40f1b695262389191391b1a4c1f4cb21e9a4d172 Mon Sep 17 00:00:00 2001 From: wchen11 Date: Sun, 25 Jun 2023 11:05:06 +0800 Subject: [PATCH 2/4] [BAHIR-322] Add unit tests --- .../kudu/connector/KuduStreamingSource.java | 8 +- .../configuration/StreamingKeySorter.java | 5 +- .../connector/KuduStreamingSourceTest.java | 358 ++++++++++++++++-- .../StreamingKeyOffsetManagerTest.java | 19 +- .../connector/StreamingKeySorterTest.java | 10 +- .../UserTableDataTypeBuilderTest.java | 8 +- .../UserTableDataTypeParserTest.java | 22 +- .../connectors/kudu/connector/UserType.java | 34 ++ 8 files changed, 396 insertions(+), 68 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java index 706d51c4..8d4b9d35 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java @@ -99,10 +99,10 @@ private Object getTypeValue(Class clz, String value) { return Short.valueOf(value); } else if (clz == Byte.class) { return Byte.valueOf(value); - } else if (clz == Timestamp.class) { - return new Timestamp(Long.valueOf(value)); - } else { + } else if (clz == String.class){ return value; + } else { + throw new IllegalArgumentException("Unsupported type: " + clz + " for streaming key."); } } @@ -205,7 +205,7 @@ public void run(SourceContext sourceContext) { while (resultIterator.hasNext()) { T row = resultIterator.next(); if (row != null) { - /** For the running mode == KuduStreamingRunningMode.INCREMENTAL, we need to manage the offsets of the table. + /** For the running mode == KuduStreamingRunningMode#INCREMENTAL, we need to manage the offsets of the table. * The data will be in the local buffer and sorted before emitting. */ if (kuduStreamingSourceConfiguration.getRunningMode() == KuduStreamingRunningMode.INCREMENTAL) { diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingKeySorter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingKeySorter.java index bbd65cef..6fe087c3 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingKeySorter.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/configuration/StreamingKeySorter.java @@ -27,7 +27,10 @@ public static int compareOffsets(String[] one, String[] two, List new CreateTableOptions() .setNumReplicas(3) - .addHashPartitions(Lists.newArrayList("name_col", "id_col"), 6)); + .addHashPartitions(Lists.newArrayList("name_col", "id_col"), 3)); KuduWriterConfig writerConfig = KuduWriterConfig.Builder .setMasters(masterAddresses) @@ -109,34 +118,317 @@ private void prepareData() throws Exception { Thread.sleep(1000); sink.close(); + } + + @Test + void testCustomQuery() throws Exception { + String masterAddresses = getMasterAddress(); + + prepareData(); + + UserTableDataQueryDetail detail = new UserTableDataQueryDetail(); + detail.setProjectedColumns(Arrays.asList("name_col", "id_col", "age_col")); - List rows = readRows(tableInfo); - for (Row row : rows) { - System.out.println(row.getField(0) + " " + row.getField(1) + " " + row.getField(2)); + UserTableDataQueryFilter nameFilter = UserTableDataQueryFilter.builder() + .colName("name_col") + .filterOp(FilterOp.GREATER_EQUAL) + .filterValueResolver(() -> "Jack").build(); + + UserTableDataQueryFilter idFilter = UserTableDataQueryFilter.builder() + .colName("id_col") + .filterOp(FilterOp.LESS_EQUAL) + .filterValueResolver(() -> 2000L).build(); + detail.setUserTableDataQueryFilters(Arrays.asList(nameFilter, idFilter)); + + KuduStreamingSourceConfiguration configuration = + KuduStreamingSourceConfiguration.builder() + .masterAddresses(masterAddresses) + .tableName("users") + .batchRunningInterval(1000L) + .runningMode(KuduStreamingRunningMode.CUSTOM_QUERY) + .targetKuduRowClz(UserType.class) + .userTableDataQueryDetailList(Arrays.asList(detail)) + .build(); + + KuduStreamingSource sourceFunction = new KuduStreamingSource<>(configuration); + + EchoSink echoSink = new EchoSink(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.addSource(sourceFunction).setParallelism(1) + .returns(TypeInformation.of(new TypeHint() {})) + .addSink(echoSink).setParallelism(1); + + + CompletableFuture completableFuture = CompletableFuture.runAsync(() -> { + try { + env.execute(); + } catch (Exception e) {} + }); + + try { + completableFuture.get(3, TimeUnit.SECONDS); + } catch (TimeoutException e) { + completableFuture.cancel(true); } - } - protected List readRows(KuduTableInfo tableInfo) throws Exception { - KuduReaderConfig readerConfig = KuduReaderConfig.Builder.setMasters(masterAddresses).build(); - KuduReader reader = new KuduReader<>(tableInfo, readerConfig, new RowResultRowConvertor()); - - KuduInputSplit[] splits = reader.createInputSplits(1); - List rows = new ArrayList<>(); - for (KuduInputSplit split : splits) { - KuduReaderIterator resultIterator = reader.scanner(split.getScanToken()); - while (resultIterator.hasNext()) { - Row row = resultIterator.next(); - if (row != null) { - rows.add(row); - } + int matched = 0; + for (UserType user : echoSink.getCollectedData()) { + if (user.equals(new UserType(1000L, "Tom", 10))) { + matched++; + } else if (user.equals(new UserType(2000L, "Mary", 20))) { + matched++; } } - reader.close(); - return rows; + Assertions.assertEquals(2, matched); } + @Test - void testCustomQuery() throws Exception { - prepareData(); + public void testIncremental() throws Exception { + String masterAddresses = getMasterAddress(); + String tableName = "streaming_key_types"; + String[] columns = {"long_val", "int_val", "short_val", "byte_val", "str_val"}; + KuduTableInfo tableInfo = KuduTableInfo + .forTable(tableName) + .createTableIfNotExists( + () -> + Lists.newArrayList( + new ColumnSchema + .ColumnSchemaBuilder("long_val", Type.INT64) + .key(true) + .build(), + new ColumnSchema + .ColumnSchemaBuilder("int_val", Type.INT32) + .key(true) + .build(), + new ColumnSchema + .ColumnSchemaBuilder("short_val", Type.INT16) + .key(true) + .build(), + new ColumnSchema + .ColumnSchemaBuilder("byte_val", Type.INT8) + .key(true) + .build(), + new ColumnSchema + .ColumnSchemaBuilder("str_val", Type.STRING) + .key(true) + .build() + ), + () -> new CreateTableOptions() + .setNumReplicas(3) + .addHashPartitions( + Lists.newArrayList(columns), + 3)); + + KuduWriterConfig writerConfig = KuduWriterConfig.Builder + .setMasters(masterAddresses) + .setEventualConsistency() + .build(); + KuduSink sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)); + + sink.setRuntimeContext(context); + sink.open(new Configuration()); + + Row kuduRow = new Row(5); + kuduRow.setField(0, 1L); + kuduRow.setField(1, 1); + kuduRow.setField(2, (short)1); + kuduRow.setField(3, (byte)1); + kuduRow.setField(4, "a"); + + Row kuduRow2 = new Row(5); + kuduRow2.setField(0, 1L); + kuduRow2.setField(1, 2); + kuduRow2.setField(2, (short)1); + kuduRow2.setField(3, (byte)1); + kuduRow2.setField(4, "a"); + + Row kuduRow3 = new Row(5); + kuduRow3.setField(0, 1L); + kuduRow3.setField(1, 2); + kuduRow3.setField(2, (short)3); + kuduRow3.setField(3, (byte)1); + kuduRow3.setField(4, "a"); + + Row kuduRow4 = new Row(5); + kuduRow4.setField(0, 1L); + kuduRow4.setField(1, 2); + kuduRow4.setField(2, (short)3); + kuduRow4.setField(3, (byte)4); + kuduRow4.setField(4, "a"); + + Row kuduRow5 = new Row(5); + kuduRow5.setField(0, 1L); + kuduRow5.setField(1, 2); + kuduRow5.setField(2, (short)3); + kuduRow5.setField(3, (byte)4); + kuduRow5.setField(4, "b"); + + sink.invoke(kuduRow); + sink.invoke(kuduRow2); + sink.invoke(kuduRow3); + sink.invoke(kuduRow4); + sink.invoke(kuduRow5); + + Thread.sleep(1000); + + sink.close(); + + UserTableDataQueryDetail detail = new UserTableDataQueryDetail(); + detail.setProjectedColumns(Arrays.asList(columns)); + + KuduStreamingSourceConfiguration configuration = + KuduStreamingSourceConfiguration.builder() + .masterAddresses(masterAddresses) + .tableName(tableName) + .batchRunningInterval(1000L) + .runningMode(KuduStreamingRunningMode.INCREMENTAL) + .targetKuduRowClz(StreamingKeyType.class) + .userTableDataQueryDetailList(Arrays.asList(detail)) + .build(); + + KuduStreamingSource sourceFunction = new KuduStreamingSource<>(configuration); + + OrderedEchoSink orderedEchoSink = new OrderedEchoSink(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(3000); + env.addSource(sourceFunction).setParallelism(1) + .returns(TypeInformation.of(new TypeHint() {})) + .addSink(orderedEchoSink).setParallelism(1); + + + + CompletableFuture completableFuture = CompletableFuture.runAsync(() -> { + try { + env.execute(); + } catch (Exception e) {} + }); + + try { + completableFuture.get(10, TimeUnit.SECONDS); + } catch (TimeoutException e) { + completableFuture.cancel(true); + } + + List typeList = orderedEchoSink.getOrderedData(); + Assertions.assertEquals(new StreamingKeyType(1L, 1, (short)1, (byte)1, "a"), typeList.get(0)); + Assertions.assertEquals(new StreamingKeyType(1L, 2, (short)1, (byte)1, "a"), typeList.get(1)); + Assertions.assertEquals(new StreamingKeyType(1L, 2, (short)3, (byte)1, "a"), typeList.get(2)); + Assertions.assertEquals(new StreamingKeyType(1L, 2, (short)3, (byte)4, "a"), typeList.get(3)); + Assertions.assertEquals(new StreamingKeyType(1L, 2, (short)3, (byte)4, "b"), typeList.get(4)); + } + + public static class StreamingKeyType { + @StreamingKey(order = 1) + @ColumnDetail(name = "long_val") + private Long longVal; + + @StreamingKey(order = 2) + @ColumnDetail(name = "int_val") + private Integer intVal; + + @StreamingKey(order = 3) + @ColumnDetail(name = "short_val") + private Short shortVal; + + @StreamingKey(order = 4) + @ColumnDetail(name = "byte_val") + private Byte byteVal; + + @StreamingKey(order = 5) + @ColumnDetail(name = "str_val") + private String strVal; + + public StreamingKeyType() { + } + + public StreamingKeyType(Long longVal, Integer intVal, Short shortVal, Byte byteVal, String strVal) { + this.longVal = longVal; + this.intVal = intVal; + this.shortVal = shortVal; + this.byteVal = byteVal; + this.strVal = strVal; + } + + public Long getLongVal() { + return longVal; + } + + public void setLongVal(Long longVal) { + this.longVal = longVal; + } + + public Integer getIntVal() { + return intVal; + } + + public void setIntVal(Integer intVal) { + this.intVal = intVal; + } + + public Short getShortVal() { + return shortVal; + } + + public void setShortVal(Short shortVal) { + this.shortVal = shortVal; + } + + public Byte getByteVal() { + return byteVal; + } + + public void setByteVal(Byte byteVal) { + this.byteVal = byteVal; + } + + public String getStrVal() { + return strVal; + } + + public void setStrVal(String strVal) { + this.strVal = strVal; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StreamingKeyType that = (StreamingKeyType) o; + return Objects.equals(longVal, that.longVal) && Objects.equals(intVal, that.intVal) && Objects.equals(shortVal, that.shortVal) && Objects.equals(byteVal, that.byteVal) && Objects.equals(strVal, that.strVal); + } + + @Override + public int hashCode() { + return Objects.hash(longVal, intVal, shortVal, byteVal, strVal); + } + } + + private static class OrderedEchoSink extends RichSinkFunction { + + private static final List orderedData = Lists.newArrayList(); + + @Override + public void invoke(StreamingKeyType value, SinkFunction.Context ctx) throws Exception { + orderedData.add(value); + } + + + public List getOrderedData() { + return orderedData; + } + } + + private static class EchoSink extends RichSinkFunction { + private static final Set collectedData = Sets.newHashSet(); + + @Override + public void invoke(UserType value, SinkFunction.Context ctx) throws Exception { + collectedData.add(value); + } + + public Set getCollectedData() { + return collectedData; + } } } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeyOffsetManagerTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeyOffsetManagerTest.java index 4c645069..c37054fe 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeyOffsetManagerTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeyOffsetManagerTest.java @@ -17,10 +17,11 @@ package org.apache.flink.connectors.kudu.connector; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.connectors.kudu.connector.configuration.StreamingColumn; import org.apache.kudu.shaded.com.google.common.collect.Lists; -import org.junit.Assert; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import java.util.Arrays; import java.util.Iterator; @@ -56,27 +57,25 @@ public void testUpdate() throws Exception { StreamingLocalEventsManager streamingLocalEventsManager = new StreamingLocalEventsManager<>(Arrays.asList(one, two), null, null); - Assert.assertEquals("0|" + Long.MIN_VALUE, streamingLocalEventsManager.getCurrentHWMStr()); + Assertions.assertEquals(StringUtils.EMPTY, streamingLocalEventsManager.getCurrentHWMStr()); streamingLocalEventsManager.update(row); - Assert.assertEquals("hello_world|1", streamingLocalEventsManager.getCurrentHWMStr()); + Assertions.assertEquals("hello_world|1", streamingLocalEventsManager.getCurrentHWMStr()); streamingLocalEventsManager.update(row2); - Assert.assertEquals("hello_world|2", streamingLocalEventsManager.getCurrentHWMStr()); + Assertions.assertEquals("hello_world|2", streamingLocalEventsManager.getCurrentHWMStr()); - Assert.assertEquals(2, streamingLocalEventsManager.getSortedLocalEvents().size()); + Assertions.assertEquals(2, streamingLocalEventsManager.getSortedLocalEvents().size()); Iterator itr = streamingLocalEventsManager.getSortedLocalEvents().iterator(); long id = 1L; while (itr.hasNext()) { - Assert.assertEquals(Long.valueOf(id++), itr.next().getId()); + Assertions.assertEquals(Long.valueOf(id++), itr.next().getId()); } streamingLocalEventsManager.update(row3); - Assert.assertEquals("hello_world2|1", streamingLocalEventsManager.getCurrentHWMStr()); + Assertions.assertEquals("hello_world2|1", streamingLocalEventsManager.getCurrentHWMStr()); String[] upperBoundKey = streamingLocalEventsManager.getUserConfiguredUpperKey(); - Assert.assertEquals(2, upperBoundKey.length); - Assert.assertEquals("z", upperBoundKey[0]); - Assert.assertEquals(String.valueOf(Long.MAX_VALUE), upperBoundKey[1]); + Assertions.assertNull(upperBoundKey); } } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeySorterTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeySorterTest.java index d4b13b17..f1e2735e 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeySorterTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/StreamingKeySorterTest.java @@ -19,8 +19,8 @@ import org.apache.flink.connectors.kudu.connector.configuration.StreamingColumn; import org.apache.flink.connectors.kudu.connector.configuration.StreamingKeySorter; import org.apache.kudu.shaded.com.google.common.collect.Lists; -import org.junit.Assert; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import java.util.List; import java.util.Locale; @@ -38,19 +38,19 @@ public void testCompareOffsets() { String[] key1 = {"-1", "b"}; String[] key2 = {"1", "a"}; - Assert.assertTrue(StreamingKeySorter.compareOffsets(key1, key2, streamingColumns) < 0); + Assertions.assertTrue(StreamingKeySorter.compareOffsets(key1, key2, streamingColumns) < 0); String[] key3 = {"1", "b"}; String[] key4 = {"0", "a"}; - Assert.assertTrue(StreamingKeySorter.compareOffsets(key3, key4, streamingColumns) > 0); + Assertions.assertTrue(StreamingKeySorter.compareOffsets(key3, key4, streamingColumns) > 0); String[] key5 = {"1", "b"}; String[] key6 = {"1", "a"}; - Assert.assertTrue(StreamingKeySorter.compareOffsets(key5, key6, streamingColumns) > 0); + Assertions.assertTrue(StreamingKeySorter.compareOffsets(key5, key6, streamingColumns) > 0); String[] key7 = {"1", "a"}; String[] key8 = {"1", "a"}; - Assert.assertTrue(StreamingKeySorter.compareOffsets(key7, key8, streamingColumns) == 0); + Assertions.assertTrue(StreamingKeySorter.compareOffsets(key7, key8, streamingColumns) == 0); } @Test(expected = IllegalArgumentException.class) diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeBuilderTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeBuilderTest.java index 82926572..888e2db5 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeBuilderTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeBuilderTest.java @@ -19,8 +19,8 @@ import org.apache.flink.connectors.kudu.connector.configuration.UserTableDataTypeDetail; import org.apache.flink.connectors.kudu.connector.convertor.builder.UserTableDataTypeBuilder; import org.apache.flink.connectors.kudu.connector.convertor.parser.UserTableDataTypeParser; -import org.junit.Assert; import org.junit.Test; +import org.junit.jupiter.api.Assertions; public class UserTableDataTypeBuilderTest { @Test @@ -34,8 +34,8 @@ public void build() throws Exception { builder.build(userType, "name_col", new String[]{"hello_world"}); builder.build(userType, "age_col", new Integer[]{100}); - Assert.assertEquals(Long.valueOf(123L), userType.getId()); - Assert.assertEquals("hello_world", userType.getName()); - Assert.assertEquals(Integer.valueOf(100), userType.getAge()); + Assertions.assertEquals(Long.valueOf(123L), userType.getId()); + Assertions.assertEquals("hello_world", userType.getName()); + Assertions.assertEquals(Integer.valueOf(100), userType.getAge()); } } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeParserTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeParserTest.java index c1d14365..de12b56a 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeParserTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserTableDataTypeParserTest.java @@ -19,8 +19,8 @@ import org.apache.flink.connectors.kudu.connector.configuration.ReflectionTypeDetail; import org.apache.flink.connectors.kudu.connector.configuration.UserTableDataTypeDetail; import org.apache.flink.connectors.kudu.connector.convertor.parser.UserTableDataTypeParser; -import org.junit.Assert; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import java.util.Map; @@ -29,21 +29,21 @@ public class UserTableDataTypeParserTest { public void testParse() throws Exception { UserTableDataTypeDetail detail = UserTableDataTypeParser.getInstance().parse(UserType.class); - Assert.assertEquals(2, detail.getStreamingCols().size()); - Assert.assertEquals("name_col", detail.getStreamingCols().get(0).getColName()); - Assert.assertEquals("id_col", detail.getStreamingCols().get(1).getColName()); + Assertions.assertEquals(2, detail.getStreamingCols().size()); + Assertions.assertEquals("name_col", detail.getStreamingCols().get(0).getColName()); + Assertions.assertEquals("id_col", detail.getStreamingCols().get(1).getColName()); Map reflectionTypeDetailMap = detail.getReflectionTypeDetailByColNames(); - Assert.assertEquals("id", reflectionTypeDetailMap.get("id_col").getField().getName()); - Assert.assertEquals("name", reflectionTypeDetailMap.get("name_col").getField().getName()); - Assert.assertEquals("age", reflectionTypeDetailMap.get("age_col").getField().getName()); + Assertions.assertEquals("id", reflectionTypeDetailMap.get("id_col").getField().getName()); + Assertions.assertEquals("name", reflectionTypeDetailMap.get("name_col").getField().getName()); + Assertions.assertEquals("age", reflectionTypeDetailMap.get("age_col").getField().getName()); - Assert.assertEquals("setId", reflectionTypeDetailMap.get("id_col").getMethod().getName()); - Assert.assertEquals("setName", reflectionTypeDetailMap.get("name_col").getMethod().getName()); - Assert.assertEquals("setAge", reflectionTypeDetailMap.get("age_col").getMethod().getName()); + Assertions.assertEquals("setId", reflectionTypeDetailMap.get("id_col").getMethod().getName()); + Assertions.assertEquals("setName", reflectionTypeDetailMap.get("name_col").getMethod().getName()); + Assertions.assertEquals("setAge", reflectionTypeDetailMap.get("age_col").getMethod().getName()); - Assert.assertEquals("org.apache.flink.connectors.kudu.connector.UserType", + Assertions.assertEquals("org.apache.flink.connectors.kudu.connector.UserType", detail.getUserTableDataTypeConstructor().getName()); } } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserType.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserType.java index 25dafece..2bcbd0ef 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserType.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/UserType.java @@ -20,6 +20,8 @@ import org.apache.flink.connectors.kudu.connector.configuration.type.annotation.ColumnDetail; import org.apache.flink.connectors.kudu.connector.configuration.type.annotation.StreamingKey; +import java.util.Objects; + public class UserType { @StreamingKey(order = 2) @ColumnDetail(name = "id_col") @@ -32,6 +34,15 @@ public class UserType { @ColumnDetail(name = "age_col") private Integer age; + public UserType() { + } + + public UserType(Long id, String name, Integer age) { + this.id = id; + this.name = name; + this.age = age; + } + public Long getId() { return id; } @@ -55,4 +66,27 @@ public Integer getAge() { public void setAge(Integer age) { this.age = age; } + + @Override + public String toString() { + return "[name=" + name + ", id=" + id + ", age=" + age + "]"; + } + + @Override + public int hashCode() { + return Objects.hash(name, id, age); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof UserType)) { + return false; + } else { + UserType other = (UserType) obj; + return ((name == null && other.name == null) || (name != null && name.equals(other.name))) && + ((id == null && other.id == null) || (id != null && id.equals(other.id))) && + ((age == null && other.age == null) || (age != null && age.equals(other.age))); + } + } } From d87ae63d26c85a294622a904e4529bf9d63d3213 Mon Sep 17 00:00:00 2001 From: wchen11 Date: Sun, 25 Jun 2023 11:27:38 +0800 Subject: [PATCH 3/4] [BAHIR-322] Refactor the codes --- .../connectors/kudu/connector/KuduStreamingSource.java | 1 - .../kudu/connector/KuduStreamingSourceTest.java | 8 ++++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java index 8d4b9d35..0e904e75 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSource.java @@ -42,7 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Timestamp; import java.util.Arrays; import java.util.Iterator; import java.util.List; diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceTest.java index c04ae24a..4151a7de 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduStreamingSourceTest.java @@ -162,7 +162,9 @@ void testCustomQuery() throws Exception { CompletableFuture completableFuture = CompletableFuture.runAsync(() -> { try { env.execute(); - } catch (Exception e) {} + } catch (Exception e) { + Assertions.fail("Executing job failed."); + } }); try { @@ -301,7 +303,9 @@ public void testIncremental() throws Exception { CompletableFuture completableFuture = CompletableFuture.runAsync(() -> { try { env.execute(); - } catch (Exception e) {} + } catch (Exception e) { + Assertions.fail("Executing job failed"); + } }); try { From 8068bd548f4950fa389da75fb8b96aa9c8f8f490 Mon Sep 17 00:00:00 2001 From: wchen11 Date: Tue, 4 Jul 2023 12:14:40 +0800 Subject: [PATCH 4/4] add new workflow --- .github/workflows/maven-ci-local.yml | 63 ++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 .github/workflows/maven-ci-local.yml diff --git a/.github/workflows/maven-ci-local.yml b/.github/workflows/maven-ci-local.yml new file mode 100644 index 00000000..4b1039af --- /dev/null +++ b/.github/workflows/maven-ci-local.yml @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: Local Build + +on: + workflow_dispatch: + inputs: + logLevel: + description: 'Log level' + required: true + default: 'debug' + type: choice + options: + - info + - warning + - debug + tags: + description: 'Local Build' + required: false + type: boolean + environment: + description: 'Environment to run local build against ' + type: environment + required: true + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + java: ['8', '11'] + flink-version: ['1.17.0'] + connector: [ + 'flink-connector-pinot' + ] + + steps: + - uses: actions/checkout@v3 + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v3 + with: + java-version: ${{ matrix.java }} + distribution: 'zulu' + cache: maven + - name: Build with flink ${{ matrix.flink-version }} + run: mvn -B clean verify -pl ${{ matrix.connector }} -am -Dscala-2.12 -Dflink.version=${{ matrix.flink-version }}