-
Notifications
You must be signed in to change notification settings - Fork 0
/
outputstream.go
128 lines (101 loc) · 2.35 KB
/
outputstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package fwk
import (
"reflect"
)
// OutputStream implements a task writing data to an OutputStreamer.
//
// OutputStream is concurrent-safe.
//
// OutputStream declares a property 'Ports', a []fwk.Port, which will
// be used to declare the input ports the task will access to,
// writing out data via the underlying OutputStreamer.
//
// OutputStream declares a property 'Streamer', a fwk.OutputStreamer,
// which will be used to actually write data to.
type OutputStream struct {
TaskBase
streamer OutputStreamer
ctrl StreamControl
}
// Configure declares the input ports defined by the 'Ports' property.
func (tsk *OutputStream) Configure(ctx Context) error {
var err error
for _, port := range tsk.ctrl.Ports {
err = tsk.DeclInPort(port.Name, port.Type)
if err != nil {
return err
}
}
return err
}
// StartTask starts the OutputStreamer task
func (tsk *OutputStream) StartTask(ctx Context) error {
var err error
return err
}
// StopTask stops the OutputStreamer task
func (tsk *OutputStream) StopTask(ctx Context) error {
var err error
return err
}
func (tsk *OutputStream) connect(ctrl StreamControl) error {
ctrl.Ports = make([]Port, len(tsk.ctrl.Ports))
copy(ctrl.Ports, tsk.ctrl.Ports)
tsk.ctrl = ctrl
err := tsk.streamer.Connect(ctrl.Ports)
if err != nil {
return err
}
go tsk.write()
return err
}
func (tsk *OutputStream) disconnect() error {
select {
case tsk.ctrl.Quit <- struct{}{}:
default:
}
return tsk.streamer.Disconnect()
}
func (tsk *OutputStream) write() {
for {
select {
case ctx := <-tsk.ctrl.Ctx:
tsk.ctrl.Err <- tsk.streamer.Write(ctx)
case <-tsk.ctrl.Quit:
return
}
}
}
// Process gets data from the store and
// writes it out via the underlying OutputStreamer
func (tsk *OutputStream) Process(ctx Context) error {
var err error
tsk.ctrl.Ctx <- ctx
err = <-tsk.ctrl.Err
if err != nil {
return err
}
return err
}
func newOutputStream(typ, name string, mgr App) (Component, error) {
var err error
tsk := &OutputStream{
TaskBase: NewTask(typ, name, mgr),
streamer: nil,
ctrl: StreamControl{
Ports: make([]Port, 0),
},
}
err = tsk.DeclProp("Ports", &tsk.ctrl.Ports)
if err != nil {
return nil, err
}
err = tsk.DeclProp("Streamer", &tsk.streamer)
if err != nil {
return nil, err
}
return tsk, err
}
func init() {
Register(reflect.TypeOf(OutputStream{}), newOutputStream)
}