Skip to content

Commit

Permalink
[hotfix] Remove redundant and unused test code (apache#3066)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored and zhu3pang committed Mar 29, 2024
1 parent e38916e commit df25ef2
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,6 @@ public void before() throws IOException {
prepareEnv();
}

protected Table getPaimonTable(String tableName) {
FlinkCatalog flinkCatalog = (FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
try {
return flinkCatalog
.catalog()
.getTable(new Identifier(tEnv.getCurrentDatabase(), tableName));
} catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
}

protected Map<String, String> catalogOptions() {
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testQueryServiceLookup() throws Exception {
sql(
"CREATE TABLE DIM (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ('bucket' = '2', 'continuous.discovery-interval' = '1ms')");
CloseableIterator<Row> service = streamSqlIter("CALL sys.query_service('default.DIM', 2)");
RemoteTableQuery query = new RemoteTableQuery(getPaimonTable("DIM"));
RemoteTableQuery query = new RemoteTableQuery(paimonTable("DIM"));

sql("INSERT INTO DIM VALUES (1, 11), (2, 22), (3, 33), (4, 44)");
Thread.sleep(2000);
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testLookupRemoteTable() throws Throwable {
}

private ServiceProxy launchQueryServer(String tableName) throws Throwable {
FileStoreTable table = (FileStoreTable) getPaimonTable(tableName);
FileStoreTable table = (FileStoreTable) paimonTable(tableName);
LocalTableQuery query = table.newLocalTableQuery().withIOManager(IOManager.create(path));
KvQueryServer server =
new KvQueryServer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,17 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down Expand Up @@ -139,61 +134,4 @@ private FileStoreTable createFileStoreTable() throws Exception {
options,
new CatalogEnvironment(Lock.emptyFactory(), null, null));
}

private OneInputStreamOperatorTestHarness<InternalRow, Committable> createTestHarness(
OneInputStreamOperator<InternalRow, Committable> operator) throws Exception {
TypeSerializer<Committable> serializer =
new CommittableTypeInfo().createSerializer(new ExecutionConfig());
OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
new OneInputStreamOperatorTestHarness<>(operator);
harness.setup(serializer);
return harness;
}

private OneInputStreamOperatorTestHarness<Tuple2<InternalRow, Integer>, Committable>
createDynamicBucketTestHarness(
OneInputStreamOperator<Tuple2<InternalRow, Integer>, Committable> operator)
throws Exception {
TypeSerializer<Committable> serializer =
new CommittableTypeInfo().createSerializer(new ExecutionConfig());
OneInputStreamOperatorTestHarness<Tuple2<InternalRow, Integer>, Committable> harness =
new OneInputStreamOperatorTestHarness<>(operator);
harness.setup(serializer);
return harness;
}

protected RowDataStoreWriteOperator createWriteOperator(FileStoreTable table) {
return new RowDataStoreWriteOperator(
table,
null,
(t, commitUser, state, ioManager, memoryPool, metricGroup) ->
new StoreSinkWriteImpl(
t,
commitUser,
state,
ioManager,
false,
false,
true,
memoryPool,
metricGroup),
"test");
}

protected DynamicBucketRowWriteOperator createDynamicBucketWriteOperator(FileStoreTable table) {
return new DynamicBucketRowWriteOperator(
table,
(t, commitUser, state, ioManager, memoryPool, metricGroup) ->
new StoreSinkWriteImpl(
t,
commitUser,
state,
ioManager,
false,
false,
true,
memoryPool,
metricGroup),
"test");
}
}

0 comments on commit df25ef2

Please sign in to comment.