1. 程式人生 > >計算某個人在某個區域停留時長

計算某個人在某個區域停留時長

在工作的時候遇到了一個這樣的場景,統計一個人群在某個區域指定時間段內的停留時長.其實很想寫個sql去處理這個問題.但是認真想一下,其實這個邏輯不好寫.於是參考了一下別人的方案,使用MR來解決.

說明:這個MR是在阿里雲ODPS環境下執行的.

資料準備

這裡模擬了一些資料:

# 資料結構
project=example_project
table=wc_in
columns=msisdn:string,geohash:string,start_time:bigint,end_time:bigint,first_time:bigint,last_time:bigint,dual_time:bigint
19999999999,zzaq45ww,1539300000,1539323011,1539320733,1539320833,100
19999999999,zzaq45ww,1539300000,1539323011,1539311706,1539311806,100
19999999999,zzaq45ww,1539300000,1539323011,1539309422,1539309522,100
19999999999,zzaq45ww,1539300000,1539323011,1539317713,1539317813,100
19999999999,zzaq45ww,1539300000,1539323011,1539311974,1539312074,100
19999999999,zzaq45ww,1539300000,1539323011,1539318027,1539318127,100
19999999999,zzaq45ww,1539300000,1539323011,1539320733,1539328517,896
19999999999,zzaq45ww,1539300000,1539323011,1539311706,1539312311,247
19999999999,zzaq45ww,1539300000,1539323011,1539309422,1539312981,721
19999999999,zzaq45ww,1539300000,1539323011,1539317713,1539324718,228
19999999999,zzaq45ww,1539300000,1539323011,1539311974,1539318866,677
19999999999,zzaq44ww,1539300000,1539323011,1539318027,1539326938,732
19999999999,zzaq44ww,1539300000,1539323011,1539314486,1539317942,291
19999999999,zzaq44ww,1539300000,1539323011,1539314359,1539320011,409
19999999999,zzaq44ww,1539300000,1539323011,1539314232,1539316337,707
19999999999,zzaq44ww,1539300000,1539323011,1539314106,1539319163,853
19999999999,zzaq44ww,1539300000,1539323011,1539313979,1539320948,267
19999999999,zzaq44ww,1539300000,1539323011,1539313852,1539316317,542
19999999999,zzaq44ww,1539300000,1539323011,1539313725,1539314073,983
19999999999,zzaq44ww,1539300000,1539323011,1539313599,1539318407,886

Map階段

package myaliyun.mr;

import java.io.IOException;

import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.MapperBase;

public class CalStayTimeMapper extends MapperBase {
	private Record key;
	private Record value;

	@Override
	public void setup(TaskContext context) throws IOException {
key=context.createMapOutputKeyRecord(); value=context.createMapOutputValueRecord(); System.out.println("TaskID:"+context.getTaskID().toString()); } @Override public void map(long recordNum, Record record, TaskContext context) throws IOException { key.set("msisdn",record.getString("msisdn")); key.set("geohash",record.getString("geohash")); value.set("start_time",record.getBigint("start_time")); value.set("end_time",record.getBigint("end_time")); value.set("first_time",record.getBigint("first_time")); value.set("last_time",record.getBigint("last_time")); value.set("dual_time",record.getBigint("dual_time")); context.write(key,value); } @Override public void cleanup(TaskContext context) throws IOException { } }

Reducer階段

package myaliyun.mr;

import java.io.IOException;
import java.util.Iterator;

import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.ReducerBase;

public class CalStayTimeReducer extends ReducerBase {
	private Record result;

	@Override
	public void setup(TaskContext context) throws IOException {
		result=context.createOutputRecord();
	}

	@Override
	public void reduce(Record key, Iterator<Record> values, TaskContext context)
			throws IOException {
		long staytime=0L;

		while (values.hasNext()) {
			Record val = values.next();
			Long start_time = val.getBigint("start_time");
			Long end_time = val.getBigint("end_time");
			Long first_time = val.getBigint("first_time");
			Long last_time = val.getBigint("last_time");
			Long dual_time = val.getBigint("dual_time");
			if (first_time>=start_time && last_time<=end_time) {
				staytime+=dual_time;
			}else if (first_time>=staytime && first_time<end_time && last_time>end_time) {
				staytime+=(end_time-first_time);
			}else if (first_time<start_time && last_time>start_time && last_time<=end_time) {
				staytime+=(last_time-start_time);
			}
		}
		String msisdn=key.getString("msisdn");
		String geohash=key.getString("geohash");
		if (staytime>0) {
			this.result.set(0,msisdn);
			this.result.set(1,geohash);
			this.result.set(2,staytime);
			context.write(this.result);
		}
	}

	@Override
	public void cleanup(TaskContext context) throws IOException {
	}

}

任務配置

package myaliyun.mr;

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;

public class CalStayTimRun {

	public static void main(String[] args) throws OdpsException {
		if (args.length<2) {
			System.err.println("Parameter [table_in table_out]");
			System.exit(2);
		}
		JobConf job = new JobConf();

		// TODO: specify map output types
		job.setMapOutputKeySchema(SchemaUtils.fromString("msisdn:string,geohash:string"));
		job.setMapOutputValueSchema(SchemaUtils.fromString("start_time:bigint,end_time:bigint,first_time:bigint,last_time:bigint,dual_time:bigint"));

		// TODO: specify input and output tables
		InputUtils.addTable(TableInfo.builder().tableName("wc_in").build(),
				job);
		OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(),
				job);

		job.setMapperClass(myaliyun.mr.CalStayTimeMapper.class);
		job.setReducerClass(myaliyun.mr.CalStayTimeReducer.class);

		RunningJob rj = JobClient.runJob(job);
		rj.waitForCompletion();
	}

}