From 1d22e1f5c1fa92bae649d64bcb859e54ed7cc053 Mon Sep 17 00:00:00 2001 From: Yunhee Date: Fri, 12 Oct 2018 15:38:07 -0700 Subject: [PATCH 1/2] Migrating pull image to docker SDK with inactivity timeout handling --- agent/config/config_unix.go | 1 + agent/dockerclient/dockerapi/docker_client.go | 124 +++++---- .../dockerapi/docker_client_test.go | 244 +++++++++--------- 3 files changed, 189 insertions(+), 180 deletions(-) diff --git a/agent/config/config_unix.go b/agent/config/config_unix.go index 453a2bbe565..3cf5afc2941 100644 --- a/agent/config/config_unix.go +++ b/agent/config/config_unix.go @@ -71,6 +71,7 @@ func DefaultConfig() Config { TaskMetadataSteadyStateRate: DefaultTaskMetadataSteadyStateRate, TaskMetadataBurstRate: DefaultTaskMetadataBurstRate, SharedVolumeMatchFullConfig: false, // only requiring shared volumes to match on name, which is default docker behavior + ImagePullInactivityTimeout: defaultImagePullInactivityTimeout, } } diff --git a/agent/dockerclient/dockerapi/docker_client.go b/agent/dockerclient/dockerapi/docker_client.go index dba3faf4d32..b50b10aef2e 100644 --- a/agent/dockerclient/dockerapi/docker_client.go +++ b/agent/dockerclient/dockerapi/docker_client.go @@ -14,9 +14,11 @@ package dockerapi import ( - "bufio" + "bytes" "context" + "encoding/base64" "encoding/json" + "errors" "fmt" "io" "strconv" @@ -47,7 +49,6 @@ import ( "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/volume" - docker "github.com/fsouza/go-dockerclient" ) const ( @@ -338,7 +339,7 @@ func (dg *dockerGoClient) PullImage(image string, authData *apicontainer.Registr maximumPullRetryDelay, pullRetryJitterMultiplier, pullRetryDelayMultiplier) err := utils.RetryNWithBackoffCtx(ctx, imagePullBackoff, maximumPullRetries, func() error { - err := dg.pullImage(image, authData) + err := dg.pullImage(ctx, image, authData) if err != nil { seelog.Warnf("DockerGoClient: failed to pull image %s: %s", image, err.Error()) } @@ -367,45 +368,81 @@ func wrapPullErrorAsNamedError(err error) apierrors.NamedError { return retErr } -func (dg *dockerGoClient) pullImage(image string, authData *apicontainer.RegistryAuthenticationData) apierrors.NamedError { +func (dg *dockerGoClient) pullImage(ctx context.Context, image string, authData *apicontainer.RegistryAuthenticationData) apierrors.NamedError { seelog.Debugf("DockerGoClient: pulling image: %s", image) - client, err := dg.dockerClient() + client, err := dg.sdkDockerClient() if err != nil { return CannotGetDockerClientError{version: dg.version, err: err} } sdkAuthConfig, err := dg.getAuthdata(image, authData) - authConfig := docker.AuthConfiguration{ - Username: sdkAuthConfig.Username, - Password: sdkAuthConfig.Password, - Email: sdkAuthConfig.Email, - ServerAddress: sdkAuthConfig.ServerAddress, - } if err != nil { return wrapPullErrorAsNamedError(err) } + // encode auth data + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(sdkAuthConfig); err != nil { + return CannotPullECRContainerError{err} + } - pullDebugOut, pullWriter := io.Pipe() - defer pullWriter.Close() + imagePullOpts := types.ImagePullOptions{ + All: false, + RegistryAuth: base64.URLEncoding.EncodeToString(buf.Bytes()), + } repository := getRepository(image) - opts := docker.PullImageOptions{ - Repository: repository, - OutputStream: pullWriter, - InactivityTimeout: dg.config.ImagePullInactivityTimeout, - } timeout := dg.time().After(dockerPullBeginTimeout) // pullBegan is a channel indicating that we have seen at least one line of data on the 'OutputStream' above. // It is here to guard against a bug wherein Docker never writes anything to that channel and hangs in pulling forever. pullBegan := make(chan bool, 1) - - go dg.filterPullDebugOutput(pullDebugOut, pullBegan, image) + // pullBeganOnce ensures we only indicate it began once (since our channel will only be read 0 or 1 times) + pullBeganOnce := sync.Once{} pullFinished := make(chan error, 1) - // TODO Migrate Pull Image once Inactivity Timeout is sorted out + subCtx, cancelRequest := context.WithCancel(ctx) + go func() { - pullFinished <- client.PullImage(opts, authConfig) + defer cancelRequest() + reader, err := client.ImagePull(subCtx, repository, imagePullOpts) + if err != nil { + pullFinished <- err + return + } + + // handle inactivity timeout + var canceled uint32 + var ch chan<- struct{} + reader, ch = handleInactivityTimeout(reader, dg.config.ImagePullInactivityTimeout, cancelRequest, &canceled) + defer reader.Close() + defer close(ch) + + decoder := json.NewDecoder(reader) + data := new(events.Message) + for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) { + if err != nil { + seelog.Warnf("DockerGoClient: Unable to decode pull event message for image %s: %v", image, err) + pullFinished <- err + return + } + if atomic.LoadUint32(&canceled) != 0 { + seelog.Warnf("DockerGoClient: inactivity time exceeded timeout while pulling image %s", image) + pullErr := errors.New("inactivity time exceeded timeout while pulling image") + pullFinished <- pullErr + return + } + + pullBeganOnce.Do(func() { + pullBegan <- true + }) + + dataBytes, _ := json.Marshal(data) + line := string(dataBytes[:]) + dg.filterPullDebugOutput(line, image) + + data = new(events.Message) + } + pullFinished <- nil }() select { @@ -431,38 +468,21 @@ func (dg *dockerGoClient) pullImage(image string, authData *apicontainer.Registr return nil } -func (dg *dockerGoClient) filterPullDebugOutput(pullDebugOut *io.PipeReader, pullBegan chan<- bool, image string) { - // pullBeganOnce ensures we only indicate it began once (since our channel will only be read 0 or 1 times) - pullBeganOnce := sync.Once{} +func (dg *dockerGoClient) filterPullDebugOutput(line string, image string) { - reader := bufio.NewReader(pullDebugOut) - var line string - var pullErr error var statusDisplayed time.Time - for { - line, pullErr = reader.ReadString('\n') - if pullErr != nil { - break - } - pullBeganOnce.Do(func() { - pullBegan <- true - }) - - now := time.Now() - if !strings.Contains(line, "[=") || now.After(statusDisplayed.Add(pullStatusSuppressDelay)) { - // skip most of the progress bar lines, but retain enough for debugging - seelog.Debugf("DockerGoClient: pulling image %s, status %s", image, line) - statusDisplayed = now - } - if strings.Contains(line, "already being pulled by another client. Waiting.") { - // This can mean the daemon is 'hung' in pulling status for this image, but we can't be sure. - seelog.Errorf("DockerGoClient: image 'pull' status marked as already being pulled for image %s, status %s", - image, line) - } + now := time.Now() + if !strings.Contains(line, "[=") || now.After(statusDisplayed.Add(pullStatusSuppressDelay)) { + // skip most of the progress bar lines, but retain enough for debugging + seelog.Debugf("DockerGoClient: pulling image %s, status %s", image, line) + statusDisplayed = now } - if pullErr != nil && pullErr != io.EOF { - seelog.Warnf("DockerGoClient: error reading pull image status for image %s: %v", image, pullErr) + + if strings.Contains(line, "already being pulled by another client. Waiting.") { + // This can mean the daemon is 'hung' in pulling status for this image, but we can't be sure. + seelog.Errorf("DockerGoClient: image 'pull' status marked as already being pulled for image %s, status %s", + image, line) } } @@ -936,8 +956,8 @@ func (dg *dockerGoClient) handleContainerEvents(ctx context.Context, metadata := dg.containerMetadata(ctx, containerID) changedContainers <- DockerContainerChangeEvent{ - Status: status, - Type: eventType, + Status: status, + Type: eventType, DockerContainerMetadata: metadata, } } diff --git a/agent/dockerclient/dockerapi/docker_client_test.go b/agent/dockerclient/dockerapi/docker_client_test.go index 0f044b16891..aaa55ab80e5 100644 --- a/agent/dockerclient/dockerapi/docker_client_test.go +++ b/agent/dockerclient/dockerapi/docker_client_test.go @@ -22,6 +22,7 @@ import ( "io" "reflect" "strconv" + "strings" "sync" "testing" "time" @@ -49,7 +50,6 @@ import ( "github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/volume" "github.com/docker/go-connections/nat" - docker "github.com/fsouza/go-dockerclient" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -110,152 +110,128 @@ func dockerClientSetupWithConfig(t *testing.T, conf config.Config) ( return mockDocker, mockDockerSDK, goClient, mockTime, ctrl, ecrClientFactory, ctrl.Finish } -type pullImageOptsMatcher struct { - image string -} - -func (matcher *pullImageOptsMatcher) String() string { - return "matches " + matcher.image -} - -func (matcher *pullImageOptsMatcher) Matches(x interface{}) bool { - return matcher.image == x.(docker.PullImageOptions).Repository -} - func TestPullImageOutputTimeout(t *testing.T) { - mockDocker, _, client, testTime, _, _, done := dockerClientSetup(t) + _, mockDockerSDK, client, testTime, _, _, done := dockerClientSetup(t) defer done() pullBeginTimeout := make(chan time.Time) testTime.EXPECT().After(dockerPullBeginTimeout).Return(pullBeginTimeout).MinTimes(1) testTime.EXPECT().After(pullImageTimeout).MinTimes(1) - wait := sync.WaitGroup{} - wait.Add(1) + // multiple invocations will happen due to retries, but all should timeout - mockDocker.EXPECT().PullImage(&pullImageOptsMatcher{"image:latest"}, gomock.Any()).Do( - func(x, y interface{}) { + mockDockerSDK.EXPECT().ImagePull(gomock.Any(), "image:latest", gomock.Any()).DoAndReturn( + func(x, y, z interface{}) (io.ReadCloser, error) { pullBeginTimeout <- time.Now() - wait.Wait() - // Don't return, verify timeout happens + + reader := &mockReadCloser{ + reader: strings.NewReader(`{"status":"pull in progress"}`), + } + return reader, nil }).Times(maximumPullRetries) // expected number of retries metadata := client.PullImage("image", nil) assert.Error(t, metadata.Error, "Expected error for pull timeout") assert.Equal(t, "DockerTimeoutError", metadata.Error.(apierrors.NamedError).ErrorName()) - - // cleanup - wait.Done() } -func TestPullImageGlobalTimeout(t *testing.T) { - mockDocker, _, client, testTime, _, _, done := dockerClientSetup(t) +func TestImagePullGlobalTimeout(t *testing.T) { + _, mockDockerSDK, client, testTime, _, _, done := dockerClientSetup(t) defer done() pullBeginTimeout := make(chan time.Time, 1) testTime.EXPECT().After(dockerPullBeginTimeout).Return(pullBeginTimeout) pullTimeout := make(chan time.Time, 1) testTime.EXPECT().After(pullImageTimeout).Return(pullTimeout) - wait := sync.WaitGroup{} - wait.Add(1) - mockDocker.EXPECT().PullImage(&pullImageOptsMatcher{"image:latest"}, gomock.Any()).Do(func(x, y interface{}) { - opts, ok := x.(docker.PullImageOptions) - assert.True(t, ok, "Cannot cast argument to PullImageOptions") - io.WriteString(opts.OutputStream, "string\n") - pullBeginTimeout <- time.Now() - pullTimeout <- time.Now() - wait.Wait() - // Don't return, verify timeout happens - }) + + mockDockerSDK.EXPECT().ImagePull(gomock.Any(), "image:latest", gomock.Any()).DoAndReturn( + func(x, y, z interface{}) (io.ReadCloser, error) { + pullBeginTimeout <- time.Now() + pullTimeout <- time.Now() + + reader := mockReadCloser{ + reader: strings.NewReader(`{"status":"pull in progress"}`), + } + return reader, nil + }) metadata := client.PullImage("image", nil) assert.Error(t, metadata.Error, "Expected error for pull timeout") assert.Equal(t, "DockerTimeoutError", metadata.Error.(apierrors.NamedError).ErrorName()) - - testTime.EXPECT().After(dockerPullBeginTimeout) - testTime.EXPECT().After(pullImageTimeout) - mockDocker.EXPECT().PullImage(&pullImageOptsMatcher{"image2:latest"}, gomock.Any()) - _ = client.PullImage("image2", nil) - - // cleanup - wait.Done() } func TestPullImageInactivityTimeout(t *testing.T) { - mockDocker, _, client, testTime, _, _, done := dockerClientSetup(t) + _, mockDockerSDK, client, testTime, _, _, done := dockerClientSetup(t) defer done() + client.config.ImagePullInactivityTimeout = 100 * time.Millisecond + testTime.EXPECT().After(gomock.Any()).AnyTimes() - mockDocker.EXPECT().PullImage(&pullImageOptsMatcher{"image:latest"}, gomock.Any()).Return( - docker.ErrInactivityTimeout).Times(maximumPullRetries) // expected number of retries + mockDockerSDK.EXPECT().ImagePull(gomock.Any(), "image:latest", gomock.Any()).DoAndReturn( + func(x, y, z interface{}) (io.ReadCloser, error) { + + reader := mockReadCloser{ + reader: strings.NewReader(`{"status":"pull in progress"}`), + delay: 300 * time.Millisecond, + } + return reader, nil + }).Times(maximumPullRetries) // expected number of retries metadata := client.PullImage("image", nil) assert.Error(t, metadata.Error, "Expected error for pull inactivity timeout") assert.Equal(t, "CannotPullContainerError", metadata.Error.(apierrors.NamedError).ErrorName(), "Wrong error type") } -func TestPullImage(t *testing.T) { - mockDocker, _, client, testTime, _, _, done := dockerClientSetup(t) +func TestImagePull(t *testing.T) { + _, mockDockerSDK, client, testTime, _, _, done := dockerClientSetup(t) defer done() testTime.EXPECT().After(gomock.Any()).AnyTimes() - mockDocker.EXPECT().PullImage(&pullImageOptsMatcher{"image:latest"}, gomock.Any()).Return(nil) + + mockDockerSDK.EXPECT().ImagePull(gomock.Any(), "image:latest", gomock.Any()).Return( + mockReadCloser{ + reader: strings.NewReader(`{"status":"pull complete"}`), + }, nil) metadata := client.PullImage("image", nil) assert.NoError(t, metadata.Error, "Expected pull to succeed") } -func TestPullImageTag(t *testing.T) { - mockDocker, _, client, testTime, _, _, done := dockerClientSetup(t) +func TestImagePullTag(t *testing.T) { + _, mockDockerSDK, client, testTime, _, _, done := dockerClientSetup(t) defer done() + client.config.ImagePullInactivityTimeout = 10 * time.Second testTime.EXPECT().After(gomock.Any()).AnyTimes() - mockDocker.EXPECT().PullImage(&pullImageOptsMatcher{"image:mytag"}, gomock.Any()).Return(nil) + + mockDockerSDK.EXPECT().ImagePull(gomock.Any(), "image:mytag", gomock.Any()).Return( + mockReadCloser{ + reader: strings.NewReader(`{"status":"pull complete"}`), + }, nil) metadata := client.PullImage("image:mytag", nil) assert.NoError(t, metadata.Error, "Expected pull to succeed") } -func TestPullImageDigest(t *testing.T) { - mockDocker, _, client, testTime, _, _, done := dockerClientSetup(t) +func TestImagePullDigest(t *testing.T) { + _, mockDockerSDK, client, testTime, _, _, done := dockerClientSetup(t) defer done() testTime.EXPECT().After(gomock.Any()).AnyTimes() - mockDocker.EXPECT().PullImage( - &pullImageOptsMatcher{"image@sha256:bc8813ea7b3603864987522f02a76101c17ad122e1c46d790efc0fca78ca7bfb"}, - gomock.Any(), - ).Return(nil) + mockDockerSDK.EXPECT().ImagePull(gomock.Any(), "image@sha256:bc8813ea7b3603864987522f02a76101c17ad122e1c46d790efc0fca78ca7bfb", gomock.Any()).Return( + mockReadCloser{ + reader: strings.NewReader(`{"status":"pull complete"}`), + }, nil) metadata := client.PullImage("image@sha256:bc8813ea7b3603864987522f02a76101c17ad122e1c46d790efc0fca78ca7bfb", nil) assert.NoError(t, metadata.Error, "Expected pull to succeed") } func TestPullImageECRSuccess(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - // go-dockerclient tests - mockDocker := mock_dockeriface.NewMockClient(ctrl) - mockDocker.EXPECT().Ping().AnyTimes().Return(nil) - factory := mock_clientfactory.NewMockFactory(ctrl) - factory.EXPECT().GetDefaultClient().AnyTimes().Return(mockDocker, nil) - // Docker SDK tests - mockDockerSDK := mock_sdkclient.NewMockClient(ctrl) - mockDockerSDK.EXPECT().Ping(gomock.Any()).Return(types.Ping{}, nil) - sdkFactory := mock_sdkclientfactory.NewMockFactory(ctrl) - sdkFactory.EXPECT().GetDefaultClient().AnyTimes().Return(mockDockerSDK, nil) - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - client, _ := NewDockerGoClient(factory, sdkFactory, defaultTestConfig(), ctx) - goClient, _ := client.(*dockerGoClient) - ecrClientFactory := mock_ecr.NewMockECRFactory(ctrl) - ecrClient := mock_ecr.NewMockECRClient(ctrl) - mockTime := mock_ttime.NewMockTime(ctrl) - goClient.ecrClientFactory = ecrClientFactory - goClient._time = mockTime + _, mockDockerSDK, client, mockTime, ctrl, ecrClientFactory, done := dockerClientSetup(t) + defer done() mockTime.EXPECT().After(gomock.Any()).AnyTimes() + ecrClient := mock_ecr.NewMockECRClient(ctrl) registryID := "123456789012" region := "eu-west-1" @@ -272,10 +248,10 @@ func TestPullImageECRSuccess(t *testing.T) { image := imageEndpoint + "/myimage:tag" username := "username" password := "password" - dockerAuthConfiguration := docker.AuthConfiguration{ - Username: username, - Password: password, - ServerAddress: "https://" + imageEndpoint, + + imagePullOpts := types.ImagePullOptions{ + All: false, + RegistryAuth: "eyJ1c2VybmFtZSI6InVzZXJuYW1lIiwicGFzc3dvcmQiOiJwYXNzd29yZCIsInNlcnZlcmFkZHJlc3MiOiJodHRwczovL3JlZ2lzdHJ5LmVuZHBvaW50In0K", } ecrClientFactory.EXPECT().GetClient(authData.ECRAuthData).Return(ecrClient, nil) @@ -285,10 +261,10 @@ func TestPullImageECRSuccess(t *testing.T) { AuthorizationToken: aws.String(base64.StdEncoding.EncodeToString([]byte(username + ":" + password))), }, nil) - mockDocker.EXPECT().PullImage( - &pullImageOptsMatcher{image}, - dockerAuthConfiguration, - ).Return(nil) + mockDockerSDK.EXPECT().ImagePull(gomock.Any(), image, imagePullOpts).Return( + mockReadCloser{ + reader: strings.NewReader(`{"status":"pull complete"}`), + }, nil) metadata := client.PullImage(image, authData) assert.NoError(t, metadata.Error, "Expected pull to succeed") @@ -344,6 +320,18 @@ func TestPullImageECRAuthFail(t *testing.T) { assert.Error(t, metadata.Error, "expected pull to fail") } +type mockReadCloser struct { + reader io.Reader + delay time.Duration +} + +func (mr mockReadCloser) Read(data []byte) (n int, err error) { + time.Sleep(mr.delay) + return mr.reader.Read(data) +} +func (mr mockReadCloser) Close() error { + return nil +} func TestGetRepositoryWithTaggedImage(t *testing.T) { image := "registry.endpoint/myimage:tag" repository := getRepository(image) @@ -1014,9 +1002,11 @@ func TestStatsClientError(t *testing.T) { type mockStream struct { data []byte index int64 + delay time.Duration } func (ms mockStream) Read(data []byte) (n int, err error) { + time.Sleep(ms.delay) if ms.index >= int64(len(ms.data)) { err = io.EOF return @@ -1050,11 +1040,10 @@ func TestStatsInactivityTimeout(t *testing.T) { _, mockDockerSDK, client, _, _, _, done := dockerClientSetup(t) defer done() mockDockerSDK.EXPECT().ContainerStats(gomock.Any(), gomock.Any(), true).Return(types.ContainerStats{ - Body: mockStreamWithDelay{ - mockStream{ - data: []byte(`{"memory_stats":{"Usage":50},"cpu_stats":{"system_cpu_usage":100}}`), - index: 0, - }, + Body: mockStream{ + data: []byte(`{"memory_stats":{"Usage":50},"cpu_stats":{"system_cpu_usage":100}}`), + index: 0, + delay: 300 * time.Millisecond, }, }, nil) ctx, cancel := context.WithCancel(context.TODO()) @@ -1071,11 +1060,10 @@ func TestStatsInactivityTimeoutNoHit(t *testing.T) { _, mockDockerSDK, client, _, _, _, done := dockerClientSetup(t) defer done() mockDockerSDK.EXPECT().ContainerStats(gomock.Any(), gomock.Any(), true).Return(types.ContainerStats{ - Body: mockStreamWithDelay{ - mockStream{ - data: []byte(`{"memory_stats":{"Usage":50},"cpu_stats":{"system_cpu_usage":100}}`), - index: 0, - }, + Body: mockStream{ + data: []byte(`{"memory_stats":{"Usage":50},"cpu_stats":{"system_cpu_usage":100}}`), + index: 0, + delay: 300 * time.Millisecond, }, }, nil) ctx, cancel := context.WithCancel(context.TODO()) @@ -1089,19 +1077,6 @@ func TestStatsInactivityTimeoutNoHit(t *testing.T) { assert.Equal(t, uint64(100), newStat.CPUStats.SystemUsage) } -type mockStreamWithDelay struct { - mockStream -} - -func (msd mockStreamWithDelay) Read(data []byte) (n int, err error) { - time.Sleep(300 * time.Millisecond) - return msd.mockStream.Read(data) -} - -func (msd mockStreamWithDelay) Close() error { - return msd.mockStream.Close() -} - func TestRemoveImageTimeout(t *testing.T) { _, mockDockerSDK, client, _, _, _, done := dockerClientSetup(t) defer done() @@ -1164,7 +1139,8 @@ func TestLoadImageTimeout(t *testing.T) { // TestECRAuthCache tests the client will use cached docker auth if pulling // from same registry on ecr with default instance profile func TestECRAuthCacheWithoutExecutionRole(t *testing.T) { - mockDocker, _, client, mockTime, ctrl, ecrClientFactory, done := dockerClientSetup(t) + _, mockDockerSDK, client, mockTime, ctrl, ecrClientFactory, done := dockerClientSetup(t) + defer done() mockTime.EXPECT().After(gomock.Any()).AnyTimes() @@ -1173,8 +1149,6 @@ func TestECRAuthCacheWithoutExecutionRole(t *testing.T) { region := "eu-west-1" registryID := "1234567890" endpointOverride := "my.endpoint" - imageEndpoint := "registry.endpoint" - image := imageEndpoint + "myimage:tag" authData := &apicontainer.RegistryAuthenticationData{ Type: "ecr", ECRAuthData: &apicontainer.ECRAuthData{ @@ -1184,6 +1158,8 @@ func TestECRAuthCacheWithoutExecutionRole(t *testing.T) { }, } + imageEndpoint := "registry.endpoint" + image := imageEndpoint + "myimage:tag" username := "username" password := "password" @@ -1194,7 +1170,10 @@ func TestECRAuthCacheWithoutExecutionRole(t *testing.T) { AuthorizationToken: aws.String(base64.StdEncoding.EncodeToString([]byte(username + ":" + password))), ExpiresAt: aws.Time(time.Now().Add(10 * time.Hour)), }, nil).Times(1) - mockDocker.EXPECT().PullImage(gomock.Any(), gomock.Any()).Return(nil).Times(4) + mockDockerSDK.EXPECT().ImagePull(gomock.Any(), gomock.Any(), gomock.Any()).Return( + mockReadCloser{ + reader: strings.NewReader(`{"status":"pull complete"}`), + }, nil).Times(4) metadata := client.PullImage(image, authData) assert.NoError(t, metadata.Error, "Expected pull to succeed") @@ -1215,7 +1194,7 @@ func TestECRAuthCacheWithoutExecutionRole(t *testing.T) { // TestECRAuthCacheForDifferentRegistry tests the client will call ecr client to get docker // auth for different registry func TestECRAuthCacheForDifferentRegistry(t *testing.T) { - mockDocker, _, client, mockTime, ctrl, ecrClientFactory, done := dockerClientSetup(t) + _, mockDockerSDK, client, mockTime, ctrl, ecrClientFactory, done := dockerClientSetup(t) defer done() mockTime.EXPECT().After(gomock.Any()).AnyTimes() @@ -1224,8 +1203,6 @@ func TestECRAuthCacheForDifferentRegistry(t *testing.T) { region := "eu-west-1" registryID := "1234567890" endpointOverride := "my.endpoint" - imageEndpoint := "registry.endpoint" - image := imageEndpoint + "/myimage:tag" authData := &apicontainer.RegistryAuthenticationData{ Type: "ecr", ECRAuthData: &apicontainer.ECRAuthData{ @@ -1235,6 +1212,8 @@ func TestECRAuthCacheForDifferentRegistry(t *testing.T) { }, } + imageEndpoint := "registry.endpoint" + image := imageEndpoint + "/myimage:tag" username := "username" password := "password" @@ -1245,7 +1224,10 @@ func TestECRAuthCacheForDifferentRegistry(t *testing.T) { AuthorizationToken: aws.String(base64.StdEncoding.EncodeToString([]byte(username + ":" + password))), ExpiresAt: aws.Time(time.Now().Add(10 * time.Hour)), }, nil).Times(1) - mockDocker.EXPECT().PullImage(gomock.Any(), gomock.Any()).Return(nil).Times(2) + mockDockerSDK.EXPECT().ImagePull(gomock.Any(), gomock.Any(), gomock.Any()).Return( + mockReadCloser{ + reader: strings.NewReader(`{"status":"pull complete"}`), + }, nil).Times(2) metadata := client.PullImage(image, authData) assert.NoError(t, metadata.Error, "Expected pull to succeed") @@ -1266,7 +1248,7 @@ func TestECRAuthCacheForDifferentRegistry(t *testing.T) { // TestECRAuthCacheWithExecutionRole tests the client will use the cached docker auth // for ecr when pull from the same registry with same execution role func TestECRAuthCacheWithSameExecutionRole(t *testing.T) { - mockDocker, _, client, mockTime, ctrl, ecrClientFactory, done := dockerClientSetup(t) + _, mockDockerSDK, client, mockTime, ctrl, ecrClientFactory, done := dockerClientSetup(t) defer done() mockTime.EXPECT().After(gomock.Any()).AnyTimes() @@ -1299,7 +1281,10 @@ func TestECRAuthCacheWithSameExecutionRole(t *testing.T) { AuthorizationToken: aws.String(base64.StdEncoding.EncodeToString([]byte(username + ":" + password))), ExpiresAt: aws.Time(time.Now().Add(10 * time.Hour)), }, nil).Times(1) - mockDocker.EXPECT().PullImage(gomock.Any(), gomock.Any()).Return(nil).Times(3) + mockDockerSDK.EXPECT().ImagePull(gomock.Any(), gomock.Any(), gomock.Any()).Return( + mockReadCloser{ + reader: strings.NewReader(`{"status":"pull complete"}`), + }, nil).Times(3) metadata := client.PullImage(image, authData) assert.NoError(t, metadata.Error, "Expected pull to succeed") @@ -1316,7 +1301,7 @@ func TestECRAuthCacheWithSameExecutionRole(t *testing.T) { // TestECRAuthCacheWithDifferentExecutionRole tests client will call ecr client to get // docker auth credentials for different execution role func TestECRAuthCacheWithDifferentExecutionRole(t *testing.T) { - mockDocker, _, client, mockTime, ctrl, ecrClientFactory, done := dockerClientSetup(t) + _, mockDockerSDK, client, mockTime, ctrl, ecrClientFactory, done := dockerClientSetup(t) defer done() mockTime.EXPECT().After(gomock.Any()).AnyTimes() @@ -1324,8 +1309,6 @@ func TestECRAuthCacheWithDifferentExecutionRole(t *testing.T) { region := "eu-west-1" registryID := "1234567890" - imageEndpoint := "registry.endpoint" - image := imageEndpoint + "/myimage:tag" endpointOverride := "my.endpoint" authData := &apicontainer.RegistryAuthenticationData{ Type: "ecr", @@ -1339,6 +1322,8 @@ func TestECRAuthCacheWithDifferentExecutionRole(t *testing.T) { RoleArn: "executionRole", }) + imageEndpoint := "registry.endpoint" + image := imageEndpoint + "/myimage:tag" username := "username" password := "password" @@ -1349,7 +1334,10 @@ func TestECRAuthCacheWithDifferentExecutionRole(t *testing.T) { AuthorizationToken: aws.String(base64.StdEncoding.EncodeToString([]byte(username + ":" + password))), ExpiresAt: aws.Time(time.Now().Add(10 * time.Hour)), }, nil).Times(1) - mockDocker.EXPECT().PullImage(gomock.Any(), gomock.Any()).Return(nil).Times(2) + mockDockerSDK.EXPECT().ImagePull(gomock.Any(), gomock.Any(), gomock.Any()).Return( + mockReadCloser{ + reader: strings.NewReader(`{"status":"pull complete"}`), + }, nil).Times(2) metadata := client.PullImage(image, authData) assert.NoError(t, metadata.Error, "Expected pull to succeed") From 1c29f4ba513de3edd127768a68ab5eead1d81e4e Mon Sep 17 00:00:00 2001 From: Yunhee Date: Mon, 15 Oct 2018 11:16:47 -0700 Subject: [PATCH 2/2] custom struct to handle image pull response --- agent/dockerclient/dockerapi/docker_client.go | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/agent/dockerclient/dockerapi/docker_client.go b/agent/dockerclient/dockerapi/docker_client.go index b50b10aef2e..014ada316a0 100644 --- a/agent/dockerclient/dockerapi/docker_client.go +++ b/agent/dockerclient/dockerapi/docker_client.go @@ -237,6 +237,20 @@ type dockerGoClient struct { lock sync.Mutex } +type ImagePullResponse struct { + Id string `json:"id,omitempty"` + Status string `json:"status,omitempty"` + ProgressDetail struct { + Current int64 `json:"current,omitempty"` + Total int64 `json:"total,omitempty"` + } `json:"progressDetail,omitempty"` + Progress string `json:"progress,omitempty"` + Error struct { + Code int `json:"code,omitempty"` + Message string `json:"message,omitempty"` + } `json:"errorDetail,omitempty"` +} + func (dg *dockerGoClient) WithVersion(version dockerclient.DockerVersion) DockerClient { return &dockerGoClient{ clientFactory: dg.clientFactory, @@ -418,7 +432,7 @@ func (dg *dockerGoClient) pullImage(ctx context.Context, image string, authData defer close(ch) decoder := json.NewDecoder(reader) - data := new(events.Message) + data := new(ImagePullResponse) for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) { if err != nil { seelog.Warnf("DockerGoClient: Unable to decode pull event message for image %s: %v", image, err) @@ -436,11 +450,9 @@ func (dg *dockerGoClient) pullImage(ctx context.Context, image string, authData pullBegan <- true }) - dataBytes, _ := json.Marshal(data) - line := string(dataBytes[:]) - dg.filterPullDebugOutput(line, image) + dg.filterPullDebugOutput(data, image) - data = new(events.Message) + data = new(ImagePullResponse) } pullFinished <- nil }() @@ -468,21 +480,21 @@ func (dg *dockerGoClient) pullImage(ctx context.Context, image string, authData return nil } -func (dg *dockerGoClient) filterPullDebugOutput(line string, image string) { +func (dg *dockerGoClient) filterPullDebugOutput(data *ImagePullResponse, image string) { var statusDisplayed time.Time now := time.Now() - if !strings.Contains(line, "[=") || now.After(statusDisplayed.Add(pullStatusSuppressDelay)) { + if !strings.Contains(data.Progress, "[=") || now.After(statusDisplayed.Add(pullStatusSuppressDelay)) { // skip most of the progress bar lines, but retain enough for debugging - seelog.Debugf("DockerGoClient: pulling image %s, status %s", image, line) + seelog.Debugf("DockerGoClient: pulling image %s, status %s", image, data.Progress) statusDisplayed = now } - if strings.Contains(line, "already being pulled by another client. Waiting.") { + if strings.Contains(data.Status, "already being pulled by another client. Waiting.") { // This can mean the daemon is 'hung' in pulling status for this image, but we can't be sure. seelog.Errorf("DockerGoClient: image 'pull' status marked as already being pulled for image %s, status %s", - image, line) + image, data.Status) } }