java編寫spark程式
阿新 • • 發佈:2019-01-07
importjava.net.URI; import java.util.Arrays; import java.io.*; import org.apache.hadoop.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; public class mysparktest { public static void main(String[] args) throws IOException{ String uri = "hdfs://192.168.217.132:9000/unit/xferlog"; String uro = "hdfs://192.168.217.132:9000/unit/xferlogoutput" ; Configuration conf = new Configuration(); try { //開啟檔案系統 FileSystem fs = FileSystem. get(URI.create (uri), conf); //開啟檔案輸入流 FSDataInputStream in = fs.open( new Path(uri)); //檔案讀取 byte[] ioBuffer = new byte[1024]; int readLen = in.read(ioBuffer); while(readLen!=-1) { readLen = in.read(ioBuffer); } String str = new String(ioBuffer); int cnt=0; while(( int)(str.charAt(cnt)) != 0)cnt++; str=str.substring(0, cnt); System. out.println(str); in.close(); //檔案的刪除 fs.delete( new Path(uri), true); //寫入到新檔案中 FSDataOutputStream out = fs.create( new Path(uro)); out.write(( "new1"+str).getBytes( "UTF-8")); out.flush(); out. sync(); out.close(); fs.close(); } catch (Exception e) { e.printStackTrace(); } } }import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; public final class mysparktest { private static final Pattern SPACE = Pattern. compile(" "); public static void main(String[] args) throws Exception { String uri = "hdfs://Master:9000/unit/xferlog"; String uro = "hdfs://Master:9000/unit/xferlog1"; // if (args.length < 1) { // System.err.println("Usage: JavaWordCount "); // System.exit(1); // } //建立SparkConf,包含application的相關資訊 SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount"); //建立一個JavaSparkContext物件 JavaSparkContext sc = new JavaSparkContext(sparkConf); //textFile()方法可將本地檔案或HDFS檔案轉換成RDD,讀取本地檔案需要各節點上都存在,或者通過網路共享該檔案 //讀取一行 JavaRDD lines = sc.textFile(uri, 1); //flatMap與map的區別是,對每個輸入,flatMap會生成一個或多個的輸出,而map只是生成單一的輸出 //用空格分割各個單詞,輸入一行,輸出多個物件,所以用flatMap JavaRDD words = lines.flatMap( new FlatMapFunction() { @Override public Iterable call(String s) { return Arrays.asList(SPACE.split(s)); } }); //對每個單詞生成key-value對,PairFunction //表示輸入型別為T,生成的key-value對中的key型別為k,value型別為v,對本例,T=String, K=String, V=Integer(計數) //重寫scala的Tupple2方法 JavaPairRDD ones = words.mapToPair( new PairFunction() { @Override //scala.Tuple2 call(T t) //Tuple2為scala中的一個物件,call方法的輸入引數為T,即輸入一個單詞s,新的Tuple2物件的key為這個單詞,計數為1 public Tuple2 call(String s) { return new Tuple2(s, 1); } }); //呼叫reduceByKey方法,按key值進行reduce //呼叫Function2物件,Function2 //輸入兩個引數,T1,T2,返回R //若ones有<"one", 1>, <"one", 1>,會根據"one"將相同的pair單詞個數進行統計,輸入為Integer,輸出也為Integer //輸出<"one", 2> JavaPairRDD counts = ones. reduceByKey(new Function2() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //將結果儲存到HDFS中 counts.saveAsTextFile(uro); //collect返回一個包含RDD內所有元素的Array List> output = counts.collect(); for (Tuple2<?, ?> tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } sc.stop(); } }importorg.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; public final class mysparktest { private static final Pattern SPACE = Pattern. compile(" "); public static void main(String[] args) throws Exception { String uri = "hdfs://Master:9000/unit/xferlog"; String uro = "hdfs://Master:9000/unit/xferlog1" ; SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); //也可以使用ctx獲取環境變數,例如下面的語句 System.out.println("spark home:"+ctx.getSparkHome()); //一次一行,String型別 ,還有 hadoopfile,sequenceFile什麼的 ,可以直接用sc.textFile("path") JavaRDD lines = ctx.textFile(uri, 1); //java.lang.String path, int minSplits lines.cache(); //cache,暫時放在快取中,一般用於哪些可能需要多次使用的RDD,據說這樣會減少執行時間 //collect方法,用於將RDD型別轉化為java基本型別,如下 List line = lines.collect(); for(String val:line) System. out.println(val); //下面這些也是RDD的常用函式 // lines.collect(); List // lines.union(); javaRDD // lines.top(1); List // lines.count(); long // lines.countByValue(); /** * filter test * 定義一個返回 bool型別的函式,spark執行filter的時候會過濾掉那些返回只為false的資料 * String s,中的變數s可以認為就是變數lines(lines可以理解為一系列的String型別資料)的每一條資料 */ JavaRDD contaninsE = lines.filter( new Function() { public Boolean call(String s) throws Exception { return (s.contains( "passwd")); } }); System.out.println("--------------next filter's result------------------"); line = contaninsE.collect(); for(String val:line) System. out.println(val); /** * sample test * sample函式使用很簡單,用於對資料進行抽樣 * 引數為:withReplacement: Boolean, fraction: Double, seed: Int * */ JavaRDD sampletest = lines.sample( false,0.1,5); System.out.println("-------------next sample-------------------"); line = sampletest.collect(); for(String val:line) System. out.println(val); /** * * new FlatMapFunction兩個string分別代表輸入和輸出型別 * Override的call方法需要自己實現一個轉換的方法,並返回一個 Iterable的結構 * * flatmap屬於一類非常常用的spark函式,簡單的說作用就是將一條 rdd資料使用你定義的函式給分解成多條 rdd資料 * 例如,當前狀態下,lines這個 rdd型別的變數中,每一條資料都是一行String,我們現在想把他拆分成1個個的詞的話, * 可以這樣寫 : */ JavaRDD words = lines.flatMap( new FlatMapFunction() { @Override public Iterable call(String s) { String[] words=s.split( " "); return Arrays. asList(words); } }); /** * map 鍵值對 ,類似於MR的map方法 * pairFunction: T:輸入型別;K,V:輸出鍵值對 * 需要重寫call方法實現轉換 */ JavaPairRDD ones = words.mapToPair( new PairFunction() { @Override public Tuple2 call(String s) { return new Tuple2(s, 1); } }); //A two-argument function that takes arguments // of type T1 and T2 and returns an R. /** * reduceByKey方法,類似於MR的reduce * 要求被操作的資料(即下面例項中的ones)是KV鍵值對形式,該方法會按照key相同的進行聚合,在兩兩運算 */ JavaPairRDD counts = ones.reduceByKey( new Function2() { @Override public Integer call(Integer i1, Integer i2) { //reduce階段,key相同的value怎麼處理的問題 return i1 + i2; } }); //備註:spark也有reduce方法,輸入資料是RDD型別就可以,不需要鍵值對, // reduce方法會對輸入進來的所有資料進行兩兩運算 /** * sort,顧名思義,排序 */ JavaPairRDD sort = counts.sortByKey(); System.out.println("----------next sort----------------------"); /** * collect方法其實之前已經出現了多次,該方法用於將spark的RDD型別轉化為我們熟知的java常見型別 */ List> output = sort.collect(); for (Tuple2<?,?> tuple : output) { System. out.println(tuple. _1 + ": " + tuple._2()); } /** * 儲存函式,資料輸出,spark為結果輸出提供了很多介面 */ // sort.saveAsTextFile( uro); // sort.saveAsNewAPIHadoopFile(); // sort.saveAsHadoopFile(); System.exit(0); } }