Skip to content

Commit

Permalink
Intercept VtDriver splitTable + setting sequence for each split table. (
Browse files Browse the repository at this point in the history
  • Loading branch information
wangweicugw authored Sep 9, 2023
1 parent c9e5d17 commit 0d35e65
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 12 deletions.
23 changes: 15 additions & 8 deletions src/main/java/com/jd/jdbc/planbuilder/InsertPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@
import com.jd.jdbc.sqltypes.VtPlanValue;
import com.jd.jdbc.tindexes.LogicTable;
import com.jd.jdbc.vindexes.VKeyspace;
import static com.jd.jdbc.vindexes.Vschema.TYPE_PINNED_TABLE;
import io.netty.util.internal.StringUtil;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -58,8 +60,6 @@
import lombok.Getter;
import vschema.Vschema;

import static com.jd.jdbc.vindexes.Vschema.TYPE_PINNED_TABLE;

public class InsertPlan {
public static PrimitiveEngine newBuildInsertPlan(MySqlInsertReplaceStatement stmt, VSchemaManager vm, String defaultKeyspace) throws SQLException {
PrimitiveBuilder pb = new PrimitiveBuilder(vm, defaultKeyspace, Jointab.newJointab(SqlParser.getBindVars(stmt)));
Expand Down Expand Up @@ -333,7 +333,7 @@ private static PrimitiveEngine buildPartitionTableInsetPlan(MySqlInsertReplaceSt
List<SQLExpr> duplicateKeyUpdateExprList = stmt.getDuplicateKeyUpdate();
if (duplicateKeyUpdateExprList != null && !duplicateKeyUpdateExprList.isEmpty()) {
if (isTindexChangine(duplicateKeyUpdateExprList, insertEngine.getTable())) {
throw new SQLException("unsupported: DML cannot change tindex column");
throw new SQLFeatureNotSupportedException("unsupported: DML cannot change tindex column");
}
insertEngine.setInsertOpcode(Engine.InsertOpcode.InsertShardedIgnore);
}
Expand All @@ -343,10 +343,10 @@ private static PrimitiveEngine buildPartitionTableInsetPlan(MySqlInsertReplaceSt

List<SQLInsertStatement.ValuesClause> valuesList = stmt.getValuesList();
if (stmt.getSelectQuery().getSubQuery() != null) {
throw new SQLException("unsupported: insert into select");
throw new SQLFeatureNotSupportedException("unsupported: insert into select");
} else if (valuesList != null && !valuesList.isEmpty()) {
if (valuesHasQuery(valuesList)) {
throw new SQLException("unsupported: subquery in insert values");
throw new SQLFeatureNotSupportedException("unsupported: subquery in insert values");
}
} else {
throw new SQLException("BUG: unexpected construct in insert: " + valuesList);
Expand Down Expand Up @@ -392,9 +392,16 @@ private static PrimitiveEngine buildPartitionTableInsetPlan(MySqlInsertReplaceSt
insertEngine.setInsertReplaceStmt(stmt);

MySqlInsertReplaceStatement stmtClone = (MySqlInsertReplaceStatement) stmt.clone();
SwitchTableVisitor visitor = new SwitchTableVisitor(new HashMap<String, String>() {{
put(ltb.getLogicTable(), ltb.getFirstActualTableName());
}});
Map<String, String> map = new HashMap<>();
map.put(ltb.getLogicTable(), ltb.getFirstActualTableName());
SwitchTableVisitor visitor = new SwitchTableVisitor(map);

// check use seq for each table on split table
Vschema.Table table = vm.getTable(defaultKeyspace, ltb.getFirstActualTableName());
if (table != null && table.hasAutoIncrement()) {
throw new SQLFeatureNotSupportedException("Unsupported to use seq for each table on split table");
}

stmtClone.accept(visitor);
InsertEngine innerInsertEigine = (InsertEngine) newBuildInsertPlan(stmtClone, vm, defaultKeyspace);

Expand Down
103 changes: 99 additions & 4 deletions src/test/java/com/jd/jdbc/table/engine/TableSequenceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@

import com.jd.jdbc.table.TableTestUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.ToString;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -56,8 +62,7 @@ public void close() throws SQLException {
if (stmt2 != null) {
stmt2.close();
}
closeConnection(conn);
closeConnection(conn2);
closeConnection(conn, conn2);
TableTestUtil.setDefaultTableConfig();
}

Expand All @@ -75,6 +80,88 @@ public void testSequenceAsTindex() throws SQLException {
testSequence(stmt2);
}

@Ignore
@Test
public void testEachTableSequence() {
List<TestCase> testCaseList = new ArrayList<>();
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key) values (null, '11')", null));
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key) values (null, ?)", 22));
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key) values (101, '11')", null));
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key) values (101, ?)", 22));
testCaseList.add(new TestCase("insert into table_seq_test (f_key, f_tinyint) values ('22' , 1)", null));
testCaseList.add(new TestCase("insert into table_seq_test (f_key, f_tinyint) values ('22' , ?)", 55));

testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key) values (null, '11')", null));
testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key) values (null, ?)", 22));
testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key) values (101, '11')", null));
testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key) values (101, ?)", 22));
testCaseList.add(new TestCase("insert into table_split_seq_test (f_key, f_tinyint) values ('22' , 1)", null));
testCaseList.add(new TestCase("insert into table_split_seq_test (f_key, f_tinyint) values ('22' , ?)", 55));

TableTestUtil.setSplitTableConfig("engine/tableengine/split-table-seq.yml");
for (TestCase testCase : testCaseList) {
try {
if (testCase.var != null) {
PreparedStatement prepareStatement = conn2.prepareStatement(testCase.sql);
prepareStatement.setInt(1, testCase.var);
prepareStatement.execute();
} else {
Statement statement = conn2.createStatement();
statement.execute(testCase.sql);
}
} catch (SQLException e) {
if (e instanceof SQLFeatureNotSupportedException && e.getMessage().equals("Unsupported to use seq for each table on split table")) {
printOk(testCase.toString());
} else {
Assert.fail();
}
}
}
}

@Test
public void testEachTableSequenceAsTindex() {
List<TestCase> testCaseList = new ArrayList<>();
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key) values (null, '11')", null));
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key) values (null, ?)", 22));
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key) values (101, '11')", null));
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key) values (101, ?)", 22));
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key, f_tinyint) values (null, '22', 1)", null));
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key, f_tinyint) values (null, '22', ?)", 55));
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key, f_tinyint) values (101, '22', 1)", null));
testCaseList.add(new TestCase("insert into table_seq_test (id, f_key, f_tinyint) values (101, '22', ?)", 55));

testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key) values (null, '11')", null));
testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key) values (null, ?)", 22));
testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key) values (101, '11')", null));
testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key) values (101, ?)", 22));
testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key, f_tinyint) values (null, '22', 1)", null));
testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key, f_tinyint) values (null, '22', ?)", 55));
testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key, f_tinyint) values (101, '22', 1)", null));
testCaseList.add(new TestCase("insert into table_split_seq_test (id, f_key, f_tinyint) values (101, '22', ?)", 55));

TableTestUtil.setSplitTableConfig("engine/tableengine/split-table-seq-tindex.yml");
for (TestCase testCase : testCaseList) {
try {
if (testCase.var != null) {
PreparedStatement prepareStatement = conn2.prepareStatement(testCase.sql);
prepareStatement.setInt(1, testCase.var);
prepareStatement.execute();
} else {
Statement statement = conn2.createStatement();
statement.execute(testCase.sql);
}
} catch (SQLException e) {
if (e instanceof SQLFeatureNotSupportedException && e.getMessage().equals("Unsupported to use seq for each table on split table")) {
printOk(testCase.toString());
} else {
e.printStackTrace();
Assert.fail();
}
}
}
}

private void testSequence(Statement stmt) throws SQLException {
int num = 120;
stmt.executeUpdate("delete from table_engine_test");
Expand All @@ -94,19 +181,27 @@ private void testSequence(Statement stmt) throws SQLException {
checkByCount(stmt, num, "table_engine_test");
}

public void checkByCountDistinct(Statement stmt, final int expected, String tableName, String column) throws SQLException {
private void checkByCountDistinct(Statement stmt, final int expected, String tableName, String column) throws SQLException {
ResultSet resultSet = stmt.executeQuery("select count(distinct(" + column + ")) from " + tableName);
Assert.assertTrue(resultSet.next());
int actual = resultSet.getInt(1);
Assert.assertEquals(printFail("[FAIL]"), expected, actual);
Assert.assertFalse(resultSet.next());
}

public void checkByCount(Statement stmt, final int expected, String tableName) throws SQLException {
private void checkByCount(Statement stmt, final int expected, String tableName) throws SQLException {
ResultSet resultSet = stmt.executeQuery("select count(*) from " + tableName);
Assert.assertTrue(resultSet.next());
int actual = resultSet.getInt(1);
Assert.assertEquals(printFail("[FAIL]"), expected, actual);
Assert.assertFalse(resultSet.next());
}

@AllArgsConstructor
@ToString
class TestCase {
private String sql;

private Integer var;
}
}
22 changes: 22 additions & 0 deletions src/test/resources/engine/tableengine/split-table-seq-tindex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ schemas:
shardingColumnName: id,
shardingColumnType: INT64,
sequenceColumnName: id }
- { actualTableExprs: 'table_seq_test_${1..2}',
logicTable: table_seq_test,
shardingAlgorithms: ShardTableByMurmur,
shardingColumnName: id,
shardingColumnType: INT64 }
- { actualTableExprs: 'table_split_seq_test_${1..2}',
logicTable: table_split_seq_test,
shardingAlgorithms: ShardTableByMurmur,
shardingColumnName: id,
shardingColumnType: INT64,
sequenceColumnName: id }

- schema: customer
logicTables:
Expand All @@ -16,4 +27,15 @@ schemas:
shardingAlgorithms: ShardTableByMurmur,
shardingColumnName: id,
shardingColumnType: INT64,
sequenceColumnName: id }
- { actualTableExprs: 'table_seq_test_${1..2}',
logicTable: table_seq_test,
shardingAlgorithms: ShardTableByMurmur,
shardingColumnName: id,
shardingColumnType: INT64 }
- { actualTableExprs: 'table_split_seq_test_${1..2}',
logicTable: table_split_seq_test,
shardingAlgorithms: ShardTableByMurmur,
shardingColumnName: id,
shardingColumnType: INT64,
sequenceColumnName: id }
22 changes: 22 additions & 0 deletions src/test/resources/engine/tableengine/split-table-seq.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ schemas:
shardingColumnName: f_key,
shardingColumnType: INT32,
sequenceColumnName: id }
- { actualTableExprs: 'table_seq_test_${1..2}',
logicTable: table_seq_test,
shardingAlgorithms: TableRuleMod,
shardingColumnName: f_key,
shardingColumnType: INT32 }
- { actualTableExprs: 'table_split_seq_test_${1..2}',
logicTable: table_split_seq_test,
shardingAlgorithms: TableRuleMod,
shardingColumnName: f_key,
shardingColumnType: INT32,
sequenceColumnName: id }

- schema: customer
logicTables:
Expand All @@ -16,4 +27,15 @@ schemas:
shardingAlgorithms: TableRuleMod,
shardingColumnName: f_key,
shardingColumnType: INT32,
sequenceColumnName: id }
- { actualTableExprs: 'table_seq_test_${1..2}',
logicTable: table_seq_test,
shardingAlgorithms: TableRuleMod,
shardingColumnName: f_key,
shardingColumnType: INT32 }
- { actualTableExprs: 'table_split_seq_test_${1..2}',
logicTable: table_split_seq_test,
shardingAlgorithms: TableRuleMod,
shardingColumnName: f_key,
shardingColumnType: INT32,
sequenceColumnName: id }

0 comments on commit 0d35e65

Please sign in to comment.