Skip to content

Commit

Permalink
Merge pull request apache#3734 from sramazzina/AZURE-FIXES.1
Browse files Browse the repository at this point in the history
  • Loading branch information
hansva authored Mar 21, 2024
2 parents 068c24b + d889d34 commit 8bf9746
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import java.io.IOException;
import java.io.InputStream;

public class PageBlobInputStream extends InputStream {
public class AppendBlobInputStream extends InputStream {

private BlobInputStream inputStream;
private long fileSize;
private long totalRead = 0;

public PageBlobInputStream(BlobInputStream inputStream, long fileSize) {
public AppendBlobInputStream(BlobInputStream inputStream, long fileSize) {
this.inputStream = inputStream;
this.fileSize = fileSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,40 +38,57 @@ public AzureFileNameParser() {
}

@Override
public FileName parseUri(final VfsComponentContext context, FileName base, String filename)
public FileName parseUri(final VfsComponentContext context, FileName base, String uri)
throws FileSystemException {
final StringBuilder name = new StringBuilder();
Authority auth = null;
String path = null;
FileType fileType;

int eidx = filename.indexOf("@/");
if (eidx != -1)
filename =
filename.substring(0, eidx + 1) + "windowsazure.com" + filename.substring(eidx + 1);

String scheme;
try {
auth = extractToPath(filename, name);
if (auth.getUserName() == null) {
scheme = UriParser.extractScheme(filename, name);
UriParser.canonicalizePath(name, 0, name.length(), this);
UriParser.fixSeparators(name);
} else {
scheme = auth.getScheme();
}
fileType = UriParser.normalisePath(name);
path = name.toString();
if (path.equals("")) {
path = "/";

StringBuilder sb = new StringBuilder(uri);

UriParser.normalisePath(sb);

String normalizedUri = sb.toString();
String scheme = normalizedUri.substring(0, normalizedUri.indexOf(':'));
String absPath = "/";
FileType fileType = FileType.IMAGINARY;
String[] s = normalizedUri.split("/");

if (s.length > 1) {
if (scheme.equals("azure")) {

String container = s[1];
for (int i = 1; i < s.length; i++) {
absPath += s[i];

if (s.length > 1 && i != s.length - 1) {
absPath += "/";
}
}

if (uri.endsWith("/")) {
fileType = FileType.FOLDER;
} else if (!absPath.endsWith("/")) {
fileType = FileType.FILE;
}

} else if (scheme.equals("azfs")) {

String account = s[1];
String container = s[2];

for (int i = 2; i < s.length; i++) {
absPath += s[i];

if (s.length > 1 && i != s.length - 1) {
absPath += "/";
}
}

if (uri.endsWith("/")) {
fileType = FileType.FOLDER;
} else if (!absPath.endsWith("/")) {
fileType = FileType.FILE;
}
}
} catch (FileSystemException fse) {
scheme = UriParser.extractScheme(filename, name);
UriParser.canonicalizePath(name, 0, name.length(), this);
UriParser.fixSeparators(name);
fileType = UriParser.normalisePath(name);
path = name.toString();
}
return new AzureFileName(scheme, path, fileType);
return new AzureFileName(scheme, absPath, fileType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,15 @@

package org.apache.hop.vfs.azure;

import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CloudPageBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import com.microsoft.azure.storage.blob.*;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.FileType;
import org.apache.commons.vfs2.provider.AbstractFileName;
import org.apache.commons.vfs2.provider.AbstractFileObject;
import org.apache.commons.vfs2.provider.UriParser;
import org.apache.hop.core.Const;
import org.apache.hop.core.variables.IVariables;
import org.apache.hop.core.variables.Variables;
import org.apache.hop.vfs.azure.config.AzureConfigSingleton;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -52,30 +40,20 @@
import java.util.Map;

public class AzureFileObject extends AbstractFileObject<AzureFileSystem> {
public static final int DEFAULT_BLOB_SIZE = 1024;

private class PageBlobOutputStream extends OutputStream {
public class AppendBlobOutputStream extends OutputStream {

private CloudAppendBlob ab;
private final OutputStream outputStream;
long written = 0;
private CloudPageBlob pb;
private int blockIncrement;
private long blobSize;

private PageBlobOutputStream(
CloudPageBlob pb, OutputStream outputStream, int blockIncrement, long blobSize) {
if (blockIncrement % Constants.PAGE_SIZE != 0)
throw new IllegalArgumentException("Block increment must be in multiple of 512.");
if (blobSize % Constants.PAGE_SIZE != 0)
throw new IllegalArgumentException("Block increment must be in multiple of 512.");
this.blockIncrement = blockIncrement;
this.outputStream = new BufferedOutputStream(outputStream, blockIncrement);
this.pb = pb;
this.blobSize = blobSize;

public AppendBlobOutputStream(CloudAppendBlob ab, OutputStream outputStream) {
this.ab = ab;
this.outputStream = outputStream;
}

@Override
public void write(int b) throws IOException {
checkBlobSize(1);
outputStream.write(b);
written(1);
}
Expand All @@ -85,67 +63,39 @@ public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
checkBlobSize(len);
outputStream.write(b, off, len);
written(len);
}

private void checkBlobSize(int length) throws IOException {
if (written + length > blobSize) {
while (written + length > blobSize) {
blobSize += blockIncrement;
}
try {
pb.resize(blobSize);
} catch (StorageException e) {
throw new IOException("Failed to resize blob.", e);
}
}
}

protected void written(int len) {
written += len;
lastModified = System.currentTimeMillis();
size = written;
}

@Override
public void flush() throws IOException {}
public void write(byte[] b, int off, int len) throws IOException {
outputStream.write(b, off, len);
written(len);
}

@Override
public void flush() throws IOException {
super.flush();
}

@Override
public void close() throws IOException {
HashMap<String, String> md = new HashMap<>(pb.getMetadata());

HashMap<String, String> md = new HashMap<>(ab.getMetadata());
md.put("ActualLength", String.valueOf(written));
pb.getProperties().setContentDisposition("vfs ; length=\"" + written + "\"");
pb.setMetadata(md);
ab.getProperties().setContentDisposition("vfs ; length=\"" + written + "\"");
ab.setMetadata(md);
try {
pb.uploadProperties();
ab.uploadProperties();
} catch (StorageException e) {
throw new IOException("Failed to update meta-data.", e);
}
checkPageBoundary();

outputStream.close();
URI uri = pb.getUri();
try {
uri = pb.getServiceClient().getCredentials().transformUri(uri);
} catch (URISyntaxException e) {
throw new IOException(e);
} catch (StorageException e) {
throw new IOException(e);
}
closeBlob();
}

private void checkPageBoundary() throws IOException {
long spare = written % Constants.PAGE_SIZE;
if (spare != 0) {
byte[] b = new byte[Constants.PAGE_SIZE - (int) spare];
write(b);
outputStream.flush();
}
}
}

private final CloudBlobClient service;
Expand Down Expand Up @@ -436,21 +386,15 @@ protected long doGetLastModifiedTime() throws Exception {
protected OutputStream doGetOutputStream(boolean bAppend) throws Exception {
if (container != null && !containerPath.equals("")) {
if (bAppend) throw new UnsupportedOperationException();
final CloudPageBlob pb = container.getPageBlobReference(removeLeadingSlash(containerPath));

// Get the block increment...
//
IVariables variables = Variables.getADefaultVariableSpace();
String configBlockIncrement = AzureConfigSingleton.getConfig().getBlockIncrement();
int blockIncrement = Const.toInt(variables.resolve(configBlockIncrement), 4096);
final CloudAppendBlob cab = container.getAppendBlobReference(removeLeadingSlash(containerPath));

if (type == FileType.IMAGINARY) {
type = FileType.FILE;
return new PageBlobOutputStream(
pb, pb.openWriteNew(DEFAULT_BLOB_SIZE), blockIncrement, DEFAULT_BLOB_SIZE);
return new AppendBlobOutputStream(
cab, cab.openWriteNew());
} else {
return new PageBlobOutputStream(
pb, pb.openWriteExisting(), blockIncrement, pb.getProperties().getLength());
return new AppendBlobOutputStream(
cab, cab.openWriteExisting());
}
} else {
throw new UnsupportedOperationException();
Expand All @@ -460,7 +404,7 @@ protected OutputStream doGetOutputStream(boolean bAppend) throws Exception {
@Override
protected InputStream doGetInputStream() throws Exception {
if (container != null && !containerPath.equals("") && type == FileType.FILE) {
return new PageBlobInputStream(cloudBlob.openInputStream(), size);
return new AppendBlobInputStream(cloudBlob.openInputStream(), size);
} else {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class AzureVfsPlugin implements IVfs {
@Override
public String[] getUrlSchemes() {
return new String[] {"azure"};
return new String[] {"azure", "azfs"};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,7 @@ public class AzureConfigPlugin implements IConfigOptions, IGuiPluginCompositeWid
description = "The key to use for the Azure VFS")
private String key;

@GuiWidgetElement(
id = WIDGET_ID_AZURE_BLOCK_INCREMENT,
parentId = ConfigPluginOptionsTab.GUI_WIDGETS_PARENT_ID,
type = GuiElementType.TEXT,
variables = true,
label = "i18n::AzureVFS.FileBlockSize.Label",
toolTip = "i18n::AzureVFS.FileBlockSize.Description")
@CommandLine.Option(
names = {"-azi", "--azure-block-increment"},
description = "The block increment size for new files on Azure, multiples of 512 only.")
private String blockIncrement;

/**
* Gets instance
*
Expand All @@ -96,7 +86,6 @@ public static AzureConfigPlugin getInstance() {
AzureConfig config = AzureConfigSingleton.getConfig();
instance.account = config.getAccount();
instance.key = config.getKey();
instance.blockIncrement = config.getBlockIncrement();

return instance;
}
Expand All @@ -121,12 +110,6 @@ public boolean handleOption(
changed = true;
}

if (blockIncrement != null) {
config.setBlockIncrement(blockIncrement);
log.logBasic("The Azure file block increment is set to '" + blockIncrement + "'");
changed = true;
}

// Save to file if anything changed
//
if (changed) {
Expand Down Expand Up @@ -163,10 +146,6 @@ public void persistContents(GuiCompositeWidgets compositeWidgets) {
key = ((TextVar) control).getText();
AzureConfigSingleton.getConfig().setKey(key);
break;
case WIDGET_ID_AZURE_BLOCK_INCREMENT:
blockIncrement = ((TextVar) control).getText();
AzureConfigSingleton.getConfig().setKey(blockIncrement);
break;
}
}
// Save the project...
Expand Down Expand Up @@ -210,19 +189,4 @@ public void setKey(String key) {
this.key = key;
}

/**
* Gets blockIncrement
*
* @return value of blockIncrement
*/
public String getBlockIncrement() {
return blockIncrement;
}

/**
* @param blockIncrement The blockIncrement to set
*/
public void setBlockIncrement(String blockIncrement) {
this.blockIncrement = blockIncrement;
}
}
Loading

0 comments on commit 8bf9746

Please sign in to comment.