1. 程式人生 > >支援kubernetes原生Spark 與其他應用的結合(mysql,postgresql,oracle,hdfs,hbase)

支援kubernetes原生Spark 與其他應用的結合(mysql,postgresql,oracle,hdfs,hbase)

安裝執行支援kubernetes原生排程的Spark程式:https://blog.csdn.net/luanpeng825485697/article/details/83651742

dockerfile的目錄

.
├── driver
│   └── Dockerfile
├── driver-py
│   └── Dockerfile
├── executor
│   └── Dockerfile
├── executor-py
│   └── Dockerfile
├── init-container
│   └── Dockerfile
├── resource-staging-server
│   └── Dockerfile
├── shuffle-service
│   └── Dockerfile
└── spark-base
    ├── Dockerfile
    └── entrypoint.sh

互動式Python Shell

在spark資料夾下面

./bin/pyspark

並執行以下命令,該命令也應返回1000:

sc.parallelize(range(1000)).count()

互動式Scala Shell

開始使用Spark的最簡單方法是通過Scala shell:

./bin/spark-shell

嘗試以下命令,該命令應返回1000:

scala> sc.parallelize(1 to 1000).count()

映象重新封裝

下載spark-2.2.0-k8s-0.5.0-bin-2.7.3客戶端 映象的封裝都是在這個目錄下進行的.

瞭解結構: 首先你需要封裝spark-base映象,這個映象負責將spark需要的jar包和相關的庫,執行檔案等封裝成映象, 例如我們要用spark連結mysql,hbase就需要jar包. 就需要重新封裝spark-base映象. 而driver-py映象和executor-py映象也就是python版本的排程器和執行器是在spark-base的基礎上封裝執行python檔案需要的內容,比如pip, numpy等等. 所以如何修改了spark-base,就要重新封裝driver-py映象和executor-py映象.

封裝spark-base映象

先封裝spark-base映象.在spark-2.2.0-k8s-0.5.0-bin-2.7.3目錄下執行

docker build -t spark-base -f dockerfiles/spark-base/Dockerfile .

spark-base映象系統為ubuntu16.04 jdk版本為1.8

封裝python3.6,安裝需要的pip包

封裝python3.7和相關python包.

修改spark-2.2.0-k8s-0.5.0-bin-2.7.3/dockerfiles/driver-py/Dockerfile檔案

#  執行命令 docker build -t spark-driver-py:latest -f dockerfiles/driver-py/Dockerfile .
FROM spark-base

ADD examples /opt/spark/examples
ADD python /opt/spark/python

RUN apk add make automake gcc g++ subversion python3 python3-dev
RUN  ln -s /usr/bin/python3 /usr/bin/python
RUN  ln -s /usr/bin/pip3 /usr/bin/pip
RUN  pip install --upgrade pip
RUN  pip install --upgrade setuptools numpy pandas Matplotlib sklearn opencv-python
RUN  rm -r /root/.cache


# UNCOMMENT THE FOLLOWING TO START PIP INSTALLING PYTHON PACKAGES
# RUN apk add --update alpine-sdk python-dev


ENV PYTHON_VERSION 3.6.6
ENV PYSPARK_PYTHON python
ENV PYSPARK_DRIVER_PYTHON python
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}

CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
    env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
    readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
    if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
    if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
    if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
    if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
    if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
    ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS

修改spark-2.2.0-k8s-0.5.0-bin-2.7.3/dockerfiles/executor-py/Dockerfile檔案

# docker build -t spark-executor-py:latest -f dockerfiles/executor-py/Dockerfile .

FROM spark-base

ADD examples /opt/spark/examples
ADD python /opt/spark/python

RUN apk add make automake gcc g++ subversion python3 python3-dev
RUN  ln -s /usr/bin/python3 /usr/bin/python
RUN  ln -s /usr/bin/pip3 /usr/bin/pip
RUN  pip install --upgrade pip
RUN  pip install --upgrade setuptools numpy pandas Matplotlib sklearn opencv-python
RUN  rm -r /root/.cache


ENV PYTHON_VERSION 3.6.6
ENV PYSPARK_PYTHON python
ENV PYSPARK_DRIVER_PYTHON python
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}

CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
    env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
    readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
    if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
    if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
    if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
    if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
    ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

封裝映象並push到倉庫

提交python程式碼

在spark客戶端的資料夾下執行

bin/spark-submit \
  --deploy-mode cluster \
  --master k8s://https://192.168.1.111:6443 \
  --kubernetes-namespace spark-cluster \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.executor.instances=5 \
  --conf spark.app.name=spark-pi \
  --conf spark.kubernetes.driver.docker.image=kubespark/spark-driver-py:v2.2.0-kubernetes-0.5.0 \
  --conf spark.kubernetes.executor.docker.image=kubespark/spark-executor-py:v2.2.0-kubernetes-0.5.0 \
  --conf spark.kubernetes.initcontainer.docker.image=kubespark/spark-init:v2.2.0-kubernetes-0.5.0 \
  --conf spark.kubernetes.resourceStagingServer.uri=http://192.168.11.127:31000 \
  --jars local:///opt/spark/jars/RoaringBitmap-0.5.11.jar \
./demo_xxx.py

讀取mysql資料

1、將mysql-connector-java-5.1.47-bin.jar檔案放在spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面

下載地址:https://dev.mysql.com/downloads/connector/j/5.1.html

重新封裝spark-base映象,進而重新封裝所有映象

docker build -t spark-base -f dockerfiles/spark-base/Dockerfile .

python 讀取mysql資料的demo

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

sc = SparkSession.builder.appName("mysqltest")\
    .config('spark.some.config,option0','some-value')\
    .getOrCreate()
sqlContext = SQLContext(sc)
jdbcDf=sqlContext.read.format("jdbc").options(url="jdbc:mysql://139.9.0.111:3306/note",
                                       driver="com.mysql.jdbc.Driver",
                                       dbtable="article",user="root",
                                       password="xxxxx").load()

print(jdbcDf.select('label').show())   # 讀取label列,預設只展示20行

讀寫postgresql

將postgresql-42.2.5.jar檔案放在spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面

下載地址:https://jdbc.postgresql.org/download.html

重新封裝spark-base映象,進而重新封裝所有映象

docker build -t spark-base -f dockerfiles/spark-base/Dockerfile .

python 讀取postgresql的demo



from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

sc = SparkSession.builder.appName("postgre_test")\
    .config('spark.some.config,option0','some-value')\
    .getOrCreate()
sqlContext = SQLContext(sc)

jdbcDf=sqlContext.read.format("jdbc").options(url="jdbc:postgresql://192.168.1.111:31234/postgres",
                                              driver="org.postgresql.Driver",
                                              dbtable="account",user="postgres",
                                              password="xxxx").load()

print(jdbcDf.select('name').show())   # 獲取name列,預設只展示20行

讀寫oracle

將ojdbc8.jar檔案放在spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面

下載地址:https://www.oracle.com/technetwork/database/application-development/jdbc/downloads/index.html

重新封裝spark-base映象,進而重新封裝所有映象

docker build -t spark-base -f dockerfiles/spark-base/Dockerfile .

讀取hdfs資料

首先要保證客戶端能連線hdfs的namenode和datanode, 因為 客戶端先詢問namenode資料在哪裡,再連線datanode查詢資料.

我們需要進入hdfs datanode的pod,
hadoop基本命令參考:https://blog.csdn.net/luanpeng825485697/article/details/83830569

建立一個檔案包含隨機的字元
echo "a e d s w q s d c x a w s z x d ew d">aa.txt
將檔案放入hdfs檔案系統
hadoop fs -mkdir /aa
hadoop fs -put aa.txt  /aa
檢視檔案在hdfs中是否存在
hadoop fs -ls /aa
檢視檔案內容
hadoop fs -cat /aa/aa.txt
刪除目錄
hadoop fs -rm -r /aa

在python中呼叫hdfs中的資料的demo

from pyspark import SparkConf,SparkContext
from operator import add

conf = SparkConf().setAppName("hdfs_test")
sc = SparkContext(conf=conf)

file = sc.textFile("hdfs://192.168.11.127:32072/aa/aa.txt")
rdd = file.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
count=rdd.reduceByKey(add)
result = count.collect()
print(result)

讀取hbase資料

將hbase/lib目錄下的hadoop開頭jar包、hbase開頭jar包新增至spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面

此外還有hbase/lib目錄下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少會提示hbase RpcRetryingCaller: Call exception不斷嘗試重連hbase,不報錯)、htrace-core-3.1.0-incubating.jar、protobuf-java-2.5.0.jar、 guava-12.0.1.jar新增至spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面

需要注意:在Spark 2.0以上版本缺少相關把hbase的資料轉換python可讀取的jar包,需要我們另行下載。
下載地址
https://mvnrepository.com/artifact/org.apache.spark/spark-examples?repo=typesafe-maven-releases
下載後同樣也要放在spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面.

我現在最新的版本是spark-examples_2.11-1.6.0-typesafe-001.jar

重新封裝spark-base,重新封裝所有映象

docker build -t spark-base -f dockerfiles/spark-base/Dockerfile .

python讀取hbase資料的demo

print('=====================================')
import os
status = os.popen('echo "10.233.64.13   hbase-master-deployment-1-ddb859944-ctbrm">> /etc/hosts')    # 因為hbase之間是通過hostname解析的,所以要先修改hosts檔案
print(status.read())
print('=====================================')


from pyspark.sql import SparkSession
from pyspark.sql import SQLContext


#
spark = SparkSession.builder.appName("hbase_test").getOrCreate()
sc = spark.sparkContext
#
zookeeper = '10.233.9.11,10.233.9.12,10.233.9.13'
table = 'product'
#
# # 讀取
conf = {
    "hbase.zookeeper.quorum": zookeeper,
    "hbase.zookeeper.property.clientPort":"2181",
    "hbase.regionserver.port":"60010",
    "hbase.master":"10.233.9.21:60000",
    "zookeeper.znode.parent":"/hbase",
    "hbase.mapreduce.inputtable": table
}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
    print((k, v))