forked from UWQuickstep/quickstep
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathInsertDestination.hpp
680 lines (577 loc) · 26.5 KB
/
InsertDestination.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_STORAGE_INSERT_DESTINATION_HPP_
#define QUICKSTEP_STORAGE_INSERT_DESTINATION_HPP_
#include <cstddef>
#include <cstdlib>
#include <memory>
#include <utility>
#include <vector>
#include "catalog/CatalogRelationSchema.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "catalog/PartitionSchemeHeader.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "storage/InsertDestinationInterface.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageBlockLayout.hpp"
#include "storage/TupleIdSequence.hpp"
#include "storage/ValueAccessor.hpp"
#include "threading/SpinMutex.hpp"
#include "threading/ThreadIDBasedMap.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
#include "gtest/gtest_prod.h"
#include "tmb/id_typedefs.h"
#include "tmb/tagged_message.h"
namespace tmb { class MessageBus; }
namespace quickstep {
class StorageManager;
namespace merge_run_operator {
class RunCreator;
class RunMergerTest;
class RunTest;
} // namespace merge_run_operator
namespace serialization { class InsertDestination; }
/** \addtogroup Storage
* @{
*/
/**
* @brief Base class for different strategies for getting blocks to insert
* tuples into.
**/
class InsertDestination : public InsertDestinationInterface {
public:
enum class InsertDestinationType {
kAlwaysCreateBlockInsertDestination = 0,
kBlockPoolInsertDestination,
kPartitionAwareInsertDestination,
};
/**
* @brief Constructor.
*
* @param insert_dest_type The implementation type.
* @param relation The relation to insert tuples into.
* @param layout The layout to use for any newly-created blocks. If NULL,
* defaults to relation's default layout.
* @param storage_manager The StorageManager to use.
* @param relational_op_index The index of the relational operator in the
* QueryPlan DAG that has outputs.
* @param query_id The ID of this query.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param bus A pointer to the TMB.
**/
InsertDestination(const InsertDestinationType insert_dest_type,
const CatalogRelationSchema &relation,
const StorageBlockLayout *layout,
StorageManager *storage_manager,
const std::size_t relational_op_index,
const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus);
/**
* @brief Virtual destructor.
**/
virtual ~InsertDestination() {
}
/**
* @brief A factory method to generate the InsertDestination from the
* serialized Protocol Buffer representation.
*
* @param query_id The ID of this query.
* @param proto A serialized Protocol Buffer representation of an
* InsertDestination, originally generated by the optimizer.
* @param relation The relation to insert tuples into.
* @param storage_manager The StorageManager to use.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param bus A pointer to the TMB.
*
* @return The constructed InsertDestination.
*/
static InsertDestination* ReconstructFromProto(
const std::size_t query_id,
const serialization::InsertDestination &proto,
const CatalogRelationSchema &relation,
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus);
/**
* @brief Check whether a serialized InsertDestination is fully-formed and
* all parts are valid.
*
* @param proto A serialized Protocol Buffer representation of an
* InsertDestination, originally generated by the optimizer.
* @param relation The relation to insert tuples into.
*
* @return Whether proto is fully-formed and valid.
**/
static bool ProtoIsValid(const serialization::InsertDestination &proto,
const CatalogRelationSchema &relation);
const CatalogRelationSchema& getRelation() const override {
return relation_;
}
PartitionSchemeHeader::PartitionAttributeIds getPartitioningAttributes() const override {
return {};
}
void insertTuple(const Tuple &tuple) override;
void insertTupleInBatch(const Tuple &tuple) override;
void bulkInsertTuples(ValueAccessor *accessor,
const bool always_mark_full = false) override;
void bulkInsertTuplesWithRemappedAttributes(
const std::vector<attribute_id> &attribute_map,
ValueAccessor *accessor) override;
void bulkInsertTuplesFromValueAccessors(
const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map) override;
void insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
std::vector<Tuple>::const_iterator end) override;
InsertDestinationType getInsertDestinationType() const {
return insert_dest_type_;
}
/**
* @brief Get the set of blocks that were partially filled by clients of this
* InsertDestination for insertion.
* @warning Should only be called AFTER this InsertDestination will no longer
* be used, and all blocks have been returned to it via
* returnBlock() and BEFORE getTouchedBlocks() is called, at all.
*
* @param partial_blocks A pointer to the vector of block IDs in which the
* partially filled block IDs will be added.
* @param part_ids A pointer to the vector of partiiton_ids in which the
* partially filled block IDs are associated with.
**/
virtual void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
std::vector<partition_id> *part_ids) = 0;
/**
* @brief Set the input partition id. Used when the partition attributes are
* empty.
*
* @param input_partition_id The input partition id.
**/
virtual void setInputPartitionId(const partition_id input_partition_id) {}
protected:
/**
* @brief Get a block to use for insertion.
*
* @return A block to use for inserting tuples.
**/
virtual MutableBlockReference getBlockForInsertion() = 0;
/**
* @brief Release a block after done using it for insertion.
* @note This should ALWAYS be called when done inserting into a block.
*
* @param block A block, originally supplied by getBlockForInsertion(),
* which the client is finished using.
* @param full If true, the client ran out of space when trying to insert
* into block. If false, all inserts were successful.
**/
virtual void returnBlock(MutableBlockReference &&block, const bool full) = 0;
// TODO(chasseur): Once StorageManager is threadsafe, it will be safe to use
// this without holding the mutex.
virtual MutableBlockReference createNewBlock() = 0;
/**
* @brief When a StorageBlock becomes full, pipeline the block id to the
* scheduler.
*
* @param id The id of the StorageBlock to be pipelined.
* @param part_id The partition id of Block <id>, if any.
**/
void sendBlockFilledMessage(const block_id id, const partition_id part_id = 0) const {
serialization::DataPipelineMessage proto;
proto.set_operator_index(relational_op_index_);
proto.set_block_id(id);
proto.set_relation_id(relation_.getID());
proto.set_query_id(query_id_);
proto.set_partition_id(part_id);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes),
proto_length,
kDataPipelineMessage);
std::free(proto_bytes);
// The reason we use the ClientIDMap is as follows:
// InsertDestination needs to send data pipeline messages to scheduler. To
// send a TMB message, we need to know the sender and receiver's TMB client
// ID. In this case, the sender thread is the worker thread that executes
// this function. To figure out the TMB client ID of the executing thread,
// there are multiple ways :
// 1. Trickle down the worker's client ID all the way from Worker::run()
// method until here.
// 2. Use thread-local storage - Each worker saves its TMB client ID in the
// local storage.
// 3. Use a globally accessible map whose key is the caller thread's
// process level ID and value is the TMB client ID.
//
// Option 1 involves modifying the signature of several functions across
// different modules. Option 2 was difficult to implement given that Apple's
// Clang doesn't allow C++11's thread_local keyword. Therefore we chose
// option 3.
DCHECK(bus_ != nullptr);
DLOG(INFO) << "InsertDestination sent DataPipelineMessage to Scheduler with Client " << scheduler_client_id_;
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
thread_id_map_.getValue(),
scheduler_client_id_,
std::move(tagged_message));
CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
inline const std::size_t getQueryID() const {
return query_id_;
}
const InsertDestinationType insert_dest_type_;
const ClientIDMap &thread_id_map_;
StorageManager *storage_manager_;
const CatalogRelationSchema &relation_;
std::unique_ptr<const StorageBlockLayout> layout_;
const std::size_t relational_op_index_;
const std::size_t query_id_;
tmb::client_id scheduler_client_id_;
tmb::MessageBus *bus_;
// TODO(chasseur): If contention is high, finer-grained locking of internal
// data members in subclasses is possible.
SpinMutex mutex_;
private:
/**
* @brief Get the set of blocks that were used by clients of this
* InsertDestination for insertion.
* @warning Should only be called AFTER this InsertDestination will no longer
* be used, and all blocks have been returned to it via
* returnBlock().
*
* @return A vector of block_ids of blocks that were used for insertion.
**/
std::vector<block_id> getTouchedBlocks() {
SpinMutexLock lock(mutex_);
return getTouchedBlocksInternal();
}
virtual std::vector<block_id> getTouchedBlocksInternal() = 0;
// TODO(shoban): Workaround to support sort. Sort needs finegrained control of
// blocks being used to insert, since inserting in an arbitrary block could
// lead to unsorted results. InsertDestination API changed while sort was
// being implemented.
friend class merge_run_operator::RunCreator;
// NOTE(zuyu): Access getTouchedBlocks.
friend class AggregationOperatorTest;
friend class merge_run_operator::RunTest;
friend class merge_run_operator::RunMergerTest;
FRIEND_TEST(HashJoinOperatorTest, LongKeyHashJoinTest);
FRIEND_TEST(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest);
FRIEND_TEST(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest);
FRIEND_TEST(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest);
FRIEND_TEST(HashJoinOperatorTest, CompositeKeyHashJoinTest);
FRIEND_TEST(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest);
FRIEND_TEST(HashJoinOperatorTest, SingleAttributePartitionedLongKeyHashJoinTest);
FRIEND_TEST(HashJoinOperatorTest, SingleAttributePartitionedCompositeKeyHashJoinTest);
FRIEND_TEST(HashJoinOperatorTest,
SingleAttributePartitionedCompositeKeyHashJoinWithResidualPredicateTest);
DISALLOW_COPY_AND_ASSIGN(InsertDestination);
};
/**
* @brief Implementation of InsertDestination that always creates new blocks,
* leaving some blocks potentially very underfull.
**/
class AlwaysCreateBlockInsertDestination : public InsertDestination {
public:
AlwaysCreateBlockInsertDestination(const CatalogRelationSchema &relation,
const StorageBlockLayout *layout,
StorageManager *storage_manager,
const std::size_t relational_op_index,
const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
: InsertDestination(InsertDestinationType::kAlwaysCreateBlockInsertDestination,
relation,
layout,
storage_manager,
relational_op_index,
query_id,
scheduler_client_id,
bus) {}
~AlwaysCreateBlockInsertDestination() override {
}
void bulkInsertTuplesFromValueAccessors(
const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map) override {
LOG(FATAL) << "bulkInsertTuplesFromValueAccessors is not implemented for AlwaysCreateBlockInsertDestination";
}
protected:
MutableBlockReference getBlockForInsertion() override;
void returnBlock(MutableBlockReference &&block, const bool full) override;
MutableBlockReference createNewBlock() override;
void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
std::vector<partition_id> *part_ids) override {
}
private:
std::vector<block_id> getTouchedBlocksInternal() override {
return returned_block_ids_;
}
std::vector<block_id> returned_block_ids_;
DISALLOW_COPY_AND_ASSIGN(AlwaysCreateBlockInsertDestination);
};
/**
* @brief Implementation of InsertDestination that keeps a pool of
* partially-full blocks. Creates new blocks as necessary when
* getBlockForInsertion() is called and there are no partially-full
* blocks from the pool which are not "checked out" by workers.
**/
class BlockPoolInsertDestination : public InsertDestination {
public:
/**
* @brief Constructor.
*
* @param relation The relation to insert tuples into.
* @param layout The layout to use for any newly-created blocks. If NULL,
* defaults to relation's default layout.
* @param storage_manager The StorageManager to use.
* @param relational_op_index The index of the relational operator in the
* QueryPlan DAG that has outputs.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param query_id The ID of the query.
* @param bus A pointer to the TMB.
**/
BlockPoolInsertDestination(const CatalogRelationSchema &relation,
const StorageBlockLayout *layout,
StorageManager *storage_manager,
const std::size_t relational_op_index,
const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
: InsertDestination(InsertDestinationType::kBlockPoolInsertDestination,
relation,
layout,
storage_manager,
relational_op_index,
query_id,
scheduler_client_id,
bus) {}
/**
* @brief Constructor.
*
* @param relation The relation to insert tuples into.
* @param layout The layout to use for any newly-created blocks. If NULL,
* defaults to relation's default layout.
* @param storage_manager The StorageManager to use.
* @blocks The existing blocks used for insertions.
* @param relational_op_index The index of the relational operator in the
* QueryPlan DAG that has outputs.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param bus A pointer to the TMB.
**/
BlockPoolInsertDestination(const CatalogRelationSchema &relation,
const StorageBlockLayout *layout,
StorageManager *storage_manager,
std::vector<block_id> &&blocks,
const std::size_t relational_op_index,
const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
: InsertDestination(InsertDestinationType::kBlockPoolInsertDestination,
relation,
layout,
storage_manager,
relational_op_index,
query_id,
scheduler_client_id,
bus),
available_block_ids_(std::move(blocks)) {
// TODO(chasseur): Once block fill statistics are available, replace this
// with something smarter.
}
~BlockPoolInsertDestination() override {
}
protected:
MutableBlockReference getBlockForInsertion() override;
void returnBlock(MutableBlockReference &&block, const bool full) override;
void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
std::vector<partition_id> *part_ids) override;
MutableBlockReference createNewBlock() override;
private:
std::vector<block_id> getTouchedBlocksInternal() override;
FRIEND_TEST(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest);
// A vector of references to blocks which are loaded in memory.
std::vector<MutableBlockReference> available_block_refs_;
// A vector of blocks from the relation that are not loaded in memory yet.
std::vector<block_id> available_block_ids_;
// A vector of fully filled blocks.
std::vector<block_id> done_block_ids_;
DISALLOW_COPY_AND_ASSIGN(BlockPoolInsertDestination);
};
class PartitionAwareInsertDestination : public InsertDestination {
public:
/**
* @brief Constructor.
*
* @note PartitionAwareInsertDestination takes ownership of \c
* partition_scheme_header.
*
* @param partition_scheme_header The partitioned scheme header information.
* @param storage_manager The StorageManager to use.
* @param relation The relation to insert tuples into.
* @param layout The layout to use for any newly-created blocks. If NULL,
* defaults to relation's default layout.
* @param partitions The blocks in partitions.
* @param relational_op_index The index of the relational operator in the
* QueryPlan DAG that has outputs.
* @param query_id The ID of the query.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param bus A pointer to the TMB.
**/
PartitionAwareInsertDestination(
PartitionSchemeHeader *partition_scheme_header,
const CatalogRelationSchema &relation,
const StorageBlockLayout *layout,
StorageManager *storage_manager,
std::vector<std::vector<block_id>> &&partitions,
const std::size_t relational_op_index,
const std::size_t query_id,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus);
~PartitionAwareInsertDestination() override {
delete[] mutexes_for_partition_;
}
/**
* @brief Manually add a block to the pool.
* @warning Call only ONCE for each block to add to the pool.
*
* @param bid The ID of the block to add to the pool.
* @part_id The partition to add the block to.
**/
void addBlockToPool(const block_id bid, const partition_id part_id) {
SpinMutexLock lock(mutexes_for_partition_[part_id]);
available_block_ids_[part_id].push_back(bid);
}
void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
std::vector<partition_id> *part_ids) override {
// Iterate through each partition and return the partially filled blocks
// in each partition.
for (partition_id part_id = 0; part_id < partition_scheme_header_->getNumPartitions(); ++part_id) {
getPartiallyFilledBlocksInPartition(partial_blocks, part_ids, part_id);
}
}
PartitionSchemeHeader::PartitionAttributeIds getPartitioningAttributes() const override;
void insertTuple(const Tuple &tuple) override;
void insertTupleInBatch(const Tuple &tuple) override;
void bulkInsertTuples(ValueAccessor *accessor,
const bool always_mark_full = false) override;
void bulkInsertTuplesWithRemappedAttributes(
const std::vector<attribute_id> &attribute_map,
ValueAccessor *accessor) override;
void bulkInsertTuplesFromValueAccessors(
const std::vector<std::pair<ValueAccessor *, std::vector<attribute_id>>> &accessor_attribute_map) override {
LOG(FATAL) << "bulkInsertTuplesFromValueAccessors is not implemented for PartitionAwareInsertDestination";
}
void insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
std::vector<Tuple>::const_iterator end) override;
void setInputPartitionId(const partition_id input_partition_id) override {
input_partition_id_ = input_partition_id;
}
protected:
MutableBlockReference getBlockForInsertion() override {
LOG(FATAL) << "PartitionAwareInsertDestination::getBlockForInsertion needs a partition id as an argument.";
}
/**
* @brief Get a block to use for insertion from a partition.
*
* @param part_id The partition id for which the client requests a block from.
*
* @return A block to use for inserting tuples belonging to a particular partition.
**/
MutableBlockReference getBlockForInsertionInPartition(const partition_id part_id);
void returnBlock(MutableBlockReference &&block, const bool full) override;
/**
* @brief Release a block after done using it for insertion.
* @note This should ALWAYS be called when done inserting into a block.
*
* @param block A block, originally supplied by getBlockForInsertion(),
* which the client is finished using.
* @param full If true, the client ran out of space when trying to insert
* into block. If false, all inserts were successful.
* @param part_id The partition id into which we should return the block into.
**/
void returnBlockInPartition(MutableBlockReference &&block, const bool full, const partition_id part_id);
MutableBlockReference createNewBlock() override;
MutableBlockReference createNewBlockInPartition(const partition_id part_id);
private:
std::vector<block_id> getTouchedBlocksInternal() override;
/**
* @brief Get the set of blocks that were partially filled by clients of this
* InsertDestination for insertion.
* @warning Should only be called AFTER this InsertDestination will no longer
* be used, and all blocks have been returned to it via
* returnBlock() and BEFORE getTouchedBlocks() is called, at all.
*
* @param partial_blocks A pointer to the vector of block IDs in which the
* partially filled block IDs will be added.
* @param part_id The partition id for which we want the partially filled blocks.
**/
void getPartiallyFilledBlocksInPartition(std::vector<MutableBlockReference> *partial_blocks,
std::vector<partition_id> *part_ids,
const partition_id part_id) {
SpinMutexLock lock(mutexes_for_partition_[part_id]);
for (std::vector<MutableBlockReference>::size_type i = 0; i < available_block_refs_[part_id].size(); ++i) {
partial_blocks->push_back((std::move(available_block_refs_[part_id][i])));
part_ids->push_back(part_id);
}
available_block_refs_[part_id].clear();
}
partition_id getPartitionId(const Tuple &tuple) const {
const auto &partition_attr_ids = partition_scheme_header_->getPartitionAttributeIds();
PartitionSchemeHeader::PartitionValues values(partition_attr_ids.size());
for (std::size_t i = 0; i < partition_attr_ids.size(); ++i) {
values[i] = tuple.getAttributeValue(partition_attr_ids[i]);
}
return values.empty() ? input_partition_id_ : partition_scheme_header_->getPartitionId(values);
}
template<typename ValueAccessorT>
void setPartitionMembership(std::vector<std::unique_ptr<TupleIdSequence>> *partition_membership,
ValueAccessorT *accessor) const {
const auto &partition_attr_ids = partition_scheme_header_->getPartitionAttributeIds();
if (partition_attr_ids.empty()) {
while (accessor->next()) {
(*partition_membership)[input_partition_id_]->set(accessor->getCurrentPosition());
}
} else {
PartitionSchemeHeader::PartitionValues values(partition_attr_ids.size());
while (accessor->next()) {
for (std::size_t i = 0; i < partition_attr_ids.size(); ++i) {
values[i] = accessor->getTypedValue(partition_attr_ids[i]);
}
(*partition_membership)[partition_scheme_header_->getPartitionId(values)]->set(
accessor->getCurrentPosition());
}
}
}
std::unique_ptr<const PartitionSchemeHeader> partition_scheme_header_;
// A vector of available block references for each partition.
std::vector< std::vector<MutableBlockReference> > available_block_refs_;
// A vector of available block ids for each partition.
std::vector< std::vector<block_id> > available_block_ids_;
// A vector of done block ids for each partition.
std::vector< std::vector<block_id> > done_block_ids_;
// Mutex for locking each partition separately.
SpinMutex *mutexes_for_partition_;
partition_id input_partition_id_ = 0u;
DISALLOW_COPY_AND_ASSIGN(PartitionAwareInsertDestination);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_STORAGE_INSERT_DESTINATION_HPP_