-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDataFlow.cs
73 lines (60 loc) · 3.15 KB
/
DataFlow.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
using System.Threading.Tasks.Dataflow;
namespace DataFlowVsTasks;
public static class DataFlow {
public static void PostSync(int parallelDegree, bool writeToDisk) {
// Options
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
var execOptions = new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = parallelDegree};
// Blocks
var readFiles = new TransformBlock<string, (string, string[])>(Actions.ReadFile, execOptions);
var searchFiles = new TransformBlock<(string, string[]), (string, int)>(Actions.SearchFile, execOptions);
var enrich = new TransformBlock<(string, int), (string, string)>(Actions.EnrichFileCount, execOptions);
var writeResults = new TransformBlock<(string, string), string>(writeToDisk ? Actions.WriteFile : Actions.DontWriteToDisk, execOptions);
// Pipeline
readFiles.LinkTo(searchFiles, linkOptions);
searchFiles.LinkTo(enrich, linkOptions);
enrich.LinkTo(writeResults, linkOptions);
writeResults.LinkTo(DataflowBlock.NullTarget<string>(), linkOptions);
var files = Directory.EnumerateFiles(Data.FilesDir, "*").ToArray();
foreach (var file in files) {
readFiles.Post(file);
}
readFiles.Complete();
writeResults.Completion.Wait();
var results = new List<string>();
while (writeResults.TryReceive(out var result)) {
results.Add(result);
}
// Do something with the results ...
}
public static void PostAsync(int parallelDegree, bool writeToDisk) {
// Options
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
var execOptions = new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = parallelDegree};
// Blocks
var readFiles = new TransformBlock<string, (string, string[])>(Actions.ReadFile, execOptions);
var searchFiles = new TransformBlock<(string, string[]), (string, int)>(Actions.SearchFile, execOptions);
var enrich = new TransformBlock<(string, int), (string, string)>(Actions.EnrichFileCount, execOptions);
var writeResults = new TransformBlock<(string, string), string>(writeToDisk ? Actions.WriteFile : Actions.DontWriteToDisk, execOptions);
// Pipeline
readFiles.LinkTo(searchFiles, linkOptions);
searchFiles.LinkTo(enrich, linkOptions);
enrich.LinkTo(writeResults, linkOptions);
writeResults.LinkTo(DataflowBlock.NullTarget<string>(), linkOptions);
// Input
var files = Directory.EnumerateFiles(Data.FilesDir, "*").ToArray();
var tasks = new Task[files.Length + 1];
for (var i = 0; i < files.Length; i++) {
var file = files[i];
tasks[i] = readFiles.SendAsync(file);
}
readFiles.Complete();
tasks[files.Length] = writeResults.Completion;
Task.WaitAll(tasks);
var results = new List<string>();
while (writeResults.TryReceive(out var result)) {
results.Add(result);
}
// Do something with the results ...
}
}