Skip to content

Commit

Permalink
Fix cross-shard Join error
Browse files Browse the repository at this point in the history
  • Loading branch information
wangweicugw committed Feb 27, 2024
1 parent 4bb12db commit 936ba1a
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 86 deletions.
11 changes: 5 additions & 6 deletions src/main/java/com/jd/jdbc/engine/JoinEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.jd.jdbc.engine;

import com.jd.jdbc.IExecute;
import com.jd.jdbc.common.util.CollectionUtils;
import com.jd.jdbc.context.IContext;
import com.jd.jdbc.sqltypes.SqlTypes;
import com.jd.jdbc.sqltypes.VtResultSet;
Expand Down Expand Up @@ -132,12 +133,8 @@ public IExecute.ExecuteMultiShardResponse execute(IContext ctx, Vcursor vcursor,
for (List<VtResultValue> rightRow : rightResult.getRows()) {
resultSet.getRows().add(joinRows(leftRow, rightRow, this.cols));
}

if (this.opcode == Engine.JoinOpcode.LeftJoin && (rightResult.getRows() == null || rightResult.getRows().isEmpty())) {
if (this.opcode == Engine.JoinOpcode.LeftJoin && CollectionUtils.isEmpty(rightResult.getRows())) {
resultSet.getRows().add(joinRows(leftRow, null, this.cols));
resultSet.setRowsAffected(resultSet.getRowsAffected() + 1);
} else {
resultSet.setRowsAffected(resultSet.getRowsAffected() + rightResult.getRows().size());
}
if (vcursor.exceedsMaxMemoryRows(resultSet.getRows().size())) {
throw new SQLException("in-memory row count exceeded allowed limit of " + vcursor.maxMemoryRows());
Expand Down Expand Up @@ -279,7 +276,7 @@ private Query.Field[] joinFields(Query.Field[] leftFields, Query.Field[] rightFi
}

private List<VtResultValue> joinRows(List<VtResultValue> leftRow, List<VtResultValue> rightRow, List<Integer> cols) {
List<VtResultValue> row = new ArrayList<>();
List<VtResultValue> row = new ArrayList<>(cols.size());
for (Integer index : cols) {
if (index < 0) {
row.add(leftRow.get(-index - 1));
Expand All @@ -288,6 +285,8 @@ private List<VtResultValue> joinRows(List<VtResultValue> leftRow, List<VtResultV
// right row can be null on left joins
if (rightRow != null) {
row.add(rightRow.get(index - 1));
} else {
row.add(VtResultValue.NULL);
}
}
return row;
Expand Down
81 changes: 3 additions & 78 deletions src/main/java/com/jd/jdbc/engine/gen4/JoinGen4Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.jd.jdbc.engine.gen4;

import com.jd.jdbc.IExecute;
import com.jd.jdbc.common.util.CollectionUtils;
import com.jd.jdbc.context.IContext;
import com.jd.jdbc.engine.Engine;
import com.jd.jdbc.engine.PrimitiveEngine;
Expand All @@ -27,7 +28,6 @@
import com.jd.jdbc.sqltypes.VtResultSet;
import com.jd.jdbc.sqltypes.VtResultValue;
import com.jd.jdbc.sqltypes.VtRowList;
import com.jd.jdbc.sqltypes.VtStreamResultSet;
import com.jd.jdbc.srvtopo.BindVariable;
import io.vitess.proto.Query;
import java.sql.SQLException;
Expand Down Expand Up @@ -135,8 +135,7 @@ public IExecute.ExecuteMultiShardResponse execute(IContext ctx, Vcursor vcursor,
for (List<VtResultValue> rightRow : rightResult.getRows()) {
resultSet.getRows().add(joinRows(leftRow, rightRow, this.cols));
}

if (this.opcode == Engine.JoinOpcode.LeftJoin && (rightResult.getRows() == null || rightResult.getRows().isEmpty())) {
if (this.opcode == Engine.JoinOpcode.LeftJoin && CollectionUtils.isEmpty(rightResult.getRows())) {
resultSet.getRows().add(joinRows(leftRow, null, this.cols));
}
if (vcursor.exceedsMaxMemoryRows(resultSet.getRows().size())) {
Expand All @@ -147,80 +146,6 @@ public IExecute.ExecuteMultiShardResponse execute(IContext ctx, Vcursor vcursor,
return new IExecute.ExecuteMultiShardResponse(resultSet);
}

@Override
public IExecute.VtStream streamExecute(IContext ctx, Vcursor vcursor, Map<String, BindVariable> bindValue, boolean wantFields) throws SQLException {
IExecute.VtStream leftStream = this.left.streamExecute(ctx, vcursor, bindValue, wantFields);
return new IExecute.VtStream() {
private IExecute.VtStream leftStreamResult = leftStream;

private IExecute.VtStream rightStreamResult;

@Override
public VtRowList fetch(boolean wantFields) throws SQLException {
return this.internalFetch(wantFields);
}

private VtRowList internalFetch(boolean wantFields) throws SQLException {
Map<String, BindVariable> joinVars = new HashMap<>();
VtResultSet resultSet = new VtResultSet();

VtStreamResultSet leftStreamResultSet = new VtStreamResultSet(this.leftStreamResult, wantFields);
while (leftStreamResultSet.hasNext()) {
List<VtResultValue> leftRow = leftStreamResultSet.next();
for (Map.Entry<String, Integer> var : vars.entrySet()) {
joinVars.put(var.getKey(), SqlTypes.valueBindVariable(leftRow.get(var.getValue())));
}
boolean rowSent = false;
this.rightStreamResult = right.streamExecute(ctx, vcursor, combineVars(bindValue, joinVars), wantFields);
VtStreamResultSet rightStreamResultSet = new VtStreamResultSet(rightStreamResult, wantFields);
if (wantFields) {
// This code is currently unreachable because the first result
// will always be just the field info, which will cause the outer
// wantfields code path to be executed. But this may change in the future.
wantFields = false;
resultSet.setFields(joinFields(leftStreamResultSet.getFields(), rightStreamResultSet.getFields(), cols));
}
while (rightStreamResultSet.hasNext()) {
rowSent = true;
List<VtResultValue> rightRow = rightStreamResultSet.next();
resultSet.getRows().add(joinRows(leftRow, rightRow, cols));
}
rightStreamResultSet.close();
if (opcode == Engine.JoinOpcode.LeftJoin && !rowSent) {
resultSet.setRows(new ArrayList<List<VtResultValue>>() {
{
add(new ArrayList<VtResultValue>() {{
addAll(joinRows(leftRow, null, cols));
}});
}
});
}
}
if (wantFields) {
wantFields = false;
for (Map.Entry<String, Integer> var : vars.entrySet()) {
joinVars.put(var.getKey(), BindVariable.NULL_BIND_VARIABLE);
}
VtResultSet rightResultSet = right.getFields(vcursor, null);
resultSet.setFields(joinFields(leftStreamResultSet.getFields(), rightResultSet.getFields(), cols));
}
return resultSet;
}

@Override
public void close() throws SQLException {
if (this.rightStreamResult != null) {
this.rightStreamResult.close();
this.rightStreamResult = null;
}
if (leftStream != null) {
this.leftStreamResult.close();
this.leftStreamResult = null;
}
}
};
}

@Override
public Boolean canResolveShardQuery() {
return Boolean.FALSE;
Expand Down Expand Up @@ -289,7 +214,7 @@ private List<VtResultValue> joinRows(List<VtResultValue> leftRow, List<VtResultV
if (rightRow != null) {
row.add(rightRow.get(index - 1));
} else {
row.add(new VtResultValue(null, Query.Type.NULL_TYPE));
row.add(VtResultValue.NULL);
}
}
return row;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/jd/jdbc/sqltypes/VtResultValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@Getter
@Setter
public class VtResultValue {
public static VtResultValue NULL = new VtResultValue();
public static final VtResultValue NULL = new VtResultValue();

private Object value;

Expand Down
2 changes: 1 addition & 1 deletion src/test/java/com/jd/jdbc/engine/util/TestResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static VtResultSet makeTestResult(Query.Field[] fields, String... rows) {
for (int j = 0; j < row.length; j++) {
VtResultValue item;
if (row[j].equals("null")) {
item = new VtResultValue(null, Query.Type.NULL_TYPE);
item = VtResultValue.NULL;
} else {
Class<?> javaClass = VtType.DataTypeConverter.fromTypeString(fields[j].getType().toString()).getJavaClass();
Object o = convertValue(row[j], javaClass);
Expand Down
36 changes: 36 additions & 0 deletions src/test/java/testsuite/TestSuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,42 @@ public static void closeConnection(Connection conn) {
}
}

public static boolean compareResultSets(ResultSet resultSet1, ResultSet resultSet2) throws SQLException {
if (resultSet1 == null || resultSet2 == null) {
return false;
}
ResultSetMetaData metaData1 = resultSet1.getMetaData();
ResultSetMetaData metaData2 = resultSet2.getMetaData();
int columnCount1 = metaData1.getColumnCount();
int columnCount2 = metaData2.getColumnCount();
if (columnCount1 != columnCount2) {
return false;
}
for (int i = 1; i <= columnCount1; i++) {
String columnName1 = metaData1.getColumnName(i);
String columnName2 = metaData2.getColumnName(i);
int columnType1 = metaData1.getColumnType(i);
int columnType2 = metaData2.getColumnType(i);
if (!columnName1.equals(columnName2) || columnType1 != columnType2) {
return false;
}
}
while (resultSet1.next() && resultSet2.next()) {
for (int i = 1; i <= columnCount1; i++) {
Object value1 = resultSet1.getObject(i);
Object value2 = resultSet2.getObject(i);
if (resultSet1.wasNull() && resultSet2.wasNull()) {
// Both values are null, continue to next column
continue;
}
if (resultSet1.wasNull() || resultSet2.wasNull() || !value1.equals(value2)) {
return false;
}
}
}
return !resultSet1.next() && !resultSet2.next();
}

protected static ExecutorService getThreadPool(int num, int max) {
ExecutorService pool = new ThreadPoolExecutor(num, max,
60, TimeUnit.SECONDS,
Expand Down

0 comments on commit 936ba1a

Please sign in to comment.