1. 程式人生 > >Kafka+SparkStreaming解析Json資料並插入Hbase,包含部分業務邏輯

Kafka+SparkStreaming解析Json資料並插入Hbase,包含部分業務邏輯

以下程式碼是在學習Spark時候自己寫的例子,還不成熟,僅供記錄和參考

下邊直接上程式碼,我在我覺得有用的位置加了比較詳細的註解

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.client.Put;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

import com.alibaba.fastjson.JSONObject;

public class KafkaStream_Json {

	static final String ZK_QUORUM = "devhadoop3:2181,devhadoop2:2181,devhadoop1:2181";
	static final String GROUP = "spark_json_test_group";
	static final String TOPICSS = "spark_json_test2";
	static final String NUM_THREAD = "5";

	@SuppressWarnings({ "serial" })
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("json_test").setMaster("local[2]");
		conf.set("spark.testing.memory", "2147480000");// 後面的值大於512m即可
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(20));

		int numThreads = Integer.parseInt(NUM_THREAD);
		Map<String, Integer> topicMap = new HashMap<String, Integer>();
		String[] topics = TOPICSS.split(",");
		for (String topic : topics) {
			topicMap.put(topic, numThreads);
		}
		JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, ZK_QUORUM, GROUP, topicMap);// 原始資料
		JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {// 這裡返回的應該就是一個Json字串了
					public String call(Tuple2<String, String> tuple2) {
						return tuple2._2();
					}
				});
		JavaDStream<JSONObject> words_2 = lines.flatMap(new FlatMapFunction<String, JSONObject>() {// 把資料轉換成json
					@Override
					public Iterable<JSONObject> call(String jsonStr) throws Exception {
						List<JSONObject> arr = new ArrayList<JSONObject>();
						JSONObject obj = JSONObject.parseObject(jsonStr);
						System.out.println("收到的資料" + jsonStr);
						arr.add(obj);
						return arr;
					}
				});
		JavaDStream<JSONObject> words = words_2.persist();// 快取也可以根據實際業務儲存,也可以用cache,cache只支援MEMORY_ONLY級別快取
		// 如果上邊不快取,那麼type1和type2輸出的時候,都需要重新執行以下lines,words_2的操作,那麼效率將會很低
		// 業務分流,根據業務編號先區分出不同的訊息,業務1
		JavaDStream<JSONObject> type1 = words.filter(new Function<JSONObject, Boolean>() {
			@Override
			public Boolean call(JSONObject v1) throws Exception {
				return "1".equals(v1.getString("type"));
			}
		});
		// 業務2的資料
		JavaDStream<JSONObject> type2 = words.filter(new Function<JSONObject, Boolean>() {
			@Override
			public Boolean call(JSONObject v1) throws Exception {
				return "2".equals(v1.getString("type"));
			}
		});

		JavaDStream<JSONObject> type1_2 = type1.map(new Function<JSONObject, JSONObject>() {

			@Override
			public JSONObject call(JSONObject v1) throws Exception {
				/*
				 * 對v1進行業務處理,但是最終結果是在type1_2,類似於string的 substring函式
				 * 
				 * 必須用一個新的去接而不是改變type1裡的v1的值
				 * 
				 * 這裡即使我們改變的起始是v1但是實際上type1裡的v1並沒有變化
				 */
				v1.put("context", "測試哈哈哈");
				return v1;
			}
		});
		type1.print();//
		type1_2.print();//
		type2.print();

		/*
		 * 下邊是迴圈是獲得真正資料的一種方式 ,foreachRDD也相當於是一種輸出
		 */
		type1_2.foreachRDD(new VoidFunction<JavaRDD<JSONObject>>() {
			@Override
			public void call(JavaRDD<JSONObject> rdd) throws Exception {
				System.out.println("123333333333333333333333333333");
				List<Put> puts = new ArrayList<Put>();
				System.out.println("外部" + puts.hashCode());
				List<JSONObject> dataList = rdd.collect();
				for (JSONObject t : dataList) {
					System.out.println(t.getString("name"));
					Put put = new Put(t.getString("name").getBytes());
					put.addColumn("data".getBytes(), "name".getBytes(), t.getString("name").getBytes());
					put.addColumn("data".getBytes(), "age".getBytes(), t.getString("age").getBytes());
					put.addColumn("data".getBytes(), "type".getBytes(), t.getString("type").getBytes());
					put.addColumn("data".getBytes(), "context".getBytes(), t.getString("context").getBytes());
					puts.add(put);
//					System.out.println("內部" + puts.hashCode());//這裡的puts,hashCode每次都不一樣,但是確實是最後都加入到一個List裡了
				}
				if (puts.size() > 0) {
					System.out.println("陣列大小"+puts.size());
					HbaseInsert.getInstance().insertHbase("lwb_test", puts);
				}
			}
		});
		jssc.start();//
		jssc.awaitTermination();//
	}
}

這個是批量插入HBase的隨便寫的一個插入類

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;

public class HbaseInsert {
	private static HbaseInsert hbaseInsert;
	private static Configuration configuration;
	private static String zkHost = "devhadoop3,devhadoop2,devhadoop1";
	private static String zkPort = "2181";
	private static String zkParent = "/hbase-unsecure";
	private static Connection connection;

	private HbaseInsert() {
		configuration = HBaseConfiguration.create();
		configuration.set("hbase.zookeeper.quorum", zkHost);
		configuration.set("hbase.zookeeper.property.clientPort", zkPort);
		configuration.set("zookeeper.znode.parent", zkParent);
		try {
			connection = ConnectionFactory.createConnection(configuration);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public static synchronized HbaseInsert getInstance() {
		if (hbaseInsert == null) {
			hbaseInsert = new HbaseInsert();
		}
		return hbaseInsert;
	}

	public void insertHbase(String tablename, List<Put> puts) {
		Table table = null;
		try {
			table = connection.getTable(TableName.valueOf(tablename));
			table.put(puts);
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			if (table != null) {
				try {
					table.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

	}
}


下邊是我測試往kafka裡插入資料的程式碼

import java.util.Properties;

import com.alibaba.fastjson.JSONObject;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {

	public static void main(String[] aaa) {
		Properties props = new Properties();
		// 此處配置的是kafka的埠
		props.put("metadata.broker.list", "192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181");// 這裡必須用域名
		// kafka.serializer.
		props.put("request.required.acks", "-1");
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
		for (int i = 0; i < 10; i++) {
			JSONObject obj = new JSONObject();
			obj.put("name", "name"+i);
			obj.put("age", i);
			obj.put("type", String.valueOf(i%4));
			producer.send(new KeyedMessage<String, String>("spark_json_test2", obj.toJSONString()));//
		}
		producer.close();
	}
}