Skip to content

Commit

Permalink
Download python files from remote location (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
mskim-raf authored Jan 13, 2022
1 parent e5a0cd7 commit f4024dc
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 32 deletions.
4 changes: 2 additions & 2 deletions api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,10 @@ type JobSpec struct {
// _(Optional)_ Fully qualified Java class name of the job.
ClassName *string `json:"className,omitempty"`

// _(Optional)_ Python file of the job. It should be a local file.
// _(Optional)_ Python file of the job. It could be a local file or remote URI (e.g.,`https://`, `gs://`).
PyFile *string `json:"pyFile,omitempty"`

// _(Optional)_ Python files of the job. It should be a local file (with .py/.egg/.zip/.whl) or directory.
// _(Optional)_ Python files of the job. It could be a local file (with .py/.egg/.zip/.whl), directory or remote URI (e.g.,`https://`, `gs://`).
// See the Flink argument `--pyFiles` for the detail.
PyFiles *string `json:"pyFiles,omitempty"`

Expand Down
36 changes: 21 additions & 15 deletions controllers/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
submitJobScriptPath = "/opt/flink-operator/submit-job.sh"
gcpServiceAccountVolume = "gcp-service-account-volume"
hadoopConfigVolume = "hadoop-config-volume"
flinkJobPath = "/opt/flink/job"
)

var (
Expand Down Expand Up @@ -658,28 +659,16 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {

var envVars []corev1.EnvVar

// If the JAR file is remote, put the URI in the env variable
// FLINK_JOB_JAR_URI and rewrite the JAR path to a local path. The entrypoint
// script of the container will download it before submitting it to Flink.
if jobSpec.JarFile != nil {
var jarPath = *jobSpec.JarFile
if strings.Contains(*jobSpec.JarFile, "://") {
var parts = strings.Split(*jobSpec.JarFile, "/")
jarPath = path.Join("/opt/flink/job", parts[len(parts)-1])
envVars = append(envVars, corev1.EnvVar{
Name: "FLINK_JOB_JAR_URI",
Value: *jobSpec.JarFile,
})
}
jobArgs = append(jobArgs, jarPath)
jobArgs = append(jobArgs, getLocalPath(&envVars, "FLINK_JOB_JAR_URI", *jobSpec.JarFile))
}

if jobSpec.PyFile != nil {
jobArgs = append(jobArgs, "--python", *jobSpec.PyFile)
jobArgs = append(jobArgs, "--python", getLocalPath(&envVars, "FLINK_JOB_JAR_URI", *jobSpec.PyFile))
}

if jobSpec.PyFiles != nil {
jobArgs = append(jobArgs, "--pyFiles", *jobSpec.PyFiles)
jobArgs = append(jobArgs, "--pyFiles", getLocalPath(&envVars, "FLINK_JOB_PYTHON_FILES_URI", *jobSpec.PyFiles))
}

if jobSpec.PyModule != nil {
Expand Down Expand Up @@ -1185,6 +1174,23 @@ func mergeLabels(labels1 map[string]string, labels2 map[string]string) map[strin
return mergedLabels
}

// getLocalPath puts the URI in the env variable and rewrite the path
// to a local path if the file is remote and returns the local path.
// The entrypoint script of the container will download it before submitting it to Flink.
func getLocalPath(envVars *[]corev1.EnvVar, envName string, filePath string) string {
var localPath = filePath
if strings.Contains(filePath, "://") {
var parts = strings.Split(filePath, "/")
localPath = path.Join(flinkJobPath, parts[len(parts)-1])
*envVars = append(*envVars, corev1.EnvVar{
Name: envName,
Value: filePath,
})
}

return localPath
}

const (
DefaultLog4jConfig = `log4j.rootLogger=INFO, console
log4j.logger.akka=INFO
Expand Down
4 changes: 2 additions & 2 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ _Appears in:_
| --- | --- |
| `jarFile` _string_ | _(Optional)_ JAR file of the job. It could be a local file or remote URI, depending on which protocols (e.g., `https://, gs://`) are supported by the Flink image. |
| `className` _string_ | _(Optional)_ Fully qualified Java class name of the job. |
| `pyFile` _string_ | _(Optional)_ Python file of the job. It should be a local file. |
| `pyFiles` _string_ | _(Optional)_ Python files of the job. It should be a local file (with .py/.egg/.zip/.whl) or directory. See the Flink argument `--pyFiles` for the detail. |
| `pyFile` _string_ | _(Optional)_ Python file of the job. It could be a local file or remote URI (e.g.,`https://`, `gs://`). |
| `pyFiles` _string_ | _(Optional)_ Python files of the job. It could be a local file (with .py/.egg/.zip/.whl), directory or remote URI (e.g.,`https://`, `gs://`). See the Flink argument `--pyFiles` for the detail. |
| `pyModule` _string_ | _(Optional)_ Python module path of the job entry point. Must use with pythonFiles. |
| `args` _string array_ | _(Optional)_ Command-line args of the job. |
| `fromSavepoint` _string_ | _(Optional)_ FromSavepoint where to restore the job from Savepoint where to restore the job from (e.g., gs://my-savepoint/1234). If flink job must be restored from the latest available savepoint when Flink job updating, this field must be unspecified. |
Expand Down
14 changes: 13 additions & 1 deletion images/flink/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,24 @@ all: build

# Build the docker image
build:
cd docker && docker build . \
cd docker
ifeq ($(PYTHON_VERSION),)
docker build . \
-t ${IMAGE_TAG} \
--build-arg FLINK_VERSION=${FLINK_VERSION} \
--build-arg SCALA_VERSION=${SCALA_VERSION} \
--build-arg FLINK_HADOOP_VERSION=${FLINK_HADOOP_VERSION} \
--build-arg GCS_CONNECTOR_VERSION=${GCS_CONNECTOR_VERSION}
else
docker build . \
-f Dockerfile.python \
-t ${IMAGE_TAG} \
--build-arg FLINK_VERSION=${FLINK_VERSION} \
--build-arg SCALA_VERSION=${SCALA_VERSION} \
--build-arg FLINK_HADOOP_VERSION=${FLINK_HADOOP_VERSION} \
--build-arg GCS_CONNECTOR_VERSION=${GCS_CONNECTOR_VERSION} \
--build-arg PYTHON_VERSION=${PYTHON_VERSION}
endif

# Push the docker image
push:
Expand Down
4 changes: 4 additions & 0 deletions images/flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ command to build and push the image:
make build push IMAGE_TAG=gcr.io/[MY_PROJECT]/flink:[FLINK_VERSION]-scala_[SCALA_VERSION]-gcs
```

### Create custom Docker image with GCS connector for python

To execute Flink application via python code, specify the `PYTHON_VERSION` in [properties](./properties) file before run the above command.

## Before creating a Flink cluster

Before you create a Flink cluster with the custom image, you need to prepare several things:
Expand Down
52 changes: 52 additions & 0 deletions images/flink/docker/Dockerfile.python
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
ARG FLINK_VERSION
ARG SCALA_VERSION
FROM flink:${FLINK_VERSION}-scala_${SCALA_VERSION}
ARG FLINK_HADOOP_VERSION
ARG GCS_CONNECTOR_VERSION
ARG FLINK_VERSION
ARG PYTHON_VERSION

RUN test -n "$FLINK_VERSION"
RUN test -n "$FLINK_HADOOP_VERSION"
RUN test -n "$GCS_CONNECTOR_VERSION"

ARG GCS_CONNECTOR_NAME=gcs-connector-${GCS_CONNECTOR_VERSION}.jar
ARG GCS_CONNECTOR_URI=https://storage.googleapis.com/hadoop-lib/gcs/${GCS_CONNECTOR_NAME}
ARG FLINK_HADOOP_JAR_NAME=flink-shaded-hadoop-2-uber-${FLINK_HADOOP_VERSION}.jar
ARG FLINK_HADOOP_JAR_URI=https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/${FLINK_HADOOP_VERSION}/${FLINK_HADOOP_JAR_NAME}

# Install Google Cloud SDK.
RUN apt-get -qq update && \
apt-get -qqy install apt-transport-https wget && \
echo "deb https://packages.cloud.google.com/apt cloud-sdk-stretch main" > /etc/apt/sources.list.d/google-cloud-sdk.list && \
wget -nv https://packages.cloud.google.com/apt/doc/apt-key.gpg -O /etc/apt/trusted.gpg.d/google-cloud-key.gpg && \
apt-get -qq update && \
apt-get -qqy install google-cloud-sdk

# Download and configure GCS connector.
# When running on GKE, there is no need to enable and include service account
# key file, GCS connector can get credential from VM metadata server.
RUN echo "Downloading ${GCS_CONNECTOR_URI}" && \
wget -q -O /opt/flink/lib/${GCS_CONNECTOR_NAME} ${GCS_CONNECTOR_URI}
RUN echo "Downloading ${FLINK_HADOOP_JAR_URI}" && \
wget -q -O /opt/flink/lib/${FLINK_HADOOP_JAR_NAME} ${FLINK_HADOOP_JAR_URI}

# Install Python and pyflink .
RUN apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tgz && \
tar -xvf Python-${PYTHON_VERSION}.tgz && \
cd Python-${PYTHON_VERSION} && \
./configure --without-tests --enable-shared && \
make -j6 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-${PYTHON_VERSION}.tgz && rm -rf Python-${PYTHON_VERSION} && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
pip3 install apache-flink==${FLINK_VERSION}

# Entry point.
COPY entrypoint.sh /
RUN chmod 775 /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
37 changes: 26 additions & 11 deletions images/flink/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ echo "Flink entrypoint..."

FLINK_CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"

download_job_file()
{
URI=$1
JOB_PATH=$2
mkdir -p ${JOB_PATH}
echo "Downloading job file ${URI} to ${JOB_PATH}"
if [[ "${URI}" == gs://* ]]; then
gsutil cp "${URI}" "${JOB_PATH}"
elif [[ "${URI}" == http://* || "${URI}" == https://* ]]; then
wget -nv -P "${JOB_PATH}" "${URI}"
else
echo "Unsupported protocol for ${URI}"
exit 1
fi
}

# Add user-provided properties to Flink config.
# FLINK_PROPERTIES is a multi-line string of "<key>: <value>".
if [[ -n "${FLINK_PROPERTIES}" ]]; then
Expand All @@ -31,18 +47,17 @@ if [[ -n "${FLINK_PROPERTIES}" ]]; then
echo "${FLINK_PROPERTIES}" >>${FLINK_CONF_FILE}
fi

# Download remote job JAR file.
# Download remote job file.
if [[ -n "${FLINK_JOB_JAR_URI}" ]]; then
mkdir -p ${FLINK_HOME}/job
echo "Downloading job JAR ${FLINK_JOB_JAR_URI} to ${FLINK_HOME}/job/"
if [[ "${FLINK_JOB_JAR_URI}" == gs://* ]]; then
gsutil cp "${FLINK_JOB_JAR_URI}" "${FLINK_HOME}/job/"
elif [[ "${FLINK_JOB_JAR_URI}" == http://* || "${FLINK_JOB_JAR_URI}" == https://* ]]; then
wget -nv -P "${FLINK_HOME}/job/" "${FLINK_JOB_JAR_URI}"
else
echo "Unsupported protocol for ${FLINK_JOB_JAR_URI}"
exit 1
fi
download_job_file ${FLINK_JOB_JAR_URI} "${FLINK_HOME}/job/"
fi

if [[ -n "${FLINK_JOB_PYTHON_URI}" ]]; then
download_job_file ${FLINK_JOB_PYTHON_URI} "${FLINK_HOME}/job/"
fi

if [[ -n "${FLINK_JOB_PYTHON_FILES_URI}" ]]; then
download_job_file ${FLINK_JOB_PYTHON_FILES_URI} "${FLINK_HOME}/job/"
fi

# Handover to Flink base image's entrypoint.
Expand Down
3 changes: 2 additions & 1 deletion images/flink/properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Properties included by Makefile and then passed to Dockerfile.

FLINK_VERSION ?= 1.8.2
FLINK_VERSION ?= 1.14.2
SCALA_VERSION ?= 2.11
FLINK_HADOOP_VERSION ?= 2.8.3-7.0
GCS_CONNECTOR_VERSION ?= hadoop2-2.0.0
PYTHON_VERSION ?=

0 comments on commit f4024dc

Please sign in to comment.