Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ADLS hint from import #7581

Merged
merged 5 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,6 @@ jobs:
LAKEFS_BLOCKSTORE_TYPE: azure
ESTI_BLOCKSTORE_TYPE: azure
ESTI_STORAGE_NAMESPACE: https://esti4hns.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}
ESTI_ADLS_IMPORT_BASE_URL: https://esti4hns.adls.core.windows.net/esti-system-testing-data/
ESTI_AZURE_STORAGE_ACCOUNT: esti4hns
ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }}

Expand Down
5 changes: 5 additions & 0 deletions cmd/lakectl/cmd/common_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
LakectlInteractive = "LAKECTL_INTERACTIVE"
DeathMessage = "{{.Error|red}}\nError executing command.\n"
DeathMessageWithFields = "{{.Message|red}}\n{{.Status}}\n"
WarnMessage = "{{.Warning|yellow}}\n\n"
)

const (
Expand Down Expand Up @@ -178,6 +179,10 @@ func WriteIfVerbose(tpl string, data interface{}) {
}
}

func Warning(message string) {
WriteTo(WarnMessage, struct{ Warning string }{Warning: "Warning: " + message}, os.Stderr)
}

func Die(errMsg string, code int) {
WriteTo(DeathMessage, struct{ Error string }{Error: errMsg}, os.Stderr)
os.Exit(code)
Expand Down
7 changes: 7 additions & 0 deletions cmd/lakectl/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"regexp"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -160,6 +161,12 @@ func newImportProgressBar(visible bool) *progressbar.ProgressBar {
}

func verifySourceMatchConfiguredStorage(ctx context.Context, client *apigen.ClientWithResponses, source string) {
// Adds backwards compatibility for ADLS Gen2 storage import `hint`
if strings.Contains(source, "adls.core.windows.net") {
Warning("'adls' hint will be deprecated soon, please use the original source url for import")
source = strings.Replace(source, "adls.core.windows.net", "blob.core.windows.net", 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to say what is happening, not only what isn't:

Suggested change
Warning("'adls' hint will be deprecated soon, please use the original source url for import")
source = strings.Replace(source, "adls.core.windows.net", "blob.core.windows.net", 1)
Warning("'adls' hint will be deprecated soon, please use the original source url for import")
translatedSource = strings.Replace(source, "adls.core.windows.net", "blob.core.windows.net", 1)
if source != translatedSource {
Fmt(" Using %s\n", translatedSource)
}

}

confResp, err := client.GetConfigWithResponse(ctx)
DieOnErrorOrUnexpectedStatusCode(confResp, err, http.StatusOK)
storageConfig := confResp.JSON200.StorageConfig
Expand Down
16 changes: 3 additions & 13 deletions docs/howto/import.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ In addition, the following for provider-specific permissions may be required:
</ul>
<div markdown="1" id="aws-s3">


## AWS S3: Importing from public buckets
{:.no_toc}

Expand Down Expand Up @@ -149,20 +148,11 @@ the following policy needs to be attached to the lakeFS S3 service-account to al

</div>
<div markdown="1" id="azure-storage">
See [Azure deployment][deploy-azure-storage-account-creds] on limitations when using account credentials.

### Azure Data Lake Gen2
{:.no_toc}

lakeFS requires a hint in the import source URL to understand that the provided storage account is ADLS Gen2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document the change. Otherwise someone who's already imported has no way of understanding what went wrong!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added deprecation message

**Note:** The use of the `alds` hint for ADLS Gen2 storage accounts is deprecated, please use the original source url for import.
{: .note}

```
For source account URL:
https://<my-account>.core.windows.net/path/to/import/

Please add the *adls* subdomain to the URL as follows:
https://<my-account>.adls.core.windows.net/path/to/import/
```
See [Azure deployment][deploy-azure-storage-account-creds] on limitations when using account credentials.

</div>
<div markdown="1" id="gcs">
Expand Down
1 change: 0 additions & 1 deletion esti/ops/docker-compose-external-db.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ services:
- ESTI_GOTEST_FLAGS
- ESTI_FLAGS
- ESTI_FORCE_PATH_STYLE
- ESTI_ADLS_IMPORT_BASE_URL
- ESTI_AZURE_STORAGE_ACCOUNT
- ESTI_AZURE_STORAGE_ACCESS_KEY
working_dir: /lakefs
Expand Down
1 change: 0 additions & 1 deletion esti/ops/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ services:
- ESTI_GOTEST_FLAGS
- ESTI_FLAGS
- ESTI_FORCE_PATH_STYLE
- ESTI_ADLS_IMPORT_BASE_URL
- ESTI_AZURE_STORAGE_ACCOUNT
- ESTI_AZURE_STORAGE_ACCESS_KEY
working_dir: /lakefs
Expand Down
3 changes: 2 additions & 1 deletion pkg/actions/lua/storage/azure/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func parsePath(l *lua.State, path string) (string, string) {
func transformPathToAbfss(l *lua.State) int {
path := lua.CheckString(l, 1)
const numOfParts = 3
r := regexp.MustCompile(`^https://(\w+)\.blob\.core\.windows\.net/([^/]*)/(.+)$`)
// Added adls for backwards compatibility in imports created pre fix of bug: https://github.com/treeverse/lakeFS/issues/7580
r := regexp.MustCompile(`^https://(\w+)\.(?:blob|adls)\.core\.windows\.net/([^/]*)/(.+)$`)
parts := r.FindStringSubmatch(path)
if len(parts) != numOfParts+1 {
lua.Errorf(l, "expected valid Azure https URL: %s", path)
Expand Down
8 changes: 4 additions & 4 deletions pkg/block/azure/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (a *Adapter) GetWalker(uri *url.URL) (block.Walker, error) {
return nil, err
}

return NewAzureBlobWalker(client)
return NewAzureDataLakeWalker(client, false)
}

func (a *Adapter) GetPreSignedURL(ctx context.Context, obj block.ObjectPointer, mode block.PreSignMode) (string, time.Time, error) {
Expand Down Expand Up @@ -568,11 +568,11 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP

func (a *Adapter) GetStorageNamespaceInfo() block.StorageNamespaceInfo {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeAzure)
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.(blob|adls)\.core\.windows\.net` // added adls for import hint validation in UI
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.windows\.net`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks any existing import scripts. Does this mean we need a major release 🤢 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated in the PR description - this is a breaking change. But I believe we can avoid a major release since this affects specific users we can identify

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a WA for lakectl so it doesn't break. Effectively this is only going to impact WebUI users (I'm fine with that)

info.ValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.windows\.net`

if a.chinaCloud {
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.(blob|adls)\.core\.chinacloudapi\.cn`
info.ImportValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.chinacloudapi\.cn`
info.ValidityRegex = `^https?://[a-z0-9_-]+\.blob\.core\.chinacloudapi\.cn`
}

Expand All @@ -598,6 +598,6 @@ func (a *Adapter) newPreSignedTime() time.Time {
return time.Now().UTC().Add(a.preSignedExpiry)
}

func (a *Adapter) GetPresignUploadPartURL(ctx context.Context, obj block.ObjectPointer, uploadID string, partNumber int) (string, error) {
func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPointer, _ string, _ int) (string, error) {
return "", block.ErrOperationNotSupported
}
85 changes: 0 additions & 85 deletions pkg/block/azure/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@ const DirectoryBlobMetadataKey = "hdi_isfolder"

var ErrAzureInvalidURL = errors.New("invalid Azure storage URL")

func NewAzureBlobWalker(svc *service.Client) (*BlobWalker, error) {
return &BlobWalker{
client: svc,
mark: block.Mark{HasMore: true},
}, nil
}

type BlobWalker struct {
client *service.Client
mark block.Mark
}

// extractAzurePrefix takes a URL that looks like this: https://storageaccount.blob.core.windows.net/container/prefix
// and return the URL for the container and a prefix, if one exists
func extractAzurePrefix(storageURI *url.URL) (*url.URL, string, error) {
Expand All @@ -52,71 +40,6 @@ func getAzureBlobURL(containerURL *url.URL, blobName string) *url.URL {
return containerURL.ResolveReference(&relativePath)
}

func (a *BlobWalker) Walk(ctx context.Context, storageURI *url.URL, op block.WalkOptions, walkFn func(e block.ObjectStoreEntry) error) error {
// we use bucket as container and prefix as path
containerURL, prefix, err := extractAzurePrefix(storageURI)
if err != nil {
return err
}
var basePath string
if idx := strings.LastIndex(prefix, "/"); idx != -1 {
basePath = prefix[:idx+1]
}

qk, err := ResolveBlobURLInfoFromURL(containerURL)
if err != nil {
return err
}

containerClient := a.client.NewContainerClient(qk.ContainerName)
listBlob := containerClient.NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{
Prefix: &prefix,
Marker: &op.ContinuationToken,
Include: container.ListBlobsInclude{
Metadata: true,
},
})

for listBlob.More() {
resp, err := listBlob.NextPage(ctx)
if err != nil {
return err
}
if resp.Marker != nil {
a.mark.ContinuationToken = *resp.Marker
}
for _, blobInfo := range resp.Segment.BlobItems {
// skipping everything in the page which is before 'After' (without forgetting the possible empty string key!)
if op.After != "" && *blobInfo.Name <= op.After {
continue
}

// Skip folders
if isBlobItemFolder(blobInfo) {
continue
}

a.mark.LastKey = *blobInfo.Name
if err := walkFn(block.ObjectStoreEntry{
FullKey: *blobInfo.Name,
RelativeKey: strings.TrimPrefix(*blobInfo.Name, basePath),
Address: getAzureBlobURL(containerURL, *blobInfo.Name).String(),
ETag: extractBlobItemEtag(blobInfo),
Mtime: *blobInfo.Properties.LastModified,
Size: *blobInfo.Properties.ContentLength,
}); err != nil {
return err
}
}
}

a.mark = block.Mark{
HasMore: false,
}

return nil
}

// isBlobItemFolder returns true if the blob item is a folder.
// Make sure that metadata is populated before calling this function.
// Example: for listing using blob API passing options with `Include: container.ListBlobsInclude{ Metadata: true }`
Expand Down Expand Up @@ -147,14 +70,6 @@ func extractBlobItemEtag(blobItem *container.BlobItem) string {
return ""
}

func (a *BlobWalker) Marker() block.Mark {
return a.mark
}

func (a *BlobWalker) GetSkippedEntries() []block.ObjectStoreEntry {
return nil
}

//
// DataLakeWalker
//
Expand Down
6 changes: 3 additions & 3 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,19 +805,19 @@ func (c *Catalog) GetBranchReference(ctx context.Context, repositoryID string, b

func (c *Catalog) HardResetBranch(ctx context.Context, repositoryID, branch, refExpr string, opts ...graveler.SetOptionsFunc) error {
branchID := graveler.BranchID(branch)
ref := graveler.Ref(refExpr)
reference := graveler.Ref(refExpr)
if err := validator.Validate([]validator.ValidateArg{
{Name: "repository", Value: repositoryID, Fn: graveler.ValidateRepositoryID},
{Name: "branch", Value: branchID, Fn: graveler.ValidateBranchID},
{Name: "ref", Value: ref, Fn: graveler.ValidateRef},
{Name: "ref", Value: reference, Fn: graveler.ValidateRef},
}); err != nil {
return err
}
repository, err := c.getRepository(ctx, repositoryID)
if err != nil {
return err
}
return c.Store.ResetHard(ctx, repository, branchID, ref, opts...)
return c.Store.ResetHard(ctx, repository, branchID, reference, opts...)
}

func (c *Catalog) ResetBranch(ctx context.Context, repositoryID string, branch string, opts ...graveler.SetOptionsFunc) error {
Expand Down
18 changes: 1 addition & 17 deletions pkg/ingest/store/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"net/url"
"strings"

"cloud.google.com/go/storage"
"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -128,22 +127,7 @@ func (f *WalkerFactory) buildAzureWalker(importURL *url.URL, skipOutOfOrder bool
return nil, err
}

isHNS := isHierarchicalNamespaceEnabled(importURL)
if isHNS {
return azure.NewAzureDataLakeWalker(client, skipOutOfOrder)
}
return azure.NewAzureBlobWalker(client)
}

// isHierarchicalNamespaceEnabled - identify if hns enabled on the account,
// based on the import URL.
// Until we enable a way to extract the account information, we assume it based on the domain used in import:
// https://<account>.<blob|adls>.core.windows.net/
// adls - azure data lake storage
func isHierarchicalNamespaceEnabled(u *url.URL) bool {
const importURLParts = 3
n := strings.SplitN(u.Host, ".", importURLParts)
return len(n) == importURLParts && n[1] == "adls"
return azure.NewAzureDataLakeWalker(client, skipOutOfOrder)
}

func (f *WalkerFactory) GetWalker(ctx context.Context, opts WalkerOptions) (*WalkerWrapper, error) {
Expand Down
Loading