Skip to content

Commit

Permalink
Merge pull request #1132 from quassy:main
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 577890111
  • Loading branch information
cloud-teleport committed Oct 30, 2023
2 parents 4524b0a + a5dfa65 commit f21d03d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
8 changes: 6 additions & 2 deletions v1/README_Bulk_Compress_GCS_Files.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

### Optional Parameters

* **outputFilenameSuffix** (Output filename suffix): Output filename suffix of the files to write. Defaults to .bzip2, .deflate or .gz depending on the compression algorithm.



Expand Down Expand Up @@ -125,6 +126,7 @@ export OUTPUT_FAILURE_FILE=<outputFailureFile>
export COMPRESSION=<compression>

### Optional
export OUTPUT_FILENAME_SUFFIX=<outputFilenameSuffix>

gcloud dataflow jobs run "bulk-compress-gcs-files-job" \
--project "$PROJECT" \
Expand All @@ -133,7 +135,8 @@ gcloud dataflow jobs run "bulk-compress-gcs-files-job" \
--parameters "inputFilePattern=$INPUT_FILE_PATTERN" \
--parameters "outputDirectory=$OUTPUT_DIRECTORY" \
--parameters "outputFailureFile=$OUTPUT_FAILURE_FILE" \
--parameters "compression=$COMPRESSION"
--parameters "compression=$COMPRESSION" \
--parameters "outputFilenameSuffix=$OUTPUT_FILENAME_SUFFIX"
```

For more information about the command, please check:
Expand All @@ -158,6 +161,7 @@ export OUTPUT_FAILURE_FILE=<outputFailureFile>
export COMPRESSION=<compression>

### Optional
export OUTPUT_FILENAME_SUFFIX=<outputFilenameSuffix>

mvn clean package -PtemplatesRun \
-DskipTests \
Expand All @@ -166,7 +170,7 @@ mvn clean package -PtemplatesRun \
-Dregion="$REGION" \
-DjobName="bulk-compress-gcs-files-job" \
-DtemplateName="Bulk_Compress_GCS_Files" \
-Dparameters="inputFilePattern=$INPUT_FILE_PATTERN,outputDirectory=$OUTPUT_DIRECTORY,outputFailureFile=$OUTPUT_FAILURE_FILE,compression=$COMPRESSION" \
-Dparameters="inputFilePattern=$INPUT_FILE_PATTERN,outputDirectory=$OUTPUT_DIRECTORY,outputFailureFile=$OUTPUT_FAILURE_FILE,compression=$COMPRESSION,outputFilenameSuffix=$OUTPUT_FILENAME_SUFFIX" \
-pl v1 \
-am
```
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
/**
* The {@link BulkCompressor} is a batch pipeline that compresses files on matched by an input file
* pattern and outputs them to a specified file location. This pipeline can be useful when you need
* to compress large batches of files as part of a perodic archival process. The supported
* to compress large batches of files as part of a periodic archival process. The supported
* compression modes are: <code>BZIP2</code>, <code>DEFLATE</code>, <code>GZIP</code>. Files output
* to the destination location will follow a naming schema of original filename appended with the
* compression mode extension. The extensions appended will be one of: <code>.bzip2</code>, <code>
Expand Down Expand Up @@ -159,6 +159,18 @@ public interface Options extends PipelineOptions {
ValueProvider<Compression> getCompression();

void setCompression(ValueProvider<Compression> value);

@TemplateParameter.Text(
order = 5,
optional = true,
regexes = {"^[A-Za-z_0-9.]*"},
description = "Output filename suffix",
helpText =
"Output filename suffix of the files to write. Defaults to .bzip2, .deflate or .gz depending on the compression algorithm.")
@Required
ValueProvider<String> getOutputFilenameSuffix();

void setOutputFilenameSuffix(ValueProvider<String> value);
}

/**
Expand Down Expand Up @@ -239,9 +251,19 @@ public static class Compressor extends DoFn<MatchResult.Metadata, String> {
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
Compression compression = compressionValue.get();
Options options = context.getPipelineOptions().as(Options.class);
String outputFilename;

// Add the compression extension to the output filename. Example: demo.txt -> demo.txt.gz
String outputFilename = inputFile.getFilename() + compression.getSuggestedSuffix();
// Add the extension to the output filename.
if (options.getOutputFilenameSuffix() != null
&& options.getOutputFilenameSuffix().isAccessible()
&& options.getOutputFilenameSuffix().get() != null) {
// Use suffix parameter. Example: demo.txt -> demo.txt.foo
outputFilename = inputFile.getFilename() + options.getOutputFilenameSuffix().get();
} else {
// Use compression extension. Example: demo.txt -> demo.txt.gz
outputFilename = inputFile.getFilename() + compression.getSuggestedSuffix();
}

// Resolve the necessary resources to perform the transfer
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
Expand Down

0 comments on commit f21d03d

Please sign in to comment.