-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathwatcher.go
239 lines (195 loc) · 6.7 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package microstellar
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/stellar/go/clients/horizon"
)
// Watcher is an abstract watcher struct.
type Watcher struct {
// Call Done to stop watching the ledger. This closes Ch.
Done func()
// This is set if the stream terminates unexpectedly. Safe to check
// after Ch is closed.
Err *error
}
// Ledger represents an entry in the ledger. You can subscribe a continuous stream of ledger
// updates on the Stellar network via the WatchLedgers call.
type Ledger horizon.Ledger
// LedgerWatcher is returned by WatchLedgers, which watches the stellar network for ledger updates.
type LedgerWatcher struct {
Watcher
// Ch gets a *Ledger everytime there's a new entry.
Ch chan *Ledger
}
// WatchLedgers watches the the stellar network for entries and streams them to LedgerWatcher.Ch. Use
// Options.WithContext to set a context.Context, and Options.WithCursor to set a cursor.
func (ms *MicroStellar) WatchLedgers(options ...*Options) (*LedgerWatcher, error) {
var streamError error
w := &LedgerWatcher{
Ch: make(chan *Ledger),
Watcher: Watcher{Err: &streamError, Done: func() {}},
}
watcherFunc := func(params streamParams) {
if params.tx.fake {
w.Ch <- &Ledger{ID: "fake", TotalCoins: "0"}
return
}
err := params.tx.GetClient().StreamLedgers(params.ctx, params.cursor, func(ledger horizon.Ledger) {
debugf("WatchLedger", "entry (%s) total_coins: %s, tx_count: %v, op_count: %v", ledger.ID, ledger.TotalCoins, ledger.TransactionCount, ledger.OperationCount)
l := Ledger(ledger)
w.Ch <- &l
})
if err != nil {
debugf("WatchLedger", "stream unexpectedly disconnected", err)
*w.Err = errors.Wrapf(err, "stream disconnected")
w.Done()
}
close(w.Ch)
}
cancelFunc, err := ms.watch("ledger", "", watcherFunc, options...)
w.Done = cancelFunc
return w, err
}
// Transaction represents a finalized transaction in the ledger. You can subscribe to transactions
// on the stellar network via the WatchTransactions call.
type Transaction horizon.Transaction
// TransactionWatcher is returned by WatchTransactions, which watches the ledger for transactions
// to and from an address.
type TransactionWatcher struct {
Watcher
// Ch gets a *Transaction everytime there's a new entry in the ledger.
Ch chan *Transaction
}
// WatchTransactions watches the ledger for transactions to and from address and streams them on a channel . Use
// Options.WithContext to set a context.Context, and Options.WithCursor to set a cursor.
func (ms *MicroStellar) WatchTransactions(address string, options ...*Options) (*TransactionWatcher, error) {
var streamError error
w := &TransactionWatcher{
Ch: make(chan *Transaction),
Watcher: Watcher{Err: &streamError, Done: func() {}},
}
watcherFunc := func(params streamParams) {
if params.tx.fake {
w.Ch <- &Transaction{Account: "FAKE"}
return
}
err := params.tx.GetClient().StreamTransactions(params.ctx, params.address, params.cursor, func(transaction horizon.Transaction) {
debugf("WatchTransaction", "found transaction (%s) on %s", transaction.ID, transaction.Account)
t := Transaction(transaction)
w.Ch <- &t
})
if err != nil {
debugf("WatchTransaction", "stream unexpectedly disconnected", err)
*w.Err = errors.Wrapf(err, "stream disconnected")
w.Done()
}
close(w.Ch)
}
cancelFunc, err := ms.watch("transaction", address, watcherFunc, options...)
w.Done = cancelFunc
return w, err
}
// Payment represents a finalized payment in the ledger. You can subscribe to payments
// on the stellar network via the WatchPayments call.
type Payment horizon.Payment
// PaymentWatcher is returned by WatchPayments, which watches the ledger for payments
// to and from an address.
type PaymentWatcher struct {
Watcher
// Ch gets a *Payment everytime there's a new entry in the ledger.
Ch chan *Payment
}
// WatchPayments watches the ledger for payments to and from address and streams them on a channel . Use
// Options.WithContext to set a context.Context, and Options.WithCursor to set a cursor.
func (ms *MicroStellar) WatchPayments(address string, options ...*Options) (*PaymentWatcher, error) {
var streamError error
w := &PaymentWatcher{
Ch: make(chan *Payment),
Watcher: Watcher{Err: &streamError, Done: func() {}},
}
watcherFunc := func(params streamParams) {
if params.tx.fake {
w.Ch <- &Payment{Type: "fake"}
return
}
err := params.tx.GetClient().StreamPayments(params.ctx, params.address, params.cursor, func(payment horizon.Payment) {
debugf("WatchPayments", "found payment (%s) at %s, loading memo", payment.Type, address)
params.tx.GetClient().LoadMemo(&payment)
p := Payment(payment)
w.Ch <- &p
})
if err != nil {
debugf("WatchPayment", "stream unexpectedly disconnected", err)
*w.Err = errors.Wrapf(err, "stream disconnected")
w.Done()
}
close(w.Ch)
}
cancelFunc, err := ms.watch("payment", address, watcherFunc, options...)
w.Done = cancelFunc
return w, err
}
// streamParams is sent to streamFunc with the parameters for a horizon stream.
type streamParams struct {
ctx context.Context
tx *Tx
cursor *horizon.Cursor
address string
cancelFunc func()
err *error
}
// streamFunc starts a horizon stream with the specified parameters.
type streamFunc func(streamParams)
// watch is a helper method to work with the Horizon Stream* methods. Returns a cancelFunc and error.
func (ms *MicroStellar) watch(entity string, address string, streamer streamFunc, options ...*Options) (func(), error) {
logField := fmt.Sprintf("watch:%s", entity)
debugf(logField, "watching address: %s", address)
if err := ValidAddress(address); address != "" && err != nil {
return nil, ms.errorf("can't watch %s, invalid address: %s", entity, address)
}
tx := ms.getTx()
var cursor *horizon.Cursor
var ctx context.Context
var cancelFunc func()
if len(options) > 0 {
tx.SetOptions(options[0])
if options[0].hasCursor {
// Ugh! Why do I have to do this?
c := horizon.Cursor(options[0].cursor)
cursor = &c
debugf(logField, "starting stream at cursor: %s", string(*cursor))
}
ctx = options[0].ctx
}
if ctx == nil {
ctx, cancelFunc = context.WithCancel(context.Background())
} else {
ctx, cancelFunc = context.WithCancel(ctx)
}
go func() {
if tx.fake {
out:
for {
select {
case <-ctx.Done():
break out
default:
// continue
}
streamer(streamParams{ctx: ctx, tx: tx, cursor: cursor, address: address, cancelFunc: cancelFunc})
time.Sleep(200 * time.Millisecond)
}
} else {
streamer(streamParams{
ctx: ctx,
tx: tx,
cursor: cursor,
address: address,
cancelFunc: cancelFunc,
})
}
}()
return cancelFunc, ms.success()
}