diff --git a/config/config.go b/config/config.go index e46086f4..afccb8f3 100644 --- a/config/config.go +++ b/config/config.go @@ -25,12 +25,13 @@ type MonitorConfig struct { // SourceConfig holds the config that defines all data sources to be processed. type SourceConfig struct { - Bucket string `yaml:"bucket"` - Experiment string `yaml:"experiment"` - Datatype string `yaml:"datatype"` - Filter string `yaml:"filter"` - Datasets Datasets `yaml:"target_datasets"` - DailyOnly bool `yaml:"daily_only"` + Bucket string `yaml:"bucket"` + Experiment string `yaml:"experiment"` + Datatype string `yaml:"datatype"` + Filter string `yaml:"filter"` + Datasets Datasets `yaml:"target_datasets"` + DailyOnly bool `yaml:"daily_only"` + FullHistory bool `yaml:"full_history"` // FullHistory = true implies DailyOnly = false. } // Datasets contains the name of BigQuery datasets used for temporary, raw, or diff --git a/job-service/job-service.go b/job-service/job-service.go index b5af6bf4..a83cf51e 100644 --- a/job-service/job-service.go +++ b/job-service/job-service.go @@ -47,6 +47,9 @@ func (svc *Service) NextJob(ctx context.Context) *tracker.JobWithTarget { return svc.ifHasFiles(ctx, jt) } + // Calculate date from 1 year ago. + lastYear := time.Now().UTC().AddDate(-1, 0, 0) + // Since some jobs may be configured as dailyOnly, or have no files for a // given date, we check for these conditions, and skip the job if // appropriate. We try up to histJobs.Len() times to find the next job. @@ -56,6 +59,12 @@ func (svc *Service) NextJob(ctx context.Context) *tracker.JobWithTarget { log.Println(err) continue } + + // Only reprocess data from the last year. + if jt != nil && !jt.FullHistory && jt.Job.Date.Before(lastYear) { + continue + } + return svc.ifHasFiles(ctx, jt) } return nil @@ -78,6 +87,10 @@ func (svc *Service) ifHasFiles(ctx context.Context, jt *tracker.JobWithTarget) * // ErrInvalidStartDate is returned if startDate is time.Time{} var ErrInvalidStartDate = errors.New("invalid start date") +// ErrInvalidDateConfig is returned if both DailyOnly and FullHistory are true for +// a source. Only one of these options can be true. +var ErrInvalidDateConfig = errors.New("invalid source date configuration") + // NewJobService creates the default job service. func NewJobService(startDate time.Time, sources []config.SourceConfig, @@ -94,6 +107,11 @@ func NewJobService(startDate time.Time, histSpecs := make([]tracker.JobWithTarget, 0) for _, s := range sources { log.Println(s) + + if s.DailyOnly && s.FullHistory { + return nil, ErrInvalidDateConfig + } + job := tracker.Job{ Bucket: s.Bucket, Experiment: s.Experiment, @@ -108,8 +126,9 @@ func NewJobService(startDate time.Time, // TODO - handle gs:// targets jt := tracker.JobWithTarget{ // NOTE: JobWithTarget.ID is assigned after Job.Date is set. - Job: job, - DailyOnly: s.DailyOnly, + Job: job, + DailyOnly: s.DailyOnly, + FullHistory: s.FullHistory, } dailySpecs = append(dailySpecs, jt) if !s.DailyOnly { diff --git a/job-service/job-service_test.go b/job-service/job-service_test.go index 2d5cd535..ed49a5fa 100644 --- a/job-service/job-service_test.go +++ b/job-service/job-service_test.go @@ -15,6 +15,7 @@ import ( "github.com/m-lab/etl-gardener/persistence" "github.com/m-lab/etl-gardener/tracker" "github.com/m-lab/go/cloudtest/gcsfake" + "github.com/m-lab/go/timex" ) type namedSaver interface { @@ -37,8 +38,8 @@ func TestNewJobService(t *testing.T) { name: "successful-init", startDate: time.Date(2022, time.July, 1, 0, 0, 0, 0, time.UTC), sources: []config.SourceConfig{ - {Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5"}, - {Bucket: "fake-bucket", Experiment: "ndt", Datatype: "tcpinfo"}, + {Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5", FullHistory: true}, + {Bucket: "fake-bucket", Experiment: "ndt", Datatype: "tcpinfo", FullHistory: true}, {Bucket: "fake-bucket", Experiment: "ndt", Datatype: "pcap", DailyOnly: true}, }, dailySaver: &failSaver{err: errors.New("any error")}, @@ -56,6 +57,14 @@ func TestNewJobService(t *testing.T) { sources: []config.SourceConfig{}, wantErr: job.ErrNoConfiguredJobs, }, + { + name: "error-invalid-date-config", + startDate: time.Date(2022, time.July, 1, 0, 0, 0, 0, time.UTC), + sources: []config.SourceConfig{ + {Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5", DailyOnly: true, FullHistory: true}, + }, + wantErr: job.ErrInvalidDateConfig, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -84,6 +93,8 @@ func TestService_NextJob(t *testing.T) { // Setup fake gcs bucket access. fakeb := gcsfake.NewBucketHandle() fakeb.ObjAttrs = append(fakeb.ObjAttrs, &storage.ObjectAttrs{Name: "ndt/ndt5/2022/07/01/foo.tgz", Size: 1, Updated: time.Now()}) + lastYear := time.Now().UTC().AddDate(-1, 0, 1) + fakeb.ObjAttrs = append(fakeb.ObjAttrs, &storage.ObjectAttrs{Name: "ndt/ndt5/" + lastYear.Format(timex.YYYYMMDDWithSlash) + "/bar.tgz", Size: 1, Updated: time.Now()}) fakec := &gcsfake.GCSClient{} fakec.AddTestBucket("fake-bucket", fakeb) @@ -107,7 +118,8 @@ func TestService_NextJob(t *testing.T) { name: "success-historical", startDate: time.Date(2022, time.July, 1, 0, 0, 0, 0, time.UTC), sources: []config.SourceConfig{ - {Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5"}, + {Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5", + FullHistory: true}, }, statsClient: fakec, dailySaver: &failSaver{err: errors.New("any error")}, @@ -124,11 +136,24 @@ func TestService_NextJob(t *testing.T) { histSaver: &noopSaver{}, want: "fake-bucket/ndt/pcap/20220701", }, + { + name: "success-historical-full-history-false", + startDate: lastYear, + sources: []config.SourceConfig{ + {Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5", + FullHistory: false}, + }, + statsClient: fakec, + dailySaver: &failSaver{err: errors.New("any error")}, + histSaver: &noopSaver{}, + want: tracker.Key("fake-bucket/ndt/ndt5/" + lastYear.Format(timex.YYYYMMDD)), + }, { name: "error-fail-savers", startDate: time.Date(2022, time.July, 1, 0, 0, 0, 0, time.UTC), sources: []config.SourceConfig{ - {Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5"}, + {Bucket: "fake-bucket", Experiment: "ndt", Datatype: "ndt5", + FullHistory: true}, }, dailySaver: &failSaver{err: errors.New("fail")}, histSaver: &failSaver{err: errors.New("fail")}, diff --git a/tracker/job.go b/tracker/job.go index 7e1d79ae..de3573a3 100644 --- a/tracker/job.go +++ b/tracker/job.go @@ -52,9 +52,10 @@ func (j *Job) TablePartition() string { // JobWithTarget specifies a type/date job, and a destination // table or GCS prefix type JobWithTarget struct { - ID Key // ID used by gardener & parsers to identify a Job's status and configuration. - Job Job - DailyOnly bool `json:"-"` + ID Key // ID used by gardener & parsers to identify a Job's status and configuration. + Job Job + DailyOnly bool `json:"-"` + FullHistory bool `json:"-"` // TODO: enable configuration for parser to target alterate buckets. }