diff --git a/src/main/java/com/jd/jdbc/Executor.java b/src/main/java/com/jd/jdbc/Executor.java index ed52caa..7e670c2 100644 --- a/src/main/java/com/jd/jdbc/Executor.java +++ b/src/main/java/com/jd/jdbc/Executor.java @@ -33,6 +33,7 @@ import com.jd.jdbc.queryservice.StreamIterator; import com.jd.jdbc.queryservice.util.RoleUtils; import com.jd.jdbc.session.SafeSession; +import com.jd.jdbc.session.VitessSession; import com.jd.jdbc.sqlparser.Comment; import com.jd.jdbc.sqlparser.SQLUtils; import com.jd.jdbc.sqlparser.SqlParser; @@ -66,6 +67,7 @@ import io.vitess.proto.Query; import io.vitess.proto.Topodata; import io.vitess.proto.Vtgate; +import java.math.BigInteger; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; @@ -404,9 +406,11 @@ private void saveSessionStats(SafeSession safeSession, VtSqlStatementType stmtTy if (e != null) { return; } + + VitessSession sessionNew = safeSession.getVitessConnection().getSessionNew(); Vtgate.Session.Builder sessionBuilder = safeSession.getVitessConnection().getSession().toBuilder().setFoundRows(resultSet.getRowsAffected()); - if (resultSet.getInsertID() > 0) { - sessionBuilder.setLastInsertId(resultSet.getInsertID()); + if (resultSet.getInsertID().compareTo(BigInteger.ZERO) > 0) { + sessionNew.setLastInsertId(resultSet.getInsertID()); } switch (stmtType) { case StmtInsert: @@ -425,6 +429,7 @@ private void saveSessionStats(SafeSession safeSession, VtSqlStatementType stmtTy default: break; } + safeSession.getVitessConnection().setSessionNew(sessionNew); safeSession.getVitessConnection().setSession(sessionBuilder.build()); } diff --git a/src/main/java/com/jd/jdbc/queryservice/NativeQueryService.java b/src/main/java/com/jd/jdbc/queryservice/NativeQueryService.java index 18c9bbf..e56e621 100644 --- a/src/main/java/com/jd/jdbc/queryservice/NativeQueryService.java +++ b/src/main/java/com/jd/jdbc/queryservice/NativeQueryService.java @@ -43,6 +43,7 @@ import io.prometheus.client.Histogram; import io.vitess.proto.Query; import io.vitess.proto.Topodata; +import java.math.BigInteger; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -490,7 +491,7 @@ private VtResultSet toVtResultSet(final boolean isQuery, final Statement stateme ret.setRowsAffected(statement.getUpdateCount()); ResultSet resultSet = statement.getGeneratedKeys(); if (resultSet.next()) { - ret.setInsertID(resultSet.getLong(1)); + ret.setInsertID(resultSet.getObject(1, BigInteger.class)); } } return ret; diff --git a/src/main/java/com/jd/jdbc/session/SafeSession.java b/src/main/java/com/jd/jdbc/session/SafeSession.java index 1a4bf36..77b8651 100644 --- a/src/main/java/com/jd/jdbc/session/SafeSession.java +++ b/src/main/java/com/jd/jdbc/session/SafeSession.java @@ -64,6 +64,7 @@ public static SafeSession newSafeSession(VitessConnection vitessConnection, Vtga } SafeSession safeSession = new SafeSession(); safeSession.setVitessConnection(new VitessConnection(vitessConnection.getResolver(), sessn)); + safeSession.vitessConnection.setSessionNew(vitessConnection.getSessionNew()); return safeSession; } @@ -85,6 +86,7 @@ public static SafeSession newAutoCommitSession(VitessConnection vitessConnection public void resetTx() { this.lock.lock(); try { + VitessSession currentSessionNew = this.vitessConnection.getSessionNew(); Vtgate.Session.Builder resetTxSessionBuilder = this.vitessConnection.getSession().toBuilder(); this.mustRollback = false; this.autocommitState = AutocommitState.NOT_AUTO_COMMITTABLE; @@ -97,6 +99,7 @@ public void resetTx() { resetTxSessionBuilder.clearPostSessions(); } this.vitessConnection.setSession(resetTxSessionBuilder.build()); + this.vitessConnection.setSessionNew(currentSessionNew); } finally { this.lock.unlock(); } @@ -108,6 +111,7 @@ public void resetTx() { public void reset() { this.lock.lock(); try { + VitessSession currentSessionNew = this.vitessConnection.getSessionNew(); Vtgate.Session.Builder resetSessionBuilder = this.vitessConnection.getSession().toBuilder(); this.mustRollback = false; this.autocommitState = AutocommitState.NOT_AUTO_COMMITTABLE; @@ -118,6 +122,7 @@ public void reset() { resetSessionBuilder.clearPreSessions(); resetSessionBuilder.clearPostSessions(); this.vitessConnection.setSession(resetSessionBuilder.build()); + this.vitessConnection.setSessionNew(currentSessionNew); } finally { this.lock.unlock(); } @@ -267,6 +272,7 @@ public void appendOrUpdate(Vtgate.Session.ShardSession shardSession, Vtgate.Tran case NORMAL: shardSessionList = buildSession(shardSession, this.vitessConnection.getSession().getShardSessionsList()); this.vitessConnection.setSession(this.vitessConnection.getSession().toBuilder().clearShardSessions().addAllShardSessions(shardSessionList).build()); + this.vitessConnection.setSessionNew(this.vitessConnection.getSessionNew()); // isSingle is enforced only for normmal commit order operations. if (this.isSingleDb(txMode) && this.vitessConnection.getSession().getShardSessionsCount() > 1) { @@ -277,10 +283,12 @@ public void appendOrUpdate(Vtgate.Session.ShardSession shardSession, Vtgate.Tran case PRE: shardSessionList = buildSession(shardSession, this.vitessConnection.getSession().getPreSessionsList()); this.vitessConnection.setSession(this.vitessConnection.getSession().toBuilder().clearPreSessions().addAllPreSessions(shardSessionList).build()); + this.vitessConnection.setSessionNew(this.vitessConnection.getSessionNew()); break; case POST: shardSessionList = buildSession(shardSession, this.vitessConnection.getSession().getPostSessionsList()); this.vitessConnection.setSession(this.vitessConnection.getSession().toBuilder().clearPostSessions().addAllPostSessions(shardSessionList).build()); + this.vitessConnection.setSessionNew(this.vitessConnection.getSessionNew()); break; default: throw new SQLException("BUG: SafeSession.AppendOrUpdate: unexpected commitOrder"); diff --git a/src/main/java/com/jd/jdbc/session/VitessSession.java b/src/main/java/com/jd/jdbc/session/VitessSession.java new file mode 100644 index 0000000..49cabc5 --- /dev/null +++ b/src/main/java/com/jd/jdbc/session/VitessSession.java @@ -0,0 +1,41 @@ +/* +Copyright 2021 JD Project Authors. Licensed under Apache-2.0. + +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.jd.jdbc.session; + +import java.math.BigInteger; +import lombok.Getter; + +public class VitessSession { + + @Getter + private BigInteger lastInsertId; + + public VitessSession() { + this.lastInsertId = BigInteger.ZERO; + } + + public void setLastInsertId(long setId) { + this.lastInsertId = BigInteger.valueOf(setId); + } + + public void setLastInsertId(BigInteger setId) { + this.lastInsertId = setId; + } + +} diff --git a/src/main/java/com/jd/jdbc/sqltypes/VtResultSet.java b/src/main/java/com/jd/jdbc/sqltypes/VtResultSet.java index 06f7817..c2e735e 100644 --- a/src/main/java/com/jd/jdbc/sqltypes/VtResultSet.java +++ b/src/main/java/com/jd/jdbc/sqltypes/VtResultSet.java @@ -20,6 +20,7 @@ import com.jd.jdbc.vitess.VitessStatement; import io.vitess.proto.Query; +import java.math.BigInteger; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -34,7 +35,7 @@ public class VtResultSet implements VtRowList { private long rowsAffected; - private long insertID; + private BigInteger insertID; private List> rows; @@ -48,9 +49,10 @@ public class VtResultSet implements VtRowList { public VtResultSet() { this.rows = new ArrayList<>(); + this.insertID = BigInteger.valueOf(0); } - public VtResultSet(long insertID, long rowsAffected) { + public VtResultSet(BigInteger insertID, long rowsAffected) { this.insertID = insertID; this.rowsAffected = rowsAffected; } @@ -58,11 +60,13 @@ public VtResultSet(long insertID, long rowsAffected) { public VtResultSet(Query.Field[] fields, List> rows) { this.fields = fields; this.rows = rows; + this.insertID = BigInteger.valueOf(0); } public VtResultSet(long rowsAffected, List> rows) { this.rowsAffected = rowsAffected; this.rows = rows; + this.insertID = BigInteger.valueOf(0); } @Override @@ -107,7 +111,7 @@ private void appendResult(VtResultSet src, boolean ignoreTable) throws SQLExcept } } this.rowsAffected += src.rowsAffected; - if (src.insertID != 0) { + if (!src.insertID.equals(BigInteger.valueOf(0))) { this.insertID = src.insertID; } if (this.rows == null || this.rows.isEmpty()) { @@ -227,4 +231,17 @@ public VtRowList clone() { VtRowList vtRowList = new VtResultSet(this.fields, this.rows); return vtRowList; } + + public BigInteger getInsertID() { + return this.insertID; + } + + public void setInsertID(long setId) { + this.insertID = BigInteger.valueOf(setId); + } + + public void setInsertID(BigInteger setId) { + this.insertID = setId; + } + } diff --git a/src/main/java/com/jd/jdbc/sqltypes/VtRowList.java b/src/main/java/com/jd/jdbc/sqltypes/VtRowList.java index a9323a5..4ac5e67 100644 --- a/src/main/java/com/jd/jdbc/sqltypes/VtRowList.java +++ b/src/main/java/com/jd/jdbc/sqltypes/VtRowList.java @@ -19,6 +19,7 @@ package com.jd.jdbc.sqltypes; import io.vitess.proto.Query; +import java.math.BigInteger; import java.sql.SQLException; import java.util.List; @@ -28,7 +29,7 @@ public interface VtRowList extends VtResultSetMessage { long getRowsAffected(); - long getInsertID(); + BigInteger getInsertID(); boolean isQuery(); diff --git a/src/main/java/com/jd/jdbc/sqltypes/VtStreamResultSet.java b/src/main/java/com/jd/jdbc/sqltypes/VtStreamResultSet.java index 3414e25..9db87a9 100644 --- a/src/main/java/com/jd/jdbc/sqltypes/VtStreamResultSet.java +++ b/src/main/java/com/jd/jdbc/sqltypes/VtStreamResultSet.java @@ -21,6 +21,7 @@ import com.jd.jdbc.IExecute; import com.jd.jdbc.vitess.VitessStatement; import io.vitess.proto.Query; +import java.math.BigInteger; import java.sql.SQLException; import java.util.List; @@ -69,8 +70,8 @@ public long getRowsAffected() { } @Override - public long getInsertID() { - return 0; + public BigInteger getInsertID() { + return BigInteger.valueOf(0); } @Override diff --git a/src/main/java/com/jd/jdbc/vitess/VitessConnection.java b/src/main/java/com/jd/jdbc/vitess/VitessConnection.java index c80bb0b..2493523 100755 --- a/src/main/java/com/jd/jdbc/vitess/VitessConnection.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessConnection.java @@ -29,6 +29,7 @@ import com.jd.jdbc.queryservice.TabletDialer; import com.jd.jdbc.queryservice.util.RoleUtils; import com.jd.jdbc.session.SafeSession; +import com.jd.jdbc.session.VitessSession; import com.jd.jdbc.sqlparser.ast.statement.SQLCommitStatement; import com.jd.jdbc.sqlparser.ast.statement.SQLRollbackStatement; import com.jd.jdbc.sqlparser.support.logging.Log; @@ -101,6 +102,10 @@ public class VitessConnection extends AbstractVitessConnection { @Setter private Vtgate.Session session; + @Getter + @Setter + private VitessSession sessionNew; + @Getter private String defaultKeyspace; @@ -127,6 +132,7 @@ public VitessConnection(String url, Properties prop, TopoServer topoServer, Reso .setAutocommit(true) .build(); buildServerSessionPropertiesMap(); + this.sessionNew = new VitessSession(); if (log.isDebugEnabled()) { log.debug("create VitessConnection"); } diff --git a/src/main/java/com/jd/jdbc/vitess/VitessPreparedStatement.java b/src/main/java/com/jd/jdbc/vitess/VitessPreparedStatement.java index 6cc9c44..eea8556 100755 --- a/src/main/java/com/jd/jdbc/vitess/VitessPreparedStatement.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessPreparedStatement.java @@ -36,6 +36,7 @@ import java.io.InputStream; import java.io.Reader; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Blob; import java.sql.Clob; import java.sql.Date; @@ -314,6 +315,10 @@ public void setObject(int parameterIndex, Object x) throws SQLException { setInt(parameterIndex, (Integer) x); return; } + if (x instanceof BigInteger) { + setBigDecimal(parameterIndex, new BigDecimal((BigInteger) x)); + return; + } if (x instanceof Long) { setLong(parameterIndex, (Long) x); return; diff --git a/src/main/java/com/jd/jdbc/vitess/VitessStatement.java b/src/main/java/com/jd/jdbc/vitess/VitessStatement.java index f5c320a..9e35705 100755 --- a/src/main/java/com/jd/jdbc/vitess/VitessStatement.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessStatement.java @@ -122,7 +122,7 @@ public class VitessStatement extends AbstractVitessStatement { protected boolean retrieveGeneratedKeys = false; - protected long lastInsertId = -1; + protected BigInteger lastInsertId = BigInteger.valueOf(-1); protected List batchedGeneratedKeys = null; @@ -309,12 +309,12 @@ protected ResultSet executeQueryInternal(IContext ctx, String sql, Map sqls, L return new VitessResultSet(resultSets.get(0), this.connection); } catch (SQLException e) { cleanResultSets(); - this.lastInsertId = 0; + this.lastInsertId = BigInteger.ZERO; throw e; } } @@ -364,12 +364,12 @@ protected int executeMultiQueryUpdateInternal(IContext ctx, List sqls, L this.lastInsertId = resultSets.get(0).getInsertID(); return (int) resultSets.get(0).getRowsAffected(); } else { - this.lastInsertId = 0; + this.lastInsertId = BigInteger.ZERO; return 0; } } catch (SQLException e) { cleanResultSets(); - this.lastInsertId = 0; + this.lastInsertId = BigInteger.ZERO; throw e; } } @@ -390,12 +390,12 @@ protected ResultSet executeStreamQueryInternal(IContext ctx, String sql, Map 1) { throw new SQLException("does not support insert multiple rows in one sql statement"); } - long generatedKey = lastInsertId; + List> rows = new ArrayList<>(); VtResultSet vtStaticResultSet = new VtResultSet(getGeneratedKeyField(), rows); if (!this.resultSets.isEmpty()) { - if (generatedKey < 0) { - throw new SQLException("generatedKey error"); - } - if (generatedKey != 0 && (numKeys > 0)) { - List row = Collections.singletonList(VtResultValue.newVtResultValue(Query.Type.UINT64, BigInteger.valueOf(generatedKey))); + if ((lastInsertId.compareTo(BigInteger.ZERO) != 0) && (numKeys > 0)) { + List row = new ArrayList<>(); + row.add(VtResultValue.newVtResultValue(Query.Type.INT64, lastInsertId)); rows.add(row); } } @@ -1073,7 +1071,7 @@ private Boolean processByExpr(SQLSelectItem selectItem) throws SQLException { String alias = selectItem.getAlias(); String methodName = ""; - long lastInsertId = -1L; + BigInteger lastInsertId = BigInteger.valueOf(-1); if (expr instanceof SQLMethodInvokeExpr) { methodName = ((SQLMethodInvokeExpr) expr).getMethodName(); if (SQLUtils.nameEquals(methodName, FUNCATION_LAST_INSERT_ID)) { @@ -1082,17 +1080,17 @@ private Boolean processByExpr(SQLSelectItem selectItem) throws SQLException { return Boolean.FALSE; } methodName += "()"; - lastInsertId = this.connection.getSession().getLastInsertId(); + lastInsertId = this.connection.getSessionNew().getLastInsertId(); } } else if (expr instanceof SQLVariantRefExpr) { String name = ((SQLVariantRefExpr) expr).getName(); if (SQLUtils.nameEquals(name, FUNCATION_IDENTITY)) { methodName = name; - lastInsertId = this.connection.getSession().getLastInsertId(); + lastInsertId = this.connection.getSessionNew().getLastInsertId(); } } - if (!StringUtil.isNullOrEmpty(methodName) && lastInsertId != -1L) { + if (!StringUtil.isNullOrEmpty(methodName) && !BigInteger.valueOf(-1).equals(lastInsertId)) { List> rows = new ArrayList<>(); List valueList = new ArrayList<>(); valueList.add(VtResultValue.newVtResultValue(Query.Type.UINT64, lastInsertId)); diff --git a/src/test/java/com/jd/jdbc/table/LastInsertIdTest.java b/src/test/java/com/jd/jdbc/table/LastInsertIdTest.java index bff2fd3..a99a550 100644 --- a/src/test/java/com/jd/jdbc/table/LastInsertIdTest.java +++ b/src/test/java/com/jd/jdbc/table/LastInsertIdTest.java @@ -16,12 +16,14 @@ package com.jd.jdbc.table; +import java.math.BigInteger; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import testsuite.TestSuite; import testsuite.internal.TestSuiteShardSpec; @@ -65,7 +67,7 @@ public void splitTableMutiValueTest() throws SQLException { } } - private void lastInsertId(final Statement stmt, final int expectedValue, final int expectedEffectRows, final boolean isSingleValue) throws SQLException { + private void lastInsertId(final Statement stmt, final Object expectedValue, final int expectedEffectRows, final boolean isSingleValue) throws SQLException { Assert.assertEquals(expectedEffectRows, effectRows); if (isSingleValue) { // last insert id @@ -82,29 +84,39 @@ private void lastInsertId(final Statement stmt, final int expectedValue, final i } } - private void testByQuery(final Statement stmt, final String query, final String columnLabel, final int expectedValue) throws SQLException { + private void testByQuery(final Statement stmt, final String query, final String columnLabel, final Object expectedValue) throws SQLException { ResultSet rs = stmt.executeQuery(query); rs.next(); - long lastInsertId1 = rs.getLong(columnLabel); - Assert.assertEquals(expectedValue, lastInsertId1); + Assert.assertEquals(expectedValue, rs.getObject(columnLabel, expectedValue.getClass())); rs.close(); rs = stmt.executeQuery(query); rs.next(); - long lastInsertId2 = rs.getLong(columnLabel); - Assert.assertEquals(expectedValue, lastInsertId2); + Assert.assertEquals(expectedValue, rs.getObject(columnLabel, expectedValue.getClass())); rs.close(); rs = stmt.executeQuery(query); rs.next(); - long lastInsertId3 = rs.getLong(columnLabel); - Assert.assertEquals(expectedValue, lastInsertId3); + Assert.assertEquals(expectedValue, rs.getObject(columnLabel, expectedValue.getClass())); rs.close(); rs = stmt.executeQuery(query); rs.next(); - long lastInsertId4 = rs.getLong(columnLabel); - Assert.assertEquals(expectedValue, lastInsertId4); + Assert.assertEquals(expectedValue, rs.getObject(columnLabel, expectedValue.getClass())); rs.close(); } + + @Test + @Ignore + public void testResultIdBigInteger() throws SQLException { + String insertSql = "insert into `type_test` (`id`, `f_key`, `f_decimal`) values('17000098931012360000', 'x', 2)"; + String initSql = "delete from type_test"; + BigInteger expectedValue = new BigInteger("17000098931012360000"); + + try (Statement stmt = driverConnection.createStatement()) { + stmt.executeUpdate(initSql); + effectRows = stmt.executeUpdate(insertSql); + lastInsertId(stmt, expectedValue, 1, true); + } + } }