1. 程式人生 > >【ODPS】利用阿里雲ODPS作業進行圓周率Pi的計算

【ODPS】利用阿里雲ODPS作業進行圓周率Pi的計算

原理:

1.畫一個正方形,邊長為1CM,在此正方形內繪製一個內接圓。
2.假如我們在此正方形內隨機點一個點,這個點落在圓內的概率是P
3.假如我們隨機足夠多的點,那麼我們的P就無限接近於Pi/4(=圓的面積/正方形的面積)

表設計:

1.隨機落點表(記錄隨機點的位置)
2.我們以左下角座標為(0,0)原點座標
3.生成隨機數如下:

java原始碼:

1.建立point表

package bysql;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.Table;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.Tables;

public class CreateTable {
	
	public static void createTable(Odps odps,String createTableName) throws Exception {
		
		Tables tables = odps.tables();// /獲取表示ODPS所有Table的集合物件	
		boolean a = tables.exists(createTableName);// 判斷指定表test_table_jyl是否存在
		if (a) {
			System.out.println("指定表存在");
			Table table = tables.get(createTableName);
			System.out.println("指定表資訊為:【name:】" + table.getName() + "【Owner:】"
					+ table.getOwner());
			tables.delete(createTableName);
		} else {
			System.out.println("指定表不存在");
		}
		System.out.println("-------------------------------------------------");
		
		/* 建立表 */
		if (tables.exists(createTableName)) {
			System.out.println("指定表存在,無法建立");
		} else {
			System.out.println("指定表不存在,可以建立");

			/* TableSchema表示ODPS中表的定義 */
			TableSchema tableSchema = new TableSchema();
			/* 新增列 */
			Column col; // Column表示ODPS中表的列定義
			col = new Column("x", OdpsType.DOUBLE, "X");
			tableSchema.addColumn(col);
			col = new Column("y", OdpsType.DOUBLE, "Y");
			tableSchema.addColumn(col);
			
			tables.create(createTableName, tableSchema);
			System.out.println("表【" + createTableName + "】建立成功");
		}
		System.out.println("-------------------------------------------------");
	}

}

2.隨機模擬資料(利用多執行緒)

package bysql;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Callable;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;


class UploadThread implements Callable<Boolean> {
    private long id;
    private RecordWriter recordWriter;
    private Record record;
    static Random random = new Random(System.currentTimeMillis());

    public UploadThread(long id, RecordWriter recordWriter, Record record) {
        this.id = id;
        this.recordWriter = recordWriter;
        this.record = record;
    }

    @Override
    public Boolean call() throws IOException {
        for (int m = 0; m < 10000; m++) {//一個執行緒插入100條
        	record.setDouble("x", getRandomDouble());
        	record.setDouble("y", getRandomDouble());
        	recordWriter.write(record);
        }
        recordWriter.close();
        return true;
    }

    private double getRandomDouble() {
        return random.nextDouble();
    }
}
package bysql;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;

public class InitData {
	public static void uploadDataToYun(Odps odps, String project, String table)
			throws Exception {
		TableTunnel tunnel = new TableTunnel(odps);
		tunnel.setEndpoint("http://dt.odps.aliyun.com");// 設定TunnelServer地址,沒有設定TunnelServer地址的情況下自動選擇
		TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(
				project, table);
		//執行緒數
		int threadNum = 100;
		long startTime = System.currentTimeMillis();
		System.out.println("正在上傳資料.............");
		ExecutorService pool = Executors.newFixedThreadPool(threadNum);
		ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
		for (int i = 0; i < threadNum; i++) {
			RecordWriter rw = uploadSession.openRecordWriter(i);
			Column[] columns = new Column[2];
			columns[0] = new Column("x", OdpsType.DOUBLE);
			columns[1] = new Column("y", OdpsType.DOUBLE);
			Record r = new ArrayRecord(columns);

			callers.add(new UploadThread(i, rw, r));
		}
		pool.invokeAll(callers);
		pool.shutdown();
		
		Long[] blocks = uploadSession.getBlockList();
		uploadSession.commit(blocks);
		
		System.out.println("資料上傳完畢!");
		long endTime = System.currentTimeMillis();
		System.out.println("總共耗時:" + (endTime - startTime) + " ms");
		System.out.println("-------------------------------------------------");
	}
}

3.求圓周率pi

1)執行SQL作業查詢在圓內的點的個數A1
 select count(*) from point where ((x-0.5)*((x-0.5))+(y-0.5)*(y-0.5) <=0.5*0.5);
2)執行SQL作業查詢所有的點的個數A2
 select count(*) from point ;
3)計算點出現在圓內的概率
 P=A1/A2
4)計算圓周率Pi
 Pi=4P

package bysql;

import java.util.Map;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.task.SQLTask;

public class GetPI {
	
	private static final String ACCESS_ID = "*****************";
	private static final String ACCESS_KEY = "**********************";
	private static final String PROJECT_NAME = "****************";
	private static final String ODPS_URL = "http://service.odps.aliyun.com/api";
	
	public static void main(String args[]) throws Exception {

		/* 先構建阿里雲帳號 */
		Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);
		System.out.println("Account Type:" + account.getType());
		System.out.println("-------------------------------------------------");

		/* Odps類是ODPS SDK的入口 */
		Odps odps = new Odps(account);
		odps.setDefaultProject(PROJECT_NAME);// 指定預設使用的Project名稱
		odps.setEndpoint(ODPS_URL);// 設定ODPS服務的地址
		
		String tableName = "point";
		
		/*建表*/
		CreateTable.createTable(odps,tableName);
		
		/*初始化資料*/
		InitData.uploadDataToYun(odps, PROJECT_NAME, tableName);
		
		/*查詢總數*/
		long startTime = System.currentTimeMillis();
		String sql = "select count(*) from point ;";
		System.out.println("正在查詢當前point總數...");
        String ret = excuteSql(odps, sql);
        System.out.println("總數:"+ret);
        long endTime = System.currentTimeMillis();
        System.out.println("總共耗時:" + (endTime - startTime) + " ms");
		System.out.println("-------------------------------------------------");
        
		/*查詢命中數*/
		startTime = System.currentTimeMillis();
        System.out.println("正在查詢命中的point總數...");
        sql = "select count(*) from point where ((x-0.5)*((x-0.5))+(y-0.5)*(y-0.5) <=0.5*0.5);";//最多獲取1000條資訊。
        String ret2 = excuteSql(odps, sql);
        System.out.println("命中數:"+ret2);
        endTime = System.currentTimeMillis();
        System.out.println("總共耗時:" + (endTime - startTime) + " ms");
		System.out.println("-------------------------------------------------");
        
		/**/
		String pi =  calculate(ret, ret2);
        System.out.println("Pi=" + pi);
	}
	
	/**
	 * 執行SQL
	 */
	public static String excuteSql(Odps odps, String sql) throws OdpsException {
        String ret = "";
        
        /*Instance表示ODPS中計算任務的一個執行例項*/
        Instance instance = null;
        instance = SQLTask.run(odps, sql);//執行SQL
        
        instance.waitForSuccess();//阻塞當前執行緒, 直到Instance結束
         
        /*
         * 獲得Instance中Task的執行結果
         * Task的執行結果, key為Task的名稱,value為Instance.Result .getString()的結果。
         */
        Map<String, String> results = instance.getTaskResults();
        Map<String, Instance.TaskStatus> taskStatus = instance.getTaskStatus();
        for (Map.Entry<String, Instance.TaskStatus> status : taskStatus.entrySet()) {
            String result = results.get(status.getKey());
            ret += result;
        }
        return ret;
    }
	
	/**
	 * 求Pi
	 */
	private static String calculate(String ret, String ret2) {
        if (ret.indexOf("\n") > 0 && (ret2.indexOf("\n") > 0)) {
            String dataStr[] = ret.split("\n");
            String dataStr2[] = ret2.split("\n");
            float pi = (Float.parseFloat(dataStr2[1]) / Float.parseFloat(dataStr[1])) * 4;
            return String.format("%f", pi);
        }
        return null;
    }

}


結果: