1. 程式人生 > >部署Azkaban多節點分散式模式

部署Azkaban多節點分散式模式

簡單介紹:

Azkaban是由Linkedin公司推出的一個批量工作流任務排程器,用於在一個工作流內以一個特定的順序執行一組工作和流程。Azkaban使用job配置檔案建立任務之間的依賴關係,並提供一個易於使用的web使用者介面維護和跟蹤你的工作流。 它有三個重要元件:

  • 關係資料庫(目前僅支援mysql)
  • web管理伺服器-AzkabanWebServer
  • 執行伺服器-AzkabanExecutorServer

Azkaban使用MySQL來儲存它的狀態資訊,Azkaban Executor Server和Azkaban Web Server均使用到了MySQL資料庫。

AzkabanExecutorServer在如下幾個方面使用到了資料庫:

  • 獲取project的資訊
  • 執行工作流
  • 儲存工作流執行日誌
  • 如果一個工作流在不同的執行器上執行,它將從DB中獲取狀態。

AzkabanWebServer在如下幾個方面使用到了資料庫:

  • Project管理
  • 跟蹤工作流執行進度
  • 訪問歷史工作流的執行資訊
  • 定時執行工作流任務
  • 記錄所有sla規則

 

AzkabanWebServer

AzkabanWebserver是整個Azkaban工作流系統的主要管理者,它負責project管理、使用者登入認證、定時執行工作流、跟蹤工作流執 行進度等一系列任務。同時,它還提供Web服務操作的介面,利用該介面,使用者可以使用curl或其他ajax的方式,來執行azkaban的相關操作。操作包括:使用者登入、建立project、上傳workflow、執行workflow、查詢workflow的執行進度、殺掉workflow等一系列操作,且這些操作的返回結果均是json的格式。

AzkabanExecutorServer

之所以將AzkabanWebServer和AzkabanExecutorServer分開,主要是因為在某個任務流失敗後,可以更方便的將重新執行。而且也更有利於Azkaban系統的升級。

 

注意:安裝sqoop的節點都要安裝azkaban

環境配置:由於azkaban3.0以上沒有相應的安裝包,需要從原始碼進行編譯。編譯的環境需要安裝jdk8。

分散式模式:叢集內應當安裝三個exec-server和一個web-server,相關元件分配如下:

bigdata243      azkaban-exec

bigdata244      azkaban-exec

bigdata245      azkaban-web-server azkaban-exec-server mysql

 

azkaban-web目錄

bin 啟動指令碼存放目錄

conf 配置檔案存放目錄(沒有的話從solo-server的目錄中拷貝過來)

lib 依賴jar包存放目錄

extlib 附加jar包存放目錄(沒有的話手動建立)

plugins 外掛安裝目錄

web web資原始檔

logs 日誌儲存目錄

sql sql資源

 

azkaban-exec目錄

bin 啟動指令碼存放目錄

conf 配置檔案存放目錄(沒有的話從solo-server的目錄中拷貝過來)

lib 依賴jar包存放目錄

extlib 附加jar包存放目錄(沒有的話手動建立)

plugins 外掛安裝目錄

 

編譯,安裝過程

官網下載:3.47版本

進入到azkaban下面編譯:[[email protected] azkaban-3.47.0]$ ./gradlew distTar

編譯結果為:

azkaban-common : 常用工具類。

azkaban-db : 對應的sql指令碼

azkaban-hadoop-secutity-plugin : hadoop 有關kerberos外掛

azkaban-solo-server: web和executor 一起的專案。

azkaban-web/executor-server:azkaban的 web和executor的server資訊

azkaban-spi: azkaban儲存介面以及exception類

編譯完成後:db、web、exec、solo四個目錄的build/distributions/下生成其壓縮包

將壓縮包拷貝到:新建資料夾:mkdir azkaban

cp azkaban-db-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/

cp azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/

cp azkaban-web-server-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/

cp azkaban-solo-server-0.1.0-SNAPSHOT.tar.gz /home/hadoop/app/azkaban/

 

解壓重新命名

tar -zxvf azkaban-web-server-0.1.0-SNAPSHOT.tar.gz

tar -zxvf azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz

tar -zxvf azkaban-db-0.1.0-SNAPSHOT.tar.gz

tar -zxvf azkaban-solo-server-0.1.0-SNAPSHOT.tar.gz

mv azkaban-db-0.1.0-SNAPSHOT azkaban-db

mv azkaban-web-server-0.1.0-SNAPSHOT azkaban-web

mv azkaban-solo-server-0.1.0-SNAPSHOT azkaban-solo

mv azkaban-exec-server-0.1.0-SNAPSHOT azkaban-exec

建立Azkaban元資料庫:登入mysql,執行如下語句

mysql> create database azkaban_matadata;

Query OK, 1 row affected (0.00 sec)

mysql> use azkaban_matadata;

Database changed

mysql> source /home/hadoop/app/azkaban/azkaban-db/create-all-sql-0.1.0-SNAPSHOT.sql (會建立所有表)

配置keystore

在azkaban-web/bin目錄下執行這條命令,在執行完這條命令之後,會生成一個檔案:keystore.使用keytool建立SSL配置,keytool是JDK提供的一個工具,輸入如下命令,可以檢視

[[email protected] ~]# find / -name keytool

/home/hadoop/app/jdk1.8/bin/keytool

/home/hadoop/app/jdk1.8/jre/bin/keytool

 

執行命令建立SSL配置

[[email protected] bin]$ keytool -keystore keystore -alias jetty -genkey -keyalg RSA

輸入金鑰庫口令: azkaban

再次輸入新口令: azkaban

您的名字與姓氏是什麼? [Unknown]: 略過

您的組織單位名稱是什麼? [Unknown]: 略過

您的組織名稱是什麼? [Unknown]: 略過

您所在的城市或區域名稱是什麼? [Unknown]: 略過

您所在的省/市/自治區名稱是什麼? [Unknown]: 略過

該單位的雙字母國家/地區程式碼是什麼? [Unknown]: CN

CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=CN是否正確?

[否]: Y

輸入 <jetty> 的金鑰口令 (如果和金鑰庫口令相同, 按回車):

 

將azkaban-solo下的conf plugins 和sql資料夾拷貝到azkaban-web目錄下

[[email protected] azkaban-solo]$ cp -a conf/ plugins/ sql/ /home/hadoop/app/azkaban/azkaban-web/

 

配置web-server

配置azkaban-web/conf/azkaban.properties

# Azkaban Personalization Settings
azkaban.name=bigdata245             # 伺服器UI名稱,用於伺服器上方顯示的名字
azkaban.label=Aliyun bigdata245 Azkaban      # 描述資訊
azkaban.color=#FF3601				# 顏色
azkaban.default.servlet.path=/index
web.resource.dir=/home/hadoop/app/azkaban/azkaban-web/web/ #預設跟web目錄,設定為絕對路徑
default.timezone.id=Asia/Shanghai       # 時區,預設為美國America/Los_Angeles
# Azkaban UserManager class
user.manager.class=azkaban.user.XmlUserManager		#使用者許可權管理預設類
user.manager.xml.file=/home/hadoop/app/azkaban/azkaban-web/conf/azkaban-users.xml				#使用者配置,具體配置參見下文
# Loader for projects
executor.global.properties=/home/hadoop/app/azkaban/azkaban-web/conf/global.properties	#globa配置檔案所在位置
azkaban.project.dir=projects

database.type=mysql             # 資料庫型別
mysql.port=3306                 # 埠
mysql.host=245          # 資料庫連線IP
mysql.database=azkaban_matadata  # 資料庫例項名
mysql.user=root                 # 資料庫使用者名稱
[email protected]         # 資料庫密碼
mysql.numconnections=100            # 最大連線數
h2.path=./h2
h2.create.tables=true
# Velocity dev mode
velocity.dev.mode=false
# Azkaban Jetty server properties.
jetty.use.ssl=false
jetty.maxThreads=25  #最大執行緒數
jetty.port=8081	#jetty埠
jetty.ssl.port=8443	#jetty ssl埠號
jetty.keystore=/home/hadoop/app/azkaban/azkaban-web/bin/keystore #ssl的檔名,絕對路徑
jetty.password=azkaban		#ssl檔案密碼
jetty.keypassword=azkaban		#jetty主密碼與keystore檔案相同
jetty.truststore=keystore			#SSL檔名
jetty.trustpassword=azkaban		#SSL檔案密碼
# Azkaban Executor settings
executor.port=12321			#執行伺服器埠
# mail settings
mail.sender=				#傳送郵箱
mail.host=				#傳送郵箱smtp地址
# User facing web server configurations used to construct the user facing server URLs. They are useful when there is a reverse proxy between Azkaban web servers and users.
# enduser -> myazkabanhost:443 -> proxy -> localhost:8081
# when this parameters set then these parameters are used to generate email links. 
# if these parameters are not set then jetty.hostname, and jetty.port(if ssl configured jetty.ssl.port) are used.
# azkaban.webserver.external_hostname=myazkabanhost.com
# azkaban.webserver.external_ssl_port=443
# azkaban.webserver.external_port=8081
job.failure.email=
job.success.email=
lockdown.create.projects=false
cache.directory=cache		#快取目錄
# JMX stats
jetty.connector.stats=true
executor.connector.stats=true
# Azkaban plugin settings
azkaban.jobtype.plugin.dir=/home/hadoop/app/azkaban/azkaban-web/plugins/jobtypes
埠號使用規則:jetty.ssl.port > jetty.port。但是使用jetty.ssl.port的前提是jetty.use.ssl=true。這個配置表示開啟ssl【Secure Sockets Layer】安全套接層,否則使用jetty.port埠。

 

在azkaban-web/conf目錄下新增log4j.properties

[[email protected] conf]$ touch log4j.properties

log4j.rootLogger=INFO,C
log4j.appender.C=org.apache.log4j.ConsoleAppender
log4j.appender.C.Target=System.err
log4j.appender.C.layout=org.apache.log4j.PatternLayout
log4j.appender.C.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

 

新增MySQL驅動在azkaban-web目錄下建立資料夾:mkdir extlib 

將lib目錄下的mysql驅動複製到extlib目錄下

[[email protected] azkaban-web]$ cp lib/mysql-connector-java-5.1.28.jar extlib/

 

新增管理員使用者以及密碼

進入azkaban-web/conf目錄,修改azkaban-users.xml,這個檔案存放使用者登入資訊以及許可權資訊。同時增加管理員使用者admin

<user username="admin" password="admin" roles="admin"/>

azkaban-web目錄下建立logs檔案用於存放日誌檔案 # mkdir logs

注意:多個執行器模式也就是分散式執行模式下執行,需要在webserver配置中啟用多個執行器模式。確認在azkaban.properties中具有以下屬性。azkaban.use.multiple.executors和azkaban.executorselector.comparator。*是必需的屬性。

注意:azkaban.use.multiple.executors 多重執行模式不予以尊重

配置多節點執行伺服器在azkaban-web/conf/azkaban.properties裡新增

azkaban.use.multiple.executors =true
azkaban.executorselector.filters = StaticRemainingFlowSize,MinimumFreeMemory,CpuStatus
azkaban.executorselector.comparator.NumberOfAssignedFlowComparator = 1
azkaban.executorselector.comparator.Memory = 1
azkaban.executorselector.comparator.LastDispatched = 1
azkaban.executorselector.comparator.CpuUsage = 1

以確認使用的是分散式方式,隨後提交的job會根據情況自行選擇執行伺服器,否則預設只使用本地執行伺服器。

 

 

 

 

 

 

 

 

 

配置exec-server

拷貝azkaban-web目錄下的conf和extlib到azkaban-web目錄下

cp -a conf/ extlib/ /home/hadoop/app/azkaban/azkaban-exec/

配置azkaban-web/conf/azkaban.properties

default.timezone.id=Asia/Shanghai
# Loader for projects
executor.global.properties=/home/hadoop/app/azkaban/azkaban-exec/conf/global.properties
azkaban.project.dir=/home/hadoop/app/azkaban/azkaban-exec/bin/projects
# Azkaban plugin settings
azkaban.jobtype.plugin.dir=plugins/jobtypes
database.type=mysql
mysql.port=3306
mysql.host=245 
mysql.database=azkaban_matadata
mysql.user=root
[email protected]
mysql.numconnections=100 
# Azkaban Executor settings
executor.maxThreads=50
executor.port=12321
executor.flow.threads=25
#分散式節點必配
azkaban.use.multiple.executors=true
azkaban.executorselector.filters=StaticRemainingFlowSize,MinimumFreeMemory,CpuStatus
azkaban.executorselector.comparator.NumberOfAssignedFlowComparator=1
azkaban.executorselector.comparator.Memory=1
azkaban.executorselector.comparator.LastDispatched=1
azkaban.executorselector.comparator.CpuUsage=1

在azkaban-exec/conf目錄下新增log4j.properties

[[email protected] conf]$ touch log4j.properties

log4j.rootLogger=INFO,C

log4j.appender.C=org.apache.log4j.ConsoleAppender

log4j.appender.C.Target=System.err

log4j.appender.C.layout=org.apache.log4j.PatternLayout

log4j.appender.C.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

 

在mysql的azkaban庫中新增各個執行伺服器的ip/域名和埠:

配置多執行器模式的執行程式,目前沒有執行程式管理UI。需要在資料庫中配置執行程式。需要將所有執行程式插入mysql DB以進行執行程式設定。驗證執行程式表中的正確執行程式是否處於活動狀態。

>insert into executors(host,port) values("bigdata245",3306);

>insert into executors(host,port) values("bigdata244",3306);

>insert into executors(host,port) values("bigdata243",3306);

 

啟動,先啟動exec-server(執行器),然後啟動web-server(web服務)

cd azkaban-exec/bin:./start-exec.sh

cd azkaban-web/bin:./start-web.sh

注意:在bin目錄下啟動會生成一堆檔案,如果用指令碼啟動注意修改配置路勁

啟動完成後,三臺節點下可以檢視到對應的程序

AzkabanExecutorServer 3

AzkabanWebServer 1

問題1;

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)

Caused by: java.net.ConnectException: Connection refused (Connection refused)

如果出現這兩個問題,去配置檔案檢視mysql連結是否出錯,還有mysql配置執行伺服器的語句是否有問題

 

訪問Azkaban UI介面

http://bigdata245:8081/index

輸入使用者名稱密碼azkaban/azkaban登入 

 

修改如下配置(azkaban預設啟動規則是在哪裡啟動在哪裡生成一堆檔案)

exec/bin

[[email protected] bin]$ cat start-exec.sh

#!/bin/bash

script_dir=$(dirname $0)

# pass along command line arguments to the internal launch script.

${script_dir}/internal/internal-start-executor.sh "[email protected]" >/home/hadoop/app/azkaban/azkaban-exec/bin/executorServerLog__`date +%F+%T`.out 2>&1 &

[[email protected] bin]$ pwd

/home/hadoop/app/azkaban/azkaban-exec/bin

web/bin

[[email protected] bin]$ pwd

/home/hadoop/app/azkaban/azkaban-web/bin

[[email protected] bin]$ cat start-web.sh

#!/bin/bash

script_dir=$(dirname $0)

${script_dir}/internal/internal-start-web.sh >/home/hadoop/app/azkaban/azkaban-web/bin/webServerLog_`date +%F+%T`.out 2>&1 &

 

配置azkaban-exec/conf/azkaban.properties

azkaban.project.dir=/home/hadoop/app/azkaban/azkaban-exec/bin/projects

 

配置azkaban-web/conf/azkaban.properties

azkaban.project.dir=/home/hadoop/app/azkaban/azkaban-web/bin/projects

 

Azkaban測試及使用

projects:最重要的部分,建立一個工程,所有flows將在工程中執行。 

Scheduling:顯示定時任務 

Executing:顯示當前執行的任務 

History:顯示歷史執行任務

主要介紹Projects部分,在建立工程前,我們先了解下之間的關係,一個工程包含一個或多個flows,一個flow包含多個job。job是你想在azkaban中執行的一個程序,可以是簡單的linux命令,可是java程式,也可以是複雜的shell指令碼、或者python指令碼,當然,如果你安裝相關外掛,也可以執行外掛。一個job可以依賴於另一個job,這種多個job和它們的依賴組成的圖表叫做flow。

web-server節點:負責專案作業管理(上傳和分發) 

exec-server節點:負責具體執行的executor會解析job檔案

一、commond 型別單一Job

1.建立工程

Flows:工作流程,有多個job組成 

Permissions:許可權管理 

Project Logs:工程日誌

2.建立Job

job就是一個以.job結尾的文字檔案,例如建立一個job,名為hello.job,用於列印hello azkaban

3.打包

將建立的job打包成.zip壓縮檔案,注意只能是.zip格式 

4.使用Azkaban UI 介面建立project並上傳壓縮包

點選Execute執行 

執行後,點選Detail,檢視日誌

 

azkaban-exec/plugins/jobtypes/commonprivate.properties配置檔案,內容中新增:azkaban.native.lib=false

關閉重啟服務

如果還不行,編譯原始碼

原始碼路徑:/home/hadoop/app/compile_azkaban3.47/azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java

修改如下:final boolean isExecuteAsUser = this.sysProps.getBoolean(EXECUTE_AS_USER, false);

 

重新編譯之後將azkaban/azkaban-exec-server/build/distributions目錄下的azkaban-exec-server-3.48.0-8-gdc851ec.tar.gz 解壓重新命名,然後再修改配置替換舊的azkaban-exec-server,最後重啟exec和web服務即可

再次執行就好了

二、commond 型別多JOb 工作流 flow

1.建立專案

首先,建立一個專案,名為 Com_Job

2.job 建立

 

假設有這麼一種場景:

(1).task1 依賴 task2

(2).task2 依賴 task3

(3).task3 依賴 task4

說明:假設task1是一個計算指標任務,task2 給 task1 提供執行需要的基礎資料

task3 給 task2 提供資料,以此類推。

 

3.flow 建立

 

多個jobs和它們的依賴組成flow。怎麼建立依賴,只要指定dependencies引數就行了

定義4個job:

(1).run_task1.job:計算業務指標資料

(2).run_task2.job:計算task1所需要的資料

(3).run_task3.job:計算task2所需要的資料

(4).run_task4.job:從 slaves 中抽取源資料

 

依賴關係:

task1 依賴 task2,task2 依賴 task3,task3 依賴 task4

 

4個job檔案內容如下(這裡以執行python為例)

# run_task1.job

type = command

command = python /home/hadoop/pyshell/run_task1.py

dependencies = run_task2

 

# run_task2.job

type = command

command = python /home/hadoop/pyshell/run_task2.py

dependencies = run_task3

 

# run_task3.job

type = command

command = python /home/hadoop/pyshell/run_task3.py

dependencies = run_task4

 

# run_task4.job

type = command

command = python /home/hadoop/pyshell/run_task4.py

 

建立python指令碼

[[email protected] pyshell]$ touch run_task1.py

[[email protected] pyshell]$ touch run_task2.py

[[email protected] pyshell]$ touch run_task3.py

[[email protected] pyshell]$ touch run_task4.py

 

4個檔案內容如下

run_task1.py

 

#!/usr/bin/python3

# -*- coding: utf-8 -*-

print("task1:計算業務指標資料...")

 

run_task2.py

 

#!/usr/bin/python3

# -*- coding: utf-8 -*-

print("task2:計算基礎資料,為task1提供資料")

 

run_task3.py

 

#!/usr/bin/python3

# -*- coding: utf-8 -*-

print("task3:資料清洗,為task2提供資料")

 

run_task4.py

 

#!/usr/bin/python3

# -*- coding: utf-8 -*-

print("task4:從Slaves中抽取源資料")

3.將上述 job 打成zip包,上傳至 azkaban

上傳完成後,點選右側Execute Flow按鈕,檢視流程檢視 

Flow view:流程檢視。可以禁用,啟用某些job

Notification:定義任務成功或者失敗是否傳送郵件

Failure Options:定義一個job失敗,剩下的job怎麼執行

Concurrent:並行任務執行設定

Flow Parametters:引數設定。

4.執行

(1).執行一次,點選右下角Execute 

(2).定時執行,點選左下角Schedule 

設定完成後,執行右下角schedule,即完成排程配置,azkaban這裡的配置與linux下的crontab類似 

想要檢視job的排程列表,切換到Schedule選單即可

5.檢視專案flow中各個Job的執行情況

 

綠色代表成功,藍色是執行,紅色是失敗。可以檢視job執行時間,依賴和日誌,點選details可以檢視各個job執行情況

三、MapReduce 任務

Azkaban 執行 MapReduce 任務,我們以 WordCount 為例

1.準備資料

[[email protected] ~]$ hadoop fs -mkdir -p /azkaban/input

[[email protected] data]$ hadoop fs -put words.txt /azkaban/input

使用hadoop提供的jar統計單詞數量

[[email protected] mapreduce]$ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.5.jar wordcount /azkaban/input/* /azkaban/outputs/

執行結果

 

2.建立專案

3.job建立

job

# mapreduce_wordcount.job

type = command

command=sh /home/hadoop/pyshell/wordcount.sh

 

4.打包上傳,執行

5.檢視執行結果

azkaban上列印的日誌顯示已經成功 

四、Hive 指令碼任務

1.建立專案

hive_export_to_mysql

2.job建立

我們要完成,hive中建立表,載入資料,然後匯出資料到mysql,分為兩個job 

hive_task1:將hive中的資料匯出到mysql中 

hive_task2:hive中建立表,載入資料 

依賴關係:hive_task1 依賴 hive_task2

3.flow建立

job 檔案內容如下

# hive_task1.job

type = command

command = sh /home/hadoop/pyshell/hive_task1.sh

dependencies = hive_task2

 

# hive_task2.job

type = command

command = sh /home/hadoop/pyshell/hive_task2.sh

 

指令碼內容如下

 

[[email protected] pyshell]$ cat hive_task1.sh

#!/bin/bash

/home/hadoop/app/sqoop1/bin/sqoop export \

--connect jdbc:mysql://bigdata245:3306/sqoop \

--username root --password [email protected] \

--table EMP \

--export-dir /user/hive/warehouse/test.db/emp \

--input-fields-terminated-by ',' \

--input-null-string 'null' --input-null-non-string 'null' \

-m 1

 

[[email protected] pyshell]$ cat hive_task2

#!/bin/bash

hive -f /home/hadoop/pyshell/test.sql

 

sql檔案 test.sql內容如下

[hadoo[email protected] pyshell]$ cat test.sql

create database if not exists test;

use test;

drop table if exists emp;

create table emp(

empno int,

ename string,

job string

)

row format delimited fields terminated by ',';

load data local inpath '/home/hadoop/pyshell/emp.txt' overwrite into table emp;

 

emp.txt檔案內容如下

[[email protected] pyshell]$ cat emp.txt

1001,Tom,Java

1002,Jack,PHP

1003,Harvey,BigData

1004,David,IOS

1005,Kett,DBA

4.打包上傳

5.執行,檢視執行結果

執行前記得先在mysql中建立表emp,sql語句如下

DROP TABLE IF EXISTS `EMP`;

CREATE TABLE `EMP` (

`empno` int(11) DEFAULT NULL,

`ename` varchar(255) DEFAULT NULL,

`job` varchar(255) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

SET FOREIGN_KEY_CHECKS=1;