1. 程式人生 > >【ODPS】TableTunnel多執行緒下載事例

【ODPS】TableTunnel多執行緒下載事例

本篇介紹多執行緒下載

1.多執行緒下載類:

package bysql;

import java.io.BufferedWriter;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.Callable;

import com.aliyun.odps.Column;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;

public class DownloadThread implements Callable<Long> {

	private long id;
	private RecordReader recordReader;
	private TableSchema tableSchema;
	private BufferedWriter out;

	public DownloadThread(int id, RecordReader recordReader,
			TableSchema tableSchema, BufferedWriter out) {
		this.id = id;
		this.recordReader = recordReader;
		this.tableSchema = tableSchema;
		this.out = out;
	}

	@Override
	public Long call() throws Exception {
		Long recordNum = 0L;
		try {
			Record record;
			while ((record = recordReader.read()) != null) {
				recordNum++;
				consumeRecord(record, tableSchema, out, id);
			}
			recordReader.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return recordNum;
	}

	private static void consumeRecord(Record record, TableSchema schema,
			BufferedWriter out, long id) throws IOException {
		String writeStr = "";
		String str;
		for (int i = 0; i < schema.getColumns().size(); i++) {
			Column column = schema.getColumn(i);
			String colValue = null;
			switch (column.getType()) {
			case BIGINT: {
				Long v = record.getBigint(i);
				colValue = v == null ? null : v.toString();
				break;
			}
			case BOOLEAN: {
				Boolean v = record.getBoolean(i);
				colValue = v == null ? null : v.toString();
				break;
			}
			case DATETIME: {
				Date v = record.getDatetime(i);
				colValue = v == null ? null : v.toString();
				break;
			}
			case DOUBLE: {
				Double v = record.getDouble(i);
				colValue = v == null ? null : v.toString();
				break;
			}
			case STRING: {
				String v = record.getString(i);
				colValue = v == null ? null : v.toString();
				break;
			}
			default:
				throw new RuntimeException("Unknown column type: "
						+ column.getType());
			}
			str = colValue == null ? "null" : colValue;
			if (i != schema.getColumns().size() - 1) {
				str = schema.getColumn(i).getName() + ":" + str + ",        ";
			} else {
				str = schema.getColumn(i).getName() + ":" + str;
			}
			writeStr = writeStr + str;
		}
		writeStr = "【Thread " + id + "】" + writeStr
				+ System.getProperty("line.separator");
		out.write(writeStr);
	}

}

2.多執行緒下載事例:

package bysql;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;

public class DownloadThreadSample {

	private static final String ACCESS_ID = "<your access id>";
	private static final String ACCESS_KEY =  "<your access Key>";
	private static final String PROJECT_NAME = "<your project>";
	private static final String TUNNEL_URL = "<your tunnel endpoint>";
	private static final String ODPS_URL = "<your odps endpoint>";

	public static void main(String[] args) {
		
		String tableName = "point_z";//表名

		/* 先構建阿里雲帳號 */
		Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);


		/* Odps類是ODPS SDK的入口 */
		Odps odps = new Odps(account);
		odps.setDefaultProject(PROJECT_NAME);// 指定預設使用的Project名稱
		odps.setEndpoint(ODPS_URL);// 設定ODPS服務的地址
		
		/*訪問ODPS Tunnel服務的入口類*/
		TableTunnel tunnel = new TableTunnel(odps);
		tunnel.setEndpoint(TUNNEL_URL);//設定TunnelServer地址

		try {
			/*此處表point_z為分割槽表,下載時必須指定分割槽
			 * 指定下載分割槽
			 * */
			PartitionSpec partitionSpec = new PartitionSpec();
			partitionSpec.set("z", "1");
			
			System.out.println("開始下載資料.........");
			File file = new File("G:\\"+tableName+"(多執行緒).txt");
			if (file.exists()){
				file.delete();
			}
			file.createNewFile();
			BufferedWriter out = new BufferedWriter(new OutputStreamWriter(
					new FileOutputStream(file, true), "utf-8"));
			
			long startTime = System.currentTimeMillis();
			
			TableTunnel.DownloadSession downloadSession = tunnel
					.createDownloadSession(PROJECT_NAME, tableName,partitionSpec);

			long count = downloadSession.getRecordCount();
			System.out.println("RecordCount is: " + count);

			int threadNum=6;
			ExecutorService pool = Executors.newFixedThreadPool(threadNum);
			ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>();

			long start = 0;
			long step = count / threadNum;
			for (int i = 0; i < threadNum - 1; i++) {
				RecordReader recordReader = downloadSession.openRecordReader(
						step * i, step);
				callers.add(new DownloadThread(i, recordReader, downloadSession
						.getSchema(),out));
			}
			RecordReader recordReader = downloadSession.openRecordReader(step
					* (threadNum - 1), count - ((threadNum - 1) * step));
			callers.add(new DownloadThread(threadNum - 1, recordReader,
					downloadSession.getSchema(),out));

			Long downloadNum = 0L;
			List<Future<Long>> recordNum = pool.invokeAll(callers);
			for (Future<Long> num : recordNum)
				downloadNum += num.get();
			System.out.println("DownLoad Count is: " + downloadNum);
			pool.shutdown();
			out.close();
			long endTime = System.currentTimeMillis();
	        System.out.println("總共耗時:" + (endTime - startTime) + " ms");
			System.out.println("-------------------------------------------------");

		} catch (TunnelException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}

	}

}