Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the error of the globally unique ID returned by the split table. #113

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/main/java/com/jd/jdbc/engine/table/TableInsertEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ public class TableInsertEngine implements PrimitiveEngine, TableShardQuery {
*/
private Boolean multiShardAutocommit;

private long insertId;

private InsertEngine insertEngine;

public TableInsertEngine(final Engine.InsertOpcode insertOpcode, final VKeyspace keyspace, final LogicTable table) {
Expand Down Expand Up @@ -138,14 +136,17 @@ public IExecute.ExecuteMultiShardResponse execute(final IContext ctx, final Vcur
if (RoleUtils.notMaster(ctx)) {
throw new SQLException("insert is not allowed for read only connection");
}
long insertId;
PrimitiveEngine primitiveEngine;
switch (this.insertOpcode) {
case InsertByDestination:
case InsertUnsharded:
insertId = Generate.processGenerate(vcursor, generate, bindVariableMap);
primitiveEngine = getInsertUnshardedEngine(ctx, vcursor, bindVariableMap);
break;
case InsertSharded:
case InsertShardedIgnore:
insertId = Generate.processGenerate(vcursor, generate, bindVariableMap);
primitiveEngine = getInsertShardedEngine(ctx, vcursor, bindVariableMap);
break;
default:
Expand All @@ -163,7 +164,6 @@ public IExecute.ExecuteMultiShardResponse execute(final IContext ctx, final Vcur
}

private PrimitiveEngine getInsertUnshardedEngine(final IContext ctx, final Vcursor vcursor, final Map<String, BindVariable> bindVariableMap) throws SQLException {
insertId = Generate.processGenerate(vcursor, generate, bindVariableMap);
List<ActualTable> actualTables = new ArrayList<>();
List<List<Query.Value>> indexesPerTable = new ArrayList<>();
buildActualTables(bindVariableMap, actualTables, indexesPerTable);
Expand All @@ -173,7 +173,6 @@ private PrimitiveEngine getInsertUnshardedEngine(final IContext ctx, final Vcurs
}

private PrimitiveEngine getInsertShardedEngine(final IContext ctx, final Vcursor vcursor, final Map<String, BindVariable> bindVariableMap) throws SQLException {
insertId = Generate.processGenerate(vcursor, generate, bindVariableMap);
List<ActualTable> actualTables = new ArrayList<>();
List<List<Query.Value>> indexesPerTable = new ArrayList<>();
buildActualTables(bindVariableMap, actualTables, indexesPerTable);
Expand Down
91 changes: 91 additions & 0 deletions src/test/java/com/jd/jdbc/table/engine/InsertTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.jd.jdbc.table.engine;

import com.jd.jdbc.table.TableTestUtil;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
Expand All @@ -30,6 +32,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand All @@ -43,6 +47,7 @@
import testsuite.internal.testcase.TestSuiteCase;

public class InsertTest extends TestSuite {

protected Connection conn;

protected List<InsertTest.TestCase> testCaseList;
Expand All @@ -56,6 +61,14 @@ protected String getUrl() {
return getConnectionUrl(Driver.of(TestSuiteShardSpec.TWO_SHARDS)) + "&useAffectedRows=false";
}

protected String getUser() {
return getUser(Driver.of(TestSuiteShardSpec.TWO_SHARDS));
}

protected String getPassword() {
return getPassword(Driver.of(TestSuiteShardSpec.TWO_SHARDS));
}

private void initCase() throws IOException, SQLException {
testCaseList = initCase("src/test/resources/engine/tableengine/insert_case.json", TestCase.class, conn.getCatalog());
testCaseList.addAll(initCase("src/test/resources/engine/tableengine/insert_case_upperCase.json", TestCase.class, conn.getCatalog()));
Expand Down Expand Up @@ -102,6 +115,84 @@ public void testSameKeySequence() throws Exception {
insert();
}

@Test
@Ignore
public void testConcurrentSequence() throws Exception {
TableTestUtil.setSplitTableConfig("engine/tableengine/split-table-seq.yml");
ExecutorService executorService = getThreadPool(10, 10);

try (Statement stmt = conn.createStatement()) {
stmt.execute("delete from table_engine_test");
}

HikariConfig config = new HikariConfig();
config.setDriverClassName("com.jd.jdbc.vitess.VitessDriver");
config.setJdbcUrl(getUrl());
config.setMinimumIdle(10);
config.setMaximumPoolSize(10);
config.setUsername(getUser());
config.setPassword(getPassword());

HikariDataSource hikariDataSource = new HikariDataSource(config);

AtomicBoolean atomicBoolean = new AtomicBoolean(true);
for (int i = 0; i < 20; i++) {
int finalI = i;
executorService.execute(() -> {
try (Connection connection = hikariDataSource.getConnection()) {
// insert split table sequence return generatedKey
PreparedStatement prepareStatement = connection.prepareStatement("insert into table_engine_test(f_key) values (?)", Statement.RETURN_GENERATED_KEYS);
prepareStatement.setInt(1, finalI);
Assert.assertFalse(prepareStatement.execute());
Assert.assertEquals(1, prepareStatement.getUpdateCount());

// check getGeneratedKeys
ResultSet generatedKeys = prepareStatement.getGeneratedKeys();
Assert.assertTrue(generatedKeys.next());
long id = generatedKeys.getLong(1);
Assert.assertFalse(generatedKeys.next());

// check last_insert_id
final ResultSet lastInsertIdResultSet = connection.createStatement().executeQuery("select last_insert_id()");
Assert.assertTrue(lastInsertIdResultSet.next());
Assert.assertEquals(id, lastInsertIdResultSet.getLong(1));
Assert.assertFalse(lastInsertIdResultSet.next());

// check last_insert_id
final ResultSet identityResultSet = connection.createStatement().executeQuery("select @@identity");
Assert.assertTrue(identityResultSet.next());
Assert.assertEquals(id, identityResultSet.getLong(1));
Assert.assertFalse(identityResultSet.next());

// select by id
PreparedStatement selectPrepareStatement = connection.prepareStatement("select f_key,id from table_engine_test where f_key = ? and id = ?");
selectPrepareStatement.setInt(1, finalI);
selectPrepareStatement.setLong(2, id);
Assert.assertTrue(selectPrepareStatement.execute());
ResultSet resultSet = selectPrepareStatement.getResultSet();
Assert.assertTrue(resultSet.next());
Assert.assertEquals(finalI, resultSet.getInt(1));
Assert.assertEquals(id, resultSet.getLong(2));
} catch (Exception e) {
e.printStackTrace();
atomicBoolean.set(false);
}
}
);
}
final long start = System.currentTimeMillis();
while (true) {
if (!atomicBoolean.get()) {
Assert.fail();
break;
}
if (System.currentTimeMillis() - start > 15 * 1000) {
break;
}
}
executorService.shutdownNow();
}

protected void insert() throws SQLException, NoSuchFieldException, IllegalAccessException {
insert(false, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ protected String getUrl() {
return getConnectionUrl(Driver.of(TestSuiteShardSpec.NO_SHARDS)) + "&useAffectedRows=false";
}

@Override
protected String getUser() {
return getUser(Driver.of(TestSuiteShardSpec.NO_SHARDS));
}

@Override
protected String getPassword() {
return getPassword(Driver.of(TestSuiteShardSpec.NO_SHARDS));
}

@Override
protected void insert() throws SQLException, NoSuchFieldException, IllegalAccessException {
insert(false, false);
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/testsuite/TestSuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static void closeConnection(Connection conn) {

protected static ExecutorService getThreadPool(int num, int max) {
ExecutorService pool = new ThreadPoolExecutor(num, max,
0L, TimeUnit.MILLISECONDS,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
Expand Down
Loading