-
Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathSuggest.cpp
205 lines (179 loc) · 8.42 KB
/
Suggest.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#include "Suggest.h"
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <Core/Settings.h>
#include <Columns/ColumnString.h>
#include <Common/typeid_cast.h>
#include <Common/Macros.h>
#include "Core/Protocol.h"
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/Operators.h>
#include <Functions/FunctionFactory.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/StorageFactory.h>
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/Context.h>
#include <Client/Connection.h>
#include <Client/LocalConnection.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_PACKET_FROM_SERVER;
extern const int DEADLOCK_AVOIDED;
}
Suggest::Suggest()
{
/// Keywords may be not up to date with ClickHouse parser.
addWords({
"CREATE", "DATABASE", "IF", "NOT", "EXISTS", "TEMPORARY", "TABLE", "ON", "CLUSTER", "DEFAULT",
"MATERIALIZED", "ALIAS", "ENGINE", "AS", "VIEW", "POPULATE", "SETTINGS", "ATTACH", "DETACH", "DROP",
"RENAME", "TO", "ALTER", "ADD", "MODIFY", "CLEAR", "COLUMN", "AFTER", "COPY", "PROJECT",
"PRIMARY", "KEY", "CHECK", "PARTITION", "PART", "FREEZE", "FETCH", "FROM", "SHOW", "INTO",
"OUTFILE", "FORMAT", "TABLES", "DATABASES", "LIKE", "PROCESSLIST", "CASE", "WHEN", "THEN", "ELSE",
"END", "DESCRIBE", "DESC", "USE", "SET", "OPTIMIZE", "FINAL", "DEDUPLICATE", "INSERT", "VALUES",
"SELECT", "DISTINCT", "SAMPLE", "ARRAY", "JOIN", "GLOBAL", "LOCAL", "ANY", "ALL", "INNER",
"LEFT", "RIGHT", "FULL", "OUTER", "CROSS", "USING", "PREWHERE", "WHERE", "GROUP", "BY",
"WITH", "TOTALS", "HAVING", "ORDER", "COLLATE", "LIMIT", "UNION", "AND", "OR", "ASC",
"IN", "KILL", "QUERY", "SYNC", "ASYNC", "TEST", "BETWEEN", "TRUNCATE", "USER", "ROLE",
"PROFILE", "QUOTA", "POLICY", "ROW", "GRANT", "REVOKE", "OPTION", "ADMIN", "EXCEPT", "REPLACE",
"IDENTIFIED", "HOST", "NAME", "READONLY", "WRITABLE", "PERMISSIVE", "FOR", "RESTRICTIVE", "RANDOMIZED",
"INTERVAL", "LIMITS", "ONLY", "TRACKING", "IP", "REGEXP", "ILIKE",
});
}
static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggestion)
{
/// NOTE: Once you will update the completion list,
/// do not forget to update 01676_clickhouse_client_autocomplete.sh
WriteBufferFromOwnString query;
query << "SELECT DISTINCT array_join(extract_all(name, '[\\\\w_]{2,}')) AS res FROM ("
"SELECT name FROM system.functions"
" UNION ALL "
"SELECT name FROM system.table_engines"
" UNION ALL "
"SELECT name FROM system.formats"
" UNION ALL "
"SELECT name FROM system.table_functions"
" UNION ALL "
"SELECT name FROM system.data_type_families"
" UNION ALL "
"SELECT name FROM system.stream_settings"
" UNION ALL "
"SELECT name FROM system.settings"
" UNION ALL ";
if (!basic_suggestion)
{
query << "SELECT macro FROM system.macros"
" UNION ALL "
"SELECT policy_name FROM system.storage_policies"
" UNION ALL ";
}
query << "SELECT concat(func.name, comb.name) FROM system.functions AS func CROSS JOIN system.aggregate_function_combinators AS comb WHERE is_aggregate";
/// The user may disable loading of databases, tables, columns by setting suggestion_limit to zero.
if (suggestion_limit > 0)
{
String limit_str = toString(suggestion_limit);
query << " UNION ALL "
"SELECT name FROM system.databases LIMIT " << limit_str
<< " UNION ALL "
"SELECT DISTINCT name FROM system.tables LIMIT " << limit_str
<< " UNION ALL ";
if (!basic_suggestion)
{
query << "SELECT DISTINCT name FROM system.dictionaries LIMIT " << limit_str
<< " UNION ALL ";
}
query << "SELECT DISTINCT name FROM system.columns LIMIT " << limit_str << " SETTINGS _tp_internal_system_open_sesame=true,is_internal=true";
}
query << ") WHERE not_empty(res) SETTINGS _tp_internal_system_open_sesame=true,is_internal=true";
return query.str();
}
template <typename ConnectionType>
void Suggest::load(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit)
{
loading_thread = std::thread([my_context = Context::createCopy(context), connection_parameters, suggestion_limit, this]
{
for (size_t retry = 0; retry < 10; ++retry)
{
ThreadStatus thread_status;
try
{
auto connection = ConnectionType::createConnection(connection_parameters, my_context);
fetch(*connection, connection_parameters.timeouts, getLoadSuggestionQuery(suggestion_limit, std::is_same_v<ConnectionType, LocalConnection>));
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::DEADLOCK_AVOIDED)
continue;
/// We should not use std::cerr here, because this method works concurrently with the main thread.
/// WriteBufferFromFileDescriptor will write directly to the file descriptor, avoiding data race on std::cerr.
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
out.next();
}
catch (...)
{
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
out.next();
}
break;
}
/// Note that keyword suggestions are available even if we cannot load data from server.
});
}
void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
{
connection.sendQuery(
timeouts, query, {} /* query_parameters */, "" /* query_id */, QueryProcessingStage::Complete, nullptr, nullptr, false, {});
while (true)
{
Packet packet = connection.receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
fillWordsFromBlock(packet.block);
continue;
case Protocol::Server::Progress:
continue;
case Protocol::Server::ProfileInfo:
continue;
case Protocol::Server::Totals:
continue;
case Protocol::Server::Extremes:
continue;
case Protocol::Server::Log:
continue;
case Protocol::Server::ProfileEvents:
continue;
case Protocol::Server::Exception:
packet.exception->rethrow();
return;
case Protocol::Server::EndOfStream:
return;
default:
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from server {}",
packet.type, connection.getDescription());
}
}
}
void Suggest::fillWordsFromBlock(const Block & block)
{
if (!block)
return;
if (block.columns() != 1)
throw Exception("Wrong number of columns received for query to read words for suggestion", ErrorCodes::LOGICAL_ERROR);
const ColumnString & column = typeid_cast<const ColumnString &>(*block.getByPosition(0).column);
size_t rows = block.rows();
Words new_words;
new_words.reserve(rows);
for (size_t i = 0; i < rows; ++i)
new_words.emplace_back(column[i].get<String>());
addWords(std::move(new_words));
}
template
void Suggest::load<Connection>(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit);
template
void Suggest::load<LocalConnection>(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit);
}