-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHashJoin.h
69 lines (55 loc) · 2.41 KB
/
HashJoin.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#pragma once
#include "../Definitions.h"
#include "../OperatorPipe.h"
#include "../Pipeline.h"
#include "../Schema.h"
#include "Operator.h"
namespace Operator
{
class _HashJoinInputAdapter : public _TransformingOperator
{
public:
_HashJoinInputAdapter(std::shared_ptr<_Datatype> semaphore)
: _semaphore(semaphore)
{}
std::string getGlobalDeclarations() override;
std::string getCode() override;
void setInputSchema(std::shared_ptr<_Schema> _schema) override
{
_inputSchema = _schema;
// Create a copied map with same signature
_outputSchema = Schema(_inputSchema->getOnly()->clone());
}
std::shared_ptr<_Schema> getOutputSchema() override { return _outputSchema; }
using BaseType = _TransformingOperator;
private:
std::shared_ptr<_Schema> _outputSchema;
std::shared_ptr<_Datatype> _semaphore;
};
DECLARE_SHARED_PTR(HashJoinInputAdapter);
class _HashJoin : public _ProducingOperator
{
public:
_HashJoin(std::shared_ptr<_Pipeline> leftPipeline, std::shared_ptr<_Pipeline> rightPipeline)
{
_leftSemaphore = POD("Semaphore", "leftWindowClosedSemaphore");
_rightSemaphore = POD("Semaphore", "rightWindowClosedSemaphore");
_leftPipeline = leftPipeline | HashJoinInputAdapter(_leftSemaphore);
_rightPipeline = rightPipeline | HashJoinInputAdapter(_rightSemaphore);
_keyType = std::static_pointer_cast<_MapType>(_leftPipeline->getPipelineOutputSchema()->getOnly())->getKeyType()->clone();
_leftElementType =
std::static_pointer_cast<_MapType>(_leftPipeline->getPipelineOutputSchema()->getOnly())->getValueType()->clone();
_rightElementType =
std::static_pointer_cast<_MapType>(_rightPipeline->getPipelineOutputSchema()->getOnly())->getValueType()->clone();
};
std::set<std::string> getHeaders() override;
std::string getCode() override;
std::string getGlobalDeclarations() override;
std::shared_ptr<_Schema> getOutputSchema() override;
using BaseType = _ProducingOperator;
private:
std::shared_ptr<_Pipeline> _leftPipeline, _rightPipeline;
std::shared_ptr<_Datatype> _keyType, _leftElementType, _rightElementType, _leftSemaphore, _rightSemaphore;
};
DECLARE_SHARED_PTR(HashJoin);
}