Skip to content
This repository has been archived by the owner on Dec 27, 2022. It is now read-only.

Latest commit

 

History

History
97 lines (65 loc) · 7.03 KB

DIST.md

File metadata and controls

97 lines (65 loc) · 7.03 KB

One Ring Dist is a utility dedicated for transferring data to and from the cluster, and is a much faster specialized replacement to [s3]-dist-cp.

Premise to Dist

The S3 object storage is not too well suited for Spark due to data non-locality and its architectural peculiarities (as it isn't a true FileSystem but object storage). It is generally recommended to always copy the source data from S3 to HDFS on the cluster before Spark invocation, and, vice versa, to copy the result back from HDFS to S3 after it has been computed.

EMR provides an utility named s3-dist-cp, and there is plain vanilla dist-cp but their usage is cumbersome because the user must always know the exact paths beforehand, and the invocation syntax is complex. Another drawback that these utilities are implemented with very dated MapReduce approach, have low parallelism, and utilize cluster resources inefficiently. Also, there is no facility for handling Parquet files which may result in excess data transfer.

One Ring provides an alternative implementation named One Ring Dist that focuses around your Task config, so you can still use Variables for source paths and generate result paths dynamically while not bothering yourself with dist-cp command line. It is written on top of Spark, has much better parallelism, and it can convert Parquet files to CSV as required by One Ring CLI.

Calling One Ring Dist

The syntax is similar to CLI:

java -jar ./Dist/target/one-ring-dist.jar -c /path/to/tasks.ini -S /path/to/dist_interface.file -d DIRECTION -x spark.meta

-h, -c, -x, -v/-V, -l, -L, -m, -u, -i and -o switches have the same meaning for Dist as to CLI.

-d specifies the direction of the copying process:

  • to/source to copy the source data from the external world to input location,
  • from/result to copy the result from the output location to the external storage.

-S specifies the path to interface file with a list of HDFS paths of Task outputs generated by the CLI.

Lastly, -t specifies a temporary location for file transfers. By default, in Local mode it points to system temp dir, and on the cluster to hdfs:///tmp

Configuration and Usage

Dist has its own layer in tasks.ini, prefixed with dist., with a small set of keys.

dist.store provides another way to set -S value (but command line switch always has higher priority, if both were set).

dist.tmp is an alias to -t key.

But for the data itself, it uses same task.input, task.output, ds.input. and ds.output. layers as CLI, and therefore honors all path, partitioning, and column-related parameters. For instance, input.path and output.path specify CLI input and output locations for a Task.

When CLI executes on a cluster, it transparently replaces all its absolute input and output paths with relative paths according to the provided input and output locations. In Local mode, it treats all paths as Hadoop FileSystem ones and uses them as is.

dist.wrap sets which copy operations are required to be performed. In addition to -d switch from and to there are:

  • both/true to indicate the copy in both directions is required,
  • nop (default) to suppress the copying.

This is useful for multi-Process tasks.ini. If the outputs from the first Task are stored in HDFS, it allows next Tasks to consume them without a round-trip to S3 while still providing the paths pointing to S3:

spark.task1.dist.wrap=to
spark.task2.dist.wrap=nop
spark.task3.dist.wrap=from

...and if same Task is executed solo, it should just use bi-directional copy:

spark.task1.dist.wrap=both

...without further changes to path Variables and other configuration parameters.

For any output DataStream the CLI adds a line to dist.store / -S file with the resulting local path. That allows Dist to gather them for the from direction.

For on-cluster execution mode, actually you should never invoke Dist manually, it is a job of automation scripts to call it before and after the execution of CLI.

Storage Adapters

In addition to Hadoop FileSystem, Dist provides a lightweight infrastructure of Adapters for other storages, file- or whatever else based.

Storage Adapters share two namesake layers of input. and output., and all their Parameters are per-DataStream.

For inputs coming from Adapters, if a source does not contain a built-in schema, there by convention could be set a loose schema by input.schema. layer, analogous to input columns. However, we're not interested in the data types of each source field, the only things that matter are field names and order.

There are following Storage Adapters currently implemented:

  • Hadoop FileSystem (fallback, uses all protocols available in your Spark environment, i.e. file:, hdfs:, s3: and so on),
  • S3 Direct (any S3-compatible storage with a protocol of s3d:),
  • JDBC (jdbc: protocol).

The fallback Hadoop Adapters are called if and only if no other Adapters recognize the protocol of the path. They handle separated text (CSV and alike) and Parquet files, either compressed or not. Known compression algorithms are gz/gzip, bz2, snappy, lz4, and zstd.

Hadoop Input honors input.schema. for CSV-like files to copy only specified subset of columns mentioned in ds.input.columns. for a said input with its delimiter. Also, it recognizes input.max.record.size (in bytes, with a default of 1MB) for setting the record buffer. You may freely mix CSV-like and Parquet files as long as they have same schema.

input.schema.signals={SCHEMA_SIGNALS:userid,_,timestamp,lat,lon,_,accuracy,_,_,_,_,_,final_country,_,_,_,_,_,_,_,_,_,_,_}
input.max.record.size.signals=360000

ds.input.columns=userid,timestamp,lat,lon,accuracy,final_country

Hadoop Output uses output.codec to set resulting file compression (by default, no compression is used). If output path ends with .parquet, resulting files will be stored into Parquet format.

S3 Direct adapters are same as Hadoop, but uses S3 client provider that can point into any S3-compatible storage, and have additinal parameters for:

  • [input|output].s3d.access.key. and [input|output].s3d.secret.key. of your target S3 Access and Secret Keys respectively with no defaults (so it will try to take them from your environment),
  • [input|output].s3d.endpoint. and [input|output].s3d.region. for S3 API endpoint and region (with no defaults as well),
  • and, only for output, output.s3d.content.type. with a default of text/csv.

JDBC Adapter Parameters are:

  • [input|output].jdbc.driver. for fully qualified class names of database driver, available in the classpath. No default.
  • [input|output].jdbc.url. for database connection URLs. No default.
  • [input|output].jdbc.user. for database user name with no default.
  • [input|output].jdbc.password. for database connection password with no default.
  • output.jdbc.batch.size. for output batch size, default is 500 records.

All default Adapters perform repartition and rearrangement of columns of source files on the fly while copying to and from the input/output locations.