-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathBigQ.h
96 lines (66 loc) · 2.39 KB
/
BigQ.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#ifndef BIGQ_H
#define BIGQ_H
#include <pthread.h>
#include <iostream>
#include <queue>
#include "Pipe.h"
#include "File.h"
#include "Record.h"
using namespace std;
struct WorkerThreadData {
Pipe *inputPipe;
Pipe *outputPipe;
OrderMaker *sortOrder;
int runLength;
File bigQFile;
char bigQFileName[100];
int numberOfRuns;
Page *currentRunPages;
int currentRunPageNumber;
bool overflowIsThere;
};
struct PriorityQueueItem {
Page *page;
Record *head;
int currentPageNumber;
int maxPageNumberOfCurrentRun;
};
struct RecordComparator {
OrderMaker *sortOrder;
RecordComparator(OrderMaker *sortorder) {
this->sortOrder = sortorder;
}
bool operator()(Record *lhs, Record *rhs) {
ComparisonEngine cmp;
return cmp.Compare(lhs, rhs, this->sortOrder) > 0;
}
bool operator()(const PriorityQueueItem &lhs, const PriorityQueueItem &rhs) {
ComparisonEngine cmp;
return cmp.Compare(lhs.head, rhs.head, this->sortOrder) > 0;
}
};
void *TPMMS(void *threadData);
void InitializeWorkerThreadData(WorkerThreadData *threadData);
void RunGeneration(WorkerThreadData *threadData);
int AddRecordToCurrentRun(WorkerThreadData *threadData, Record *nextRecord);
void CreateRun(WorkerThreadData *workerThreadData);
void SortAndStoreCurrentRun(WorkerThreadData *workerThreadData);
void LoadCurrentRunPriorityQueue(WorkerThreadData *workerThreadData,
priority_queue<Record *, vector<Record *>, RecordComparator> &pq);
void WritePriorityQueueContentToBigQFile(WorkerThreadData *workerThreadData,
priority_queue<Record *, vector<Record *>, RecordComparator> &pq);
void ClearCurrentRun(WorkerThreadData *workerThreadData);
void MergeRuns(WorkerThreadData *workerThreadData);
void LoadMergeRunPriorityQueue(WorkerThreadData *workerThreadData,
priority_queue<PriorityQueueItem, vector<PriorityQueueItem>, RecordComparator> &pq);
void LoadOutputPipeWithPriorityQueueData(WorkerThreadData *workerThreadData,
priority_queue<PriorityQueueItem, vector<PriorityQueueItem>, RecordComparator> &pq);
void CleanUp(WorkerThreadData *workerThreadData);
class BigQ {
private:
pthread_t workerThread;
public:
BigQ(Pipe &in, Pipe &out, OrderMaker &sortorder, int runlen);
~BigQ();
};
#endif