-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
112 lines (94 loc) · 2.41 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package main
import (
"bufio"
"bytes"
"context"
"fmt"
"io/ioutil"
"log"
"os"
"runtime"
"strings"
"text/template"
"time"
"github.com/psaia/kcomb"
)
// Excuse the lack of error handling. The purpose of this is to make a quick demo to better
// demonstrate the library's stream API and efficiency.
const missionStatement = "the {{.Fruit}} fell in love with the {{.Vegetable}}."
func main() {
col1Data, _ := ioutil.ReadFile("./col-1.txt")
col2Data, _ := ioutil.ReadFile("./col-2.txt")
col1DataSlice := strings.Split(string(col1Data), "\n")
col2DataSlice := strings.Split(string(col2Data), "\n")
// Trim the empty string on the end.
col1DataSlice = col1DataSlice[0 : len(col1DataSlice)-1]
col2DataSlice = col2DataSlice[0 : len(col2DataSlice)-1]
var col1, col2 []kcomb.Datum
for _, item := range col1DataSlice {
col1 = append(col1, kcomb.Datum{Value: item})
}
for _, item := range col2DataSlice {
col2 = append(col2, kcomb.Datum{Value: item})
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
fmt.Println("canceling stream!")
cancel()
}
}()
stream := compileTpl(ctx.Done(), kcomb.CombineGenerator(ctx.Done(), []kcomb.Set{col1, col2}))
i := 0
for v := range stream {
log.Println(v)
if i%1000 == 0 {
printMemUsage()
}
time.Sleep(time.Millisecond) // For effect.
i++
}
}
// A simple data generator for compiling a template in the stream.
func compileTpl(
done <-chan struct{},
valueStream <-chan kcomb.Set,
) <-chan string {
stream := make(chan string)
tmpl, _ := template.New("str").Parse(missionStatement)
go func() {
defer close(stream)
for v := range valueStream {
data := struct {
Fruit string
Vegetable string
}{
Fruit: v[0].Value.(string),
Vegetable: v[1].Value.(string),
}
var tpl bytes.Buffer
if err := tmpl.Execute(&tpl, data); err != nil {
panic(err)
}
select {
case <-done:
return
case stream <- tpl.String():
}
}
}()
return stream
}
func printMemUsage() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
// For info on each, see: https://golang.org/pkg/runtime/#MemStats
fmt.Printf("-------> Alloc = %v MiB", bToMb(m.Alloc))
fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
fmt.Printf("\tNumGC = %v\n", m.NumGC)
}
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}