Skip to content

Commit

Permalink
Merge pull request #17 from bgp/feature/separate-slurm-and-cache-update
Browse files Browse the repository at this point in the history
Refactor to update state when cache or SLURM changes
  • Loading branch information
randomthingsandstuff authored Aug 16, 2021
2 parents 74bd75c + d11bc15 commit 4fbaebe
Showing 1 changed file with 74 additions and 56 deletions.
130 changes: 74 additions & 56 deletions cmd/stayrtr/stayrtr.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,41 +248,12 @@ func (e IdenticalFile) Error() string {
return fmt.Sprintf("File %s is identical to the previous version", e.File)
}

func (s *state) updateFile(file string) error {
// Update the state based on the current slurm file and data.
func (s *state) updateFromNewState() error {
sessid, _ := s.server.GetSessionId(nil)

log.Debugf("Refreshing cache from %s", file)

s.lastts = time.Now().UTC()
data, code, lastrefresh, err := s.fetchConfig.FetchFile(file)
if err != nil {
return err
}
if lastrefresh {
LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9))
}
if code != -1 {
RefreshStatusCode.WithLabelValues(file, fmt.Sprintf("%d", code)).Inc()
}

hsum, _ := checkFile(data)
if s.lasthash != nil {
cres := bytes.Compare(s.lasthash, hsum)
if cres == 0 {
return IdenticalFile{File: file}
}
}

s.lastchange = time.Now().UTC()
s.lastdata = data

vrplistjson, err := decodeJSON(s.lastdata)
if err != nil {
return err
}

if s.checktime {
buildtime, err := time.Parse(time.RFC3339, vrplistjson.Metadata.Buildtime)
buildtime, err := time.Parse(time.RFC3339, s.lastdata.Metadata.Buildtime)
if err != nil {
return err
}
Expand All @@ -292,7 +263,7 @@ func (s *state) updateFile(file string) error {
}
}

vrpsjson := vrplistjson.Data
vrpsjson := s.lastdata.Data
if s.slurm != nil {
kept, removed := s.slurm.FilterOnVRPs(vrpsjson)
asserted := s.slurm.AssertVRPs()
Expand All @@ -301,13 +272,9 @@ func (s *state) updateFile(file string) error {
}

vrps, count, countv4, countv6 := processData(vrpsjson)
if err != nil {
return err
}

log.Infof("New update (%v uniques, %v total prefixes). %v bytes. Updating sha256 hash %x -> %x",
len(vrps), count, len(s.lastconverted), s.lasthash, hsum)
s.lasthash = hsum
log.Infof("New update (%v uniques, %v total prefixes).",
len(vrps), count)

s.server.AddVRPs(vrps)

Expand All @@ -322,7 +289,7 @@ func (s *state) updateFile(file string) error {
s.exported = prefixfile.VRPList{
Metadata: prefixfile.MetaData{
Counts: len(vrpsjson),
Buildtime: vrplistjson.Metadata.Buildtime,
Buildtime: s.lastdata.Metadata.Buildtime,
},
Data: vrpsjson,
}
Expand All @@ -339,16 +306,54 @@ func (s *state) updateFile(file string) error {
countv6_dup++
}
}
s.metricsEvent.UpdateMetrics(countv4, countv6, countv4_dup, countv6_dup, s.lastchange, s.lastts, file)
s.metricsEvent.UpdateMetrics(countv4, countv6, countv4_dup, countv6_dup, s.lastchange, s.lastts, *CacheBin)
}

return nil
}

func (s *state) updateSlurm(file string) error {
func (s *state) updateFile(file string) (bool, error) {
log.Debugf("Refreshing cache from %s", file)

s.lastts = time.Now().UTC()
data, code, lastrefresh, err := s.fetchConfig.FetchFile(file)
if err != nil {
return false, err
}
if lastrefresh {
LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9))
}
if code != -1 {
RefreshStatusCode.WithLabelValues(file, fmt.Sprintf("%d", code)).Inc()
}

hsum, _ := checkFile(data)
if s.lasthash != nil {
cres := bytes.Compare(s.lasthash, hsum)
if cres == 0 {
return false, IdenticalFile{File: file}
}
}

log.Infof("new cache file: Updating sha256 hash %x -> %x", s.lasthash, hsum)

vrplistjson, err := decodeJSON(data)
if err != nil {
return false, err
}

s.lasthash = hsum
s.lastchange = time.Now().UTC()
s.lastdata = vrplistjson

return true, nil
}

func (s *state) updateSlurm(file string) (bool, error) {
log.Debugf("Refreshing slurm from %v", file)
data, code, lastrefresh, err := s.fetchConfig.FetchFile(file)
if err != nil {
return err
return false, err
}
if lastrefresh {
LastRefresh.WithLabelValues(file).Set(float64(s.lastts.UnixNano() / 1e9))
Expand All @@ -361,10 +366,10 @@ func (s *state) updateSlurm(file string) error {

slurm, err := prefixfile.DecodeJSONSlurm(buf)
if err != nil {
return err
return false, err
}
s.slurm = slurm
return nil
return true, nil
}

func (s *state) routineUpdate(file string, interval int, slurmFile string) {
Expand All @@ -379,8 +384,10 @@ func (s *state) routineUpdate(file string, interval int, slurmFile string) {
log.Debug("Received HUP signal")
}
delay.Stop()
slurmNotPresentOrUpdated := false
if slurmFile != "" {
err := s.updateSlurm(slurmFile)
var err error
slurmNotPresentOrUpdated, err = s.updateSlurm(slurmFile)
if err != nil {
switch err.(type) {
case utils.HttpNotModified:
Expand All @@ -392,7 +399,7 @@ func (s *state) routineUpdate(file string, interval int, slurmFile string) {
}
}
}
err := s.updateFile(file)
cacheUpdated, err := s.updateFile(file)
if err != nil {
switch err.(type) {
case utils.HttpNotModified:
Expand All @@ -405,6 +412,15 @@ func (s *state) routineUpdate(file string, interval int, slurmFile string) {
log.Errorf("Error updating: %v", err)
}
}

// Only process the first time after there is either a cache or SLURM
// update.
if cacheUpdated || slurmNotPresentOrUpdated {
err := s.updateFromNewState()
if err != nil {
log.Errorf("Error updating from new state: %v", err)
}
}
}
}

Expand All @@ -417,13 +433,12 @@ func (s *state) exporter(wr http.ResponseWriter, r *http.Request) {
}

type state struct {
lastdata []byte
lastconverted []byte
lasthash []byte
lastchange time.Time
lastts time.Time
sendNotifs bool
useSerial int
lastdata *prefixfile.VRPList
lasthash []byte
lastchange time.Time
lastts time.Time
sendNotifs bool
useSerial int

fetchConfig *utils.FetchConfig

Expand Down Expand Up @@ -532,7 +547,7 @@ func main() {
log.Fatalf("Specify at least a bind address")
}

err := s.updateFile(*CacheBin)
_, err := s.updateFile(*CacheBin)
if err != nil {
switch err.(type) {
case utils.HttpNotModified:
Expand All @@ -548,7 +563,7 @@ func main() {

slurmFile := *Slurm
if slurmFile != "" {
err := s.updateSlurm(slurmFile)
_, err := s.updateSlurm(slurmFile)
if err != nil {
switch err.(type) {
case utils.HttpNotModified:
Expand All @@ -564,6 +579,9 @@ func main() {
}
}

// Initial calculation of state (after fetching cache + slurm)
s.updateFromNewState()

if *Bind != "" {
go func() {
sessid, _ := server.GetSessionId(nil)
Expand Down

0 comments on commit 4fbaebe

Please sign in to comment.