Skip to content
This repository has been archived by the owner on Mar 27, 2022. It is now read-only.

Support for Apache Flink #6

Merged
merged 5 commits into from
Jan 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions extensions/flink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Deploying Flink on Google Compute Engine
========================================

Set up a bucket
----------------

If you have not done so, create a bucket for the bdutil config and
staging files. A new bucket can be created with the gsutil:

gsutil mb gs://<bucket_name>


Adapt the bdutil config
-----------------------

To deploy Flink with bdutil, adapt at least the following variables in
bdutil_env.sh.

CONFIGBUCKET="<bucket_name>"
PROJECT="<compute_engine_project_name>"
NUM_WORKERS=<number_of_workers>


Bring up a cluster with Flink
-----------------------------

To bring up the Flink cluster on Google Compute Engine, execute:

./bdutil -e extensions/flink/flink_env.sh deploy

To run a Flink example job:

./bdutil shell
curl http://www.gutenberg.org/cache/epub/2265/pg2265.txt > text
gsutil cp text gs://<bucket_name>/text
cd /home/hadoop/flink-install/bin
./flink run ../examples/flink-java-examples-*-WordCount.jar gs://<bucket_name>/text gs://<bucket_name>/output
72 changes: 72 additions & 0 deletions extensions/flink/flink_env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This file contains environment-variable overrides to be used in conjunction
# with bdutil_env.sh in order to deploy a Hadoop + Flink cluster.
# Usage: ./bdutil deploy -e extensions/flink/flink_env.sh


# In standalone mode, Flink runs the job manager and the task managers (workers)
# on the cluster without using YARN containers. Flink also supports YARN
# deployment which will be implemented in future version of the Flink bdutil plugin.
FLINK_MODE="standalone"

# URIs of tarballs for installation.
FLINK_HADOOP1_TARBALL_URI='gs://flink-dist/flink-0.8.0-bin-hadoop1.tgz'
FLINK_HADOOP2_TARBALL_URI='gs://flink-dist/flink-0.8.0-bin-hadoop2.tgz'
FLINK_HADOOP2_YARN_TARBALL_URI='gs://flink-dist/flink-0.8.0-bin-hadoop2-yarn.tgz'

# Directory on each VM in which to install each package.
FLINK_INSTALL_DIR='/home/hadoop/flink-install'

# Optional JVM arguments to pass
# Flink config entry: env.java.opts:
FLINK_JAVA_OPTS="-DsomeOption=value"

# Heap memory used by the job manager (master) determined by the physical (free) memory of the server
# Flink config entry: jobmanager.heap.mb: {{jobmanager_heap}}
FLINK_JOBMANAGER_MEMORY_FRACTION='0.8'

# Heap memory used by the task managers (slaves) determined by the physical (free) memory of the servers
# Flink config entry: taskmanager.heap.mb: {{taskmanager_heap}}
FLINK_TASKMANAGER_MEMORY_FRACTION='0.8'

# Number of task slots per task manager (worker)
# ideally set to the number of physical cpus
# if set to 'auto', the number of slots will be determined automatically
# Flink config entry: taskmanager.numberOfTaskSlots: {{num_task_slots}}
FLINK_TASKMANAGER_SLOTS='auto'

# Default parallelization degree (number of concurrent actions per task)
# If set to 'auto', this will be determined automatically
# Flink config entry: parallelization.degree.default: {{parallelization}}
FLINK_PARALLELIZATION_DEGREE='auto'

# The number of buffers for the network stack.
# Flink config entry: taskmanager.network.numberOfBuffers: {{taskmanager_num_buffers}}
FLINK_NETWORK_NUM_BUFFERS=2048


COMMAND_GROUPS+=(
"install_flink:
extensions/flink/install_flink.sh
"
"start_flink:
extensions/flink/start_flink.sh
"
)

# Installation of flink on master and workers; then start_flink only on master.
COMMAND_STEPS+=(
'install_flink,install_flink'
'start_flink,*'
)
98 changes: 98 additions & 0 deletions extensions/flink/install_flink.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# fail if undeclared variables are used
set -o nounset
# exit on error
set -o errexit


# Figure out which tarball to use based on which Hadoop version is being used.
set +o nounset
HADOOP_BIN="sudo -u hadoop ${HADOOP_INSTALL_DIR}/bin/hadoop"
HADOOP_VERSION=$(${HADOOP_BIN} version | tr -cd [:digit:] | head -c1)
set -o nounset
if [[ "${HADOOP_VERSION}" == '2' ]]; then
FLINK_TARBALL_URI=${FLINK_HADOOP2_TARBALL_URI}
else
FLINK_TARBALL_URI=${FLINK_HADOOP1_TARBALL_URI}
fi

# Install Flink via this fancy pipe
gsutil cat "${FLINK_TARBALL_URI}" | tar -C /home/hadoop/ -xzv
mv /home/hadoop/flink* "${FLINK_INSTALL_DIR}"

# List all task managers (workers) in the slaves file
# The task managers will be brought up by the job manager (master)
echo ${WORKERS[@]} | tr ' ' '\n' > ${FLINK_INSTALL_DIR}/conf/slaves

# Create temp file in hadoop directory which might be mounted to other storage than os
FLINK_TASKMANAGER_TEMP_DIR="/hadoop/flink/tmp"
mkdir -p ${FLINK_TASKMANAGER_TEMP_DIR}
chgrp hadoop -R /hadoop/flink
chmod 777 -R /hadoop/flink

# Calculate the memory allocations, MB, using 'free -m'. Floor to nearest MB.
TOTAL_MEM=$(free -m | awk '/^Mem:/{print $2}')
FLINK_JOBMANAGER_MEMORY=$(python -c \
"print int(${TOTAL_MEM} * ${FLINK_JOBMANAGER_MEMORY_FRACTION})")
FLINK_TASKMANAGER_MEMORY=$(python -c \
"print int(${TOTAL_MEM} * ${FLINK_TASKMANAGER_MEMORY_FRACTION})")

# Determine the number of task slots
if [[ "${FLINK_TASKMANAGER_SLOTS}" == "auto" ]] ; then
FLINK_TASKMANAGER_SLOTS=`grep -c processor /proc/cpuinfo`
fi

# Determine the default degree of parallelization
if [[ "${FLINK_PARALLELIZATION_DEGREE}" == "auto" ]] ; then
FLINK_PARALLELIZATION_DEGREE=$(python -c \
"print ${NUM_WORKERS} * ${FLINK_TASKMANAGER_SLOTS}")
fi

# Apply Flink settings by appending them to the default config
cat << EOF >> ${FLINK_INSTALL_DIR}/conf/flink-conf.yaml
jobmanager.rpc.address: ${MASTER_HOSTNAME}
jobmanager.heap.mb: ${FLINK_JOBMANAGER_MEMORY}
taskmanager.heap.mb: ${FLINK_TASKMANAGER_MEMORY}
taskmanager.numberOfTaskSlots: ${FLINK_TASKMANAGER_SLOTS}
parallelization.degree.default: ${FLINK_PARALLELIZATION_DEGREE}
taskmanager.network.numberOfBuffers: ${FLINK_NETWORK_NUM_BUFFERS}
env.java.opts: ${FLINK_JAVA_OPTS}
taskmanager.tmp.dirs: ${FLINK_TASKMANAGER_TEMP_DIR}
fs.hdfs.hadoopconf: ${HADOOP_CONF_DIR}
EOF

# Find the Hadoop lib dir so and add its gcs-connector to the Flink lib dir
set +o nounset
if [[ -r "${HADOOP_INSTALL_DIR}/libexec/hadoop-config.sh" ]]; then
. "${HADOOP_INSTALL_DIR}/libexec/hadoop-config.sh"
fi
if [[ -n "${HADOOP_COMMON_LIB_JARS_DIR}" ]] && \
[[ -n "${HADOOP_PREFIX}" ]]; then
LIB_JARS_DIR="${HADOOP_PREFIX}/${HADOOP_COMMON_LIB_JARS_DIR}"
else
LIB_JARS_DIR="${HADOOP_INSTALL_DIR}/lib"
fi
set -o nounset
# Get jar name and path
GCS_JARNAME=$(grep -o '[^/]*\.jar' <<< ${GCS_CONNECTOR_JAR})
LOCAL_GCS_JAR="${LIB_JARS_DIR}/${GCS_JARNAME}"
# create link in Flink lib dir
ln -s "${LOCAL_GCS_JAR}" "${FLINK_INSTALL_DIR}/lib/"


# Assign ownership of everything to the 'hadoop' user.
chown -R hadoop:hadoop /home/hadoop/
# Make the Flink log directory writable
chmod 777 ${FLINK_INSTALL_DIR}/log
18 changes: 18 additions & 0 deletions extensions/flink/start_flink.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS-IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -o nounset
set -o errexit

if [[ ${FLINK_MODE} == 'standalone' ]]; then
sudo -u hadoop ${FLINK_INSTALL_DIR}/bin/start-cluster.sh
fi