forked from UWQuickstep/quickstep
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAggregationOperationState.hpp
327 lines (275 loc) · 13.5 KB
/
AggregationOperationState.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATION_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATION_HPP_
#include <cstddef>
#include <memory>
#include <vector>
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "storage/HashTableBase.hpp"
#include "storage/HashTablePool.hpp"
#include "storage/PartitionedHashTablePool.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/ValueAccessorMultiplexer.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
namespace serialization { class AggregationOperationState; }
class AggregateFunction;
class CatalogDatabaseLite;
class CatalogRelationSchema;
class CollisionFreeVectorTable;
class InsertDestination;
class LIPFilterAdaptiveProber;
class StorageManager;
class Type;
/** \addtogroup Storage
* @{
*/
/**
* @brief Helper class for maintaining state during aggregation operation.
* If a GROUP BY list was provided, this class maintains a hash table
* for each aggregate computed where the key is the GROUP BY expression
* values and payload is each group's corresponding running aggregation
* state. Without GROUP BY, this class maintains a single aggregation
* state for each aggregate computed.
* @note See also AggregationHandle, which encapsulates logic for actually
* computing aggregates, and AggregateFunction, which represents a
* particular SQL aggregate in the abstract sense.
*
* This class represents the common state for an instance of
* AggregationOperator, and also encapsulates the high-level logic for
* aggregating over blocks and generating final results.
* AggregationWorkOrder::execute() is mainly just a call to aggregateBlock(),
* while FinalizeAggregationWorkOrder::execute() is mainly just a call to
* finalizeAggregate().
**/
class AggregationOperationState {
public:
/**
* @brief Constructor for aggregation operation state.
* @note The order of some of the parameters to this constructor (or the
* corresponding fields when reconstructing from a protobuf) determines
* the schema of tuples written out by finalizeAggregate(). If group_by
* is nonempty, the first attribute(s) will be the group-by values, in
* order. Following that will be the values for each aggregate
* specified by aggregate_functions (with arguments specified by
* attributes), in order.
*
* @param input_relation Input relation on which the aggregates are computed.
* @param aggregate_functions A list of the aggregate functions to be
* computed.
* @param arguments For each entry in aggregate_functions, a corresponding
* list of argument expressions to that aggregate. This is moved-from,
* with AggregationOperationState taking ownership.
* @param is_distinct For each entry in aggregate_functions, whether DISTINCT
* should be applied to the entry's arguments.
* @param group_by A list of expressions to compute the GROUP BY values. If
* empty, no grouping is used. This is moved-from, with
* AggregationOperationState taking ownership.
* @param predicate The predicate to be applied prior to aggregation. nullptr
* indicates no predicate to be applied. This object takes ownership
* of predicate.
* @param estimated_num_entries Estimated of number of entries in the hash
* table. A good estimate would be a fraction of total number of tuples
* in the input relation.
* @param is_partitioned Whether this aggregation state is partitioned.
* @param num_partitions The number of partitions of the aggregation state
* hash table.
* @param hash_table_impl_type The HashTable implementation to use for
* GROUP BY. Ignored if group_by is empty.
* @param distinctify_hash_table_impl_type The HashTable implementation to use
* for the distinctify phase of each DISTINCT aggregation.
* @param storage_manager The StorageManager to use for allocating hash
* tables. Single aggregation state (when GROUP BY list is not
* specified) is not allocated using memory from storage manager.
* @param collision_free_vector_memory_size For CollisionFreeVectorTable,
* the memory size.
* @param collision_free_vector_num_init_partitions For
* CollisionFreeVectorTable, the number of partitions to initialize.
* @param collision_free_vector_state_offsets For CollisionFreeVectorTable,
* the offsets for each state.
*/
AggregationOperationState(
const CatalogRelationSchema &input_relation,
const std::vector<const AggregateFunction *> &aggregate_functions,
std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
std::vector<bool> &&is_distinct,
std::vector<std::unique_ptr<const Scalar>> &&group_by,
const Predicate *predicate,
const std::size_t estimated_num_entries,
const bool is_partitioned,
const std::size_t num_partitions,
const HashTableImplType hash_table_impl_type,
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
StorageManager *storage_manager,
const std::size_t collision_free_vector_memory_size = 0,
const std::size_t collision_free_vector_num_init_partitions = 0,
const std::vector<std::size_t> &collision_free_vector_state_offsets = std::vector<std::size_t>());
~AggregationOperationState() {}
/**
* @brief Generate the aggregation operation state from the serialized
* Protocol Buffer representation.
* @note The order of some repeated fields in the proto representation
* determines the schema of tuples written out by finalizeAggregate().
* See the note for the constructor for details.
*
* @param proto A serialized Protocol Buffer representation of an
* AggregationOperationState, originally generated by the optimizer.
* @param database The Database to resolve relation and attribute references
* in.
* @param storage_manager The StorageManager to use.
**/
static AggregationOperationState* ReconstructFromProto(
const serialization::AggregationOperationState &proto,
const CatalogDatabaseLite &database,
StorageManager *storage_manager);
/**
* @brief Check whether a serialization::AggregationOperationState is
* fully-formed and all parts are valid.
*
* @param proto A serialized Protocol Buffer representation of an
* AggregationOperationState, originally generated by the optimizer.
* @param database The Database to resolve relation and attribute references
* in.
* @return Whether proto is fully-formed and valid.
**/
static bool ProtoIsValid(
const serialization::AggregationOperationState &proto,
const CatalogDatabaseLite &database);
/**
* @brief Get the number of partitions to be used for initializing the
* aggregation.
*
* @return The number of partitions to be used for initializing the aggregation.
**/
std::size_t getNumInitializationPartitions() const;
/**
* @brief Initialize the specified partition of this aggregation.
*
* @param partition_id ID of the partition to be initialized.
*/
void initialize(const std::size_t partition_id);
/**
* @brief Compute aggregates on the tuples of the given storage block,
* updating the running state maintained by this
* AggregationOperationState.
*
* @param input_block The block ID of the storage block where the aggreates
* are going to be computed.
* @param lip_filter_adaptive_prober The LIPFilter prober for pre-filtering
* the block.
**/
void aggregateBlock(const block_id input_block,
LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr);
/**
* @brief Generate the final results for the aggregates managed by this
* AggregationOperationState and write them out to StorageBlock(s).
*
* @param partition_id The partition id of this finalize operation.
* @param output_destination An InsertDestination where the finalized output
* tuple(s) from this aggregate are to be written.
**/
void finalizeAggregate(const std::size_t partition_id,
InsertDestination *output_destination);
/**
* @brief Get the collision-free vector table used by this aggregation.
*
* @return The collision-free vector table used by this aggregation.
* Returns NULL if collision-free vector table is not used.
*/
CollisionFreeVectorTable* getCollisionFreeVectorTable() const;
/**
* @brief Get the memory footprint of the AggregationOperationState.
**/
std::size_t getMemoryConsumptionBytes() const;
private:
// Aggregate on input block.
void aggregateBlockSingleState(const ValueAccessorMultiplexer &accessor_mux);
void aggregateBlockHashTable(const ValueAccessorMultiplexer &accessor_mux);
// Merge locally (per storage block) aggregated states with global aggregation
// states.
void mergeSingleState(
const std::vector<std::unique_ptr<AggregationState>> &local_state);
void mergeGroupByHashTables(AggregationStateHashTableBase *src,
AggregationStateHashTableBase *dst) const;
// Finalize the aggregation results into output_destination.
void finalizeSingleState(InsertDestination *output_destination);
void finalizeHashTable(const std::size_t partition_id,
InsertDestination *output_destination);
// Specialized implementations for aggregateBlockHashTable.
void aggregateBlockHashTableImplCollisionFree(
const ValueAccessorMultiplexer &accessor_mux);
void aggregateBlockHashTableImplPartitioned(
const ValueAccessorMultiplexer &accessor_mux);
void aggregateBlockHashTableImplThreadPrivate(
const ValueAccessorMultiplexer &accessor_mux);
// Specialized implementations for finalizeHashTable.
void finalizeHashTableImplCollisionFree(const std::size_t partition_id,
InsertDestination *output_destination);
void finalizeHashTableImplPartitioned(const std::size_t partition_id,
InsertDestination *output_destination);
void finalizeHashTableImplThreadPrivatePackedPayload(
InsertDestination *output_destination);
void finalizeHashTableImplThreadPrivateCompactKey(
InsertDestination *output_destination);
std::size_t getMemoryConsumptionBytesHelper(
const std::vector<std::unique_ptr<AggregationStateHashTableBase>>
&hashtables) const;
// Common state for all aggregates in this operation: the input relation, the
// filter predicate (if any), and the list of GROUP BY expressions (if any).
const CatalogRelationSchema &input_relation_;
// Whether the aggregation is collision free or not.
const bool is_aggregate_collision_free_;
// Whether the aggregation is partitioned or not.
const bool is_aggregate_partitioned_;
std::unique_ptr<const Predicate> predicate_;
// Each individual aggregate in this operation has an AggregationHandle and
// zero (indicated by -1) or one argument.
std::vector<std::unique_ptr<AggregationHandle>> handles_;
// For each aggregate, whether DISTINCT should be applied to the aggregate's
// arguments.
std::vector<bool> is_distinct_;
// A flag indicating whether all aggregate functions are DISTINCT aggregations.
const bool all_distinct_;
// Non-trivial group-by/argument expressions that need to be evaluated.
std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
std::vector<MultiSourceAttributeId> group_by_key_ids_;
std::vector<std::vector<MultiSourceAttributeId>> argument_ids_;
std::vector<const Type *> group_by_types_;
// Hash table for obtaining distinct (i.e. unique) arguments.
std::vector<std::unique_ptr<AggregationStateHashTableBase>> distinctify_hashtables_;
// Per-aggregate global states for aggregation without GROUP BY.
std::vector<std::unique_ptr<AggregationState>> single_states_;
// Per-aggregate HashTables for aggregation with GROUP BY.
//
// TODO(shoban): We should ideally store the aggregation state together in one
// hash table to prevent multiple lookups.
std::vector<std::unique_ptr<AggregationStateHashTableBase>> group_by_hashtables_;
// A vector of group by hash table pools.
std::unique_ptr<HashTablePool> group_by_hashtable_pool_;
std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_;
std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_;
StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATION_HPP_