1. 程式人生 > >CDH新增第三方服務的方法

CDH新增第三方服務的方法

  1. 前瞻導讀

CDH可以很方便的新增一些大資料相關服務,但這僅限於cloudera公司提供。若想將第三方服務(如公司自己開發的元件)新增到CDH叢集(託管在CDH上),需要按照一定的規則流程製作相關程式包,最後釋出到CDH上。

本文就是指導大家如何打包自己的服務,釋出到CDH上,並且由CDH控制服務的執行、監控服務的基本執行狀態。

  1. 製作相關介紹
    1. 名詞介紹

parcel:以“.parcel”結尾的gz格式的壓縮檔案。它必須為一個固定規則的檔名。

命名規則必須如下:

檔名稱格式為三段,第一段是包名,第二段是版本號,第三段是執行平臺。

例如:FLINK-1.6.0-hadoop_2.6-scala_2.11-el7.parcel

包名:FLINK

版本號:1.6.0-hadoop_2.6-scala_2.11

執行環境:el7

el6是代表centos6系統,centos7則用el7表示

parcel包內包含了你的服務元件,同時包含一個重要的描述性檔案parcel.json:

這個檔案記錄了你的服務資訊,如版本、所屬使用者、適用的CDH平臺版本等

parcel必須包置於/opt/cloudera/parcel-repo/目錄下才可以被CDH釋出程式時識別到。

csd:csd檔案是一個jar包,它記錄了你的服務在CDH上的管理規則

如你的服務在CDH頁面上顯示的圖示、依賴的服務、暴露的埠、啟動規則等。

csd的jar包必須置於/opt/cloudera/csd/目錄才可以在新增叢集服務時被識別到。

    1. 相關下載

https://github.com/cloudera/cm_csds

https://github.com/cloudera/cm_ext

  1. 製作CDH元件
    1. 整理預釋出元件

將你通過測試的服務整理到一個目錄內,目錄內的子目錄結構就是你的工程專案結構,不需要作任何變化。依賴的相關庫檔案可以由系統環境提供,也可以直接放置在該工程目錄下。

任何語言編寫的服務都可以託管到CDH。

    1. 製作flink元件包
      1. 下載flink包

https://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoop26-scala_2.11.tgz

      1. 製作parcel

parcel包的根目錄結構如下:

parcel包目錄結構由你的服務目錄(lib/flink)和一個meta目錄組成。

meta目錄組成檔案如下:

       

flink_env.sh檔案可以宣告你的服務執行時的bash環境下的一些變數環境,根據你的服務需要可以自行新增設定。

建立flink_env.sh檔案:

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/flink_env.sh

FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/flink_env.sh內容:

#!/bin/bash

FLINK_DIRNAME=${PARCEL_DIRNAME:-"FLINK-1.6.0-hadoop_2.6-scala_2.11"}

export FLINK_HOME=$PARCELS_ROOT/$FLINK_DIRNAME/lib/flink

 

parcel.json檔案需要填寫好相關的parcel包名、相容的CDH平臺版本資訊。

建立parcel.json檔案(parcel包描述):

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/parcel.json

FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/parcel.json內容:

        {

"schema_version": 1,

            "name": "FLINK",

            "version": "1.6.0-hadoop_2.6-scala_2.11",

            "depends": "CDH (>= 5.2), CDH (<< 6.0)",

            "setActiveSymlink": true,

            "replaces": "FLINK",

            "scripts": {

"defines": "flink_env.sh"

            },

            "packages": [{

                "name": "flink-master",

                "version": "1.6.0+flink1.6.0"

            },

            {

                "name": "flink-worker",

                "version": "1.6.0+flink1.6.0"

            }],

            "components": [{

                "name": "flink",

                "version": "1.6.0-flink1.6.0",

                "pkg_version": "1.6.0+flink1.6.0",

                "pkg_release": "hadoop_2.6-scala_2.11"

            }],

            "provides": ["flink"],

            "users": {

                "flink": {

                    "longname": "Flink",

                    "home": "/var/lib/flink",

                    "shell": "/bin/bash",

                    "extra_groups": []

                }

            },

            "groups": ["flink"]

}

注意:務必注意檔案內容的大小寫,否則可能造成parcel包無法釋出的情況。

       

建立flink-master.sh檔案:

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-master.sh

FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-master.sh內容:

#!/bin/bash

 

# Flink Master.

USAGE="Usage: flink-master.sh (start|stop)"

 

bin=`dirname "$0"`

bin=`cd "$bin"; pwd`

 

. "$bin"/config.sh

 

if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then

echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"

else

    flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})

    FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})

fi

 

if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then

echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."

    exit 1

fi

 

if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then

export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"

fi

 

# Add JobManager-specific JVM options

export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"

 

# Startup parameters

ARGS=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster" "--host" "${FLINK_MASTER_HOST}" "--webui-port" "${FLINK_WEB_UI_PORT}")

echo "FLINK_MASTER_HOST: $FLINK_MASTER_HOST"

echo "FLINK_WEB_UI_PORT: $FLINK_WEB_UI_PORT"

echo "FLINK_LOG_DIR: ${FLINK_LOG_DIR}"

echo "MASTER_ARGS: ${ARGS[@]}"

 

CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint

FLINK_TM_CLASSPATH=`constructFlinkClassPath`

FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-master"

 

log="${FLINK_LOG_PREFIX}.log"

out="${FLINK_LOG_PREFIX}.out"

 

log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")

 

JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

 

# Only set JVM 8 arguments if we have correctly extracted the version

if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then

    if [ "$JAVA_VERSION" -lt 18 ]; then

        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"

    fi

fi

 

MY_PID=$(ps -ef | grep "$CLASS_TO_RUN" | grep -v grep | awk '{print $2}')

if [ "${MY_PID}" = "" ];then

    # Rotate log files

rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"

    # Evaluate user options for local variable expansion

    FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})

CLASS_PATH=`manglePathList "$FLINK_TM_CLASSPATH:$(hadoop classpath)"`

CLASS_PATH=$(echo "${CLASS_PATH}" | sed "s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")

echo "Starting $DAEMON daemon (pid: $!) on host $HOSTNAME."

exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "${CLASS_PATH}" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1

else

echo "$DAEMON daemon (pid: $MY_PID) is running on host $HOSTNAME."

fi

flink-master.sh檔案用於啟動flink的master管理節點。

注意:flink-master.sh指令碼中的exec命令是必須的。

 

建立flink-worker.sh檔案:

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink- worker.sh

FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink- worker.sh內容:

#!/bin/bash

 

#Flink Worker.

USAGE="Usage: flink-worker.sh (start|stop)"

 

OPERATION=$1

 

bin=`dirname "$0"`

bin=`cd "$bin"; pwd`

 

. "$bin"/config.sh

 

# if memory allocation mode is lazy and no other JVM options are set,

# set the 'Concurrent Mark Sweep GC'

if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then

    export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"

fi

 

if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then

echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, pelase replace with key \`${KEY_TASKM_MEM_SIZE}\`"

else

    flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})

    FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})

fi

 

if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then

echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."

    exit 1

fi

 

if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then

 

    TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)

# Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used

    TM_MAX_OFFHEAP_SIZE="8388607T"

   

export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"

 

fi

 

# Add TaskManager-specific JVM options

export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"

 

# Startup parameters

ARGS=("--configDir" "${FLINK_CONF_DIR}")

echo "FLINK_LOG_DIR: ${FLINK_LOG_DIR}"

echo "MASTER_ARGS: ${ARGS[@]}"

 

CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner

FLINK_TM_CLASSPATH=`constructFlinkClassPath`

FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-worker"

 

log="${FLINK_LOG_PREFIX}.log"

out="${FLINK_LOG_PREFIX}.out"

 

log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")

 

JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

 

# Only set JVM 8 arguments if we have correctly extracted the version

if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then

    if [ "$JAVA_VERSION" -lt 18 ]; then

        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"

    fi

fi

 

MY_PID=$(ps -ef | grep "$CLASS_TO_RUN" | grep -v grep | awk '{print $2}')

if [ "${MY_PID}" = "" ];then

    # Rotate log files

rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"

    # Evaluate user options for local variable expansion

    FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})

CLASS_PATH=`manglePathList "$FLINK_TM_CLASSPATH:$(hadoop classpath)"`

CLASS_PATH=$(echo "${CLASS_PATH}" | sed "s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")

echo "Starting $DAEMON daemon (pid: $!) on host $HOSTNAME."

exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "${CLASS_PATH}" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1

else

echo "$DAEMON daemon (pid: $MY_PID) is running on host $HOSTNAME."

fi

flink-worker.sh檔案用於啟動flink的worker任務節點。

注意:flink-worker.sh指令碼中的exec命令是必須的。

 

建立flink-yarn.sh檔案:

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-yarn.sh

FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-yarn.sh內容:

#!/bin/bash

 

bin=`dirname "$0"`

bin=`cd "$bin"; pwd`

 

# get Flink config

. "$bin"/config.sh

 

JVM_ARGS="$JVM_ARGS -Xmx512m"

CLASS_TO_RUN=org.apache.flink.yarn.cli.FlinkYarnSessionCli

 

log=$FLINK_LOG_DIR/flink-yarn.log

out=$FLINK_LOG_DIR/flink-yarn.out

log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

 

# Rotate log files

rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"

CLASS_PATH=`manglePathList $(constructFlinkClassPath):$(hadoop classpath)`

CLASS_PATH=$(echo "${CLASS_PATH}" | sed "s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")

exec $JAVA_RUN $JVM_ARGS -classpath "$CLASS_PATH" $log_setting ${CLASS_TO_RUN} -j "$FLINK_LIB_DIR"/flink-dist*.jar "[email protected]" > "$out" 2>&1

flink-yarn.sh檔案用於在yarn中啟動flink。

注意:flink-yarn.sh指令碼中的exec命令是必須的。

 

建立permissions.json檔案:

vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/permissions.json

FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/permissions.json內容:

{

    "lib/flink/bin/config.sh": {

        "user": "flink",

        "group": "flink",

        "permissions": "0755"

    },

    "lib/flink/bin/flink-master.sh": {

        "user": "flink",

        "group": "flink",

        "permissions": "0755"

    },

    "lib/flink/bin/flink-worker.sh": {

        "user": "flink",

        "group": "flink",

        "permissions": "0755"

    },

    "lib/flink/bin/flink-yarn.sh": {

        "user": "flink",

        "group": "flink",

        "permissions": "0755"

    }

}

permissions.json檔案用於授予檔案或資料夾許可權。

 

進入FLINK-1.6.0-hadoop_2.6-scala_2.11所在目錄。

建立資料夾parcel-el6:

mkdir parcel-el6

 

將目錄FLINK-1.6.0-hadoop_2.6-scala_2.11打包,打包成標準名稱的parcel檔案。

執行以下命令:

tar -czvf parcel-el6/FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel FLINK-1.6.0-hadoop_2.6-scala_2.11

打包完Parcel檔案,需要生成FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha檔案,作為FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel檔案的hash校驗值。

該校驗值由make_manifest.py生成,執行完該python命令,生成manifest.json檔案,檔案中包含hash校驗值資訊。

make_manifest.py是一個python指令碼,在2.4使用工具cm_ext中。

執行以下命令:

python cm_ext-master/make_manifest/make_manifest.py parcel-el6

parcel-el6是FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel的父級目錄。

上面的命令會在parcel-el6資料夾下生成一個檔案manifest.json。

parcel-el6/manifest.json內容:

{

            "parcels": [{

                "hash": "b548e8b4be3db290933222e4bd517c903d36d453",

                "depends": "CDH (>= 5.2), CDH (<< 6.0)",

                "replaces": "FLINK",

"parcelName": "FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel",

                "components": [{

                    "pkg_version": "1.6.0+flink1.6.0",

                    "version": "1.6.0-flink1.6.0",

                    "name": "flink",

                    "pkg_release": "hadoop_2.6-scala_2.11"

              }]

            }],

            "lastUpdated": 1538048224076

}

建立FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha檔案:

echo $(cat parcel-el6/manifest.json | grep hash | awk -F"\"" '{print $4}') > parcel-el6/FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha

parcel-el6/FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha內容:

b548e8b4be3db290933222e4bd517c903d36d453

最終你會得到三個檔案:

FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel

FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha

manifest.json

將這三個檔案複製到/opt/cloudera/parcel-repo/目錄下即可。

如果/opt/cloudera/parcel-repo/目錄已存在manifest.json檔案,需將以下內容:

{

"parcelName": "FLINK-1.6.0-hadoop_2.6-scala_2.11-el7.parcel",

      "components": [{

          "pkg_version": "1.6.0+flink1.6.0",

          "version": "1.6.0-flink1.6.0",

          "name": "flink",

          "pkg_release": "hadoop_2.6-scala_2.11"

      }],

      "depends": "CDH (>= 5.2), CDH (<< 6.0)",

      "hash": "ce75a90cd57aecd7e31bef15dd1221c6182e38c6"

  }

新增到已有manifest.json檔案中。

      1. 製作standlaone csd jar

csd檔案的目錄結構如下:

       

descriptor 放服務的規則描述檔案service.sdl。

images 放服務的圖示檔案,png格式。不放圖示檔案,則CDH頁面不顯示圖示。

scripts 放你的服務的啟動指令碼,你的服務如何啟動在scripts目錄下的control.sh中自行定義即可。

建立資料夾:

mkdir -p FLINK-1.6.0/descriptor

mkdir -p FLINK-1.6.0/images

mkdir -p FLINK-1.6.0/scripts

建立service.sdl檔案:

vi FLINK-1.6.0/descriptor/service.sdl

FLINK-1.6.0/descriptor/service.sdl內容:

       {

            "name": "FLINK",

            "label": "Flink(Standalone)",

"description": "Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.",

            "version": "1.6.0",

            "compatibility": {

                "generation": 1,

                "cdhVersion": {

                    "min": "5",

                    "max": "5"

                }

            },

            "runAs": {

                "user": "flink",

                "group": "flink"

            },

            "icon": "images/flink.png",

            "serviceDependencies": [{

                "name": "ZOOKEEPER",

                "required": "true"

            },

            {

                "name": "HDFS",

                "required": "true"

            }],

            "parameters": [{

                "name": "high-availability.storageDir",

                "label": "high-availability storageDir",

"description": "HDFS path (URI) where Flink persists metadata in high-availability setups.",

                "type": "string",

                "default": "hdfs:///user/flink/ha",

                "configurableInWizard": true

            },

            {

                "name": "high-availability.zookeeper.path.root",

                "label": "high-availability zookeeper path root",

"description": "The root path under which Flink stores its entries in ZooKeeper.",

                "type": "string",

                "default": "/flink",

                "configurableInWizard": true

            },

            {

                "name": "high-availability.cluster-id",

                "label": "high-availability cluster-id",

"description": "The ID of the Flink cluster, used to separate multiple Flink clusters from each other.",

                "type": "string",

                "default": "cluster_standalone",

                "configurableInWizard": true

            },

            {

                "name": "state.checkpoints.dir",

                "label": "state checkpoints dir",

                "description": "HDFS path (URI) for checkpoints.",

                "type": "string",

"default": "hdfs:///user/flink/cluster_standalone/checkpoints",

                "configurableInWizard": true

            },

            {

                "name": "state.savepoints.dir",

                "label": "state savepoints dir",

                "description": "HDFS path (URI) for savepoints.",

                "type": "string",

"default": "hdfs:///user/flink/cluster_standalone/savepoints",

                "configurableInWizard": true

            },

            {

                "name": "parallelism.default",

                "label": "parallelism default",

"description": "The parallelism used for programs that did not specify and other parallelism.",

                "type": "long",

                "default": 1,

                "configurableInWizard": true

            }],

            "hdfsDirs": [{

                "name": "CreateFlinkUserDirCommand",

                "label": "Create Flink User Dir",

"description": "Creates the Flink user directory in HDFS.",

                "directoryDescription": "Flink HDFS user directory",

                "path": "/user/${user}",

                "permissions": "0751"

            }],

            "serviceInit": {

                "preStartSteps": [{

                "commandName": "CreateFlinkUserDirCommand"

            }]},

            "roles": [{

                "name": "FLINK_MASTER",

                "label": "Flink Master",

                "pluralLabel": "Flink Masters",

                "startRunner": {

                    "program": "scripts/control.sh",

                    "args": ["master"],

                    "environmentVariables": {

                        "FLINK_MASTER_HOST": "${host}",

                        "FLINK_WEB_UI_PORT": "${rest.port}",

                        "FLINK_RUN_AS_USER": "${user}"

                    }

                },

                "externalLink": {

                    "name": "web_dashboard",

                    "label": "Web Dashboard",

                    "url": "http://${host}:${rest.port}"

                },

                "parameters": [

                {

                    "name": "jobmanager.heap.size",

                    "label": "jobmanager heap size",

"description": "The heap size for the JobManager JVM.",

                    "type": "string",

                    "default": "1024m",

                    "configurableInWizard": true

                },

                {

                    "name": "rest.port",

                    "label": "rest port",

"description": "The port under which the web-based runtime monitor listens.",

                    "type": "long",

                    "default": 8081,

                    "configurableInWizard": true

                }],

                "topology": {

                    "minInstances": 1

                },

                "logging": {

                    "filename": "flink-master.log",

                    "isModifiable": true,

                    "configName": "env.log.dir",

                    "loggingType": "log4j",

                    "dir": "/var/log/flink"

                },

                "configWriter": {

                    "generators": [{

                        "filename": "flink-conf.properties",

                        "configFormat": "properties",

"includeParams": [

"high-availability.storageDir",

                            "high-availability.zookeeper.path.root",

                            "high-availability.cluster-id",

                            "state.savepoints.dir",

                            "state.checkpoints.dir",

                            "jobmanager.heap.size",

                            "parallelism.default",

                            "rest.port"

]

                    }]

                }

            },

            {

                "name": "FLINK_WORKER",

                "label": "Flink Worker",

                "pluralLabel": "Flink Workers",

                "startRunner": {

                    "program": "scripts/control.sh",

                    "args": ["worker"],

                    "environmentVariables": {

                        "FLINK_RUN_AS_USER": "${user}"

                    }

                },

                "parameters": [{

                    "name": "taskmanager.heap.size",

                    "label": "taskmanager heap size",

"description": "The heap size for the TaskManager JVM.",

                    "type": "string",

                    "default": "1024m",

                    "configurableInWizard": true

                },

                {

                    "name": "taskmanager.numberOfTaskSlots",

                    "label": "taskmanager numberOfTaskSlots",

"description": "The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.",

                    "type": "long",

                    "default": 1,

                    "configurableInWizard": true

                }],

                "topology": {

                    "minInstances": 1

                },

                "logging": {

                    "filename": "flink-worker.log",

                    "isModifiable": true,

                    "configName": "env.log.dir",

                    "loggingType": "log4j",

                    "dir": "/var/log/flink"

                },

                "configWriter": {

                    "generators": [{

                        "filename": "flink-conf.properties",

                        "configFormat": "properties",

                        "includeParams": [

"high-availability.storageDir",

                            "high-availability.zookeeper.path.root",

                            "high-availability.cluster-id",

                            "state.savepoints.dir",

                            "state.checkpoints.dir",

                            "taskmanager.heap.size",

                            "taskmanager.numberOfTaskSlots",

                            "parallelism.default"

]

                    }]

                }

            }]

}

執行使用者配置:

圖示配置:

依賴元件配置:

通用引數配置:

hdfs目錄建立配置:

Flink Master節點啟動配置:

WEB UI連結配置:

Flink Master節點引數配置:

Flink Master節點topology配置:

Flink Master節點日誌配置:

Flink Master節點配置檔案生成配置:

 

建立control.sh檔案:

vi FLINK-1.6.0/scripts/control.sh

FLINK-1.6.0/scripts/control.sh內容:

#!/bin/bash

 

# For better debugging

USAGE="Usage: control.sh ((master|worker) (start|stop))"

 

NODE_TYPE=$1

NODE_HOST=`hostname -f`

 

#Determine if the directory exists

TEMP_PATH=$CMF_VAR/../cloudera/parcels

if [ ! -d "$TEMP_PATH" ];then

    TEMP_PATH=$CMF_VAR/../../cloudera/parcels

fi

PARCELS_DIR=`cd $TEMP_PATH; pwd`

FLINK_HOME=$PARCELS_DIR/FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink

#Determine if the configuration file directory exists

FLINK_CONF_DIR=$CONF_DIR/flink-conf

if [ ! -d "$FLINK_CONF_DIR" ];then

    mkdir $FLINK_CONF_DIR

else

    rm -rf $FLINK_CONF_DIR/*

fi

cp $FLINK_HOME/conf/* $FLINK_CONF_DIR/

sed -i 's#=#: #g' $CONF_DIR/flink-conf.properties

if [ "$NODE_TYPE" = "master" ]; then

RPC_ADDRESS=`cat $CONF_DIR/flink-conf.properties | grep "jobmanager.rpc.address:"`

    #Determine if the variable RPC_ADDRESS is empty

    if [ "$RPC_ADDRESS" = "" ]; then

echo "jobmanager.rpc.address: $FLINK_MASTER_HOST" >> $CONF_DIR/flink-conf.properties