Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pullimage sdk migration #1623

Merged
merged 2 commits into from
Oct 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent/config/config_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func DefaultConfig() Config {
TaskMetadataSteadyStateRate: DefaultTaskMetadataSteadyStateRate,
TaskMetadataBurstRate: DefaultTaskMetadataBurstRate,
SharedVolumeMatchFullConfig: false, // only requiring shared volumes to match on name, which is default docker behavior
ImagePullInactivityTimeout: defaultImagePullInactivityTimeout,
}
}

Expand Down
136 changes: 84 additions & 52 deletions agent/dockerclient/dockerapi/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package dockerapi

import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
Expand Down Expand Up @@ -47,7 +49,6 @@ import (
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/volume"
docker "github.com/fsouza/go-dockerclient"
)

const (
Expand Down Expand Up @@ -236,6 +237,20 @@ type dockerGoClient struct {
lock sync.Mutex
}

type ImagePullResponse struct {
Id string `json:"id,omitempty"`
Status string `json:"status,omitempty"`
ProgressDetail struct {
Current int64 `json:"current,omitempty"`
Total int64 `json:"total,omitempty"`
} `json:"progressDetail,omitempty"`
Progress string `json:"progress,omitempty"`
Error struct {
Code int `json:"code,omitempty"`
Message string `json:"message,omitempty"`
} `json:"errorDetail,omitempty"`
}

func (dg *dockerGoClient) WithVersion(version dockerclient.DockerVersion) DockerClient {
return &dockerGoClient{
clientFactory: dg.clientFactory,
Expand Down Expand Up @@ -338,7 +353,7 @@ func (dg *dockerGoClient) PullImage(image string, authData *apicontainer.Registr
maximumPullRetryDelay, pullRetryJitterMultiplier, pullRetryDelayMultiplier)
err := utils.RetryNWithBackoffCtx(ctx, imagePullBackoff, maximumPullRetries,
func() error {
err := dg.pullImage(image, authData)
err := dg.pullImage(ctx, image, authData)
if err != nil {
seelog.Warnf("DockerGoClient: failed to pull image %s: %s", image, err.Error())
}
Expand Down Expand Up @@ -367,45 +382,79 @@ func wrapPullErrorAsNamedError(err error) apierrors.NamedError {
return retErr
}

func (dg *dockerGoClient) pullImage(image string, authData *apicontainer.RegistryAuthenticationData) apierrors.NamedError {
func (dg *dockerGoClient) pullImage(ctx context.Context, image string, authData *apicontainer.RegistryAuthenticationData) apierrors.NamedError {
seelog.Debugf("DockerGoClient: pulling image: %s", image)
client, err := dg.dockerClient()
client, err := dg.sdkDockerClient()
if err != nil {
return CannotGetDockerClientError{version: dg.version, err: err}
}

sdkAuthConfig, err := dg.getAuthdata(image, authData)
authConfig := docker.AuthConfiguration{
Username: sdkAuthConfig.Username,
Password: sdkAuthConfig.Password,
Email: sdkAuthConfig.Email,
ServerAddress: sdkAuthConfig.ServerAddress,
}
if err != nil {
return wrapPullErrorAsNamedError(err)
}
// encode auth data
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(sdkAuthConfig); err != nil {
return CannotPullECRContainerError{err}
}

pullDebugOut, pullWriter := io.Pipe()
defer pullWriter.Close()
imagePullOpts := types.ImagePullOptions{
All: false,
RegistryAuth: base64.URLEncoding.EncodeToString(buf.Bytes()),
}

repository := getRepository(image)

opts := docker.PullImageOptions{
Repository: repository,
OutputStream: pullWriter,
InactivityTimeout: dg.config.ImagePullInactivityTimeout,
}
timeout := dg.time().After(dockerPullBeginTimeout)
// pullBegan is a channel indicating that we have seen at least one line of data on the 'OutputStream' above.
// It is here to guard against a bug wherein Docker never writes anything to that channel and hangs in pulling forever.
pullBegan := make(chan bool, 1)

go dg.filterPullDebugOutput(pullDebugOut, pullBegan, image)
// pullBeganOnce ensures we only indicate it began once (since our channel will only be read 0 or 1 times)
pullBeganOnce := sync.Once{}

pullFinished := make(chan error, 1)
// TODO Migrate Pull Image once Inactivity Timeout is sorted out
subCtx, cancelRequest := context.WithCancel(ctx)

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this go routine along with the select statement necessary since we now implement the full go routine with the sdk change? It seems the select statement was only to catch a premature error (if nothing was being pulled), but now we can handle the error within the go routine without the select statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that images can be large and take long time to download, I think go routine here is necessary so that it is handled in a separate thread. I'm sure that this can be cleaned up, but I'd like to limit changes in behavior between go-dockerclient and SDK, and prefer to come back to this to make any improvements.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're still using the goroutine for the pullBegan timeout. You could rework that, but I think its fine to only focus on translating the API for now.

pullFinished <- client.PullImage(opts, authConfig)
defer cancelRequest()
reader, err := client.ImagePull(subCtx, repository, imagePullOpts)
if err != nil {
pullFinished <- err
return
}

// handle inactivity timeout
var canceled uint32
var ch chan<- struct{}
reader, ch = handleInactivityTimeout(reader, dg.config.ImagePullInactivityTimeout, cancelRequest, &canceled)
defer reader.Close()
defer close(ch)

decoder := json.NewDecoder(reader)
data := new(ImagePullResponse)
for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) {
if err != nil {
seelog.Warnf("DockerGoClient: Unable to decode pull event message for image %s: %v", image, err)
pullFinished <- err
return
}
if atomic.LoadUint32(&canceled) != 0 {
seelog.Warnf("DockerGoClient: inactivity time exceeded timeout while pulling image %s", image)
pullErr := errors.New("inactivity time exceeded timeout while pulling image")
pullFinished <- pullErr
return
}

pullBeganOnce.Do(func() {
pullBegan <- true
})

dg.filterPullDebugOutput(data, image)

data = new(ImagePullResponse)
}
pullFinished <- nil
}()

select {
Expand All @@ -431,38 +480,21 @@ func (dg *dockerGoClient) pullImage(image string, authData *apicontainer.Registr
return nil
}

func (dg *dockerGoClient) filterPullDebugOutput(pullDebugOut *io.PipeReader, pullBegan chan<- bool, image string) {
// pullBeganOnce ensures we only indicate it began once (since our channel will only be read 0 or 1 times)
pullBeganOnce := sync.Once{}
func (dg *dockerGoClient) filterPullDebugOutput(data *ImagePullResponse, image string) {

reader := bufio.NewReader(pullDebugOut)
var line string
var pullErr error
var statusDisplayed time.Time
for {
line, pullErr = reader.ReadString('\n')
if pullErr != nil {
break
}
pullBeganOnce.Do(func() {
pullBegan <- true
})

now := time.Now()
if !strings.Contains(line, "[=") || now.After(statusDisplayed.Add(pullStatusSuppressDelay)) {
// skip most of the progress bar lines, but retain enough for debugging
seelog.Debugf("DockerGoClient: pulling image %s, status %s", image, line)
statusDisplayed = now
}

if strings.Contains(line, "already being pulled by another client. Waiting.") {
// This can mean the daemon is 'hung' in pulling status for this image, but we can't be sure.
seelog.Errorf("DockerGoClient: image 'pull' status marked as already being pulled for image %s, status %s",
image, line)
}
now := time.Now()
if !strings.Contains(data.Progress, "[=") || now.After(statusDisplayed.Add(pullStatusSuppressDelay)) {
// skip most of the progress bar lines, but retain enough for debugging
seelog.Debugf("DockerGoClient: pulling image %s, status %s", image, data.Progress)
statusDisplayed = now
}
if pullErr != nil && pullErr != io.EOF {
seelog.Warnf("DockerGoClient: error reading pull image status for image %s: %v", image, pullErr)

if strings.Contains(data.Status, "already being pulled by another client. Waiting.") {
// This can mean the daemon is 'hung' in pulling status for this image, but we can't be sure.
seelog.Errorf("DockerGoClient: image 'pull' status marked as already being pulled for image %s, status %s",
image, data.Status)
}
}

Expand Down Expand Up @@ -936,8 +968,8 @@ func (dg *dockerGoClient) handleContainerEvents(ctx context.Context,
metadata := dg.containerMetadata(ctx, containerID)

changedContainers <- DockerContainerChangeEvent{
Status: status,
Type: eventType,
Status: status,
Type: eventType,
DockerContainerMetadata: metadata,
}
}
Expand Down
Loading