Skip to content

Commit

Permalink
godev/cmd/worker: add a /copy endpoint to copy prod uploads to dev
Browse files Browse the repository at this point in the history
1. /copy accepts either date or start and end query parameter same as query
   /chart.
2. /copy can only copy from prod.
3. /queue-tasks will queue copy task before calling merge or chart as
   they are depending on copy task in dev env.
   a. queue tasks will first copy 20 days reports. From (now - 20) to
   now.
   b. queue tasks will then call merge to merge the past 7 days reports.
   c. queue tasks will finally call chart to generate chart for the past
   8 days.

Change-Id: Iafe7780a4228d423ecfb74b3697a8955ae4a1f15
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/599855
Reviewed-by: Robert Findley <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
  • Loading branch information
h9jiang authored and findleyr committed Jul 24, 2024
1 parent 62bbb77 commit af8248e
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 8 deletions.
14 changes: 14 additions & 0 deletions godev/cmd/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ worker will retrieve the report for the provided date from the merge bucket.
Use this endpoint to generate an aggregate chart file containing data from the
provided date range (inclusive) from the merge bucket.

### `/copy` (dev env only)

This endpoint facilitates the copying of uploaded reports from the prod
environment's "uploaded" bucket to the dev environment's "uploaded" bucket.

Since we don't have clients regularly uploading data to dev, copying seeds the
dev environment with data.

Similar to the /chart endpoint, /copy also supports the following query
parameters:

- `/?date=<YYYY-MM-DD>``: Copies reports for a specific date.
- `/?start=<YYYY-MM-DD>&end=<YYYY-MM-DD>``: Copies reports within a specified date range.

### `/queue-tasks`

The queue-tasks endpoint is responsible for task distribution. When invoked, it
Expand Down
55 changes: 54 additions & 1 deletion godev/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func main() {
mux.Handle("/merge/", handleMerge(buckets))
mux.Handle("/chart/", handleChart(ucfg, buckets))
mux.Handle("/queue-tasks/", handleTasks(cfg))
mux.Handle("/copy/", handleCopy(cfg, buckets))

mw := middleware.Chain(
middleware.Log(slog.Default()),
Expand All @@ -72,9 +73,53 @@ func main() {
log.Fatal(http.ListenAndServe(":"+cfg.WorkerPort, mw(mux)))
}

// handleCopy copies uploaded reports from prod gcs bucket to dev gcs buckets.
func handleCopy(cfg *config.Config, dest *storage.API) content.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) error {
const prodBucket = "prod-telemetry-uploaded"

if cfg.UploadBucket == prodBucket {
return content.Text(w, fmt.Sprintf("do not need to copy from %s to %s", prodBucket, cfg.UploadBucket), http.StatusOK)
}

ctx := r.Context()
sourceBucket, err := storage.NewBucket(ctx, cfg, "prod-telemetry-uploaded")
if err != nil {
return err
}

destBucket := dest.Upload

start, end, err := parseDateRange(r.URL)
if err != nil {
return err
}

for date := start; !date.After(end); date = date.AddDate(0, 0, 1) {
it := sourceBucket.Objects(ctx, date.Format(time.DateOnly))
for {
fileName, err := it.Next()
if errors.Is(err, storage.ErrObjectIteratorDone) {
break
}
if err != nil {
return err
}

if err := storage.Copy(ctx, destBucket.Object(fileName), sourceBucket.Object(fileName)); err != nil {
return err
}
}
}

return nil
}
}

// handleTasks will populate the task queue that processes report
// data. Cloud Scheduler will be instrumented to call this endpoint
// daily to merge reports and generate chart data.
// daily to copy uploaded reports, merge reports and generate chart data.
// The copy tasks will copy uploaded data from prod to dev.
// The merge tasks will merge the previous 7 days reports.
// The chart tasks generate daily and weekly charts for the 7 days preceding
// today.
Expand All @@ -85,6 +130,14 @@ func main() {
func handleTasks(cfg *config.Config) content.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) error {
now := time.Now().UTC()

// Copy the past 20 days uploaded reports from prod to dev gcs bucket.
if cfg.Env != "prod" {
url := cfg.WorkerURL + "/copy/?start=" + now.Format(time.DateOnly) + "&end=" + now.AddDate(0, 0, -1*20).Format(time.DateOnly)
if _, err := createHTTPTask(cfg, url); err != nil {
return err
}
}
for i := 7; i > 0; i-- {
date := now.AddDate(0, 0, -1*i).Format(time.DateOnly)
url := cfg.WorkerURL + "/merge/?date=" + date
Expand Down
4 changes: 4 additions & 0 deletions godev/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Config struct {
// or storage emulator modes.
LocalStorage string

// Env represents the deployment environment this worker handles.
Env string

// MergedBucket is the storage bucket for merged reports. The worker merges the
// reports from the upload bucket and saves them here.
MergedBucket string
Expand Down Expand Up @@ -93,6 +96,7 @@ func NewConfig() *Config {
StorageEmulatorHost: env("GO_TELEMETRY_STORAGE_EMULATOR_HOST", "localhost:8081"),
LocalStorage: env("GO_TELEMETRY_LOCAL_STORAGE", ".localstorage"),
ChartDataBucket: environment + "-telemetry-charted",
Env: environment,
MergedBucket: environment + "-telemetry-merged",
UploadBucket: environment + "-telemetry-uploaded",
UploadConfig: env("GO_TELEMETRY_UPLOAD_CONFIG", "./config/config.json"),
Expand Down
8 changes: 4 additions & 4 deletions godev/internal/storage/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ type API struct {
}

func NewAPI(ctx context.Context, cfg *config.Config) (*API, error) {
upload, err := newBucket(ctx, cfg, cfg.UploadBucket)
upload, err := NewBucket(ctx, cfg, cfg.UploadBucket)
if err != nil {
return nil, err
}
merge, err := newBucket(ctx, cfg, cfg.MergedBucket)
merge, err := NewBucket(ctx, cfg, cfg.MergedBucket)
if err != nil {
return nil, err
}
chart, err := newBucket(ctx, cfg, cfg.ChartDataBucket)
chart, err := NewBucket(ctx, cfg, cfg.ChartDataBucket)
if err != nil {
return nil, err
}
return &API{upload, merge, chart}, nil
}

func newBucket(ctx context.Context, cfg *config.Config, name string) (BucketHandle, error) {
func NewBucket(ctx context.Context, cfg *config.Config, name string) (BucketHandle, error) {
if cfg.UseGCS {
return NewGCSBucket(ctx, cfg.ProjectID, name)
}
Expand Down
22 changes: 22 additions & 0 deletions godev/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package storage
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"os"
Expand Down Expand Up @@ -49,6 +50,27 @@ type GCSBucket struct {
url string
}

// Copy read the content from the source and write the content to the dest.
func Copy(ctx context.Context, dest, source ObjectHandle) error {
reader, err := source.NewReader(ctx)
if err != nil {
return fmt.Errorf("failed to create reader for source: %w", err)
}
defer reader.Close()

writer, err := dest.NewWriter(ctx)
if err != nil {
return fmt.Errorf("failed to create writer for destination: %w", err)
}
defer writer.Close()

if _, err := io.Copy(writer, reader); err != nil {
return err
}

return nil
}

func NewGCSBucket(ctx context.Context, project, bucket string) (BucketHandle, error) {
client, err := storage.NewClient(ctx)
if err != nil {
Expand Down
34 changes: 31 additions & 3 deletions godev/internal/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,49 @@ func runTest(t *testing.T, ctx context.Context, s BucketHandle) {
if diff := cmp.Diff(list2, []string{"prefix/test-object"}); diff != "" {
t.Errorf("Objects() mismatch (-want +got):\n%s", diff)
}

// check that the destination file have same content as source.
copyData := jsondata{"foo", "bar", map[string]int{"copy": 1}}
if err := write(ctx, s, "prefix/source-file", copyData); err != nil {
t.Fatal(err)
}
if err := Copy(ctx, s.Object("prefix/dest-file"), s.Object("prefix/source-file")); err != nil {
t.Errorf("Copy() should not return err: %v", err)
}
got, err := read(ctx, s, "prefix/dest-file")
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(copyData, got); diff != "" {
t.Errorf("data write read mismatch (-wrote +read):\n%s", diff)
}

// check that copy twice have same result.
if err := Copy(ctx, s.Object("prefix/dest-file"), s.Object("prefix/source-file")); err != nil {
t.Errorf("Copy() should not return err: %v", err)
}
got, err = read(ctx, s, "prefix/dest-file")
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(copyData, got); diff != "" {
t.Errorf("data write read mismatch (-wrote +read):\n%s", diff)
}
}

func write(ctx context.Context, s BucketHandle, object string, data any) error {
obj, err := s.Object("prefix/test-object").NewWriter(ctx)
obj, err := s.Object(object).NewWriter(ctx)
if err != nil {
return err
}
if err := json.NewEncoder(obj).Encode(writeData); err != nil {
if err := json.NewEncoder(obj).Encode(data); err != nil {
return err
}
return obj.Close()
}

func read(ctx context.Context, s BucketHandle, object string) (any, error) {
obj, err := s.Object("prefix/test-object").NewReader(ctx)
obj, err := s.Object(object).NewReader(ctx)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit af8248e

Please sign in to comment.