diff --git a/src/main/java/com/jd/jdbc/engine/JoinEngine.java b/src/main/java/com/jd/jdbc/engine/JoinEngine.java index f65a7fc..42c8e87 100644 --- a/src/main/java/com/jd/jdbc/engine/JoinEngine.java +++ b/src/main/java/com/jd/jdbc/engine/JoinEngine.java @@ -19,6 +19,7 @@ package com.jd.jdbc.engine; import com.jd.jdbc.IExecute; +import com.jd.jdbc.common.util.CollectionUtils; import com.jd.jdbc.context.IContext; import com.jd.jdbc.sqltypes.SqlTypes; import com.jd.jdbc.sqltypes.VtResultSet; @@ -132,12 +133,8 @@ public IExecute.ExecuteMultiShardResponse execute(IContext ctx, Vcursor vcursor, for (List rightRow : rightResult.getRows()) { resultSet.getRows().add(joinRows(leftRow, rightRow, this.cols)); } - - if (this.opcode == Engine.JoinOpcode.LeftJoin && (rightResult.getRows() == null || rightResult.getRows().isEmpty())) { + if (this.opcode == Engine.JoinOpcode.LeftJoin && CollectionUtils.isEmpty(rightResult.getRows())) { resultSet.getRows().add(joinRows(leftRow, null, this.cols)); - resultSet.setRowsAffected(resultSet.getRowsAffected() + 1); - } else { - resultSet.setRowsAffected(resultSet.getRowsAffected() + rightResult.getRows().size()); } if (vcursor.exceedsMaxMemoryRows(resultSet.getRows().size())) { throw new SQLException("in-memory row count exceeded allowed limit of " + vcursor.maxMemoryRows()); @@ -279,7 +276,7 @@ private Query.Field[] joinFields(Query.Field[] leftFields, Query.Field[] rightFi } private List joinRows(List leftRow, List rightRow, List cols) { - List row = new ArrayList<>(); + List row = new ArrayList<>(cols.size()); for (Integer index : cols) { if (index < 0) { row.add(leftRow.get(-index - 1)); @@ -288,6 +285,8 @@ private List joinRows(List leftRow, List rightRow : rightResult.getRows()) { resultSet.getRows().add(joinRows(leftRow, rightRow, this.cols)); } - - if (this.opcode == Engine.JoinOpcode.LeftJoin && (rightResult.getRows() == null || rightResult.getRows().isEmpty())) { + if (this.opcode == Engine.JoinOpcode.LeftJoin && CollectionUtils.isEmpty(rightResult.getRows())) { resultSet.getRows().add(joinRows(leftRow, null, this.cols)); } if (vcursor.exceedsMaxMemoryRows(resultSet.getRows().size())) { @@ -147,80 +146,6 @@ public IExecute.ExecuteMultiShardResponse execute(IContext ctx, Vcursor vcursor, return new IExecute.ExecuteMultiShardResponse(resultSet); } - @Override - public IExecute.VtStream streamExecute(IContext ctx, Vcursor vcursor, Map bindValue, boolean wantFields) throws SQLException { - IExecute.VtStream leftStream = this.left.streamExecute(ctx, vcursor, bindValue, wantFields); - return new IExecute.VtStream() { - private IExecute.VtStream leftStreamResult = leftStream; - - private IExecute.VtStream rightStreamResult; - - @Override - public VtRowList fetch(boolean wantFields) throws SQLException { - return this.internalFetch(wantFields); - } - - private VtRowList internalFetch(boolean wantFields) throws SQLException { - Map joinVars = new HashMap<>(); - VtResultSet resultSet = new VtResultSet(); - - VtStreamResultSet leftStreamResultSet = new VtStreamResultSet(this.leftStreamResult, wantFields); - while (leftStreamResultSet.hasNext()) { - List leftRow = leftStreamResultSet.next(); - for (Map.Entry var : vars.entrySet()) { - joinVars.put(var.getKey(), SqlTypes.valueBindVariable(leftRow.get(var.getValue()))); - } - boolean rowSent = false; - this.rightStreamResult = right.streamExecute(ctx, vcursor, combineVars(bindValue, joinVars), wantFields); - VtStreamResultSet rightStreamResultSet = new VtStreamResultSet(rightStreamResult, wantFields); - if (wantFields) { - // This code is currently unreachable because the first result - // will always be just the field info, which will cause the outer - // wantfields code path to be executed. But this may change in the future. - wantFields = false; - resultSet.setFields(joinFields(leftStreamResultSet.getFields(), rightStreamResultSet.getFields(), cols)); - } - while (rightStreamResultSet.hasNext()) { - rowSent = true; - List rightRow = rightStreamResultSet.next(); - resultSet.getRows().add(joinRows(leftRow, rightRow, cols)); - } - rightStreamResultSet.close(); - if (opcode == Engine.JoinOpcode.LeftJoin && !rowSent) { - resultSet.setRows(new ArrayList>() { - { - add(new ArrayList() {{ - addAll(joinRows(leftRow, null, cols)); - }}); - } - }); - } - } - if (wantFields) { - wantFields = false; - for (Map.Entry var : vars.entrySet()) { - joinVars.put(var.getKey(), BindVariable.NULL_BIND_VARIABLE); - } - VtResultSet rightResultSet = right.getFields(vcursor, null); - resultSet.setFields(joinFields(leftStreamResultSet.getFields(), rightResultSet.getFields(), cols)); - } - return resultSet; - } - - @Override - public void close() throws SQLException { - if (this.rightStreamResult != null) { - this.rightStreamResult.close(); - this.rightStreamResult = null; - } - if (leftStream != null) { - this.leftStreamResult.close(); - this.leftStreamResult = null; - } - } - }; - } - @Override public Boolean canResolveShardQuery() { return Boolean.FALSE; @@ -289,7 +214,7 @@ private List joinRows(List leftRow, List javaClass = VtType.DataTypeConverter.fromTypeString(fields[j].getType().toString()).getJavaClass(); Object o = convertValue(row[j], javaClass); diff --git a/src/test/java/testsuite/TestSuite.java b/src/test/java/testsuite/TestSuite.java index f4e2d29..be19f75 100644 --- a/src/test/java/testsuite/TestSuite.java +++ b/src/test/java/testsuite/TestSuite.java @@ -80,6 +80,42 @@ public static void closeConnection(Connection conn) { } } + public static boolean compareResultSets(ResultSet resultSet1, ResultSet resultSet2) throws SQLException { + if (resultSet1 == null || resultSet2 == null) { + return false; + } + ResultSetMetaData metaData1 = resultSet1.getMetaData(); + ResultSetMetaData metaData2 = resultSet2.getMetaData(); + int columnCount1 = metaData1.getColumnCount(); + int columnCount2 = metaData2.getColumnCount(); + if (columnCount1 != columnCount2) { + return false; + } + for (int i = 1; i <= columnCount1; i++) { + String columnName1 = metaData1.getColumnName(i); + String columnName2 = metaData2.getColumnName(i); + int columnType1 = metaData1.getColumnType(i); + int columnType2 = metaData2.getColumnType(i); + if (!columnName1.equals(columnName2) || columnType1 != columnType2) { + return false; + } + } + while (resultSet1.next() && resultSet2.next()) { + for (int i = 1; i <= columnCount1; i++) { + Object value1 = resultSet1.getObject(i); + Object value2 = resultSet2.getObject(i); + if (resultSet1.wasNull() && resultSet2.wasNull()) { + // Both values are null, continue to next column + continue; + } + if (resultSet1.wasNull() || resultSet2.wasNull() || !value1.equals(value2)) { + return false; + } + } + } + return !resultSet1.next() && !resultSet2.next(); + } + protected static ExecutorService getThreadPool(int num, int max) { ExecutorService pool = new ThreadPoolExecutor(num, max, 60, TimeUnit.SECONDS,