Skip to content

Commit

Permalink
src: refactor builder concurrency test
Browse files Browse the repository at this point in the history
  • Loading branch information
lkingland committed Jun 21, 2023
1 parent 8dc5a76 commit cb4a6e0
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 95 deletions.
38 changes: 9 additions & 29 deletions pkg/oci/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ type Builder struct {
name string
verbose bool

tester *testHelper
onDone func() // optionally provide a function to be notified on done
buildFn languageLayerBuilder // optionally provide a custom build impl
}

// NewBuilder creates a builder instance.
func NewBuilder(name string, verbose bool) *Builder {
return &Builder{name, verbose, nil}
return &Builder{name, verbose, nil, nil}
}

func newBuildConfig(ctx context.Context, b *Builder, f fn.Function, platforms []fn.Platform) *buildConfig {
Expand All @@ -46,8 +47,9 @@ func newBuildConfig(ctx context.Context, b *Builder, f fn.Function, platforms []
time.Now(),
b.verbose,
"",
b.tester,
toPlatforms(platforms),
b.onDone,
b.buildFn,
}
// If the client did not specifically request a certain set of platforms,
// use the func core defined set of suggested defaults.
Expand Down Expand Up @@ -100,12 +102,8 @@ func (b *Builder) Build(ctx context.Context, f fn.Function, pp []fn.Platform) (e
// which includes a general struct which can be used by all builders to
// communicate to the pusher where the image can be found.
// Tests, however, can use a simple channel:
if cfg.tester != nil && cfg.tester.notifyDone {
if cfg.verbose {
fmt.Println("tester configured to notify on done. Sending to unbuffered doneCh")
}
cfg.tester.doneCh <- true
fmt.Println("send to doneCh complete")
if cfg.onDone != nil {
cfg.onDone()
}
return
}
Expand All @@ -118,8 +116,9 @@ type buildConfig struct {
t time.Time // Timestamp for this build
verbose bool // verbose logging
h string // hash cache (use .hash() accessor)
tester *testHelper
platforms []v1.Platform
onDone func() // optionally provide a function to be notified on done
buildFn languageLayerBuilder // optionally provide a custom build impl
}

func (c *buildConfig) hash() string {
Expand Down Expand Up @@ -277,25 +276,6 @@ func updateLastLink(cfg *buildConfig) error {
return os.Symlink(cfg.buildDir(), cfg.lastLink())
}

type testHelper struct {
emulateSlowBuild bool
continueCh chan any

notifyDone bool
doneCh chan any

notifyPaused bool
pausedCh chan any
}

func newTestHelper() *testHelper {
return &testHelper{
continueCh: make(chan any),
doneCh: make(chan any),
pausedCh: make(chan any),
}
}

// toPlatforms converts func's implementation-agnostic Platform struct
// into to the OCI builder's implementation-specific go-containerregistry v1
// palatform.
Expand Down
59 changes: 51 additions & 8 deletions pkg/oci/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"
"testing"

v1 "github.com/google/go-containerregistry/pkg/v1"
fn "knative.dev/func/pkg/functions"
. "knative.dev/func/pkg/testing"
)
Expand Down Expand Up @@ -47,33 +48,75 @@ func TestBuilder_Concurrency(t *testing.T) {

client := fn.New()

// Initialize a new Go Function
f, err := client.Init(fn.Function{Root: root, Runtime: "go"})
if err != nil {
t.Fatal(err)
}

// Start a build which pauses such that we can start a second.
// Concurrency
//
// The first builder is setup to use a mock implementation of the
// builder function which will block until released after first notifying
// that it has been paused.
//
// When the test receives the message that the builder has been paused, it
// starts a second, concurrently executing builder to ensure there is a
// typed error returned indicating a build is in progress.
//
// When the second builder completes, having confirmed the error message
// received is as expected. It signals the first (blocked) builder that it
// can now continue.

// Thet test waits until the first builder notifies that it is done, and
// has therefore ran its tests as well.

var (
pausedCh = make(chan bool)
continueCh = make(chan bool)
doneCh = make(chan bool)
)

// Build A
builder1 := NewBuilder("builder1", true)
builder1.tester = newTestHelper()
builder1.tester.emulateSlowBuild = true
builder1.tester.notifyPaused = true
builder1.tester.notifyDone = true
builder1.buildFn = func(cfg *buildConfig, p v1.Platform) (d v1.Descriptor, l v1.Layer, err error) {
if isFirstBuild(cfg, p) {
pausedCh <- true // Notify of being paused
<-continueCh // Block until released
}
return
}
builder1.onDone = func() {
doneCh <- true // Notify of being done
}
go func() {
if err := builder1.Build(context.Background(), f, TestPlatforms); err != nil {
fmt.Fprintf(os.Stderr, "test build error %v", err)
}
}()
<-builder1.tester.pausedCh // wait until it is paused

// Wait until build 1 indicates it is paused
<-pausedCh

// Build B
builder2 := NewBuilder("builder2", true)
go func() {
err = builder2.Build(context.Background(), f, TestPlatforms)
if !errors.As(err, &ErrBuildInProgress{}) {
fmt.Fprintf(os.Stderr, "test build error %v", err)
}
}()
builder1.tester.continueCh <- true // release the paused first builder
<-builder1.tester.doneCh // wait for it to be done

// Release the blocking Build A and wait until complete.
continueCh <- true
<-doneCh
}

func isFirstBuild(cfg *buildConfig, current v1.Platform) bool {
first := cfg.platforms[0]
return current.OS == first.OS &&
current.Architecture == first.Architecture &&
current.Variant == first.Variant
}

// ImageIndex represents the structure of an OCI Image Index.
Expand Down
48 changes: 26 additions & 22 deletions pkg/oci/containerize.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"archive/tar"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand All @@ -19,27 +18,30 @@ import (
// languageLayerBuilder builds the layer for the given language whuch may
// be different from one platform to another. For example, this is the
// layer in the image which contains the Go cross-compiled binary.
type languageLayerBuilder interface {
Build(*buildConfig, v1.Platform) (v1.Descriptor, v1.Layer, error)
type languageLayerBuilder func(*buildConfig, v1.Platform) (v1.Descriptor, v1.Layer, error)

var languageLayerBuilders = map[string]languageLayerBuilder{
"go": buildGoLayer,
"python": layerBuilderNotImplemented,
"node": layerBuilderNotImplemented,
"rust": layerBuilderNotImplemented,
}

func layerBuilderNotImplemented(cfg *buildConfig, _ v1.Platform) (d v1.Descriptor, l v1.Layer, err error) {
err = fmt.Errorf("%v functions are not yet supported by the host builder.", cfg.f.Runtime)
return
}

func newLanguageLayerBuilder(cfg *buildConfig) (l languageLayerBuilder, err error) {
switch cfg.f.Runtime {
case "go":
l = goLayerBuilder{}
case "python":
// Likely the next to be supported after Go
err = errors.New("functions written in Python are not yet supported by the host builder")
case "node":
// Likely the next to be supported after Python
err = errors.New("functions written in Node are not yet supported by the host builder")
case "rust":
// Likely the next to be supprted after Node
err = errors.New("functions written in Rust are not yet supported by the host builder")
default:
// Others are not likely to be supported in the near future without
// increased contributions.
func getLanguageLayerBuilder(cfg *buildConfig) (l languageLayerBuilder, err error) {
// use the custom implementation, if provided
if cfg.buildFn != nil {
return cfg.buildFn, nil
}
// otherwise lookup the build function
l, ok := languageLayerBuilders[cfg.f.Runtime]
if !ok {
err = fmt.Errorf("the language runtime '%v' is not a recognized language by the host builder", cfg.f.Runtime)
return
}
return
}
Expand Down Expand Up @@ -205,14 +207,13 @@ func newDescriptor(layer v1.Layer) (desc v1.Descriptor, err error) {
// newImage creates an image for the given platform.
// The image consists of the shared data layer which is provided
func newImage(cfg *buildConfig, dataDesc v1.Descriptor, dataLayer v1.Layer, p v1.Platform, verbose bool) (imageDesc v1.Descriptor, err error) {

b, err := newLanguageLayerBuilder(cfg)
buildFn, err := getLanguageLayerBuilder(cfg)
if err != nil {
return
}

// Write Exec Layer as Blob -> Layer
execDesc, execLayer, err := b.Build(cfg, p)
execDesc, execLayer, err := buildFn(cfg, p)
if err != nil {
return
}
Expand Down Expand Up @@ -288,6 +289,9 @@ func newConfig(cfg *buildConfig, p v1.Platform, layers ...v1.Layer) (desc v1.Des
}
var diff v1.Hash
for _, v := range layers {
if v == nil {
continue
}
if diff, err = v.DiffID(); err != nil {
return
}
Expand Down
38 changes: 2 additions & 36 deletions pkg/oci/containerize_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ import (
fn "knative.dev/func/pkg/functions"
)

type goLayerBuilder struct{}

// Build the source code as Go, cross compiled for the given platform, placing
// the statically linked binary in a tarred layer and return the Descriptor
// and Layer metadata.
func (c goLayerBuilder) Build(cfg *buildConfig, p v1.Platform) (desc v1.Descriptor, layer v1.Layer, err error) {
// Build the Executable
func buildGoLayer(cfg *buildConfig, p v1.Platform) (desc v1.Descriptor, layer v1.Layer, err error) {
// Executable
exe, err := goBuild(cfg, p) // Compile binary returning its path
if err != nil {
return
Expand Down Expand Up @@ -54,10 +52,6 @@ func (c goLayerBuilder) Build(cfg *buildConfig, p v1.Platform) (desc v1.Descript
}

func goBuild(cfg *buildConfig, p v1.Platform) (binPath string, err error) {
if cfg.tester != nil && cfg.tester.emulateSlowBuild {
pauseBuildUntilReleased(cfg, p)
}

gobin, args, outpath, err := goBuildCmd(p, cfg)
if err != nil {
return
Expand Down Expand Up @@ -89,34 +83,6 @@ func goBuild(cfg *buildConfig, p v1.Platform) (binPath string, err error) {
return outpath, cmd.Run()
}

func isFirstBuild(cfg *buildConfig, current v1.Platform) bool {
first := cfg.platforms[0]
return current.OS == first.OS &&
current.Architecture == first.Architecture &&
current.Variant == first.Variant
}

func pauseBuildUntilReleased(cfg *buildConfig, p v1.Platform) {
if cfg.verbose {
fmt.Println("test set to emulate slow build. checking if this build should be paused")
}
if !isFirstBuild(cfg, p) {
if cfg.verbose {
fmt.Println("not first build. will not pause")
}
return
}
if cfg.verbose {
fmt.Println("this is the first build: pausing awaiting release via cfg.tester.continueCh")
}
fmt.Printf("testing slow builds. %v paused\n", cfg.name)
if cfg.tester.notifyPaused {
cfg.tester.pausedCh <- true
}
<-cfg.tester.continueCh
fmt.Printf("continuing build\n")
}

func goBuildCmd(p v1.Platform, cfg *buildConfig) (gobin string, args []string, outpath string, err error) {
/* TODO: Use Build Command override from the function if provided
* A future PR will include the ability to specify a
Expand Down

0 comments on commit cb4a6e0

Please sign in to comment.