-
Notifications
You must be signed in to change notification settings - Fork 3
/
fast.cu
332 lines (280 loc) · 11 KB
/
fast.cu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
#include <iostream>
#include <iomanip>
#include <fstream>
#include <string>
#include <set>
#include <vector>
#include <chrono>
#include <cstring>
#define MAX_CITY_BYTE 100 // City names are at most 100 bytes.
#define MAX_THREADS_PER_BLOCK 1024
// File split metadata of the end offset, and length, all in bytes.
struct Part {
long long offset; long long length;
};
// Array entry of a given city, keeping track of the temperature statistics.
struct Stat {
char city[MAX_CITY_BYTE];
float min = INFINITY; float max = -INFINITY; float sum = 0;
int count = 0;
Stat() {}
Stat(const std::string& init_city) {
strncpy(city, init_city.c_str(), init_city.size());
city[init_city.size()] = '\0';
}
};
// CUDA's atomicMin/Max only work with ints
__device__ static float atomicMin(float* address, float val) {
int* address_as_i = (int*) address;
int old = *address_as_i, assumed;
do {
assumed = old;
old = ::atomicCAS(address_as_i, assumed,
__float_as_int(::fminf(val, __int_as_float(assumed))));
} while (assumed != old);
return __int_as_float(old);
}
__device__ static float atomicMax(float* address, float val) {
int* address_as_i = (int*) address;
int old = *address_as_i, assumed;
do {
assumed = old;
old = ::atomicCAS(address_as_i, assumed,
__float_as_int(::fmaxf(val, __int_as_float(assumed))));
} while (assumed != old);
return __int_as_float(old);
}
// ChatGPT's working solution.
// Probably could be made more accurate using doubles like the actual strtod.c
__device__ float cuda_atof(char* str) {
float result = 0.0f;
int sign = 1; int decimal = 0; int digits = 0;
if (*str == '-') {
sign = -1;
++str;
}
while (*str >= '0' && *str <= '9') {
result = result * 10.0f + (*str - '0');
++str;
++digits;
}
if (*str == '.') {
++str;
while (*str >= '0' && *str <= '9') {
result = result * 10.0f + (*str - '0');
++str;
++digits;
++decimal;
}
}
result *= sign;
while (decimal > 0) {
result /= 10.0f;
--decimal;
}
return result;
}
// Identical to glibc's strcmp.c
__device__ int cuda_strcmp(const char* p1, const char* p2) {
const unsigned char *s1 = (const unsigned char *) p1;
const unsigned char *s2 = (const unsigned char *) p2;
unsigned char c1, c2;
do {
c1 = (unsigned char) *s1++;
c2 = (unsigned char) *s2++;
if (c1 == '\0')
return c1 - c2;
} while (c1 == c2);
return c1 - c2;
}
// Returns the pre-defined index of a city using good ol' binary search.
__device__ int get_index(char* cities, char* city_target, int n_city) {
int left = 0;
int right = n_city - 1;
while (left <= right) {
int mid = left + (right - left) / 2;
const char* city_query = cities + mid * MAX_CITY_BYTE;
int cmp = cuda_strcmp(city_query, city_target);
if (cmp == 0)
return mid;
else if (cmp < 0)
left = mid + 1;
else
right = mid - 1;
}
return -1;
}
// The CUDA kernel. Each thread operates on a different section of the buffer, and updates the statistics.
__global__ void process_buffer(char* buffer, Part* parts, Stat* stats, char* cities, int n_city, long long buffer_offset, int part_size) {
int tx = threadIdx.x;
int bx = blockIdx.x * blockDim.x + tx;
if (bx >= part_size) // For threads that are not assigned any work, return
return;
int index = 0;
bool parsing_city = true;
char city[MAX_CITY_BYTE];
char floatstr[5]; // longest temperature float str is "-99.9" i.e. 5 bytes
// An ugly way to do string processing in CUDA.
// I could probably use more helper functions here like my own getline.
for (int i = 0; i < parts[bx].length; i++) {
char c = buffer[parts[bx].offset-buffer_offset + i];
if (parsing_city) { // City characters
if (c == ';') {
city[index] = '\0';
index = 0;
parsing_city = false;
} else {
city[index] = c;
index++;
}
} else { // Float characters
if (c == '\n') {
floatstr[index] = '\0';
int stat_index = get_index(cities, city, n_city);
float temp = cuda_atof(floatstr);
// The heart of the CUDA kernel.
// Update (atomically) the temperature statistics.
// Identical in spirit to the C++ baseline.
atomicMin(&stats[stat_index].min, temp);
atomicMax(&stats[stat_index].max, temp);
atomicAdd(&stats[stat_index].sum, temp);
atomicAdd(&stats[stat_index].count, 1);
// reset for next line read
parsing_city = true;
index = 0;
floatstr[0] = '\0'; city[0] = '\0';
} else {
floatstr[index] = c;
index++;
}
}
}
}
// Adapted from https://github.com/benhoyt/go-1brc/blob/master/r8.go#L124
std::vector<Part> split_file(std::string input_path, int num_parts) {
std::ifstream file(input_path, std::ios::binary | std::ios::ate);
std::streamsize size = file.tellg();
file.seekg(0, std::ios::beg);
// Using long long is necessary to avoid overflow of file size.
// e.g. 15B (bytes) for a 1B-row file
long long split_size = size / num_parts;
std::cout << "Total file size: " << size << ", split size: " << split_size << std::endl;
long long offset = 0;
std::vector<Part> parts;
while (offset < size) {
long long seek_offset = std::max(offset + split_size - MAX_CITY_BYTE, 0LL);
if (seek_offset > size) {
parts.back().length += size-offset;
break;
}
file.seekg(seek_offset, std::ios::beg);
char buf[MAX_CITY_BYTE];
file.read(buf, MAX_CITY_BYTE);
std::streamsize n = file.gcount();
std::streamsize newline = -1;
for (int i = n - 1; i >= 0; --i) {
if (buf[i] == '\n') {
newline = i;
break;
}
}
int remaining = n - newline - 1;
long long next_offset = seek_offset + n - remaining;
parts.push_back({offset, next_offset-offset});
offset = next_offset;
}
file.close();
return parts;
}
std::set<std::string> get_cities() {
std::ifstream weather_file("data/weather_stations.csv");
std::string line;
std::set<std::string> all_cities;
while (getline(weather_file, line)) {
std::istringstream iss(line);
if (line[0] == '#')
continue;
std::string station;
std::getline(iss, station, ';');
all_cities.insert(station);
}
weather_file.close();
return all_cities;
}
int main(int argc, char* argv[]) {
if (argc < 4) {
std::cerr << "Usage: " << argv[0] << " <file path> <num parts> <batch size>" << std::endl;
return 1;
}
// Bending the rules of the challenge here.
// I'm assuming a file like data/weather_stations.csv is given.
// This file lists all possible cities that could appear in the input file.
std::set<std::string> all_cities = get_cities();
int n_city = all_cities.size();
Stat* stats = new Stat[n_city];
int index = 0;
char cities[MAX_CITY_BYTE * n_city] = {'\0'};
for (const auto& city : all_cities) {
stats[index] = Stat(city);
strcpy(cities + (index * MAX_CITY_BYTE), city.c_str());
index++;
}
auto start = std::chrono::high_resolution_clock::now();
std::string input_path = argv[1];
int num_parts = atoi(argv[2]); int batch_size = atoi(argv[3]);
std::vector<Part> parts = split_file(input_path, num_parts);
num_parts = parts.size();
std::cout << "Required GPU RAM Size (GB): " << parts[0].length * batch_size / 1'000'000'000.0 << std::endl;
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;
std::cout << "Time taken finding parts: " << elapsed.count() << " seconds" << std::endl;
start = std::chrono::high_resolution_clock::now();
Stat* d_stats; // Array of temperature statistics. Each entry corresponds to a different city.
cudaMalloc(&d_stats, n_city * sizeof(Stat));
cudaMemcpy(d_stats, stats, n_city * sizeof(Stat), cudaMemcpyHostToDevice);
char* d_buffer; // Holds a subset of the raw text char buffer.
cudaMalloc((void**) &d_buffer, 10'000'000'000 * sizeof(char));
Part* d_parts; // File splits s.t. each thread can work on a different split.
cudaMalloc(&d_parts, parts.size() * sizeof(Part));
char* d_cities; // List of all cities for city -> index lookup.
cudaMalloc(&d_cities, MAX_CITY_BYTE * n_city * sizeof(char));
cudaMemcpy(d_cities, cities, MAX_CITY_BYTE * n_city * sizeof(char), cudaMemcpyHostToDevice);
// Launch CUDA kernels that processes different splits of the file.
// Does it in sequential batches, if GPU RAM is limited.
std::ifstream file(input_path, std::ios::binary);
for (int b = 0; b < num_parts; b += batch_size) {
long long batch_file_size = 0;
for (int bi = b; bi < std::min(b + batch_size, num_parts); bi++)
batch_file_size += parts[bi].length;
file.seekg(parts[b].offset, std::ios::beg);
char* buffer = new char[batch_file_size];
file.read(buffer, batch_file_size);
cudaMemcpy(d_buffer, buffer, batch_file_size * sizeof(char), cudaMemcpyHostToDevice);
int part_size = batch_size;
if (b + batch_size > num_parts)
part_size = num_parts - b;
cudaMemcpy(d_parts, parts.data() + b, part_size * sizeof(Part), cudaMemcpyHostToDevice);
int grid_blocks = std::ceil((float) part_size / MAX_THREADS_PER_BLOCK);
process_buffer<<<grid_blocks, MAX_THREADS_PER_BLOCK>>>(d_buffer, d_parts, d_stats, d_cities, n_city, parts[b].offset, part_size);
cudaError_t error = cudaGetLastError();
if (error != cudaSuccess)
std::cerr << "Error: " << cudaGetErrorString(error) << std::endl;
delete[] buffer;
}
cudaDeviceSynchronize(); // for accurate profiling (cuda calls are async)
end = std::chrono::high_resolution_clock::now();
elapsed = end - start;
std::cout << "Time taken in cuda kernel: " << elapsed.count() << " seconds" << std::endl;
// Write out the results, and complete the challenge.
cudaMemcpy(stats, d_stats, n_city * sizeof(Stat), cudaMemcpyDeviceToHost);
std::ofstream measurements("cuda_measurements.out");
for (int i = 0; i < n_city; i++) {
if (stats[i].count != 0) {
float mean = stats[i].sum / stats[i].count;
measurements << stats[i].city << "=" << stats[i].min << "/";
measurements << std::fixed << std::setprecision(1) << mean << "/";
measurements << stats[i].max << std::endl;
}
}
return 0;
}