Skip to content

Commit bb9bd13

Browse files
committed
修复limit cancel
1 parent 17ba2cc commit bb9bd13

File tree

31 files changed

+183
-307
lines changed

31 files changed

+183
-307
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@
159159
<dependency>
160160
<groupId>mysql</groupId>
161161
<artifactId>mysql-connector-java</artifactId>
162-
<version>5.1.18</version>
162+
<version>5.1.27</version>
163163
</dependency>
164164
<dependency>
165165
<groupId>com.sleepycat</groupId>

tddl-atom/src/main/java/com/taobao/tddl/atom/config/TAtomConfParser.java

+12
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import java.util.Map;
99
import java.util.Properties;
1010

11+
import org.apache.commons.lang.BooleanUtils;
12+
1113
import com.taobao.tddl.atom.securety.impl.PasswordCoder;
1214
import com.taobao.tddl.atom.utils.ConnRestrictEntry;
1315
import com.taobao.tddl.common.utils.TStringUtil;
@@ -31,6 +33,7 @@ public class TAtomConfParser {
3133
public static final String GLOBA_DB_STATUS_KEY = "dbStatus";
3234
public static final String APP_USER_NAME_KEY = "userName";
3335
public static final String APP_INIT_POOL_SIZE_KEY = "initPoolSize";
36+
public static final String APP_PREFILL = "prefill";
3437
public static final String APP_MIN_POOL_SIZE_KEY = "minPoolSize";
3538
public static final String APP_MAX_POOL_SIZE_KEY = "maxPoolSize";
3639
public static final String APP_IDLE_TIMEOUT_KEY = "idleTimeout";
@@ -152,6 +155,15 @@ public static TAtomDsConfDO parserTAtomDsConfDO(String globaConfStr, String appC
152155
if (!TStringUtil.isBlank(driverClass)) {
153156
pasObj.setDriverClass(driverClass);
154157
}
158+
159+
if (connectionProperties.containsKey(APP_PREFILL)) {
160+
// add by agapple, 简单处理支持下初始化链接
161+
String prefill = connectionProperties.get(APP_PREFILL);
162+
if (BooleanUtils.toBoolean(prefill)
163+
&& pasObj.getInitPoolSize() == TAtomDsConfDO.defaultInitPoolSize) {
164+
pasObj.setInitPoolSize(pasObj.getMinPoolSize());
165+
}
166+
}
155167
}
156168

157169
// 解析应用连接限制, 参看下面的文档

tddl-atom/src/main/java/com/taobao/tddl/atom/config/TAtomDsConfHandle.java

+1
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,7 @@ public void onDataRecieved(String dataId, String data) {
395395
newConf.setUserName(tmpConf.getUserName());
396396
newConf.setMinPoolSize(tmpConf.getMinPoolSize());
397397
newConf.setMaxPoolSize(tmpConf.getMaxPoolSize());
398+
newConf.setInitPoolSize(tmpConf.getInitPoolSize());
398399
newConf.setIdleTimeout(tmpConf.getIdleTimeout());
399400
newConf.setBlockingTimeout(tmpConf.getBlockingTimeout());
400401
newConf.setPreparedStatementCacheSize(tmpConf.getPreparedStatementCacheSize());

tddl-common/src/main/java/com/taobao/tddl/common/model/ExtraCmd.java

+89-115
Original file line numberDiff line numberDiff line change
@@ -7,120 +7,94 @@
77
*/
88
public class ExtraCmd {
99

10-
public static class OptimizerExtraCmd {
11-
12-
/**
13-
* 为true时,选取索引,
14-
*/
15-
public final static String ChooseIndex = "CHOOSE_INDEX";
16-
17-
/**
18-
* 为true时,最优join策略选择
19-
*/
20-
public final static String ChooseJoin = "CHOOSE_JOIN";
21-
22-
/**
23-
* 为true时,会将or条件转化为index merge
24-
*/
25-
public final static String ChooseIndexMerge = "CHOOSE_INDEX_MERGE";
26-
27-
/**
28-
* 智能优化join merge join
29-
*/
30-
public final static String JoinMergeJoinJudgeByRule = "JOIN_MERGE_JOIN_JUDGE_BY_RULE";
31-
32-
/**
33-
* 强制优化成join merge join
34-
*/
35-
public final static String JoinMergeJoin = "JOIN_MERGE_JOIN";
36-
37-
/**
38-
* 为true时,Merge Join Merge将会展开
39-
*/
40-
public final static String MergeExpand = "MERGE_EXPAND";
41-
42-
/**
43-
* 为true时,Merge将并行执行
44-
*/
45-
public final static String MergeConcurrent = "MERGE_CONCURRENT";
46-
}
47-
48-
public static class ExecutionExtraCmd {
49-
50-
/**
51-
* 似乎是没用了
52-
*/
53-
public static final String INDEX_NAME = "INDEX_NAME";
54-
/**
55-
* 用于记录这次请求从一个dbd 的 dbGroup里面选择了哪一个bdb进行查询。
56-
*/
57-
public static final String GROUP_INDEX = "GROUP_INDEX";
58-
/**
59-
* 用于决定当前bdb 数据查询是否是一个强一致查询
60-
*/
61-
public static final String GROUP_CONSISTENT = "GROUP_CONSISTENT";
62-
/**
63-
* 用于记录这次请求所走的具体是哪一个dbGroup.
64-
*/
65-
public static final String MATRIX_KEY = "MATRIX_KEY";
66-
67-
/**
68-
* 如果这个值为true,则允许使用临时表。 而如果为空。或者为false,则不允许使用临时表。
69-
* 从性能和实际需求来说,默认值应该为false.也就是不允许使用临时表。
70-
*/
71-
public static final String ALLOW_TEMPORARY_TABLE = "ALLOW_TEMPORARY_TABLE";
72-
73-
/**
74-
* 查询是否是流模式
75-
*/
76-
public static final String STREAM_MODE = "STREAM_MODE";
77-
public static final String EXECUTE_QUERY_WHEN_CREATED = "EXECUTE_QUERY_WHEN_CREATED";
78-
79-
/**
80-
* 是否允许某些查询使用BIO,默认为true
81-
*/
82-
public static final String ALLOW_BIO = "ALLOW_BIO";
83-
84-
public static final String HBASE_MAPPING_FILE = "HBASE_MAPPING_FILE";
85-
public static final String FETCH_SIZE = "FETCH_SIZE";
86-
}
87-
88-
public static class ConnectionExtraCmd {
89-
90-
/**
91-
* 标记是否关闭Join Order优化
92-
*/
93-
public static final String OPTIMIZE_JOIN_ORDER = "OPTIMIZE_JOIN_ORDER";
94-
public static final String INIT_TEMP_TABLE = "USE_TEMP_TABLE";
95-
public static final String INIT_TDDL_DATASOURCE = "INIT_TDDL_DATASOURCE";
96-
97-
/**
98-
* 表的meta超时时间,单位毫秒
99-
*/
100-
public static final String TABLE_META_CACHE_EXPIRE_TIME = "TABLE_META_CACHE_EXPIRE_TIME";
101-
102-
/**
103-
* 优化器和parser结果超时时间,单位毫秒
104-
*/
105-
public static final String OPTIMIZER_CACHE_EXPIRE_TIME = "OPTIMIZER_CACHE_EXPIRE_TIME";
106-
107-
public static final String CONFIG_UPDATE_INVALID_MINUTE = "CONFIG_UPDATE_INVALID_MINUTE";
108-
public static final String USE_TDHS_FOR_DEFAULT = "USE_TDHS_FOR_DEFAULT";
109-
public static final String USE_BOTH_LOCALSCHEMA_AND_DYNAMICSCHEMA = "USE_BOTH_LOCALSCHEMA_AND_DYNAMICSCHEMA";
110-
public static final String RULE = "RULE";
111-
public static final String CONFIG_DATA_HANDLER_FACTORY = "CONFIG_DATA_HANDLER_FACTORY";
112-
public static final String PARSER_CACHE = "PARSER_CACHE";
113-
public static final String OPTIMIZER_CACHE = "OPTIMIZER_CACHE";
114-
115-
/**
116-
* 为每个连接都初始化一个线程池,用来做并行查询,默认为true
117-
*/
118-
public static final String INIT_CONCURRENT_POOL_EVERY_CONNECTION = "INIT_CONCURRENT_POOL_EVERY_CONNECTION";
119-
120-
/**
121-
* 并行查询线程池大小
122-
*/
123-
public static final String CONCURRENT_THREAD_SIZE = "CONCURRENT_THREAD_SIZE";
124-
}
10+
public static final String OPTIMIZER_CACHE = "OPTIMIZER_CACHE";
11+
12+
/**
13+
* 是否选取索引,默认为true
14+
*/
15+
public final static String CHOOSE_INDEX = "CHOOSE_INDEX";
16+
17+
/**
18+
* 是否选择最优join策略, 默认为false
19+
*/
20+
public final static String CHOOSE_JOIN = "CHOOSE_JOIN";
21+
22+
/**
23+
* 是否将or条件转化为index merge,默认为false
24+
*/
25+
public final static String CHOOSE_INDEX_MERGE = "CHOOSE_INDEX_MERGE";
26+
27+
/**
28+
* 智能优化join merge join,默认为true
29+
*/
30+
public final static String JOIN_MERGE_JOIN_JUDGE_BY_RULE = "JOIN_MERGE_JOIN_JUDGE_BY_RULE";
31+
32+
/**
33+
* 是否强制优化成join merge join,默认为false
34+
*/
35+
public final static String JOIN_MERGE_JOIN = "JOIN_MERGE_JOIN";
36+
37+
/**
38+
* 是否展开Merge Join Merge,默认为false
39+
*/
40+
public final static String MERGE_EXPAND = "MERGE_EXPAND";
41+
42+
/**
43+
* 是否设置Merge并行执行,默认为true
44+
*/
45+
public final static String MERGE_CONCURRENT = "MERGE_CONCURRENT";
46+
47+
/**
48+
* 表的meta超时时间,单位毫秒,默认5分钟
49+
*/
50+
public static final String TABLE_META_CACHE_EXPIRE_TIME = "TABLE_META_CACHE_EXPIRE_TIME";
51+
52+
/**
53+
* 优化器和parser结果超时时间,单位毫秒,默认5分钟
54+
*/
55+
public static final String OPTIMIZER_CACHE_EXPIRE_TIME = "OPTIMIZER_CACHE_EXPIRE_TIME";
56+
57+
/**
58+
* 如果这个值为true,则允许使用临时表。 而如果为空。或者为false,则不允许使用临时表。
59+
* 从性能和实际需求来说,默认值应该为false.也就是不允许使用临时表。
60+
*/
61+
public static final String ALLOW_TEMPORARY_TABLE = "ALLOW_TEMPORARY_TABLE";
62+
63+
/**
64+
* limit数量超过该阀值,启用streaming模式,默认为100
65+
*/
66+
public static final String STREAMI_THRESHOLD = "STREAMI_THRESHOLD";
67+
68+
/**
69+
* 创建cursor后是否立马执行
70+
*/
71+
public static final String EXECUTE_QUERY_WHEN_CREATED = "EXECUTE_QUERY_WHEN_CREATED";
72+
73+
public static final String HBASE_MAPPING_FILE = "HBASE_MAPPING_FILE";
74+
75+
/**
76+
* 执行jdbc fetch size
77+
*/
78+
public static final String FETCH_SIZE = "FETCH_SIZE";
79+
80+
/**
81+
* 标记是否关闭Join Order优化
82+
*/
83+
public static final String INIT_TDDL_DATASOURCE = "INIT_TDDL_DATASOURCE";
84+
85+
/**
86+
* 是否使用tdhs替换jdbc调用
87+
*/
88+
public static final String USE_TDHS_FOR_DEFAULT = "USE_TDHS_FOR_DEFAULT";
89+
90+
/**
91+
* 为每个连接都初始化一个线程池,用来做并行查询,默认为true
92+
*/
93+
public static final String INIT_CONCURRENT_POOL_EVERY_CONNECTION = "INIT_CONCURRENT_POOL_EVERY_CONNECTION";
94+
95+
/**
96+
* 并行查询线程池大小
97+
*/
98+
public static final String CONCURRENT_THREAD_SIZE = "CONCURRENT_THREAD_SIZE";
12599

126100
}

tddl-executor/src/main/java/com/taobao/tddl/executor/MatrixExecutor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ private int explainIndex(String sql) {
8989

9090
public IDataNodeExecutor parseAndOptimize(String sql, ExecutionContext executionContext) throws TddlException {
9191
boolean cache = GeneralUtil.getExtraCmdBoolean(executionContext.getExtraCmds(),
92-
ExtraCmd.ConnectionExtraCmd.OPTIMIZER_CACHE,
92+
ExtraCmd.OPTIMIZER_CACHE,
9393
true);
9494

9595
return OptimizerContext.getContext()

tddl-executor/src/main/java/com/taobao/tddl/executor/handler/MergeHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ private void executeSubNodesFuture(ISchematicCursor cursor, ExecutionContext exe
131131
List<IDataNodeExecutor> subNodes, List<ISchematicCursor> subCursors)
132132
throws TddlException {
133133

134-
executionContext.getExtraCmds().put(ExtraCmd.ExecutionExtraCmd.EXECUTE_QUERY_WHEN_CREATED, "True");
134+
executionContext.getExtraCmds().put(ExtraCmd.EXECUTE_QUERY_WHEN_CREATED, "True");
135135
List<Future<ISchematicCursor>> futureCursors = new LinkedList<Future<ISchematicCursor>>();
136136
for (IDataNodeExecutor q : subNodes) {
137137
Future<ISchematicCursor> rcfuture = executeFuture(executionContext, q);

tddl-executor/src/main/java/com/taobao/tddl/executor/spi/CursorFactoryDefaultImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public ITempTableSortCursor tempTableSortCursor(ExecutionContext executionContex
128128
throws TddlException {
129129
try {
130130
if ("True".equalsIgnoreCase(GeneralUtil.getExtraCmdString(executionContext.getExtraCmds(),
131-
ExtraCmd.ExecutionExtraCmd.ALLOW_TEMPORARY_TABLE))) {
131+
ExtraCmd.ALLOW_TEMPORARY_TABLE))) {
132132

133133
IRepository bdbRepo = ExecutorContext.getContext()
134134
.getRepositoryHolder()

tddl-group/src/main/java/com/taobao/tddl/group/jdbc/TGroupConnection.java

+3-53
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.taobao.tddl.group.jdbc;
22

3-
import java.lang.reflect.Method;
43
import java.sql.Array;
54
import java.sql.Blob;
65
import java.sql.CallableStatement;
@@ -16,7 +15,6 @@
1615
import java.sql.Savepoint;
1716
import java.sql.Statement;
1817
import java.sql.Struct;
19-
import java.sql.Wrapper;
2018
import java.util.Collections;
2119
import java.util.HashSet;
2220
import java.util.LinkedList;
@@ -25,8 +23,6 @@
2523
import java.util.Properties;
2624
import java.util.Set;
2725

28-
import com.taobao.tddl.atom.jdbc.TConnectionWrapper;
29-
import com.taobao.tddl.common.exception.TddlRuntimeException;
3026
import com.taobao.tddl.common.jdbc.TExceptionUtils;
3127
import com.taobao.tddl.group.config.GroupIndex;
3228
import com.taobao.tddl.group.dbselector.DBSelector;
@@ -578,56 +574,10 @@ public DatabaseMetaData getMetaData() throws SQLException {
578574
}
579575
}
580576

581-
/**
582-
* @return thread id
583-
*/
584-
public long getId() {
585-
try {
586-
// TODO 判断是否为mysql
587-
Connection atomConnection = this.rBaseConnection;
588-
if (atomConnection == null) {
589-
atomConnection = this.wBaseConnection;
590-
}
591-
592-
/**
593-
* 这个连接上没做过查询,不会创建真正连接的
594-
*/
595-
if (atomConnection == null) {
596-
return -1;
597-
}
598-
599-
TConnectionWrapper conn = (TConnectionWrapper) atomConnection;
600-
Connection delegate = conn.getTargetConnection();
601-
if (delegate instanceof Wrapper) {
602-
delegate = delegate.unwrap(Connection.class);
603-
}
604-
// DruidPooledConnection druidConnection = (DruidPooledConnection)
605-
// conn.getTargetConnection();
606-
// Connection delegateConn = druidConnection.getConnection();
607-
Method method = null;
608-
Class clazz = delegate.getClass();
609-
do {
610-
try {
611-
method = clazz.getDeclaredMethod("getId", new Class[] {});
612-
} catch (NoSuchMethodException e) {
613-
clazz = clazz.getSuperclass();
614-
}
615-
} while (method == null);
616-
if (!method.isAccessible()) {
617-
method.setAccessible(true);
618-
}
619-
return (Long) method.invoke(delegate, new Object[] {});
620-
} catch (Exception ex) {
621-
throw new TddlRuntimeException("connection get thread id fail !", ex);
622-
}
623-
624-
}
625-
626-
public Connection duplicate() throws SQLException {
627-
if (this.tGroupDataSource == null) {
628-
return null;
577+
public void cancel() throws SQLException {
578+
for (TGroupStatement stat : openedStatements) {
579+
stat.cancel();
629580
}
630-
return this.tGroupDataSource.getConnection();
631581
}
632582

633583
/*

0 commit comments

Comments
 (0)