diff --git a/client/virtual_channel.go b/client/virtual_channel.go index e2ae9f88a..6b16e2e4b 100644 --- a/client/virtual_channel.go +++ b/client/virtual_channel.go @@ -100,6 +100,52 @@ func (c *Client) handleVirtualChannelFundingProposal( c.acceptProposal(responder) } +func (c *Channel) watchVirtual() error { + log := c.Log().WithField("proc", fmt.Sprintf("virtual channel watcher %v", c.ID())) + defer log.Info("Watcher returned.") + + // Subscribe to state changes + ctx := c.Ctx() + sub, err := c.adjudicator.Subscribe(ctx, c.Params()) + if err != nil { + return errors.WithMessage(err, "subscribing to adjudicator state changes") + } + defer func() { + err := sub.Close() + if err != nil { + log.Warn(err) + } + }() + + // Wait for state changed event + for e := sub.Next(); e != nil; e = sub.Next() { + log.Debugf("event %v", e) + + // Update channel + switch e := e.(type) { + case *channel.RegisteredEvent: + if e.Version() > c.State().Version { + err := c.client.updateVirtualChannelState(ctx, e.State, e.Sigs) + if err != nil { + log.Warnf("error updating virtual channel: %v", err) + } + } + + case *channel.ProgressedEvent: + log.Infof("Virtual channel progressed: %v", e.ID()) + + case *channel.ConcludedEvent: + log.Infof("Virtual channel concluded: %v", e.ID()) + + default: + log.Errorf("unsupported type: %T", e) + } + } + + err = sub.Err() + log.Debugf("Subscription closed: %v", err) + return errors.WithMessage(err, "subscription closed") +} type dummyAccount struct { address wallet.Address @@ -306,6 +352,12 @@ func (c *Client) matchFundingProposal(a, b interface{}) (ok bool) { return false } c.channels.Put(virtual.ID(), virtual) + + // Watch. + go func() { + err := virtual.watchVirtual() + c.log.Debugf("channel %v: watcher stopped: %v", virtual.ID(), err) + }() return }