1. 程式人生 > >spark streaming + redis : 實時統計日註冊率

spark streaming + redis : 實時統計日註冊率

使用spark streaming 實時統計新註冊的使用者流程如下: 在這裡插入圖片描述 程式碼如下:

1, 新增maven依賴

<!--hive依賴-->
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.17</version>
</dependency>
<dependency>
	<groupId>org.apache.hive</groupId>
	<artifactId>hive-exec</artifactId>
	<version>2.1.0</version>
</dependency>

<!--spark sql 依賴-->
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-hive_2.11</artifactId>
	<version>2.1.0</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.11</artifactId>
	<version>2.1.0</version>
</dependency>

<!--spark streaming 依賴-->
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming_2.11</artifactId>
	<version>2.1.0</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
	<version>2.1.0</version>
</dependency>

<!--redis 依賴-->
<dependency>
	<groupId>redis.clients</groupId>
	<artifactId>jedis</artifactId>
	<version>2.9.0</version>
</dependency>

2,啟動spark流計算

SparkConf conf = new SparkConf() ;
conf.setAppName("kafka") ;
conf.setMaster("local[3]") ;

// 先建立SparkSession
final SparkSession spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() ;

//建立java streaming上下文
JavaStreamingContext ssc = new JavaStreamingContext(
new JavaSparkContext(spark.sparkContext()) , Durations.seconds(2)) ;

//kafka引數
Map<String,Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers" , "localhost:9092") ;
kafkaParams.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ;
kafkaParams.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ;
kafkaParams.put("auto.offset.reset" , "latest") ;
kafkaParams.put("group.id" , "raw_logs") ;
kafkaParams.put("enable.auto.commit" ,"true") ;


//位置策略 , 控制消費者在哪個主機上啟動
//消費者策略 , 控制消費哪個主題,哪個分割槽,哪個偏移量
LocationStrategy ls = LocationStrategies.PreferConsistent() ;
List<TopicPartition> tps = new ArrayList<TopicPartition>( ) ;
tps.add(new TopicPartition("raw_log_handleTopic" , 0)) ;
ConsumerStrategy cs = ConsumerStrategies.Assign(tps , kafkaParams) ;

//kafka訊息流
JavaDStream<ConsumerRecord<String,String>> ds1 = KafkaUtils.createDirectStream(ssc , ls ,cs) ;

//3, 過濾原始日誌,提取startUp表資料
//4, 在表中查詢資料,存入redis

ssc.start();
ssc.awaitTermination();

3, 過濾原始日誌,提取startUp表資料

//提取到日誌串#.#.#.#.
JavaDStream<Row> ds2 = ds1.map(new Function<ConsumerRecord<String,String>, Row>() {
	public Row call(ConsumerRecord<String, String> v1) throws Exception {
		String topic = v1.topic() ;
		int par = v1.partition() ;
		long offset = v1.offset() ;
		String value = v1.value();

              String mesg="topic= "+topic + ", partition= "+par + ", offset= "+offset + ", value= "+value;
              System.out.println("mesg===> " +mesg);

              String[] arr = value.split("#");
              return RowFactory.create(
				Float.parseFloat(arr[0]),
				arr[1],
				arr[2],
				Long.parseLong(arr[3]),
				Integer.parseInt(arr[4]),
				arr[5]) ;
	}
}) ;
ds2.print();

ds2.foreachRDD(new VoidFunction<JavaRDD<Row>>() {
public void call(JavaRDD<Row> rdd) throws Exception {
SparkSession spark = SparkSession.builder()
						 .config(rdd.context().getConf())
						 .enableHiveSupport()
						 .getOrCreate();

StructField[] fields = new StructField[6];
fields[0] = new StructField("servertimems", DataTypes.FloatType, false, Metadata.empty());
fields[1] = new StructField("servertimestr", DataTypes.StringType, false, Metadata.empty());
fields[2] = new StructField("clientip", DataTypes.StringType, false, Metadata.empty());
fields[3] = new StructField("clienttimems", DataTypes.LongType, false, Metadata.empty());
fields[4] = new StructField("status", DataTypes.IntegerType, false, Metadata.empty());
fields[5] = new StructField("log", DataTypes.StringType, false, Metadata.empty());
StructType type = new StructType(fields);

//過濾無效資料
Dataset<Row> df1 = spark.createDataFrame(rdd, type);
df1.createOrReplaceTempView("_temp");
Dataset<Row> df2 = spark.sql("select forkstartuplogs(servertimestr , clienttimems , clientip , log) from _temp");
df2.createOrReplaceTempView("_temp2");

4, 在表中查詢資料,存入redis

String aggSql = "select concat(appid,'#',appversion,'#',brand,'#',appplatform,'#',devicestyle,'#',ostype,'#',deviceid) key," +
					"min(createdatms) mn," +
					"max(createdatms) mx  from _temp2 group by " +
					"concat(appid,'#',appversion,'#',brand,'#',appplatform,'#',devicestyle,'#',ostype,'#',deviceid)" ;
//在sql語句中聚合rdd內的最值
spark.sql(aggSql).foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> t) throws Exception {
	//建立redis例項
	Jedis redis = new Jedis("s101", 6379);
	redis.select(1);

	while(t.hasNext()){
		Row row = t.next() ;
		String key = row.getAs("key") ;
		long mn = row.getAs("mn") ;
		long mx = row.getAs("mx") ;

		String oldvalue = redis.get(key);
		if (oldvalue == null) {
			redis.set(key, mn + "," + mx);
		} else {
			String[] arr = oldvalue.split(",");
			long oldMin = Long.parseLong(arr[0]);
			long oldMax = Long.parseLong(arr[1]);
			redis.set(key, Math.min(mn, oldMin) + "," + Math.max(mx, oldMax));
		}
	}
	redis.close();
}
});