Skip to content

Commit

Permalink
universe: use proof log db table in federation envoy
Browse files Browse the repository at this point in the history
This commit adds a flag to the federation universe push request to
indicate that the proof leaf sync attempt should be logged and actively
managed to ensure that the federation push procedure is repeated in the
event of a failure.
  • Loading branch information
ffranr committed Nov 27, 2023
1 parent 04e4ee7 commit 86a75d3
Showing 1 changed file with 283 additions and 33 deletions.
316 changes: 283 additions & 33 deletions universe/auto_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ type FederationPushReq struct {
// federation proof push was successful.
resp chan *Proof

// proofSyncLogged is a boolean that indicates, if true, that the
// proof leaf sync attempt should be logged and actively managed to
// ensure that the federation push procedure is repeated in the event of
// a failure.
proofSyncLogged bool

err chan error
}

Expand Down Expand Up @@ -227,55 +233,178 @@ func (f *FederationEnvoy) syncServerState(ctx context.Context,
return nil
}

// pushProofToFederation attempts to push out a new proof to the current
// federation in parallel.
func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, key LeafKey,
leaf *Leaf) {
func (f *FederationEnvoy) pushProofToServer(ctx context.Context,
uniID Identifier, key LeafKey, leaf *Leaf, addr ServerAddr) error {

// Fetch all universe servers in our federation.
fedServers, err := f.tryFetchServers()
if err != nil || len(fedServers) == 0 {
return
remoteUniverseServer, err := f.cfg.NewRemoteRegistrar(addr)
if err != nil {
return fmt.Errorf("cannot push proof unable to connect "+
"to remote server(%v): %w", addr.HostStr(), err)
}

_, err = remoteUniverseServer.UpsertProofLeaf(
ctx, uniID, key, leaf,
)
if err != nil {
return fmt.Errorf("cannot push proof to remote "+
"server(%v): %w", addr.HostStr(), err)
}

return nil
}

func (f *FederationEnvoy) pushProofToServerLogged(ctx context.Context,
uniID Identifier, key LeafKey, leaf *Leaf, addr ServerAddr) error {

// Ensure that we have a pending sync log entry for this
// leaf and server pair. This will allow us to handle all
// pending syncs in the event of a restart or at a different
// point in the envoy.
_, err := f.cfg.FederationDB.UpsertFederationProofSyncLog(
ctx, uniID, key, addr, SyncDirectionPush,
ProofSyncStatusPending, true,
)
if err != nil {
log.Warnf("unable to log proof sync as pending: %v",
err)
return nil
}

remoteUniverseServer, err := f.cfg.NewRemoteRegistrar(addr)
if err != nil {
log.Warnf("cannot push proof unable to connect "+
"to remote server(%v): %v", addr.HostStr(),
err)
return nil
}

_, err = remoteUniverseServer.UpsertProofLeaf(
ctx, uniID, key, leaf,
)
if err != nil {
log.Warnf("cannot push proof to remote "+
"server(%v): %v", addr.HostStr(), err)
return nil
}

// We did not encounter an error in our proof push
// attempt. Log the proof sync attempt as complete.
_, err = f.cfg.FederationDB.UpsertFederationProofSyncLog(
ctx, uniID, key, addr, SyncDirectionPush,
ProofSyncStatusComplete, false,
)
if err != nil {
log.Warnf("unable to log proof sync attempt: %v",
err)
return nil
}

return nil
}

// pushProofToFederation attempts to push out a new proof to the current
// federation in parallel.
func (f *FederationEnvoy) pushProofToFederation(ctx context.Context,
uniID Identifier, key LeafKey, leaf *Leaf, fedServers []ServerAddr,
proofSyncLogged bool) {

log.Infof("Pushing new proof to %v federation members, proof_key=%v",
len(fedServers), spew.Sdump(key))

ctx, cancel := f.WithCtxQuitNoTimeout()
defer cancel()

// To push a new proof out, we'll attempt to dial to the remote
// registrar, then will attempt to push the new proof directly to the
// register.
pushNewProof := func(ctx context.Context, addr ServerAddr) error {
remoteUniverseServer, err := f.cfg.NewRemoteRegistrar(addr)
if err != nil {
log.Warnf("cannot push proof unable to connect "+
"to remote server(%v): %v", addr.HostStr(),
err)
// If we are logging proof sync attempts, we will use the
// logged version of the push function.
if proofSyncLogged {
err := f.pushProofToServerLogged(
ctx, uniID, key, leaf, addr,
)
if err != nil {
log.Warnf("cannot push proof via logged "+
"server push: %v", err)
return nil
}

return nil
}

_, err = remoteUniverseServer.UpsertProofLeaf(
ctx, uniID, key, leaf,
)
// If we are not logging proof sync attempts, we will use the
// non-logged version of the push function.
err := f.pushProofToServer(ctx, uniID, key, leaf, addr)
if err != nil {
log.Warnf("cannot push proof to remote "+
"server(%v): %v", addr.HostStr(), err)
log.Warnf("cannot push proof: %v", err)
return nil
}

return nil
}

// To conclude, we'll attempt to push the new proof to all the universe
// servers in parallel.
err = fn.ParSlice(ctx, fedServers, pushNewProof)
err := fn.ParSlice(ctx, fedServers, pushNewProof)
if err != nil {
// TODO(roasbeef): retry in the background until successful?
log.Errorf("unable to push proof to federation: %v", err)
return
}
}

// filterProofSyncPending filters out servers that have already been synced
// with for the given leaf.
func (f *FederationEnvoy) filterProofSyncPending(fedServers []ServerAddr,
uniID Identifier, key LeafKey) ([]ServerAddr, error) {

// If there are no servers to filter, then we'll return early. This
// saves from querying the database unnecessarily.
if len(fedServers) == 0 {
return nil, nil
}

ctx, cancel := f.WithCtxQuit()
defer cancel()

// Select all sync push complete log entries for the given universe
// leaf. If there are any servers which are sync complete within this
// log set, we will filter them out of our target server set.
logs, err := f.cfg.FederationDB.QueryFederationProofSyncLog(
ctx, uniID, key, SyncDirectionPush,
ProofSyncStatusComplete,
)
if err != nil {
return nil, fmt.Errorf("unable to query federation sync log: %w",
err)
}

// Filter out servers that we've already pushed to.
filteredFedServers := fn.Filter(fedServers, func(a ServerAddr) bool {
for idx := range logs {
log := logs[idx]

// Filter out servers that have a log entry with sync
// status complete. We've already specified that we
// want sync complete logs only by this point, but we'll
// explicitly check here to make our logic clear.
if log.ServerAddr.HostStr() == a.HostStr() &&
log.SyncStatus == ProofSyncStatusComplete {

return false
}

// TODO(ffranr): Add timestamp check to filter out
// servers that we've pushed to recently.
}

// By this point we haven't found logs corresponding to the
// given server, we will therefore return true and include the
// server as a sync target for the given leaf.
return true
})

return filteredFedServers, nil
}

// syncer is the main goroutine that's responsible for interacting with the
// federation envoy. It also accepts incoming requests to push out new updates
// to the federation.
Expand All @@ -289,6 +418,16 @@ func (f *FederationEnvoy) syncer() {
syncTicker := time.NewTicker(f.cfg.SyncInterval)
defer syncTicker.Stop()

// We'll use a timeout that's slightly less than the sync interval to
// help avoid ticking into a new sync event before the previous event
// has finished.
syncContextTimeout := f.cfg.SyncInterval - 1*time.Second
if syncContextTimeout < 0 {
// If the sync interval is less than a second, then we'll use
// the sync interval as the timeout.
syncContextTimeout = f.cfg.SyncInterval
}

for {
select {
// A new sync event has just been triggered, so we'll attempt
Expand All @@ -313,6 +452,51 @@ func (f *FederationEnvoy) syncer() {
continue
}

// After we've synced with the federation, we'll
// attempt to push out any pending proofs that we
// haven't yet completed.
ctxFetchLog, cancelFetchLog := f.WithCtxQuitNoTimeout()
syncDirection := SyncDirectionPush
logEntries, err :=
f.cfg.FederationDB.FetchPendingProofsSyncLog(
ctxFetchLog, &syncDirection,
)
if err != nil {
log.Warnf("unable to query pending push "+
"sync log: %w", err)
continue
}
cancelFetchLog()

// TODO(ffranr): Take account of any new servers that
// have been added since the last time we populated the
// log for a given proof leaf. Pending proof sync log
// entries are only relevant for the set of servers
// that existed at the time the log entry was created.
// If a new server is added, then we should create a
// new log entry for the new server.

for idx := range logEntries {
entry := logEntries[idx]

go func() {
ctx, cancel := f.CtxBlockingCustomTimeout(
syncContextTimeout,
)
defer cancel()

servers := []ServerAddr{
entry.ServerAddr,
}

f.pushProofToFederation(
ctx, entry.UniID, entry.LeafKey,
&entry.Leaf, servers,
true,
)
}()
}

// A new push request has just arrived. We'll perform a
// asynchronous registration with the local Universe registrar,
// then push it out in an async manner to the federation
Expand Down Expand Up @@ -341,11 +525,50 @@ func (f *FederationEnvoy) syncer() {
// proof out to the federation in the background.
pushReq.resp <- newProof

// With the response sent above, we'll push this out to
// all the Universe servers in the background.
go f.pushProofToFederation(
pushReq.ID, pushReq.Key, pushReq.Leaf,
)
// Fetch all universe servers in our federation.
fedServers, err := f.tryFetchServers()
if err != nil {
err := fmt.Errorf("unable to fetch federation servers: %w", err)
log.Warnf(err.Error())
pushReq.err <- err
continue
}

if len(fedServers) == 0 {
log.Warnf("could not find any federation " +
"servers")
continue
}

if pushReq.proofSyncLogged {
// We are attempting to sync using the
// logged proof sync procedure. We will
// therefore narrow down the set of target
// servers based on the sync log. Only servers
// that are not yet push sync complete will be
// targeted.
fedServers, err = f.filterProofSyncPending(
fedServers, pushReq.ID, pushReq.Key,
)
if err != nil {
log.Warnf("failed to filter " +
"federation servers")
continue
}
}

// With the response sent above, we'll push this
// out to all the Universe servers in the
// background.
go func() {
ctx, cancel := f.WithCtxQuitNoTimeout()
defer cancel()
f.pushProofToFederation(
ctx, pushReq.ID, pushReq.Key,
pushReq.Leaf, fedServers,
pushReq.proofSyncLogged,
)
}()

case pushReq := <-f.batchPushRequests:
ctx, cancel := f.WithCtxQuitNoTimeout()
Expand All @@ -370,13 +593,34 @@ func (f *FederationEnvoy) syncer() {
// we'll return back to the caller.
pushReq.resp <- struct{}{}

// Fetch all universe servers in our federation.
fedServers, err := f.tryFetchServers()
if err != nil {
err := fmt.Errorf("unable to fetch "+
"federation servers: %w", err)
log.Warnf(err.Error())
pushReq.err <- err
continue
}

if len(fedServers) == 0 {
log.Warnf("could not find any federation " +
"servers")
continue
}

// With the response sent above, we'll push this out to
// all the Universe servers in the background.
go func() {
ctxPush, cancelPush := f.WithCtxQuitNoTimeout()
defer cancelPush()

for idx := range pushReq.Batch {
item := pushReq.Batch[idx]

f.pushProofToFederation(
item.ID, item.Key, item.Leaf,
ctxPush, item.ID, item.Key,
item.Leaf, fedServers, false,
)
}
}()
Expand All @@ -395,12 +639,18 @@ func (f *FederationEnvoy) syncer() {
func (f *FederationEnvoy) UpsertProofLeaf(_ context.Context, id Identifier,
key LeafKey, leaf *Leaf) (*Proof, error) {

// If we're attempting to push an issuance proof, then we'll ensure
// that we track the sync attempt to ensure that we retry in the event
// of a failure.
logProofSync := id.ProofType == ProofTypeIssuance

pushReq := &FederationPushReq{
ID: id,
Key: key,
Leaf: leaf,
resp: make(chan *Proof, 1),
err: make(chan error, 1),
ID: id,
Key: key,
Leaf: leaf,
proofSyncLogged: logProofSync,
resp: make(chan *Proof, 1),
err: make(chan error, 1),
}

if !fn.SendOrQuit(f.pushRequests, pushReq, f.Quit) {
Expand Down

0 comments on commit 86a75d3

Please sign in to comment.