Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor zep fixes #12

Open
wants to merge 124 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
124 commits
Select commit Hold shift + click to select a range
f9ffdd5
Fixed typo
May 7, 2015
b0e168e
refactoring to allow changes in ignition.mail
zago May 8, 2015
c93d702
Merge pull request #39 from zago/master
zago May 8, 2015
a8e9734
Fix ec2 request issue
filipenf May 8, 2015
4d11c7d
Merge pull request #40 from chaordic/fix-ec2-request
filipenf May 8, 2015
9ae5178
Minor improvements
May 18, 2015
53cfe88
remove unused lib
May 18, 2015
a246dbd
Merge pull request #42 from fparisotto/remove-nscala-time
douglaz May 18, 2015
d965fd6
Added utilitary function for better stack traces
Jun 1, 2015
c899619
Updated scalatest to fix conflicts
Jun 8, 2015
182a2d7
Merge pull request #44 from douglaz/combo_refactoring
fparisotto Jun 9, 2015
82a09c0
Improved s3 service
Jun 11, 2015
c32cce5
Add optinal content type
flaviotruzzi Jun 11, 2015
8f51a86
Added content encoding
Jun 12, 2015
c752d93
Upgraded scalatest
Jun 18, 2015
59f818d
Added removeEmpty to Maps
Jun 23, 2015
842ca9d
Added removeEmpty to Maps
Jun 23, 2015
d05f836
exclude slf4j-log4j12 backend
Jun 25, 2015
b25a616
Merge pull request #45 from fparisotto/exclude-slf4j-log4j12
douglaz Jun 29, 2015
c9abcd5
added method that allow to map future using success/failure
Jun 30, 2015
5cc2dda
Merge remote-tracking branch 'origin/master' into refactory-re
Jul 7, 2015
48f4e2c
change catch to NonFatal
Jul 7, 2015
f75749d
Merge pull request #46 from sisso/refactory-re
Jul 7, 2015
bab487a
attempt to update spark_ec2.py
Aug 13, 2015
83fcbcb
get_spark_ami fix
Aug 13, 2015
807f0f6
remove user data, spark-ec2 takes care on formatting disks
Aug 14, 2015
637ab06
fix variable replacement
Aug 14, 2015
f6d5d0d
remove rstudio and some fixes
Aug 14, 2015
7787045
update spark-ec2 version
Aug 14, 2015
ccfed3f
pr review, fix removed feature and added noop user-data
Aug 17, 2015
4c6d4d6
Merge pull request #47 from fparisotto/spark-ec2-update
fparisotto Aug 17, 2015
9bbcd18
added heap size param for driver
Aug 18, 2015
a2d5af9
parameterized memory unit
Aug 18, 2015
f165937
fix default memory size to match default master instance type
Aug 19, 2015
8321762
Merge pull request #48 from fparisotto/heap-size-driver-param
fparisotto Aug 19, 2015
980a278
Use the driver heap size param
Aug 19, 2015
75163e8
Merge pull request #49 from chaordic/driver_size_fix
douglaz Aug 19, 2015
c78c319
Update spark_ec2.py
douglaz Aug 26, 2015
a5379a0
Update spark_ec2.py
douglaz Aug 26, 2015
206e090
Merge pull request #50 from chaordic/spark_1.4.1
douglaz Aug 26, 2015
59ba132
Use Spark 1.4.1
Aug 27, 2015
63e867a
Increase group to avoid slowdowns
Aug 28, 2015
f12dfdc
Updated core to ignore spark ec2 boto
Aug 31, 2015
cae677f
Make spark 1.4.1 the default
Aug 31, 2015
14324a2
Added IntBag
Sep 1, 2015
3cb2ef5
Adds an option to launch the cluster master as spot
mwconceicao Sep 1, 2015
38213b4
Fix serialization
Sep 2, 2015
d8eabe6
Merge branch 'master' of github.com:chaordic/ignition-core into bag_s…
Sep 2, 2015
d668f40
Date between helper
luisguilherme Sep 4, 2015
5072a9c
Merge pull request #53 from luisguilherme/master
luisguilherme Sep 4, 2015
b2a6025
Adds a TODO!
mwconceicao Sep 8, 2015
08ae1dd
some kind of hack to parallel read and list files using spark cluster…
Sep 9, 2015
056b4f8
Merge branch 'master' of github.com:chaordic/ignition-core into paral…
Sep 9, 2015
4576607
Merge pull request #52 from mwconceicao/spark-ec2-master-spot
mwconceicao Sep 11, 2015
5dfb7c8
Merge branch 'master' of github.com:chaordic/ignition-core into paral…
Sep 14, 2015
c56c027
some of pr reivews
Sep 14, 2015
8ffee27
pr review
Sep 15, 2015
7234254
logging input stream close failure
Sep 16, 2015
af00eef
better exception report
Sep 16, 2015
57659d4
Merge pull request #54 from fparisotto/parallel-read-listfiles
fparisotto Sep 16, 2015
89630eb
setting UTF-8 codec to read file content (same behavior of hadoop cli…
Sep 17, 2015
ba0efda
Merge pull request #55 from fparisotto/utf8-readlines
fparisotto Sep 17, 2015
06ac774
will delete SG's after cluster destroy
fernandors87 Sep 21, 2015
17ca790
Merge pull request #56 from chaordic/delete_sgs
fernandors87 Sep 22, 2015
f72eaf3
Merge branch 'master' of github.com:chaordic/ignition-core into bag_s…
Sep 30, 2015
b273708
Merge pull request #51 from chaordic/bag_support
douglaz Sep 30, 2015
10b086e
spark 1.5.1 update
Oct 15, 2015
1e9f160
Merge pull request #57 from fparisotto/spark-1-5-update
douglaz Oct 15, 2015
a59f2eb
fix spark_ec2.py
Oct 15, 2015
3c6185b
Merge pull request #58 from fparisotto/spark-1-5-update
douglaz Oct 15, 2015
b176cc5
Added executor instances option
Oct 15, 2015
257ae2b
Merge pull request #59 from chaordic/executor_instances
douglaz Oct 15, 2015
437e264
Adding filterAndGetParallelTextFiles
Oct 27, 2015
637b80d
Many improvements
Oct 27, 2015
0563fab
Small improvements
Oct 28, 2015
97cb6e2
Merge pull request #60 from chaordic/new_function
douglaz Oct 28, 2015
cc4f716
Fix file system issues in corner cases
Oct 28, 2015
75ed62a
Merge pull request #61 from chaordic/fs_fix
douglaz Oct 28, 2015
5a49164
Make it faster in some situations
Nov 3, 2015
1ea78e0
Merge pull request #62 from chaordic/faster_parallel
douglaz Nov 3, 2015
506bd1c
Split gzip files and other improvements
Nov 9, 2015
607ac27
Merge pull request #63 from chaordic/splitted_files
douglaz Nov 9, 2015
dc12d2a
Use SplittableGzipCodec only for big files
Nov 10, 2015
1f18d92
Merge pull request #64 from chaordic/SplittableGzipCodec_fix
douglaz Nov 10, 2015
b52ecee
Dont use build with updated hadoop client
Nov 11, 2015
6b0d622
Merge pull request #66 from chaordic/rollback
douglaz Nov 11, 2015
f1075e8
s3 list
Nov 17, 2015
9091366
Split compressed big files
Nov 20, 2015
368a998
Removed unused dependency
Nov 20, 2015
016de5b
pr review
Nov 23, 2015
2d01898
Merge pull request #67 from fparisotto/s3-list
fparisotto Nov 23, 2015
5754857
merge
Nov 23, 2015
796b983
Merge pull request #68 from chaordic/split
douglaz Nov 23, 2015
7c23316
fix lambda ref to close resources
Nov 25, 2015
5e60752
Merge pull request #69 from fparisotto/bugfix-leak
fparisotto Nov 25, 2015
358459f
Small Xlint fixes
ljmachado Dec 4, 2015
5f54641
Make it partially compatible with scala 2.11 and Xlint free and minor…
Dec 7, 2015
dade291
merged
Dec 7, 2015
0ec3724
Make it partially compatible with scala 2.11 and Xlint free and minor…
Dec 7, 2015
b66d05d
Renaming
Dec 7, 2015
2f6741d
Use null instead of Unit because Unit isnt serialiable
Dec 7, 2015
bf2d27b
Merge pull request #72 from chaordic/cleanup
douglaz Dec 7, 2015
84e98f4
new filter and get text files
Nov 27, 2015
a1d226a
merge
Dec 8, 2015
8e27081
Merge pull request #71 from fparisotto/new-filterandget
fparisotto Dec 8, 2015
f5ad7f2
fix empty file filter
Dec 8, 2015
0af60fc
Merge pull request #74 from fparisotto/filter-empty-file
fparisotto Dec 8, 2015
5587537
fix narrow paths for paths without common prefixes (like final folders)
Dec 10, 2015
a31e80c
Merge pull request #75 from fparisotto/fix-narrow-paths
fparisotto Dec 10, 2015
b253f29
Added some new utils
Dec 21, 2015
afdbe22
Merge branch 'master' of github.com:chaordic/ignition-core
Dec 21, 2015
877f13a
Merge pull request #76 from chaordic/new_utils
douglaz Dec 21, 2015
352ee0b
Minor change
Dec 22, 2015
d780ea5
Make try work even if the exception is fatall
Jan 15, 2016
400b1f0
zeppelin setup
Feb 1, 2016
3331279
pr review
Feb 2, 2016
33aa47e
rdd.filterNot
Feb 3, 2016
93964db
open a browser for zepplin web ui
Feb 3, 2016
5137e43
using webbrowser lib
Feb 3, 2016
e5d0fd9
Merge pull request #78 from fparisotto/filter-not
fparisotto Feb 4, 2016
3b559dd
Merge pull request #77 from fparisotto/zeppelin-setup
douglaz Feb 4, 2016
b0c323c
Do not delete the security group by default
ljmachado Feb 22, 2016
7c891d6
Merge pull request #79 from chaordic/DefaultDeleteSecGroup
ljmachado Feb 22, 2016
ce911f6
Fixing typo and adding driver heap param
ljmachado Feb 24, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ project/plugins/project/

# Node
node_modules

# Spark-ec2 boto
tools/spark-ec2/lib
20 changes: 13 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version := "1.0"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature", "-Xfatal-warnings")
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature", "-Xfatal-warnings", "-Xlint", "-Ywarn-dead-code", "-Xmax-classfile-name", "130")

ideaExcludeFolders += ".idea"

Expand All @@ -13,22 +13,28 @@ ideaExcludeFolders += ".idea_modules"
// Because we can't run two spark contexts on same VM
parallelExecution in Test := false

libraryDependencies += ("org.apache.spark" %% "spark-core" % "1.3.0" % "provided").exclude("org.apache.hadoop", "hadoop-client")
libraryDependencies += ("org.apache.spark" %% "spark-core" % "1.5.1" % "provided")
.exclude("org.apache.hadoop", "hadoop-client")
.exclude("org.slf4j", "slf4j-log4j12")

libraryDependencies += ("org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.7.1" % "provided")

libraryDependencies += "com.github.nscala-time" %% "nscala-time" % "0.8.0"

libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.0"

libraryDependencies += "org.scalaj" %% "scalaj-http" % "0.3.16"
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.4"

libraryDependencies += "org.scalaz" %% "scalaz-core" % "7.0.6"

libraryDependencies += "com.github.scopt" %% "scopt" % "3.2.0"

libraryDependencies += "net.java.dev.jets3t" % "jets3t" % "0.7.1"

libraryDependencies += "joda-time" % "joda-time" % "2.7"

libraryDependencies += "org.joda" % "joda-convert" % "1.7"

libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.9.6"

libraryDependencies += "commons-lang" % "commons-lang" % "2.6"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

resolvers += "Sonatype OSS Releases" at "http://oss.sonatype.org/content/repositories/releases/"
Expand Down
23 changes: 21 additions & 2 deletions remote_hook.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CONTROL_DIR="${5?Please give the Control Directory}"
SPARK_MEM_PARAM="${6?Please give the Job Memory Size to use}"
USE_YARN="${7?Please tell if we should use YARN (yes/no)}"
NOTIFY_ON_ERRORS="${8?Please tell if we will notify on errors (yes/no)}"
DRIVER_HEAP_SIZE="${9?Please tell driver heap size to use}"

JOB_WITH_TAG=${JOB_NAME}.${JOB_TAG}
JOB_CONTROL_DIR="${CONTROL_DIR}/${JOB_WITH_TAG}"
Expand Down Expand Up @@ -48,6 +49,23 @@ on_trap_exit() {
rm -f "${RUNNING_FILE}"
}

install_and_run_zeppelin() {
if [[ ! -d "zeppelin" ]]; then
wget "http://www.us.apache.org/dist/incubator/zeppelin/0.5.6-incubating/zeppelin-0.5.6-incubating-bin-all.tgz" -O zeppelin.tar.gz
mkdir zeppelin
tar xvzf zeppelin.tar.gz -C zeppelin --strip-components 1 > /tmp/zeppelin_install.log
fi
if [[ -f "zeppelin/bin/zeppelin.sh" ]]; then
export MASTER="${JOB_MASTER}"
export ZEPPELIN_PORT="8081"
export SPARK_HOME="/root/spark"
export SPARK_SUBMIT_OPTIONS="--jars ${JAR_PATH} --runner-executor-memory ${SPARK_MEM_PARAM}"
sudo -E zeppelin/bin/zeppelin.sh
else
notify_error_and_exit "Zepellin installation not found"
fi
}


trap "on_trap_exit" EXIT

Expand All @@ -73,14 +91,15 @@ if [[ "${USE_YARN}" == "yes" ]]; then
export SPARK_WORKER_MEMORY=${SPARK_MEM_PARAM}
fi


if [[ "${JOB_NAME}" == "shell" ]]; then
export ADD_JARS=${JAR_PATH}
sudo -E ${SPARK_HOME}/bin/spark-shell || notify_error_and_exit "Execution failed for shell"
elif [[ "${JOB_NAME}" == "zeppelin" ]]; then
install_and_run_zeppelin
else
JOB_OUTPUT="${JOB_CONTROL_DIR}/output.log"
tail -F "${JOB_OUTPUT}" &
sudo -E "${SPARK_HOME}/bin/spark-submit" --master "${JOB_MASTER}" --driver-memory 25000M --driver-java-options "-Djava.io.tmpdir=/mnt -verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps" --class "${MAIN_CLASS}" ${JAR_PATH} "${JOB_NAME}" --runner-date "${JOB_DATE}" --runner-tag "${JOB_TAG}" --runner-user "${JOB_USER}" --runner-master "${JOB_MASTER}" --runner-executor-memory "${SPARK_MEM_PARAM}" >& "${JOB_OUTPUT}" || notify_error_and_exit "Execution failed for job ${JOB_WITH_TAG}"
sudo -E "${SPARK_HOME}/bin/spark-submit" --master "${JOB_MASTER}" --driver-memory "${DRIVER_HEAP_SIZE}" --driver-java-options "-Djava.io.tmpdir=/mnt -verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps" --class "${MAIN_CLASS}" ${JAR_PATH} "${JOB_NAME}" --runner-date "${JOB_DATE}" --runner-tag "${JOB_TAG}" --runner-user "${JOB_USER}" --runner-master "${JOB_MASTER}" --runner-executor-memory "${SPARK_MEM_PARAM}" >& "${JOB_OUTPUT}" || notify_error_and_exit "Execution failed for job ${JOB_WITH_TAG}"
fi

touch "${JOB_CONTROL_DIR}/SUCCESS"
11 changes: 8 additions & 3 deletions src/main/scala/ignition/core/jobs/CoreJobRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ object CoreJobRunner {

// Used to provide contextual logging
def setLoggingContextValues(config: RunnerConfig): Unit = {
org.slf4j.MDC.put("setupName", config.setupName)
org.slf4j.MDC.put("tag", config.tag)
org.slf4j.MDC.put("user", config.user)
try { // yes, this may fail but we don't want everything to shut down
org.slf4j.MDC.put("setupName", config.setupName)
org.slf4j.MDC.put("tag", config.tag)
org.slf4j.MDC.put("user", config.user)
} catch {
case e: Throwable =>
// cry
}
}

case class RunnerConfig(setupName: String = "nosetup",
Expand Down
8 changes: 7 additions & 1 deletion src/main/scala/ignition/core/jobs/utils/RDDUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ object RDDUtils {
def incrementCounterIf(cond: (V) => Boolean, acc: spark.Accumulator[Int]): RDD[V] = {
rdd.map(x => { if (cond(x)) acc += 1; x })
}

def filterNot(p: V => Boolean): RDD[V] = rdd.filter(!p(_))
}

implicit class PairRDDImprovements[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) {
Expand All @@ -80,11 +82,15 @@ object RDDUtils {
}, preservesPartitioning = true)
}

def collectValues[U: ClassTag](f: PartialFunction[V, U]): RDD[(K, U)] = {
rdd.filter { case (k, v) => f.isDefinedAt(v) }.mapValues(f)
}

def groupByKeyAndTake(n: Int): RDD[(K, List[V])] =
rdd.aggregateByKey(List.empty[V])(
(lst, v) =>
if (lst.size >= n) {
logger.warn(s"Ignoring value '$v' due aggregation result of size '${lst.size}' is bigger then n = '$n'")
logger.warn(s"Ignoring value '$v' due aggregation result of size '${lst.size}' is bigger than n=$n")
lst
} else {
v :: lst
Expand Down
Loading