【ODPS】利用阿里雲ODPS作業進行圓周率Pi的計算
阿新 • • 發佈:2019-01-05
原理:
1.畫一個正方形,邊長為1CM,在此正方形內繪製一個內接圓。
2.假如我們在此正方形內隨機點一個點,這個點落在圓內的概率是P
3.假如我們隨機足夠多的點,那麼我們的P就無限接近於Pi/4(=圓的面積/正方形的面積)
表設計:
1.隨機落點表(記錄隨機點的位置)
2.我們以左下角座標為(0,0)原點座標
3.生成隨機數如下:
java原始碼:
1.建立point表
package bysql; import com.aliyun.odps.Column; import com.aliyun.odps.Odps; import com.aliyun.odps.OdpsType; import com.aliyun.odps.Table; import com.aliyun.odps.TableSchema; import com.aliyun.odps.Tables; public class CreateTable { public static void createTable(Odps odps,String createTableName) throws Exception { Tables tables = odps.tables();// /獲取表示ODPS所有Table的集合物件 boolean a = tables.exists(createTableName);// 判斷指定表test_table_jyl是否存在 if (a) { System.out.println("指定表存在"); Table table = tables.get(createTableName); System.out.println("指定表資訊為:【name:】" + table.getName() + "【Owner:】" + table.getOwner()); tables.delete(createTableName); } else { System.out.println("指定表不存在"); } System.out.println("-------------------------------------------------"); /* 建立表 */ if (tables.exists(createTableName)) { System.out.println("指定表存在,無法建立"); } else { System.out.println("指定表不存在,可以建立"); /* TableSchema表示ODPS中表的定義 */ TableSchema tableSchema = new TableSchema(); /* 新增列 */ Column col; // Column表示ODPS中表的列定義 col = new Column("x", OdpsType.DOUBLE, "X"); tableSchema.addColumn(col); col = new Column("y", OdpsType.DOUBLE, "Y"); tableSchema.addColumn(col); tables.create(createTableName, tableSchema); System.out.println("表【" + createTableName + "】建立成功"); } System.out.println("-------------------------------------------------"); } }
2.隨機模擬資料(利用多執行緒)
package bysql; import java.io.IOException; import java.util.Random; import java.util.concurrent.Callable; import com.aliyun.odps.data.Record; import com.aliyun.odps.data.RecordWriter; class UploadThread implements Callable<Boolean> { private long id; private RecordWriter recordWriter; private Record record; static Random random = new Random(System.currentTimeMillis()); public UploadThread(long id, RecordWriter recordWriter, Record record) { this.id = id; this.recordWriter = recordWriter; this.record = record; } @Override public Boolean call() throws IOException { for (int m = 0; m < 10000; m++) {//一個執行緒插入100條 record.setDouble("x", getRandomDouble()); record.setDouble("y", getRandomDouble()); recordWriter.write(record); } recordWriter.close(); return true; } private double getRandomDouble() { return random.nextDouble(); } }
package bysql; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.aliyun.odps.Column; import com.aliyun.odps.Odps; import com.aliyun.odps.OdpsType; import com.aliyun.odps.data.ArrayRecord; import com.aliyun.odps.data.Record; import com.aliyun.odps.data.RecordWriter; import com.aliyun.odps.tunnel.TableTunnel; public class InitData { public static void uploadDataToYun(Odps odps, String project, String table) throws Exception { TableTunnel tunnel = new TableTunnel(odps); tunnel.setEndpoint("http://dt.odps.aliyun.com");// 設定TunnelServer地址,沒有設定TunnelServer地址的情況下自動選擇 TableTunnel.UploadSession uploadSession = tunnel.createUploadSession( project, table); //執行緒數 int threadNum = 100; long startTime = System.currentTimeMillis(); System.out.println("正在上傳資料............."); ExecutorService pool = Executors.newFixedThreadPool(threadNum); ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>(); for (int i = 0; i < threadNum; i++) { RecordWriter rw = uploadSession.openRecordWriter(i); Column[] columns = new Column[2]; columns[0] = new Column("x", OdpsType.DOUBLE); columns[1] = new Column("y", OdpsType.DOUBLE); Record r = new ArrayRecord(columns); callers.add(new UploadThread(i, rw, r)); } pool.invokeAll(callers); pool.shutdown(); Long[] blocks = uploadSession.getBlockList(); uploadSession.commit(blocks); System.out.println("資料上傳完畢!"); long endTime = System.currentTimeMillis(); System.out.println("總共耗時:" + (endTime - startTime) + " ms"); System.out.println("-------------------------------------------------"); } }
3.求圓周率pi
1)執行SQL作業查詢在圓內的點的個數A1
select count(*) from point where ((x-0.5)*((x-0.5))+(y-0.5)*(y-0.5) <=0.5*0.5);
2)執行SQL作業查詢所有的點的個數A2
select count(*) from point ;
3)計算點出現在圓內的概率
P=A1/A2
4)計算圓周率Pi
Pi=4P
package bysql;
import java.util.Map;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.task.SQLTask;
public class GetPI {
private static final String ACCESS_ID = "*****************";
private static final String ACCESS_KEY = "**********************";
private static final String PROJECT_NAME = "****************";
private static final String ODPS_URL = "http://service.odps.aliyun.com/api";
public static void main(String args[]) throws Exception {
/* 先構建阿里雲帳號 */
Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);
System.out.println("Account Type:" + account.getType());
System.out.println("-------------------------------------------------");
/* Odps類是ODPS SDK的入口 */
Odps odps = new Odps(account);
odps.setDefaultProject(PROJECT_NAME);// 指定預設使用的Project名稱
odps.setEndpoint(ODPS_URL);// 設定ODPS服務的地址
String tableName = "point";
/*建表*/
CreateTable.createTable(odps,tableName);
/*初始化資料*/
InitData.uploadDataToYun(odps, PROJECT_NAME, tableName);
/*查詢總數*/
long startTime = System.currentTimeMillis();
String sql = "select count(*) from point ;";
System.out.println("正在查詢當前point總數...");
String ret = excuteSql(odps, sql);
System.out.println("總數:"+ret);
long endTime = System.currentTimeMillis();
System.out.println("總共耗時:" + (endTime - startTime) + " ms");
System.out.println("-------------------------------------------------");
/*查詢命中數*/
startTime = System.currentTimeMillis();
System.out.println("正在查詢命中的point總數...");
sql = "select count(*) from point where ((x-0.5)*((x-0.5))+(y-0.5)*(y-0.5) <=0.5*0.5);";//最多獲取1000條資訊。
String ret2 = excuteSql(odps, sql);
System.out.println("命中數:"+ret2);
endTime = System.currentTimeMillis();
System.out.println("總共耗時:" + (endTime - startTime) + " ms");
System.out.println("-------------------------------------------------");
/**/
String pi = calculate(ret, ret2);
System.out.println("Pi=" + pi);
}
/**
* 執行SQL
*/
public static String excuteSql(Odps odps, String sql) throws OdpsException {
String ret = "";
/*Instance表示ODPS中計算任務的一個執行例項*/
Instance instance = null;
instance = SQLTask.run(odps, sql);//執行SQL
instance.waitForSuccess();//阻塞當前執行緒, 直到Instance結束
/*
* 獲得Instance中Task的執行結果
* Task的執行結果, key為Task的名稱,value為Instance.Result .getString()的結果。
*/
Map<String, String> results = instance.getTaskResults();
Map<String, Instance.TaskStatus> taskStatus = instance.getTaskStatus();
for (Map.Entry<String, Instance.TaskStatus> status : taskStatus.entrySet()) {
String result = results.get(status.getKey());
ret += result;
}
return ret;
}
/**
* 求Pi
*/
private static String calculate(String ret, String ret2) {
if (ret.indexOf("\n") > 0 && (ret2.indexOf("\n") > 0)) {
String dataStr[] = ret.split("\n");
String dataStr2[] = ret2.split("\n");
float pi = (Float.parseFloat(dataStr2[1]) / Float.parseFloat(dataStr[1])) * 4;
return String.format("%f", pi);
}
return null;
}
}
結果: