Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for layer extraction from OCI artifacts with ImageIndex #1369

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion api/v1beta2/ocirepository_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ type OCIRepositoryRef struct {
type OCILayerSelector struct {
// MediaType specifies the OCI media type of the layer
// which should be extracted from the OCI Artifact. The
// first layer matching this type is selected.
// first layer matching this type is selected by default,
// otherwise 'offest' needs to be specified.
// +optional
MediaType string `json:"mediaType,omitempty"`

Expand All @@ -177,6 +178,15 @@ type OCILayerSelector struct {
// +kubebuilder:validation:Enum=extract;copy
// +optional
Operation string `json:"operation,omitempty"`

// Offset specifies the index of the layer which should be extracted.
// +kubebuilder:validation:Minimum=0
// +optional
Offset *int `json:"offset,omitempty"`

// TODO: next API version should probably use artifact media types
// at the top level and make layer selector optional
ArtifactMediaType string `json:"artifactMediaType,omitempty"`
Comment on lines +187 to +189
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this used anywhere in the controller logic, can you explain what's the role of this field? Is this something that's in the latest OCI spec but not implemented by any registry yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a quick rebase of a WIP branch that I had from around the time of #1244. I thought I should open a draft PR before it's forgotten. It still needs some tidying for sure. This field looks like an idea I had at the time, I'll refresh my memory and see if this needs implementing or deleting.

Copy link
Contributor Author

@errordeveloper errordeveloper Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have recapped this. The idea is actaully that media type of artifact index and the layers that it points at are two distinct notions. I should probably try implementing the field in this PR, without flipping API semanitcs until there is more to make a case for API version bump.
The point is that what you want to select primarily is the media type of the OCI index, the type of the blob is a secondary concern, it can be defaulted to a tarball.

}

// OCIRepositoryVerification verifies the authenticity of an OCI Artifact
Expand Down Expand Up @@ -307,6 +317,14 @@ func (in *OCIRepository) GetLayerOperation() string {
return in.Spec.LayerSelector.Operation
}

func (in *OCIRepository) GetLayerOffset() int {
if in.Spec.LayerSelector == nil || in.Spec.LayerSelector.Offset == nil {
return 0
}

return *in.Spec.LayerSelector.Offset
}

// +genclient
// +genclient:Namespaced
// +kubebuilder:storageversion
Expand Down
196 changes: 138 additions & 58 deletions internal/controller/ocirepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -392,7 +393,7 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, sp *patch

// Get the upstream revision from the artifact digest
// TODO: getRevision resolves the digest, which may change before image is fetched, so it should probaly update ref
revision, err := r.getRevision(ref, opts)
revision, ref, desc, err := r.getRevision(ref, opts)
if err != nil {
e := serror.NewGeneric(
fmt.Errorf("failed to determine artifact digest: %w", err),
Expand All @@ -407,7 +408,7 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, sp *patch
// Mark observations about the revision on the object
defer func() {
if !obj.GetArtifact().HasRevision(revision) {
message := fmt.Sprintf("new revision '%s' for '%s'", revision, ref)
message := fmt.Sprintf("new revision '%s' for '%s'", revision, obj.Spec.URL)
if obj.GetArtifact() != nil {
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
}
Expand Down Expand Up @@ -454,35 +455,10 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, sp *patch
return sreconcile.ResultSuccess, nil
}

// Pull artifact from the remote container registry
img, err := remote.Image(ref, opts...)
if err != nil {
e := serror.NewGeneric(
fmt.Errorf("failed to pull artifact from '%s': %w", obj.Spec.URL, err),
ociv1.OCIPullFailedReason,
)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}

// Copy the OCI annotations to the internal artifact metadata
manifest, err := img.Manifest()
if err != nil {
e := serror.NewGeneric(
fmt.Errorf("failed to parse artifact manifest: %w", err),
ociv1.OCILayerOperationFailedReason,
)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
metadata.Metadata = manifest.Annotations

// Extract the compressed content from the selected layer
blob, err := r.selectLayer(obj, img)
if err != nil {
e := serror.NewGeneric(err, ociv1.OCILayerOperationFailedReason)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
blob, serr := r.fetchArtifact(obj, metadata, ref, desc, opts)
if serr != nil {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, serr.Reason, serr.Err.Error())
return sreconcile.ResultEmpty, serr
}

// Persist layer content to storage using the specified operation
Expand Down Expand Up @@ -539,30 +515,40 @@ func (r *OCIRepositoryReconciler) selectLayer(obj *ociv1.OCIRepository, image gc
return nil, fmt.Errorf("failed to parse artifact layers: %w", err)
}

if len(layers) < 1 {
offset := obj.GetLayerOffset()
mediaType := obj.GetLayerMediaType()

if len(layers) == 0 {
return nil, fmt.Errorf("no layers found in artifact")
}

if len(layers) <= offset {
return nil, fmt.Errorf("layer offset %d is out of bounds", offset)
}

var layer gcrv1.Layer
switch {
case obj.GetLayerMediaType() != "":
case mediaType != "":
var found bool
for i, l := range layers {
if i < offset {
continue
}
md, err := l.MediaType()
if err != nil {
return nil, fmt.Errorf("failed to determine the media type of layer[%v] from artifact: %w", i, err)
}
if string(md) == obj.GetLayerMediaType() {
if string(md) == mediaType {
layer = layers[i]
found = true
break
}
}
if !found {
return nil, fmt.Errorf("failed to find layer with media type '%s' in artifact", obj.GetLayerMediaType())
return nil, fmt.Errorf("failed to find layer with media type '%s' in artifact at offset %d", obj.GetLayerMediaType(), offset)
}
default:
layer = layers[0]
layer = layers[offset]
}

blob, err := layer.Compressed()
Expand All @@ -573,32 +559,130 @@ func (r *OCIRepositoryReconciler) selectLayer(obj *ociv1.OCIRepository, image gc
return blob, nil
}

func newPullErr(format string, a ...any) *serror.Generic {
return serror.NewGeneric(fmt.Errorf(format, a...), ociv1.OCIPullFailedReason)
}

func newLayerOperationErr(format string, a ...any) *serror.Generic {
return serror.NewGeneric(fmt.Errorf(format, a...), ociv1.OCILayerOperationFailedReason)
}

func (r *OCIRepositoryReconciler) fetchArtifact(obj *ociv1.OCIRepository, metadata *sourcev1.Artifact, ref name.Reference, desc *remote.Descriptor, options []remote.Option) (io.ReadCloser, *serror.Generic) {
switch mt := desc.MediaType; {
case mt.IsImage():
// Pull artifact from the remote container registry
img, err := desc.Image()
if err != nil {
return nil, newPullErr("failed to parse artifact image from '%s': %w", obj.Spec.URL, err)
}

// Copy the OCI annotations to the internal artifact metadata
manifest, err := img.Manifest()
if err != nil {
return nil, newLayerOperationErr("failed to parse artifact image manifest: %w", err)
}
metadata.Metadata = manifest.Annotations

// Extract the compressed content from the selected layer
blob, err := r.selectLayer(obj, img)
if err != nil {
e := serror.NewGeneric(err, ociv1.OCILayerOperationFailedReason)
return nil, e
}
return blob, nil
case mt.IsIndex():
idx, err := desc.ImageIndex()
if err != nil {
return nil, newPullErr("failed to parse artifact index from '%s': %w", obj.Spec.URL, err)
}

manifest, err := idx.IndexManifest()
if err != nil {
return nil, newPullErr("failed to parse artifact index manifest: %w", err)
}

if len(manifest.Manifests) == 0 {
return nil, newLayerOperationErr("empty index")
}

images := make([]gcrv1.Image, 0, len(manifest.Manifests))

for i := range manifest.Manifests {
manifest := manifest.Manifests[i]
if manifest.MediaType.IsIndex() {
r.Eventf(obj, corev1.EventTypeWarning, "OCINestedIndexUnsupported", "skipping nested index manifest '%s' in '%s'", manifest.Digest.String(), desc.Digest.String())
continue
}
if !manifest.MediaType.IsImage() {
r.Eventf(obj, corev1.EventTypeNormal, "OCIImageUnsupported", "skipping runnable image '%s' in '%s'", manifest.Digest.String(), ref)
continue
}
if manifest.ArtifactType != "" {
img, err := idx.Image(manifest.Digest)
if err != nil {
return nil, newPullErr("failed to pull artifact image '%s' from '%s': %w", manifest.Digest.String(), ref, err)
}
images = append(images, img)
}
}

if len(images) == 0 {
return nil, newPullErr("no suitable artifacts found in index '%s': %w", desc.Digest.String(), err)
}

var errs []error
for i := range images {
blob, err := r.selectLayer(obj, images[i])
if err != nil {
errs = append(errs, err)
continue
}
return blob, nil
}
if len(errs) > 0 {
return nil, newLayerOperationErr("%w", kerrors.NewAggregate(errs))
}
return nil, newPullErr("no suitable layers found in index '%s': %w", desc.Digest.String(), err)
default:
return nil, newLayerOperationErr("media type '%s' of '%s' is not index or image", mt, ref)
}
}

func (r *OCIRepositoryReconciler) getDescriptor(ref name.Reference, options []remote.Option) (*remote.Descriptor, error) {
// NB: there is no good enought reason to use remote.Head first,
// as it's only in error case that remote.Get won't have to be
// done afterwards anyway
desc, err := remote.Get(ref, options...)
if err != nil {
return nil, fmt.Errorf("failed to fetch %w", err)
}
return desc, nil

}

// getRevision fetches the upstream digest, returning the revision in the
// format '<tag>@<digest>'.
func (r *OCIRepositoryReconciler) getRevision(ref name.Reference, options []remote.Option) (string, error) {
func (r *OCIRepositoryReconciler) getRevision(ref name.Reference, options []remote.Option) (string, name.Reference, *remote.Descriptor, error) {
switch ref := ref.(type) {
case name.Digest:
digest, err := gcrv1.NewHash(ref.DigestStr())
if err != nil {
return "", err
return "", nil, nil, err
}
desc, err := r.getDescriptor(ref, options)
if err != nil {
return "", nil, nil, fmt.Errorf("unable to check digest in registry: %w", err)
}
return digest.String(), nil
return digest.String(), ref, desc, nil
case name.Tag:
var digest gcrv1.Hash

desc, err := remote.Head(ref, options...)
if err == nil {
digest = desc.Digest
} else {
rdesc, err := remote.Get(ref, options...)
if err != nil {
return "", err
}
digest = rdesc.Descriptor.Digest
desc, err := r.getDescriptor(ref, options)
if err != nil {
return "", nil, nil, err
}
return fmt.Sprintf("%s@%s", ref.TagStr(), digest.String()), nil
digest := desc.Digest.String()
return fmt.Sprintf("%s@%s", ref.TagStr(), digest), ref.Digest(digest), desc, nil
default:
return "", fmt.Errorf("unsupported reference type: %T", ref)
return "", nil, nil, fmt.Errorf("unsupported reference type: %T", ref)
}
}

Expand Down Expand Up @@ -1162,26 +1246,22 @@ func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *oc
// makeRemoteOptions returns a remoteOptions struct with the authentication and transport options set.
// The returned struct can be used to interact with a remote registry using go-containerregistry based libraries.
func makeRemoteOptions(ctxTimeout context.Context, transport http.RoundTripper,
keychain authn.Keychain, auth authn.Authenticator) remoteOptions {
keychain authn.Keychain, auth authn.Authenticator) []remote.Option {

authOption := remote.WithAuthFromKeychain(keychain)
if auth != nil {
// auth take precedence over keychain here as we expect the caller to set
// the auth only if it is required.
authOption = remote.WithAuth(auth)
}
return remoteOptions{
return []remote.Option{
remote.WithContext(ctxTimeout),
remote.WithUserAgent(oci.UserAgent),
remote.WithTransport(transport),
authOption,
}
}

// remoteOptions contains the options to interact with a remote registry.
// It can be used to pass options to go-containerregistry based libraries.
type remoteOptions []remote.Option

// ociContentConfigChanged evaluates the current spec with the observations
// of the artifact in the status to determine if artifact content configuration
// has changed and requires rebuilding the artifact.
Expand Down
5 changes: 3 additions & 2 deletions internal/controller/ocirepository_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1575,12 +1575,13 @@ func TestOCIRepository_reconcileSource_verifyOCISourceSignature_keyless(t *testi
Reference: tt.reference,
},
}
url := strings.TrimPrefix(obj.Spec.URL, "oci://") + ":" + tt.reference.Tag

url := strings.TrimPrefix(obj.Spec.URL, "oci://")

assertConditions := tt.assertConditions
for k := range assertConditions {
assertConditions[k].Message = strings.ReplaceAll(assertConditions[k].Message, "<revision>", tt.revision)
assertConditions[k].Message = strings.ReplaceAll(assertConditions[k].Message, "<url>", url)
assertConditions[k].Message = strings.ReplaceAll(assertConditions[k].Message, "<url>", obj.Spec.URL)
assertConditions[k].Message = strings.ReplaceAll(assertConditions[k].Message, "<provider>", "cosign")
}

Expand Down
Loading