Skip to content

Commit

Permalink
Merge pull request #1 from xavierguihot/dev/2.0.0
Browse files Browse the repository at this point in the history
Dev/2.0.0
  • Loading branch information
xavierguihot authored Jun 17, 2018
2 parents 0f3a5ff + b88c19d commit 405bf5c
Show file tree
Hide file tree
Showing 49 changed files with 8,841 additions and 1,823 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ project/target
target

*.crc

.idea
174 changes: 111 additions & 63 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
## Overview


Version: 1.1.1

API Scaladoc: [SparkHelper](http://xavierguihot.com/spark_helper/#com.spark_helper.SparkHelper$)

This library contains a bunch of low-level basic methods for data processing
Expand All @@ -18,14 +16,14 @@ names are self-explanatory and readable.

This also provides a monitoring/logger tool.

This is a bunch of 4 modules:
This is a set of 4 modules:

* [HdfsHelper](http://xavierguihot.com/spark_helper/#com.spark_helper.HdfsHelper$): Wrapper around [apache Hadoop FileSystem API](https://hadoop.apache.org/docs/r2.6.1/api/org/apache/hadoop/fs/FileSystem.html) for file manipulations on hdfs.
* [SparkHelper](http://xavierguihot.com/spark_helper/#com.spark_helper.SparkHelper$): Hdfs file manipulations through the Spark API.
* [HdfsHelper](http://xavierguihot.com/spark_helper/#com.spark_helper.HdfsHelper$): Wrapper around the [apache Hadoop FileSystem API](https://hadoop.apache.org/docs/r2.6.1/api/org/apache/hadoop/fs/FileSystem.html) for file manipulations on hdfs.
* [SparkHelper](http://xavierguihot.com/spark_helper/#com.spark_helper.SparkHelper$): Hdfs file manipulations through the Spark API (pimped RDDs and SparkContext).
* [DateHelper](http://xavierguihot.com/spark_helper/#com.spark_helper.DateHelper$): Wrapper around [joda-time](http://www.joda.org/joda-time/apidocs/) for usual data mining dates manipulations.
* [Monitor](http://xavierguihot.com/spark_helper/#com.spark_helper.Monitor$): Spark custom monitoring/logger and kpi validator.

Compatible with Spark 2.
Compatible with Spark 2.x


### HdfsHelper:
Expand All @@ -36,68 +34,93 @@ The full list of methods is available at
Contains basic file-related methods mostly based on hdfs apache Hadoop
FileSystem API [org.apache.hadoop.fs.FileSystem](https://hadoop.apache.org/docs/r2.6.1/api/org/apache/hadoop/fs/FileSystem.html).

For instance, one don't want to remove a file from hdfs using 3 lines of code
and thus could instead just use `HdfsHelper.deleteFile("my/hdfs/file/path.csv")`.

A non-exhaustive list of exemples:
A non-exhaustive list of examples:

```scala
import com.spark_helper.HdfsHelper

// A bunch of methods wrapping the FileSystem API, such as:
HdfsHelper.fileExists("my/hdfs/file/path.txt")
assert(HdfsHelper.listFileNamesInFolder("my/folder/path") == List("file_name_1.txt", "file_name_2.csv"))
assert(HdfsHelper.fileModificationDate("my/hdfs/file/path.txt") == "20170306")
assert(HdfsHelper.nbrOfDaysSinceFileWasLastModified("my/hdfs/file/path.txt") == 3)
HdfsHelper.deleteFile("my/hdfs/file/path.csv")
HdfsHelper.moveFolder("my/hdfs/folder")
HdfsHelper.fileExists("my/hdfs/file/path.txt") // HdfsHelper.folderExists("my/hdfs/folder")
HdfsHelper.listFileNamesInFolder("my/folder/path") // List("file_name_1.txt", "file_name_2.csv")
HdfsHelper.fileModificationDate("my/hdfs/file/path.txt") // "20170306"
HdfsHelper.nbrOfDaysSinceFileWasLastModified("my/hdfs/file/path.txt") // 3
HdfsHelper.deleteFile("my/hdfs/file/path.csv") // HdfsHelper.deleteFolder("my/hdfs/folder")
HdfsHelper.moveFolder("old/path", "new/path") // HdfsHelper.moveFile("old/path.txt", "new/path.txt")
HdfsHelper.createEmptyHdfsFile("/some/hdfs/file/path.token") // HdfsHelper.createFolder("my/hdfs/folder")

// File content helpers:
HdfsHelper.compressFile("hdfs/path/to/uncompressed_file.txt", classOf[GzipCodec])
HdfsHelper.appendHeader("my/hdfs/file/path.csv", "colum0,column1")

// Some Xml/Typesafe helpers for hadoop as well:
HdfsHelper.isHdfsXmlCompliantWithXsd("my/hdfs/file/path.xml", getClass.getResource("/some_xml.xsd"))
HdfsHelper.loadXmlFileFromHdfs("my/hdfs/file/path.xml")

// Very handy to load a config (typesafe format) stored on hdfs at the begining of a spark job:
// Very handy to load a config (typesafe format) stored on hdfs at the beginning of a spark job:
HdfsHelper.loadTypesafeConfigFromHdfs("my/hdfs/file/path.conf"): Config

// In order to write small amount of data in a file on hdfs without the whole spark stack:
HdfsHelper.writeToHdfsFile(Array("some", "relatively small", "text"), "/some/hdfs/file/path.txt")
// or:
import com.spark_helper.HdfsHelper._
Array("some", "relatively small", "text").writeToHdfs("/some/hdfs/file/path.txt")
"hello world".writeToHdfs("/some/hdfs/file/path.txt")

// Deletes all files/folders in "hdfs/path/to/folder" for which the timestamp is older than 10 days:
HdfsHelper.purgeFolder("hdfs/path/to/folder", 10)
```

In case a specific configuration is needed to access the file system, these
setters are available:

```scala
// To use a specific conf FileSystem.get(whateverConf) instead of FileSystem.get(new Configuration()):
HdfsHelper.setConf(whateverConf)
// Or directly the FileSystem:
HdfsHelper.setFileSystem(whateverFileSystem)
```

### SparkHelper:

The full list of methods is available at
[SparkHelper](http://xavierguihot.com/spark_helper/#com.spark_helper.SparkHelper$).

Contains basic file/RRD-related methods based on the Spark APIs.
Contains basic RRD-related methods.

A non-exhaustive list of exemples:
A non-exhaustive list of examples:

```scala
import com.spark_helper.SparkHelper
import com.spark_helper.SparkHelper._

// Same as SparkContext.saveAsTextFile, but the result is a single file:
SparkHelper.saveAsSingleTextFile(myOutputRDD, "/my/output/file/path.txt")
// Same as rdd.saveAsTextFile("path"), but the result is a single file (while
// keeping the processing distributed):
rdd.saveAsSingleTextFile("/my/output/file/path.txt")
rdd.saveAsSingleTextFile("/my/output/file/path.txt", classOf[BZip2Codec])

// Same as SparkContext.textFile, but instead of reading one record per line,
// it reads records spread over several lines. This way, xml, json, yml or
// any multi-line record file format can be used with Spark:
SparkHelper.textFileWithDelimiter("/my/input/folder/path", sparkContext, "---\n")
// Same as sc.textFile("path"), but instead of reading one record per line (by
// splitting the input with \n), it splits the file in records based on a custom
// delimiter. This way, xml, json, yml or any multi-line record file format can
// be used with Spark:
sc.textFile("/my/input/folder/path", "---\n") // for a yml file for instance

// Equivalent to sparkContext.textFile(), but for each line is tupled with its
// file path:
SparkHelper.textFileWithFileName("folder", sparkContext)
// Equivalent to rdd.flatMap(identity) for RDDs of Seqs or Options:
rdd.flatten

// Equivalent to sc.textFile(), but for each line is tupled with its file path:
sc.textFileWithFileName("/my/input/folder/path")
// which produces:
RDD(
("file:/path/on/machine/folder/file_1.txt", "record1fromfile1"),
("file:/path/on/machine/folder/file_1.txt", "record2fromfile1"),
("file:/path/on/machine/folder/file_2.txt", "record1fromfile2"),
...
)
// RDD(("folder/file_1.txt", "record1fromfile1"), ("folder/file_1.txt", "record2fromfile1"),
// ("folder/file_2.txt", "record1fromfile2"), ...)

// In the given folder, this generates one file per key in the given key/value
// RDD. Within each file (named from the key) are all values for this key:
rdd.saveAsTextFileByKey("/my/output/folder/path")

// Concept mapper (the following example transforms RDD(1, 3, 2, 7, 8) into RDD(1, 3, 4, 7, 16)):
rdd.partialMap { case a if a % 2 == 0 => 2 * a }

// For when input files contain commas and textFile can't handle it:
sc.textFile(Seq("path/hello,world.txt", "path/hello_world.txt"))
```

### DateHelper:
Expand All @@ -106,37 +129,59 @@ The full list of methods is available at
[DateHelper](http://xavierguihot.com/spark_helper/#com.spark_helper.DateHelper$).

Wrapper around [joda-time](http://www.joda.org/joda-time/apidocs/) for
data-mining classic dates manipulations.
data-mining classic dates manipulations and job scheduling.

A non-exhaustive list of exemples:
A non-exhaustive list of examples:

```scala
import com.spark_helper.DateHelper

assert(DateHelper.daysBetween("20161230", "20170101") == List("20161230", "20161231", "20170101"))
assert(DateHelper.today() == "20170310") // If today's "20170310"
assert(DateHelper.yesterday() == "20170309") // If today's "20170310"
assert(DateHelper.reformatDate("20170327", "yyyyMMdd", "yyMMdd") == "170327")
assert(DateHelper.now("HH:mm") == "10:24")
assert(DateHelper.currentTimestamp() == "1493105229736")
assert(DateHelper.nDaysBefore(3) == "20170307") // If today's "20170310"
assert(DateHelper.nDaysAfterDate(3, "20170307") == "20170310")
DateHelper.daysBetween("20161230", "20170101") // List("20161230", "20161231", "20170101")
DateHelper.today // "20170310"
DateHelper.yesterday // "20170309"
DateHelper.reformatDate("20170327", "yyyyMMdd", "yyMMdd") // "170327"
DateHelper.now("HH:mm") // "10:24"
DateHelper.currentTimestamp // "1493105229736"
DateHelper.nDaysBefore(3) // "20170307"
DateHelper.nDaysAfterDate(3, "20170307") // "20170310"
DateHelper.nextDay("20170310") // "20170311"
DateHelper.nbrOfDaysSince("20170302") // 8
DateHelper.nbrOfDaysBetween("20170327", "20170401") // 5
DateHelper.dayOfWeek("20160614") // 2

import com.spark_helper.DateHelper._

2.daysAgo // "20170308"
"20161230" to "20170101" // List("20161230", "20161231", "20170101")
3.daysBefore("20170310") // "20170307"
5.daysAfter // "20170315"
4.daysAfter("20170310") // "20170314"
"20170302".isCompliantWith("yyyyMMdd")
"20170310".nextDay // "20170311"
"20170310".previousDay // "20170309"
```

The default format (when no format is specified) is "yyyyMMdd" (20170327). It
can be modified globally with:

```scala
DateHelper.setFormat("ddMMMyy")
```

### Monitor:

The full list of methods is available at
[Monitor](http://xavierguihot.com/spark_helper/#com.spark_helper.Monitor$)

It's a simple logger/report which contains a report that one can update from
the driver and a success state. The idea is to persist job executions logs and
errors (and forget about grepping unreadable yarn logs).
It's a simple logger/report which contains a report and a state that one can
update from the driver. The idea is to persist job executions logs and errors
(and forget about grepping unreadable yarn logs).

It's designed for perdiodic spark jobs (handles storage and purge of logs) and
It's designed for periodic spark jobs (handles storage and purge of logs) and
provides a way to handle kpis validation.

Logs are stored on the go which means one can have a direct real time access of
the job logs/status and it's current state (which can overwise be a pain if it
the job logs/status and it's current state (which can otherwise be a pain if it
means going through yarn logs, or even for certain production environments going
through additional layers of software logs to get to yarn logs).

Expand All @@ -150,9 +195,9 @@ the logger for a clean logging.
This is a "driver-only" logger and is not intended at logging concurrent actions
from executors.

Produced reports can easily be inserted in a notification email whenerver the
Produced reports can easily be inserted in a notification email whenever the
job fails, which saves a lot of time to maintainers operating on heavy
production environements.
production environments.

The produced persisted report is also a way for downstream jobs to know the
status of their input data.
Expand Down Expand Up @@ -190,7 +235,7 @@ try {
Monitor.error(e, "My pipeline descirption") // whatever unexpected error
}

if (Monitor.isSuccess()) {
if (Monitor.isSuccess) {
val doMore = "Let's do some more stuff!"
Monitor.log("My second pipeline description: success")
}
Expand All @@ -199,9 +244,9 @@ if (Monitor.isSuccess()) {
// HDFS (this saves the logs in the folder set with Monitor.setLogFolder):
Monitor.store()

// At the end of the job, if the job isn't successfull, you might want to
// At the end of the job, if the job isn't successful, you might want to
// crash it (for instance to get a notification from your scheduler):
if (!Monitor.isSuccess()) throw new Exception() // or send an email, or ...
if (!Monitor.isSuccess) throw new Exception() // or send an email, or ...
```

At any time during the job, logs can be accessed from file
Expand All @@ -214,7 +259,7 @@ Here are some possible reports generated by the previous pipeline:
My job description (whatever you want); for instance:
Documentation: https://github.com/xavierguihot/spark_helper
[10:23] Begining
[10:23] Beginning
[10:23-10:23] My pipeline descirption: failed
Diagnostic: No input data!
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://my/hdfs/input/path
Expand All @@ -231,7 +276,7 @@ or
My job description (whatever you want); for instance:
Documentation: https://github.com/xavierguihot/spark_helper
[10:23] Begining
[10:23] Beginning
[10:23-10:41] My pipeline descirption: success
KPI: Nbr of output records
Value: 14669071.0
Expand All @@ -248,15 +293,15 @@ Documentation: https://github.com/xavierguihot/spark_helper
## Including spark_helper to your dependencies:


With sbt, add these lines to your build.sbt:
With sbt:

```scala
resolvers += "jitpack" at "https://jitpack.io"

libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "v1.1.1"
libraryDependencies += "com.github.xavierguihot" % "spark_helper" % "2.0.0"
```

With maven, add these lines to your pom.xml:
With maven:

```xml
<repositories>
Expand All @@ -269,11 +314,11 @@ With maven, add these lines to your pom.xml:
<dependency>
<groupId>com.github.xavierguihot</groupId>
<artifactId>spark_helper</artifactId>
<version>v1.1.1</version>
<version>2.0.0</version>
</dependency>
```

With gradle, add these lines to your build.gradle:
With gradle:

```groovy
allprojects {
Expand All @@ -283,6 +328,9 @@ allprojects {
}
dependencies {
compile 'com.github.xavierguihot:spark_helper:v1.1.1'
compile 'com.github.xavierguihot:spark_helper:2.0.0'
}
```

For versions anterior to `2.0.0`, use prefix `v` in the version tag; for
instance `v1.0.0`
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "spark_helper"

version := "1.1.1"
version := "2.0.0"

scalaVersion := "2.11.12"

Expand Down
Loading

0 comments on commit 405bf5c

Please sign in to comment.