1. 程式人生 > >storm 整合 kafka之保存MySQL數據庫

storm 整合 kafka之保存MySQL數據庫

ons fin 整合 連接 shu date pri 對數 data

整合Kafka+Storm,消息通過各種方式進入到Kafka消息中間件,比如通過使用Flume來收集的日誌數據,然後暫由Kafka中的路由暫存,然後在由實時計算程序Storm做實時分析,這時候我們需要講Storm中的Spout中讀取Kafka中的消息,然後交由具體的Bolt組件分析處理。實際上在 apache-storm-0.9.3這個版本的Storm已經自帶了一個集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依賴配置。

1、配置Maven依賴包 [html] view plain copy
  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka_2.10</artifactId>
  4. <version>0.8.2.0</version>
  5. <exclusions>
  6. <exclusion>
  7. <groupId>org.slf4j</groupId>
  8. <artifactId>slf4j-log4j12</artifactId>
  9. </exclusion>
  10. </exclusions>
  11. </dependency>
  12. <!-- kafka整合storm -->
  13. <dependency>
  14. <groupId>org.apache.storm</groupId>
  15. <artifactId>storm-core</artifactId>
  16. <version>0.9.3</version>
  17. <scope>provided</scope>
  18. <exclusions>
  19. <exclusion>
  20. <groupId>org.slf4j</groupId>
  21. <artifactId>log4j-over-slf4j</artifactId>
  22. </exclusion>
  23. <exclusion>
  24. <groupId>org.slf4j</groupId>
  25. <artifactId>slf4j-api</artifactId>
  26. </exclusion>
  27. </exclusions>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.apache.storm</groupId>
  31. <artifactId>storm-kafka</artifactId>
  32. <version>0.9.3</version>
  33. </dependency>
storm程序能接收到數據,並進行處理,但是會發現數據被重復處理這是因為在bolt中沒有對數據進行確認,需要調用ack或者fail方法, 修改完成之後即可。 2、編寫Storm程序 [java] view plain copy
  1. package com.yun.storm;
  2. import java.util.UUID;
  3. import storm.kafka.KafkaSpout;
  4. import storm.kafka.SpoutConfig;
  5. import storm.kafka.ZkHosts;
  6. import backtype.storm.Config;
  7. import backtype.storm.LocalCluster;
  8. import backtype.storm.topology.TopologyBuilder;
  9. /**
  10. * Storm讀取Kafka消息中間件數據
  11. *
  12. * @author shenfl
  13. *
  14. */
  15. public class KafkaLogProcess {
  16. private static final String BOLT_ID = LogFilterBolt.class.getName();
  17. private static final String SPOUT_ID = KafkaSpout.class.getName();
  18. public static void main(String[] args) {
  19. TopologyBuilder builder = new TopologyBuilder();
  20. //表示kafka使用的zookeeper的地址
  21. String brokerZkStr = "192.168.2.20:2181";
  22. ZkHosts zkHosts = new ZkHosts(brokerZkStr);
  23. //表示的是kafak中存儲數據的主題名稱
  24. String topic = "mytopic";
  25. //指定zookeeper中的一個根目錄,裏面存儲kafkaspout讀取數據的位置等信息
  26. String zkRoot = "/kafkaspout";
  27. String id = UUID.randomUUID().toString();
  28. SpoutConfig spoutconf = new SpoutConfig(zkHosts, topic, zkRoot, id);
  29. builder.setSpout(SPOUT_ID , new KafkaSpout(spoutconf));
  30. builder.setBolt(BOLT_ID,new LogFilterBolt()).shuffleGrouping(SPOUT_ID);
  31. LocalCluster localCluster = new LocalCluster();
  32. localCluster.submitTopology(KafkaLogProcess.class.getSimpleName(), new Config(),builder.createTopology() );
  33. }
  34. }
[java] view plain copy
  1. package com.yun.storm;
  2. import java.util.Map;
  3. import java.util.regex.Matcher;
  4. import java.util.regex.Pattern;
  5. import backtype.storm.task.OutputCollector;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.topology.OutputFieldsDeclarer;
  8. import backtype.storm.topology.base.BaseRichBolt;
  9. import backtype.storm.tuple.Tuple;
  10. /**
  11. * 處理來自KafkaSpout的tuple,並保存到數據庫中
  12. *
  13. * @author shenfl
  14. *
  15. */
  16. public class LogFilterBolt extends BaseRichBolt {
  17. private OutputCollector collector;
  18. /**
  19. *
  20. */
  21. private static final long serialVersionUID = 1L;
  22. Pattern p = Pattern.compile("省公司鑒權接口url\\[(.*)]\\,響應時間\\[([0-9]+)\\],當前時間\\[([0-9]+)\\]");
  23. /**
  24. * 每個LogFilterBolt實例僅初始化一次
  25. */
  26. @Override
  27. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  28. this.collector = collector;
  29. }
  30. @Override
  31. public void execute(Tuple input) {
  32. try {
  33. // 接收KafkaSpout的數據
  34. byte[] bytes = input.getBinaryByField("bytes");
  35. String value = new String(bytes).replaceAll("[\n\r]", "");
  36. // 解析數據並入庫
  37. Matcher m = p.matcher(value);
  38. if (m.find()) {
  39. String url = m.group(1);
  40. String usetime = m.group(2);
  41. String currentTime = m.group(3);
  42. System.out.println(url + "->" + usetime + "->" + currentTime);
  43. }
  44. this.collector.ack(input);
  45. } catch (Exception e) {
  46. this.collector.fail(input);
  47. }
  48. }
  49. @Override
  50. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  51. }
  52. }

3、解析日誌入庫 3.1 引入Maven依賴包 [java] view plain copy
  1. <!-- mysql maven相關依賴 -->
  2. <dependency>
  3. <groupId>commons-dbutils</groupId>
  4. <artifactId>commons-dbutils</artifactId>
  5. <version>1.6</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>mysql</groupId>
  9. <artifactId>mysql-connector-java</artifactId>
  10. <version>5.1.29</version>
  11. </dependency>
3.2 編寫MyDbUtils工具類 (1)創建數據表 [sql] view plain copy
  1. create database jfyun;
  2. CREATE TABLE `log_info` (
  3. `id` int(10) NOT NULL AUTO_INCREMENT,
  4. `topdomain` varchar(100) COLLATE latin1_german1_ci DEFAULT NULL,
  5. `usetime` varchar(10) COLLATE latin1_german1_ci DEFAULT NULL,
  6. `time` datetime DEFAULT NULL,
  7. PRIMARY KEY (`id`)
  8. ) ENGINE=InnoDB AUTO_INCREMENT=1803 DEFAULT CHARSET=latin1 COLLATE=latin1_german1_ci
(2)MyDbUtils的程序 [java] view plain copy
  1. package com.yun.storm.util;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.ResultSet;
  5. import java.sql.SQLException;
  6. import java.util.ArrayList;
  7. import java.util.Date;
  8. import java.util.List;
  9. import org.apache.commons.dbutils.BasicRowProcessor;
  10. import org.apache.commons.dbutils.QueryRunner;
  11. import org.apache.commons.dbutils.handlers.ArrayListHandler;
  12. public class MyDbUtils {
  13. private static String className = "com.mysql.jdbc.Driver";
  14. private static String url = "jdbc:mysql://192.168.2.20:3306/jfyun?useUnicode=true&characterEncoding=utf-8";
  15. private static String user = "root";
  16. private static String password = "123";
  17. private static QueryRunner queryRunner = new QueryRunner();
  18. public static final String INSERT_LOG = "insert into log_info(topdomain,usetime,time) values(?,?,?)";
  19. static{
  20. try {
  21. Class.forName(className);
  22. } catch (ClassNotFoundException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. public static void main(String[] args) throws Exception {
  27. String topdomain = "taobao.com";
  28. String usetime = "100";
  29. String currentTime="1444218216106";
  30. MyDbUtils.update(MyDbUtils.INSERT_LOG, topdomain,usetime,currentTime);
  31. update(INSERT_LOG,topdomain,usetime,MyDateUtils.formatDate2(new Date(Long.parseLong(currentTime))));
  32. }
  33. /**
  34. * @param conn
  35. * @throws SQLException
  36. */
  37. public static void update(String sql,Object... params) throws SQLException {
  38. Connection connection = getConnection();
  39. //更新數據
  40. queryRunner.update(connection,sql, params);
  41. connection.close();
  42. }
  43. public static List<String> executeQuerySql(String sql) {
  44. List<String> result = new ArrayList<String>();
  45. try {
  46. List<Object[]> requstList = queryRunner.query(getConnection(), sql,
  47. new ArrayListHandler(new BasicRowProcessor() {
  48. @Override
  49. public <Object> List<Object> toBeanList(ResultSet rs,
  50. Class<Object> type) throws SQLException {
  51. return super.toBeanList(rs, type);
  52. }
  53. }));
  54. for (Object[] objects : requstList) {
  55. result.add(objects[0].toString());
  56. }
  57. } catch (SQLException e) {
  58. e.printStackTrace();
  59. }
  60. return result;
  61. }
  62. /**
  63. * @throws SQLException
  64. *
  65. */
  66. public static Connection getConnection() throws SQLException {
  67. //獲取mysql連接
  68. return DriverManager.getConnection(url, user, password);
  69. }
  70. }
(3)修改storm程序 [java] view plain copy
  1. if (m.find()) {
  2. url = m.group(1);
  3. usetime = m.group(2);
  4. currentTime = m.group(3);
  5. System.out.println(url + "->" + usetime + "->" + currentTime);
  6. MyDbUtils.update(MyDbUtils.INSERT_LOG, url, usetime,
  7. MyDateUtils.formatDate2(new Date(Long.parseLong(currentTime))));
  8. }
(4)統計指標 [sql] view plain copy
  1. --統計每個url平均響應時間
  2. SELECT
  3. topdomain,
  4. ROUND(AVG(usetime) / 1000, 2) avg_use_time
  5. FROM
  6. log_info
  7. GROUP BY topdomain;
技術分享圖片 原文鏈接:http://blog.csdn.net/shenfuli/article/details/48982687

storm 整合 kafka之保存MySQL數據庫