Spark Stream整合flum和kafka,資料儲存在HBASE上,分析後存入資料庫
開發環境:Hadoop+HBASE+Phoenix+flum+kafka+spark+MySQL
預設配置好了Hadoop的開發環境,並且已經安裝好HBASE等元件。
下面通過一個簡單的案例進行整合:
這是整個工作的流程圖:
第一步:獲取資料來源
由於外部埋點獲取資源較為繁瑣,因此,自己寫了個自動生成類似資料程式碼:
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class Genlog { static String[] srcurls={"http://www.baidu.com","http://www.sougou.com","http://www.360.com","http://www.taobao.com"}; static String[] oss={"android","ios","mac","win","linux"}; static String[] sexs={"f","m"}; public static void main(String[] args) throws InterruptedException { //http://xxxxx?refurl=http://www.baidu.com&pid=xx&os=andriod&sex=f/m&wx=abcLogger logger=LogManager.getLogger(Genlog.class); while(true){ String srcurl=srcurls[(int) (Math.random()*srcurls.length)]; String os=oss[(int) (Math.random()*oss.length)]; String sex=sexs[(int) (Math.random()*sexs.length)]; String url=String.format("http://xxxxx?refurl=%s&pid=xx&os=%s&wx=abc&sex=%s/m",srcurl,os,sex); logger.info(url); Thread.sleep(300); } } }
這部分程式碼表示,在啟動程式後,將會不斷生成類似文中註釋型別的資料,這樣flume的source端就可以源源不斷的獲取到資料。
pom.xml檔案就是關於log4j的依賴api core 和flum-ng即可,不再贅述。
同時,在專案中,要編寫連線虛擬機器的配置檔案,放在resource下,配置檔案如下:
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="WARN"> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> </Console> <Flume name ="hi" compress="false" type="avro"> <agent host ="192.168.110.101" port="44444"></agent> </Flume> </Appenders> <Loggers> <Root level="INFO"> <AppenderRef ref="Console"/> <AppenderRef ref="hi"></AppenderRef> </Root> </Loggers> </Configuration>
這樣,我們的配置資料來源的專案就已經完成了,當然,在實際生產中,肯定要比這複雜的多。
第二步:配置flume
配置flume/config/a1.conf,檔案可以直接touch建立,配置如下:
# 定義資源 管道 目的地 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 設定源的屬性 a1.sources.r1.type =avro a1.sources.r1.bind=192.168.110.101 a1.sources.r1.port=44444 # 設定目的地屬性 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.producer.acks = 0 a1.sinks.k1.kafka.topic = mylog a1.sinks.k1.kafka.bootstrap.servers = 192.168.110.101:9092 # 管道屬性 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 把源通過管道連線到目的地 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
注意更換自己的IP地址,同時,根據需求更改acks的結果,如1、-1、0,具體介紹看官網即可。此時flume是依賴kafka的。所以啟動順序請先啟動kafka,否則會報錯。
第三步:編寫spark stream專案
專案目標主要是將kafka中的資料拉取下來消費,通過內部邏輯,將資料轉變為DataFrame格式,通過Phoenix儲存在HBASE上,以方便對資料進行分析。
專案配置檔案pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.yzhl</groupId> <artifactId>spark-streaming-phoneix-kafkademo</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.1</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> </plugin> </plugins> </build> </project>
邏輯程式碼如下:
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} object LogSave extends App { //定義brokers, groupId, topics /** * 關於driver和worker的執行位置的程式碼 */ val Array(brokers, groupId, topics) = Array("192.168.86.128:9092","mylog","mylog")//driver //spark上下文物件相當於connection val spark = SparkSession.builder().appName("mylog").getOrCreate()//driver //建立spark streaming 上下文 val ssc = new StreamingContext(spark.sparkContext, Seconds(5))//driver val topicsSet = topics.split(",").toSet//driver //定義kafka配置屬性 val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])//driver //使用KafkaUtils工具來的createDirectStream靜態方法建立DStream物件 val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))//driver //messages中的每一條資料都是一個(key,value) 其中value指的是log中的一行資料 val lines = messages.map(_.value)//worker import spark.implicits._//driver worker //在driver端編譯成了class,之後上傳到worker中 case class MyRecord(id:String,time:String,srcUrl:String,os:String,sex:String) //為記錄產生ID lines.print(5)//driver //foreachRDD在driver上執行, lines.foreachRDD((rdd,t) =>{ val props = scala.collection.mutable.Map[String,String]()//driver props += "table" -> "tb_mylog" props += "zkUrl" -> "jdbc:phoenix:hadoop" //從下面到toDF.都會放在worker上執行 rdd.zipWithUniqueId().map( x =>{ val p =""".+(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}).+refurl=(.*)&.+&os=(.+)&.+&sex=(.+)""".r x._1 match { case p(time,srcUrl,os,sex) => MyRecord(t.toString()+x._2,time,srcUrl,os,sex) case _ => MyRecord(null,null,null,null,null) } }).filter(_.id !=null).toDF().write.format("org.apache.phoenix.spark") .mode("overwrite") .options(props).save();//todf--save之間都是在worker上執行,save()是在driver上 }) ssc.start()//driver ssc.awaitTermination()//driver /** * spark的所有上下文的建立都在driver上執行 * spark的所有action都在driver上執行 * spark的所有transformation都在worker上執行 * */ }
這部分程式碼可以將拉取的資料進行格式化 的儲存。其中正則表示式是對資料行的拆分,並通過Phoenix儲存到HBASE上。
第四步:專案打包
我用的idea,打包很簡單,maven-->plugins-->scala:compile(編譯)-->Lifecycle的package 即可打包完成,可在target目錄下檢視。
eclipse的打包也很簡單,網上一大堆。
到此,在程式碼階段的操作基本完成,接下來就是在叢集上的執行過程。
第五步:啟動各個程序
本次的部署是在yarn上的,所以肯定有yarn的啟動。我們按照順序啟動。
1,啟動HDFS:start-dfs.sh
2.啟動yarn:start-yarn.sh
3.啟動zookeeper:如果是自己安裝的zookeeper,可以直接用./zkServer.sh start
如果是用kafka自帶的zookeeper,啟動命令:bin/zookeeper-server-start.sh config/zookeeper.properties
4.啟動kafka:bin/kafka-server-start.sh config/server.properties
5.啟動flume:bin/flume-ng agent -n a1 -c conf -f conf/a1.conf 此時可以啟動資料來源的生成專案執行
6.啟動kafka的消費者consumer:bin/kafka-console-consumer.sh --bootstrap-server 192.168.110.101:9092 --topic mylog
7.啟動HBASE:start-hbase.sh
8.啟動Phoenix: ./sqlline.py localhost
第六步:以上程序都啟動成功後,可以將打包好的jar包上傳到系統路徑
此時有一個問題一定要注意,不然肯定會報錯,列如空指標的異常,但無法查詢錯誤具體資訊,根本原因是缺少對於的依賴包。
在下載依賴包的時候,我們還需要將兩個必須的依賴包匯入到spark的jars檔案中,因為我們打包的瘦包,無法包含所有的依賴包。
這兩包是:spark-streaming-kafka-0-10_2.11和他的依賴包kafka_2.11。根據你自己的版本不同,找到對應的版本依賴包,否則會報出版本依賴的異常資訊。
新增方法:cd到spark的jars目錄先,在maven官網,右鍵點選相應的依賴包的jar,複製路徑,運用命令 ”wget 複製的路徑”,也可以自己下載到本地後上傳。
接著,在啟動的Phoenix中,建立我們自己的表,在編碼中的表名為tb_mylog,所以建立表:
!create table tb_mylog(id varchar(255) primary key,time varchar(255),srcUrl varchar(255),os varchar(255),sex varchar(20));
此時!tables裡面就會存在了tb_mylog個表。
第七步:執行上傳的jar包,處理資料
執行命令:spark-submit --master yarn --deploy-mode client --class 包名 jar包
執行後,可以看到資料在不斷的寫入,spark Stream在不斷的獲取,此時,進入Phoenix中,
select * from tb_mylog,可以看到資料在表中存在,並不斷的增長,如果機器效能不是很好,建議執行一段時間後,可以停掉源資料的生成。
對於關閉HBASE,需要注意,不可直接stop掉HBASE,這樣資料就會丟失或者出發預寫機制,無法將資料完全的儲存到HDFS上,所以停掉HBASE的最好方式是:先執行hbase-daemon.sh stop master,然後在執行stop-hbase.sh. 這樣既可。
由於是基於yarn模式,所以要讀取到yarn-site.xml檔案,所以在spark-env.sh中配置HADOOP_CONF-DIR=Hadoop路徑,或者YARN_CONF_DIR=yarn路徑。
注意:
如果用Phoenix連線spark,那麼需要Phoenix裡的Phoenix-spark-hbase.jar和Phoenix-HBASE-client.jar。
且,worker節點通過Phoenix連線HBASE時,自己有了客戶端,那麼HBASE的regionserver端需要Phoenix-HBASE-server.jar和Phoenix-spark-hbase.jar兩個包。
flume通訊資料來源:通過通訊協議avro. 給到flume的source處,通過配置channel後,得到下沉的位置,即得到kafka的producer,然後通過worker節點進行消費,消費形式是kafkaDStream。
接下來是資料的分析,然後儲存到MySQL中。
第八步:儲存到資料庫中的編碼
新建專案:
import org.apache.spark.sql.{SaveMode, SparkSession} object ETLSparkSql extends App { val spark = SparkSession.builder().appName("from-hbase-etl-to-mysql using spark+phoenix").getOrCreate()//driver val props = scala.collection.mutable.Map[String,String]() //driver props += "table" -> "tb_mylog" props += "zkUrl" -> "hadoop:2181" val df = spark.read.format("org.apache.phoenix.spark").options(props).load(); df.createOrReplaceTempView("tb_mylog") val df2 = spark.sql("select srcUrl,count(1) as count_nums from tb_mylog group by srcUrl"); df2.createOrReplaceTempView("tb_url_count") val sql = """ |select | case when srcUrl = 'http://www.baidu.com' then count_nums | else 0 end as baidu, | case when srcUrl = 'http://www.souguo.com' then count_nums | else 0 end as souguo, | case when srcUrl = 'http://www.360.com' then count_nums | else 0 end as `360`, | case when srcUrl = 'http://www.taobao.com' then count_nums | else 0 end as `taobao`, | case when srcUrl not in ('http://www.baidu.com','http://www.souguo.com','http://www.taobao.com','http://www.360.com') then count_nums | else 0 end as `qita` | from tb_url_count """.stripMargin val df3 = spark.sql(sql) df3.createOrReplaceTempView("tb_case") val jdbcops = scala.collection.mutable.Map[String,String]() //driver props += "table" -> "tb_log_count" props += "url" -> "jdbc:mysql://192.168.86.1:3306/logdb" props += "user" -> "root" props += "password" -> "root" props += "driver" -> "com.mysql.jdbc.Driver" spark.sql("select sum(baidu),sum(souguo),sum(`360`),sum(taobao),sum(qita) from tb_case").write.format("jdbc").mode(SaveMode.Append).options(jdbcops).save() println("任務提交,等待結果") }
第九步:建立資料庫和表
建立logdb的資料庫,建立表tb_log_count,列名分別為id,baidu,souguo,360,taobao,qita。
然後對專案進行編譯和打包,上傳到客戶端driver上,
啟動HDFS,啟動yarn,啟動HBASE,同時可以執行編譯執行語句:
spark-submit --master yarn --deploy-mode client ETLSparkSql 包名
到此為止,我們的資料的獲取,資料的處理,資料的儲存,資料的存庫都已經完成,可以在MySQL資料庫中檢視結果了。
新手上路,有不對的地方還請指正。