1. 程式人生 > >k8s叢集中 spark訪問hbase中資料

k8s叢集中 spark訪問hbase中資料

首先我們需要對hbase的訪問原理非常清除.可以參考:https://blog.csdn.net/luanpeng825485697/article/details/80319552

我們這裡已經在k8s中部署了hdfs和zookeeper以及hbase.部署可以參考: https://blog.csdn.net/luanpeng825485697/article/details/81985602

hbase資料分割槽是按照region進行的,分割槽的location就是各個region的location。那麼後續分配executor時可以按照region所在機器分配對應executor,直接在本機讀取資料計算。

我們先來往hbase裡面寫兩個資料

hbase shell
whoami
list      # 如果list出錯,說正在初始化中,要等待,可以在dashboard中看
exists 't1'
create 't1',{NAME => 'f1', VERSIONS => 2},{NAME => 'f2', VERSIONS => 2}
put 't1','rowkey001','f1:col1','value01'
put 't1','rowkey002','f1:col1','value01'
put 't1','rowkey003','f1:col1','value01'
put 't1','rowkey004','f1:col1','value01'

我們使用python來實現連線

python程式碼如下

# 這些函式只在driver驅動中執行.  只有DRR的mapreduce才會在exec程式中執行
# 需要設定hbase的hostname和subdomain   並在dns增加一條重定向

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext


spark = SparkSession.builder.appName("hbase_test").getOrCreate()
sc = spark.sparkContext


zookeeper = 'zookeeper-1.cloudai-2.svc.cluster.local,zookeeper-2.cloudai-2.svc.cluster.local,zookeeper-3.cloudai-2.svc.cluster.local'
table = 't1'


# # 讀取
conf = {
    "hbase.zookeeper.quorum": zookeeper,
    "hbase.zookeeper.property.clientPort":"2181",
    "hbase.regionserver.port":"60010",
    # "hbase.master":"10.233.9.21:60000",
    # "hbase.master.port":"60000",
    "zookeeper.znode.parent":"/hbase",
    "hbase.defaults.for.version.skip":"true",
    "hbase.mapreduce.inputtable": table
}

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
    print((k, v))

在程式碼中我們配置了zookeeper的地址, spark會先訪問spark來獲取hbase的所有資訊,包含master,regionserver的資訊.

為了保證正確,我們可以先自己查詢一遍看看是否正確.

zookeeper的儲存原理可以參考:http://www.cnblogs.com/leesf456/p/6179118.html

進入zookeeper的pod,

./bin/zkCli.sh          # 計入zookeeper命令列

ls /                   # 檢視zookeeper根目錄儲存:
包含zookeeper   hbase兩個資料夾(資料節點)

ls2 /zookeeper   檢視zookeeper節點目錄

ls2 /hbase   檢視hbase節點資訊:
[meta-region-server, backup-masters, table, draining, region-in-transition, table-lock, running, master, namespace, hbaseid, online-snapshot, replication, splitWAL, recovering-regions, rs, flush-table-proc]

檢視hbase叢集在zookeeper記錄的資訊,比如:regionserver1-slave-1,regionserver2-slave-2
ls2 /hbase/rs
[hbase-master-deployment-1-ddb859944-ctbrm,16201,1541399059310]
這裡可以看出, 記錄在zookeeper中的資料是pod的主機名hostname  而不是ip

表鎖節點會有所有表。
[zk: localhost:2181(CONNECTED) 10] ls2 /hbase/table-lock
[TestTable, product, device, t1, rec_history, hbase:namespace, face_detect]
cZxid = 0x100000009
ctime = Wed Sep 19 07:25:49 UTC 2018
mZxid = 0x100000009
mtime = Wed Sep 19 07:25:49 UTC 2018
pZxid = 0x300000749
cversion = 21
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 7


檢視所有表
[zk: localhost:2181(CONNECTED) 11] ls2 /hbase/table
[TestTable, product, t1, rec_history, device, hbase:meta, hbase:namespace, face_detect]
cZxid = 0x100000006
ctime = Wed Sep 19 07:25:49 UTC 2018
mZxid = 0x100000006
mtime = Wed Sep 19 07:25:49 UTC 2018
pZxid = 0x30000074b
cversion = 22
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 8


檢視hbase的meta表資訊,包含server資訊。
[zk: localhost:2181(CONNECTED) 14] get /hbase/table/hbase:meta

?master:60000o,?OG?e?PBUF
cZxid = 0x100000029
ctime = Wed Sep 19 07:25:57 UTC 2018
mZxid = 0x10000002a
mtime = Wed Sep 19 07:25:57 UTC 2018
pZxid = 0x100000029
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 31
numChildren = 0

由於hbase的通訊是通過hostname連線的. 所以我們在python程式碼中設定了將hostname寫入到hosts檔案,但是這個程式碼只在driver驅動中執行,只有DRR資料才會分解到exec驅動中執行. 所以只有driver的pod中成功修改了hosts 而exec仍然無法解析hostname

有幾種方案:

1 將hbase的hostname和pod的ip想辦法能配置到映象中 因為pod的ip可能是變化的,所以必須以配置的方式新增進去.

2 新增代理,或者自定義DNS 將hbase的hostname代理到pod的ip

3 重定向 將hostname 重定向到pod的訪問地址

{pod-name}.{subdomain}.{namespace}.svc.cluster.local

4 建立service-headless服務的名稱和hbase的hostname設定成一樣,並且讓spark和hbase在同一個名稱空間下 ( 這種最簡單 )

我們先按照第4種方案在hbase所在的名稱空間建立和hbase的pod的hostname相同的service-headless,並且讓spark也在這個名稱空間下執行

apiVersion: v1
kind: Service
metadata:
  name: hbase-master-1
  namespace: cloudai-2
spec:
  selector:
    app: hbase-master-1
  clusterIP: None
  ports:
    - name: rpc
      port: 60000
      targetPort: 60000        # 配置的hbase的連結埠   
    - name: info
      port: 60001            # hbase執行狀態的網頁查詢埠
      targetPort: 60001
    - name: region
      port: 60010            
      targetPort: 60010
    - name: regioninfo
      port: 60011           
      targetPort: 60011
    - name: thrift
      port: 9090           # thrift伺服器ip,讓外界通過thrift來訪問hbase
      targetPort: 9090

如果我們想讓spark在一個獨立的名稱空間執行,就要另尋它法了.

建立hbase的deployment時指定pod的hostname和subdomain

hostname: hbase-master-1  # 設定主機名
subdomain: hbase-headless    # 設定子域名

為subdomain建一個service-headless

apiVersion: v1
kind: Service
metadata:
  name: hbase-headless
  namespace: cloudai-2
spec:
  selector:
    app: hbase-master-1
  clusterIP: None
  ports:
    - name: rpc
      port: 60000
      targetPort: 60000        # 配置的hbase的連結埠   
    - name: info
      port: 60001            # hbase執行狀態的網頁查詢埠
      targetPort: 60001
    - name: region
      port: 60010            
      targetPort: 60010
    - name: regioninfo
      port: 60011           
      targetPort: 60011
    - name: thrift
      port: 9090           # thrift伺服器ip,讓外界通過thrift來訪問hbase
      targetPort: 9090

這樣就能訪問hbase的pod了,只不過域名必須是

hbase-master-1.hbase-headless.cloudai-2.svc.cluster.local

下面我們來設定代理, 在使用hbase的hostname訪問pod時也能正常訪問到hbase的pod.

我這裡在dns中增加了一個重定向解析記錄

我這裡的dns使用的coredns,在建立Corefile的configmap時使用下面的方法

apiVersion: v1
data:
  Corefile: |
    .:53 {
        errors
        health
        rewrite name hbase-master-1 hbase-master-1.hbase-headless.cloudai-2.svc.cluster.local
        kubernetes cluster.local in-addr.arpa ip6.arpa {
           pods insecure
           upstream
           fallthrough in-addr.arpa ip6.arpa
        }
        prometheus :9153
        proxy . /etc/resolv.conf
        cache 30
        reload
    }
kind: ConfigMap
metadata:
  name: coredns
  namespace: kube-system

這樣dns在解析hbase-master-1時返回的是hbase-master-1.hbase-headless.cloudai-2.svc.cluster.local的ip

如果你使用的是kube-dns,可以參考https://blog.csdn.net/luanpeng825485697/article/details/84108166