【ODPS】TableTunnel多執行緒下載事例
阿新 • • 發佈:2019-01-05
本篇介紹多執行緒下載
1.多執行緒下載類:
package bysql; import java.io.BufferedWriter; import java.io.IOException; import java.util.Date; import java.util.concurrent.Callable; import com.aliyun.odps.Column; import com.aliyun.odps.TableSchema; import com.aliyun.odps.data.Record; import com.aliyun.odps.data.RecordReader; public class DownloadThread implements Callable<Long> { private long id; private RecordReader recordReader; private TableSchema tableSchema; private BufferedWriter out; public DownloadThread(int id, RecordReader recordReader, TableSchema tableSchema, BufferedWriter out) { this.id = id; this.recordReader = recordReader; this.tableSchema = tableSchema; this.out = out; } @Override public Long call() throws Exception { Long recordNum = 0L; try { Record record; while ((record = recordReader.read()) != null) { recordNum++; consumeRecord(record, tableSchema, out, id); } recordReader.close(); } catch (IOException e) { e.printStackTrace(); } return recordNum; } private static void consumeRecord(Record record, TableSchema schema, BufferedWriter out, long id) throws IOException { String writeStr = ""; String str; for (int i = 0; i < schema.getColumns().size(); i++) { Column column = schema.getColumn(i); String colValue = null; switch (column.getType()) { case BIGINT: { Long v = record.getBigint(i); colValue = v == null ? null : v.toString(); break; } case BOOLEAN: { Boolean v = record.getBoolean(i); colValue = v == null ? null : v.toString(); break; } case DATETIME: { Date v = record.getDatetime(i); colValue = v == null ? null : v.toString(); break; } case DOUBLE: { Double v = record.getDouble(i); colValue = v == null ? null : v.toString(); break; } case STRING: { String v = record.getString(i); colValue = v == null ? null : v.toString(); break; } default: throw new RuntimeException("Unknown column type: " + column.getType()); } str = colValue == null ? "null" : colValue; if (i != schema.getColumns().size() - 1) { str = schema.getColumn(i).getName() + ":" + str + ", "; } else { str = schema.getColumn(i).getName() + ":" + str; } writeStr = writeStr + str; } writeStr = "【Thread " + id + "】" + writeStr + System.getProperty("line.separator"); out.write(writeStr); } }
2.多執行緒下載事例:
package bysql; import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.aliyun.odps.Odps; import com.aliyun.odps.PartitionSpec; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.data.RecordReader; import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.tunnel.TunnelException; public class DownloadThreadSample { private static final String ACCESS_ID = "<your access id>"; private static final String ACCESS_KEY = "<your access Key>"; private static final String PROJECT_NAME = "<your project>"; private static final String TUNNEL_URL = "<your tunnel endpoint>"; private static final String ODPS_URL = "<your odps endpoint>"; public static void main(String[] args) { String tableName = "point_z";//表名 /* 先構建阿里雲帳號 */ Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY); /* Odps類是ODPS SDK的入口 */ Odps odps = new Odps(account); odps.setDefaultProject(PROJECT_NAME);// 指定預設使用的Project名稱 odps.setEndpoint(ODPS_URL);// 設定ODPS服務的地址 /*訪問ODPS Tunnel服務的入口類*/ TableTunnel tunnel = new TableTunnel(odps); tunnel.setEndpoint(TUNNEL_URL);//設定TunnelServer地址 try { /*此處表point_z為分割槽表,下載時必須指定分割槽 * 指定下載分割槽 * */ PartitionSpec partitionSpec = new PartitionSpec(); partitionSpec.set("z", "1"); System.out.println("開始下載資料........."); File file = new File("G:\\"+tableName+"(多執行緒).txt"); if (file.exists()){ file.delete(); } file.createNewFile(); BufferedWriter out = new BufferedWriter(new OutputStreamWriter( new FileOutputStream(file, true), "utf-8")); long startTime = System.currentTimeMillis(); TableTunnel.DownloadSession downloadSession = tunnel .createDownloadSession(PROJECT_NAME, tableName,partitionSpec); long count = downloadSession.getRecordCount(); System.out.println("RecordCount is: " + count); int threadNum=6; ExecutorService pool = Executors.newFixedThreadPool(threadNum); ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>(); long start = 0; long step = count / threadNum; for (int i = 0; i < threadNum - 1; i++) { RecordReader recordReader = downloadSession.openRecordReader( step * i, step); callers.add(new DownloadThread(i, recordReader, downloadSession .getSchema(),out)); } RecordReader recordReader = downloadSession.openRecordReader(step * (threadNum - 1), count - ((threadNum - 1) * step)); callers.add(new DownloadThread(threadNum - 1, recordReader, downloadSession.getSchema(),out)); Long downloadNum = 0L; List<Future<Long>> recordNum = pool.invokeAll(callers); for (Future<Long> num : recordNum) downloadNum += num.get(); System.out.println("DownLoad Count is: " + downloadNum); pool.shutdown(); out.close(); long endTime = System.currentTimeMillis(); System.out.println("總共耗時:" + (endTime - startTime) + " ms"); System.out.println("-------------------------------------------------"); } catch (TunnelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }