From 278ce6b0d8f3edea89f174bf19fbe8b3186d0f1b Mon Sep 17 00:00:00 2001 From: Luke Kingland Date: Wed, 21 Jun 2023 20:18:36 +0900 Subject: [PATCH] src: refactor builder concurrency test (#1821) --- pkg/oci/builder.go | 38 ++++++------------------ pkg/oci/builder_test.go | 59 ++++++++++++++++++++++++++++++++------ pkg/oci/containerize.go | 48 +++++++++++++++++-------------- pkg/oci/containerize_go.go | 38 ++---------------------- 4 files changed, 88 insertions(+), 95 deletions(-) diff --git a/pkg/oci/builder.go b/pkg/oci/builder.go index 6c189aa38b..019ca7dab7 100644 --- a/pkg/oci/builder.go +++ b/pkg/oci/builder.go @@ -30,12 +30,13 @@ type Builder struct { name string verbose bool - tester *testHelper + onDone func() // optionally provide a function to be notified on done + buildFn languageLayerBuilder // optionally provide a custom build impl } // NewBuilder creates a builder instance. func NewBuilder(name string, verbose bool) *Builder { - return &Builder{name, verbose, nil} + return &Builder{name, verbose, nil, nil} } func newBuildConfig(ctx context.Context, b *Builder, f fn.Function, platforms []fn.Platform) *buildConfig { @@ -46,8 +47,9 @@ func newBuildConfig(ctx context.Context, b *Builder, f fn.Function, platforms [] time.Now(), b.verbose, "", - b.tester, toPlatforms(platforms), + b.onDone, + b.buildFn, } // If the client did not specifically request a certain set of platforms, // use the func core defined set of suggested defaults. @@ -100,12 +102,8 @@ func (b *Builder) Build(ctx context.Context, f fn.Function, pp []fn.Platform) (e // which includes a general struct which can be used by all builders to // communicate to the pusher where the image can be found. // Tests, however, can use a simple channel: - if cfg.tester != nil && cfg.tester.notifyDone { - if cfg.verbose { - fmt.Println("tester configured to notify on done. Sending to unbuffered doneCh") - } - cfg.tester.doneCh <- true - fmt.Println("send to doneCh complete") + if cfg.onDone != nil { + cfg.onDone() } return } @@ -118,8 +116,9 @@ type buildConfig struct { t time.Time // Timestamp for this build verbose bool // verbose logging h string // hash cache (use .hash() accessor) - tester *testHelper platforms []v1.Platform + onDone func() // optionally provide a function to be notified on done + buildFn languageLayerBuilder // optionally provide a custom build impl } func (c *buildConfig) hash() string { @@ -277,25 +276,6 @@ func updateLastLink(cfg *buildConfig) error { return os.Symlink(cfg.buildDir(), cfg.lastLink()) } -type testHelper struct { - emulateSlowBuild bool - continueCh chan any - - notifyDone bool - doneCh chan any - - notifyPaused bool - pausedCh chan any -} - -func newTestHelper() *testHelper { - return &testHelper{ - continueCh: make(chan any), - doneCh: make(chan any), - pausedCh: make(chan any), - } -} - // toPlatforms converts func's implementation-agnostic Platform struct // into to the OCI builder's implementation-specific go-containerregistry v1 // palatform. diff --git a/pkg/oci/builder_test.go b/pkg/oci/builder_test.go index 6ab9642a5a..3ecf1bcd46 100644 --- a/pkg/oci/builder_test.go +++ b/pkg/oci/builder_test.go @@ -10,6 +10,7 @@ import ( "runtime" "testing" + v1 "github.com/google/go-containerregistry/pkg/v1" fn "knative.dev/func/pkg/functions" . "knative.dev/func/pkg/testing" ) @@ -47,24 +48,57 @@ func TestBuilder_Concurrency(t *testing.T) { client := fn.New() + // Initialize a new Go Function f, err := client.Init(fn.Function{Root: root, Runtime: "go"}) if err != nil { t.Fatal(err) } - // Start a build which pauses such that we can start a second. + // Concurrency + // + // The first builder is setup to use a mock implementation of the + // builder function which will block until released after first notifying + // that it has been paused. + // + // When the test receives the message that the builder has been paused, it + // starts a second, concurrently executing builder to ensure there is a + // typed error returned indicating a build is in progress. + // + // When the second builder completes, having confirmed the error message + // received is as expected. It signals the first (blocked) builder that it + // can now continue. + + // Thet test waits until the first builder notifies that it is done, and + // has therefore ran its tests as well. + + var ( + pausedCh = make(chan bool) + continueCh = make(chan bool) + doneCh = make(chan bool) + ) + + // Build A builder1 := NewBuilder("builder1", true) - builder1.tester = newTestHelper() - builder1.tester.emulateSlowBuild = true - builder1.tester.notifyPaused = true - builder1.tester.notifyDone = true + builder1.buildFn = func(cfg *buildConfig, p v1.Platform) (d v1.Descriptor, l v1.Layer, err error) { + if isFirstBuild(cfg, p) { + pausedCh <- true // Notify of being paused + <-continueCh // Block until released + } + return + } + builder1.onDone = func() { + doneCh <- true // Notify of being done + } go func() { if err := builder1.Build(context.Background(), f, TestPlatforms); err != nil { fmt.Fprintf(os.Stderr, "test build error %v", err) } }() - <-builder1.tester.pausedCh // wait until it is paused + // Wait until build 1 indicates it is paused + <-pausedCh + + // Build B builder2 := NewBuilder("builder2", true) go func() { err = builder2.Build(context.Background(), f, TestPlatforms) @@ -72,8 +106,17 @@ func TestBuilder_Concurrency(t *testing.T) { fmt.Fprintf(os.Stderr, "test build error %v", err) } }() - builder1.tester.continueCh <- true // release the paused first builder - <-builder1.tester.doneCh // wait for it to be done + + // Release the blocking Build A and wait until complete. + continueCh <- true + <-doneCh +} + +func isFirstBuild(cfg *buildConfig, current v1.Platform) bool { + first := cfg.platforms[0] + return current.OS == first.OS && + current.Architecture == first.Architecture && + current.Variant == first.Variant } // ImageIndex represents the structure of an OCI Image Index. diff --git a/pkg/oci/containerize.go b/pkg/oci/containerize.go index 954b997b4b..cd13820355 100644 --- a/pkg/oci/containerize.go +++ b/pkg/oci/containerize.go @@ -4,7 +4,6 @@ import ( "archive/tar" "compress/gzip" "encoding/json" - "errors" "fmt" "io" "os" @@ -19,27 +18,30 @@ import ( // languageLayerBuilder builds the layer for the given language whuch may // be different from one platform to another. For example, this is the // layer in the image which contains the Go cross-compiled binary. -type languageLayerBuilder interface { - Build(*buildConfig, v1.Platform) (v1.Descriptor, v1.Layer, error) +type languageLayerBuilder func(*buildConfig, v1.Platform) (v1.Descriptor, v1.Layer, error) + +var languageLayerBuilders = map[string]languageLayerBuilder{ + "go": buildGoLayer, + "python": layerBuilderNotImplemented, + "node": layerBuilderNotImplemented, + "rust": layerBuilderNotImplemented, +} + +func layerBuilderNotImplemented(cfg *buildConfig, _ v1.Platform) (d v1.Descriptor, l v1.Layer, err error) { + err = fmt.Errorf("%v functions are not yet supported by the host builder.", cfg.f.Runtime) + return } -func newLanguageLayerBuilder(cfg *buildConfig) (l languageLayerBuilder, err error) { - switch cfg.f.Runtime { - case "go": - l = goLayerBuilder{} - case "python": - // Likely the next to be supported after Go - err = errors.New("functions written in Python are not yet supported by the host builder") - case "node": - // Likely the next to be supported after Python - err = errors.New("functions written in Node are not yet supported by the host builder") - case "rust": - // Likely the next to be supprted after Node - err = errors.New("functions written in Rust are not yet supported by the host builder") - default: - // Others are not likely to be supported in the near future without - // increased contributions. +func getLanguageLayerBuilder(cfg *buildConfig) (l languageLayerBuilder, err error) { + // use the custom implementation, if provided + if cfg.buildFn != nil { + return cfg.buildFn, nil + } + // otherwise lookup the build function + l, ok := languageLayerBuilders[cfg.f.Runtime] + if !ok { err = fmt.Errorf("the language runtime '%v' is not a recognized language by the host builder", cfg.f.Runtime) + return } return } @@ -205,14 +207,13 @@ func newDescriptor(layer v1.Layer) (desc v1.Descriptor, err error) { // newImage creates an image for the given platform. // The image consists of the shared data layer which is provided func newImage(cfg *buildConfig, dataDesc v1.Descriptor, dataLayer v1.Layer, p v1.Platform, verbose bool) (imageDesc v1.Descriptor, err error) { - - b, err := newLanguageLayerBuilder(cfg) + buildFn, err := getLanguageLayerBuilder(cfg) if err != nil { return } // Write Exec Layer as Blob -> Layer - execDesc, execLayer, err := b.Build(cfg, p) + execDesc, execLayer, err := buildFn(cfg, p) if err != nil { return } @@ -288,6 +289,9 @@ func newConfig(cfg *buildConfig, p v1.Platform, layers ...v1.Layer) (desc v1.Des } var diff v1.Hash for _, v := range layers { + if v == nil { + continue + } if diff, err = v.DiffID(); err != nil { return } diff --git a/pkg/oci/containerize_go.go b/pkg/oci/containerize_go.go index 7521cebf46..f6fd6fd0aa 100644 --- a/pkg/oci/containerize_go.go +++ b/pkg/oci/containerize_go.go @@ -15,13 +15,11 @@ import ( fn "knative.dev/func/pkg/functions" ) -type goLayerBuilder struct{} - // Build the source code as Go, cross compiled for the given platform, placing // the statically linked binary in a tarred layer and return the Descriptor // and Layer metadata. -func (c goLayerBuilder) Build(cfg *buildConfig, p v1.Platform) (desc v1.Descriptor, layer v1.Layer, err error) { - // Build the Executable +func buildGoLayer(cfg *buildConfig, p v1.Platform) (desc v1.Descriptor, layer v1.Layer, err error) { + // Executable exe, err := goBuild(cfg, p) // Compile binary returning its path if err != nil { return @@ -54,10 +52,6 @@ func (c goLayerBuilder) Build(cfg *buildConfig, p v1.Platform) (desc v1.Descript } func goBuild(cfg *buildConfig, p v1.Platform) (binPath string, err error) { - if cfg.tester != nil && cfg.tester.emulateSlowBuild { - pauseBuildUntilReleased(cfg, p) - } - gobin, args, outpath, err := goBuildCmd(p, cfg) if err != nil { return @@ -89,34 +83,6 @@ func goBuild(cfg *buildConfig, p v1.Platform) (binPath string, err error) { return outpath, cmd.Run() } -func isFirstBuild(cfg *buildConfig, current v1.Platform) bool { - first := cfg.platforms[0] - return current.OS == first.OS && - current.Architecture == first.Architecture && - current.Variant == first.Variant -} - -func pauseBuildUntilReleased(cfg *buildConfig, p v1.Platform) { - if cfg.verbose { - fmt.Println("test set to emulate slow build. checking if this build should be paused") - } - if !isFirstBuild(cfg, p) { - if cfg.verbose { - fmt.Println("not first build. will not pause") - } - return - } - if cfg.verbose { - fmt.Println("this is the first build: pausing awaiting release via cfg.tester.continueCh") - } - fmt.Printf("testing slow builds. %v paused\n", cfg.name) - if cfg.tester.notifyPaused { - cfg.tester.pausedCh <- true - } - <-cfg.tester.continueCh - fmt.Printf("continuing build\n") -} - func goBuildCmd(p v1.Platform, cfg *buildConfig) (gobin string, args []string, outpath string, err error) { /* TODO: Use Build Command override from the function if provided * A future PR will include the ability to specify a