From 947fa76c90a2461da006ae8fcc2c2b1e4b54faa1 Mon Sep 17 00:00:00 2001 From: Max Date: Wed, 21 Jan 2015 18:31:44 +0100 Subject: [PATCH 1/5] implement flink on bdutil in standalone mode ./bdutil -e extensions/flink_env.sh deploy --- extensions/flink/flink_env.sh | 62 +++++++++++++++++++++++++++++ extensions/flink/install_flink.sh | 66 +++++++++++++++++++++++++++++++ extensions/flink/start_flink.sh | 8 ++++ 3 files changed, 136 insertions(+) create mode 100644 extensions/flink/flink_env.sh create mode 100644 extensions/flink/install_flink.sh create mode 100644 extensions/flink/start_flink.sh diff --git a/extensions/flink/flink_env.sh b/extensions/flink/flink_env.sh new file mode 100644 index 0000000..f265061 --- /dev/null +++ b/extensions/flink/flink_env.sh @@ -0,0 +1,62 @@ +# TODO add licence + + +# This file contains environment-variable overrides to be used in conjunction +# with bdutil_env.sh in order to deploy a Hadoop + Flink cluster. +# Usage: ./bdutil deploy -e extensions/flink/flink_env.sh + +# In standalone mode, Flink runs the job manager and the task managers (workers) +# on the cluster without using YARN containers. Flink also supports YARN +# deployment which will be implemented in future version of the Flink bdutil plugin. +FLINK_MODE="standalone" + +# URIs of tarballs for installation. +FLINK_HADOOP1_TARBALL_URI='gs://flink-dist/flink-0.8.0-bin-hadoop1.tgz' +FLINK_HADOOP2_TARBALL_URI='gs://flink-dist/flink-0.8.0-bin-hadoop2.tgz' +FLINK_HADOOP2_YARN_TARBALL_URI='gs://flink-dist/flink-0.8.0-bin-hadoop2-yarn.tgz' + +# Directory on each VM in which to install each package. +FLINK_INSTALL_DIR='/home/hadoop/flink-install' + +# Optional JVM arguments to pass +# Flink config entry: env.java.opts: +FLINK_JAVA_OPTS="-DsomeOption=value" + +# Heap memory used by the job manager (master) determined by the physical (free) memory of the server +# Flink config entry: jobmanager.heap.mb: {{jobmanager_heap}} +FLINK_JOBMANAGER_MEMORY_FRACTION='0.8' + +# Heap memory used by the task managers (slaves) determined by the physical (free) memory of the servers +# Flink config entry: taskmanager.heap.mb: {{taskmanager_heap}} +FLINK_TASKMANAGER_MEMORY_FRACTION='0.8' + +# Number of task slots per task manager (worker) +# ideally set to the number of physical cpus +# if set to 'auto', the number of slots will be determined automatically +# Flink config entry: taskmanager.numberOfTaskSlots: {{num_task_slots}} +FLINK_TASKMANAGER_SLOTS='auto' + +# Default parallelization degree (number of concurrent actions per task) +# If set to 'auto', this will be determined automatically +# Flink config entry: parallelization.degree.default: {{parallelization}} +FLINK_PARALLELIZATION_DEGREE='auto' + +# The number of buffers for the network stack. +# Flink config entry: taskmanager.network.numberOfBuffers: {{taskmanager_num_buffers}} +FLINK_NETWORK_NUM_BUFFERS=2048 + + +COMMAND_GROUPS+=( + "install_flink: + extensions/flink/install_flink.sh + " + "start_flink: + extensions/flink/start_flink.sh + " +) + +# Installation of flink on master and workers; then start_flink only on master. +COMMAND_STEPS+=( + 'install_flink,install_flink' + 'start_flink,*' +) diff --git a/extensions/flink/install_flink.sh b/extensions/flink/install_flink.sh new file mode 100644 index 0000000..5b1fc7f --- /dev/null +++ b/extensions/flink/install_flink.sh @@ -0,0 +1,66 @@ +# TODO add licence + +# fail if undeclared variables are used +set -o nounset +# exit on error +set -o errexit + + +# Figure out which tarball to use based on which Hadoop version is being used. +set +o nounset +HADOOP_BIN="sudo -u hadoop ${HADOOP_INSTALL_DIR}/bin/hadoop" +HADOOP_VERSION=$(${HADOOP_BIN} version | tr -cd [:digit:] | head -c1) +set -o nounset +if [[ "${HADOOP_VERSION}" == '2' ]]; then + FLINK_TARBALL_URI=${FLINK_HADOOP2_TARBALL_URI} +else + FLINK_TARBALL_URI=${FLINK_HADOOP1_TARBALL_URI} +fi + +# Install Flink via this fancy pipe +gsutil cat "${FLINK_TARBALL_URI}" | tar -C /home/hadoop/ -xzv +mv /home/hadoop/flink* "${FLINK_INSTALL_DIR}" + +# List all task managers (workers) in the slaves file +# The task managers will be brought up by the job manager (master) +echo ${WORKERS[@]} | tr ' ' '\n' > ${FLINK_INSTALL_DIR}/conf/slaves + +# Create temp file in hadoop directory which might be mounted to other storage than os +FLINK_TASKMANAGER_TEMP_DIR="/hadoop/flink/tmp" +mkdir -p ${FLINK_TASKMANAGER_TEMP_DIR} +chgrp hadoop -R /hadoop/flink +chmod 777 -R /hadoop/flink + +# Calculate the memory allocations, MB, using 'free -m'. Floor to nearest MB. +TOTAL_MEM=$(free -m | awk '/^Mem:/{print $2}') +FLINK_JOBMANAGER_MEMORY=$(python -c \ + "print int(${TOTAL_MEM} * ${FLINK_JOBMANAGER_MEMORY_FRACTION})") +FLINK_TASKMANAGER_MEMORY=$(python -c \ + "print int(${TOTAL_MEM} * ${FLINK_TASKMANAGER_MEMORY_FRACTION})") + +# Determine the number of task slots +if [[ "${FLINK_TASKMANAGER_SLOTS}" == "auto" ]] ; then + FLINK_TASKMANAGER_SLOTS=`grep -c processor /proc/cpuinfo` +fi + +# Determine the default degree of parallelization +if [[ "${FLINK_PARALLELIZATION_DEGREE}" == "auto" ]] ; then + FLINK_PARALLELIZATION_DEGREE=$(python -c \ + "print ${NUM_WORKERS} * ${FLINK_TASKMANAGER_SLOTS}") +fi + +# Apply Flink settings by appending them to the default config +cat << EOF >> ${FLINK_INSTALL_DIR}/conf/flink-conf.yaml +jobmanager.rpc.address: ${MASTER_HOSTNAME} +jobmanager.heap.mb: ${FLINK_JOBMANAGER_MEMORY} +taskmanager.heap.mb: ${FLINK_TASKMANAGER_MEMORY} +taskmanager.numberOfTaskSlots: ${FLINK_TASKMANAGER_SLOTS} +parallelization.degree.default: ${FLINK_PARALLELIZATION_DEGREE} +taskmanager.network.numberOfBuffers: ${FLINK_NETWORK_NUM_BUFFERS} +env.java.opts: ${FLINK_JAVA_OPTS} +taskmanager.tmp.dirs: ${FLINK_TASKMANAGER_TEMP_DIR} +fs.hdfs.hadoopconf: ${HADOOP_CONF_DIR} +EOF + +# Assign ownership of everything to the 'hadoop' user. +chown -R hadoop:hadoop /home/hadoop/ \ No newline at end of file diff --git a/extensions/flink/start_flink.sh b/extensions/flink/start_flink.sh new file mode 100644 index 0000000..2150af4 --- /dev/null +++ b/extensions/flink/start_flink.sh @@ -0,0 +1,8 @@ +# TODO add licence + +set -o nounset +set -o errexit + +if [[ ${FLINK_MODE} == 'standalone' ]]; then + sudo -u hadoop ${FLINK_INSTALL_DIR}/bin/start-cluster.sh +fi \ No newline at end of file From 1e819e04066bee3f0726d4531aaaeb1605ee0329 Mon Sep 17 00:00:00 2001 From: Max Date: Wed, 21 Jan 2015 18:44:14 +0100 Subject: [PATCH 2/5] add Apache licence --- extensions/flink/flink_env.sh | 14 ++++++++++++-- extensions/flink/install_flink.sh | 13 ++++++++++++- extensions/flink/start_flink.sh | 12 +++++++++++- 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/extensions/flink/flink_env.sh b/extensions/flink/flink_env.sh index f265061..71d9068 100644 --- a/extensions/flink/flink_env.sh +++ b/extensions/flink/flink_env.sh @@ -1,10 +1,20 @@ -# TODO add licence - +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS-IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # This file contains environment-variable overrides to be used in conjunction # with bdutil_env.sh in order to deploy a Hadoop + Flink cluster. # Usage: ./bdutil deploy -e extensions/flink/flink_env.sh + # In standalone mode, Flink runs the job manager and the task managers (workers) # on the cluster without using YARN containers. Flink also supports YARN # deployment which will be implemented in future version of the Flink bdutil plugin. diff --git a/extensions/flink/install_flink.sh b/extensions/flink/install_flink.sh index 5b1fc7f..78e48d6 100644 --- a/extensions/flink/install_flink.sh +++ b/extensions/flink/install_flink.sh @@ -1,4 +1,15 @@ -# TODO add licence +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS-IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # fail if undeclared variables are used set -o nounset diff --git a/extensions/flink/start_flink.sh b/extensions/flink/start_flink.sh index 2150af4..5076d73 100644 --- a/extensions/flink/start_flink.sh +++ b/extensions/flink/start_flink.sh @@ -1,4 +1,14 @@ -# TODO add licence +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS-IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. set -o nounset set -o errexit From a4bd4a8f4ac9eb4e6d14a5f6cffbdcfcba36122c Mon Sep 17 00:00:00 2001 From: Max Date: Thu, 22 Jan 2015 11:36:18 +0100 Subject: [PATCH 3/5] make the flink log dir writable for all users --- extensions/flink/install_flink.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/extensions/flink/install_flink.sh b/extensions/flink/install_flink.sh index 78e48d6..46c4767 100644 --- a/extensions/flink/install_flink.sh +++ b/extensions/flink/install_flink.sh @@ -74,4 +74,6 @@ fs.hdfs.hadoopconf: ${HADOOP_CONF_DIR} EOF # Assign ownership of everything to the 'hadoop' user. -chown -R hadoop:hadoop /home/hadoop/ \ No newline at end of file +chown -R hadoop:hadoop /home/hadoop/ +# Make the Flink log directory writable +chmod 777 ${FLINK_INSTALL_DIR}/log \ No newline at end of file From 890bf5a5e12450d0604f636b9d53b859d6da7038 Mon Sep 17 00:00:00 2001 From: Max Date: Thu, 22 Jan 2015 12:32:26 +0100 Subject: [PATCH 4/5] add the gcs-connector to the Flink class path --- extensions/flink/install_flink.sh | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/extensions/flink/install_flink.sh b/extensions/flink/install_flink.sh index 46c4767..0002498 100644 --- a/extensions/flink/install_flink.sh +++ b/extensions/flink/install_flink.sh @@ -73,6 +73,25 @@ taskmanager.tmp.dirs: ${FLINK_TASKMANAGER_TEMP_DIR} fs.hdfs.hadoopconf: ${HADOOP_CONF_DIR} EOF +# Find the Hadoop lib dir so and add its gcs-connector to the Flink lib dir +set +o nounset +if [[ -r "${HADOOP_INSTALL_DIR}/libexec/hadoop-config.sh" ]]; then + . "${HADOOP_INSTALL_DIR}/libexec/hadoop-config.sh" +fi +if [[ -n "${HADOOP_COMMON_LIB_JARS_DIR}" ]] && \ + [[ -n "${HADOOP_PREFIX}" ]]; then + LIB_JARS_DIR="${HADOOP_PREFIX}/${HADOOP_COMMON_LIB_JARS_DIR}" +else + LIB_JARS_DIR="${HADOOP_INSTALL_DIR}/lib" +fi +set -o nounset +# Get jar name and path +GCS_JARNAME=$(grep -o '[^/]*\.jar' <<< ${GCS_CONNECTOR_JAR}) +LOCAL_GCS_JAR="${LIB_JARS_DIR}/${GCS_JARNAME}" +# create link in Flink lib dir +ln -s "${LOCAL_GCS_JAR}" "${FLINK_INSTALL_DIR}/lib/" + + # Assign ownership of everything to the 'hadoop' user. chown -R hadoop:hadoop /home/hadoop/ # Make the Flink log directory writable From dcf157f437fb6105672aef68658224394096826d Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 23 Jan 2015 11:18:38 +0100 Subject: [PATCH 5/5] add README to explain how to deploy Flink with bdutil --- extensions/flink/README.md | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 extensions/flink/README.md diff --git a/extensions/flink/README.md b/extensions/flink/README.md new file mode 100644 index 0000000..73c792d --- /dev/null +++ b/extensions/flink/README.md @@ -0,0 +1,37 @@ +Deploying Flink on Google Compute Engine +======================================== + +Set up a bucket +---------------- + +If you have not done so, create a bucket for the bdutil config and +staging files. A new bucket can be created with the gsutil: + + gsutil mb gs:// + + +Adapt the bdutil config +----------------------- + +To deploy Flink with bdutil, adapt at least the following variables in +bdutil_env.sh. + + CONFIGBUCKET="" + PROJECT="" + NUM_WORKERS= + + +Bring up a cluster with Flink +----------------------------- + +To bring up the Flink cluster on Google Compute Engine, execute: + + ./bdutil -e extensions/flink/flink_env.sh deploy + +To run a Flink example job: + + ./bdutil shell + curl http://www.gutenberg.org/cache/epub/2265/pg2265.txt > text + gsutil cp text gs:///text + cd /home/hadoop/flink-install/bin + ./flink run ../examples/flink-java-examples-*-WordCount.jar gs:///text gs:///output \ No newline at end of file