1. 程式人生 > >java編寫spark程式

java編寫spark程式

importjava.net.URI; 
import java.util.Arrays; 
import java.io.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.fs.FileSystem; 
 
public class mysparktest { 
    public static void main(String[] args) throws IOException{ 
        String uri = "hdfs://192.168.217.132:9000/unit/xferlog"; 
        String uro = "hdfs://192.168.217.132:9000/unit/xferlogoutput" ; 
        Configuration conf = new Configuration(); 
        try { 
             //開啟檔案系統
            FileSystem fs = FileSystem. get(URI.create (uri), conf); 
            //開啟檔案輸入流
            FSDataInputStream in = fs.open( new Path(uri)); 
            //檔案讀取 
            byte[] ioBuffer = new byte[1024];   
            int readLen = in.read(ioBuffer);       
            while(readLen!=-1)   
            {   
                readLen = in.read(ioBuffer);   
            }   
            String str = new String(ioBuffer); 
            int cnt=0; 
            while(( int)(str.charAt(cnt)) != 0)cnt++; 
            str=str.substring(0, cnt); 
            System. out.println(str); 
            in.close();   
            //檔案的刪除 
            fs.delete( new Path(uri), true); 
            //寫入到新檔案中
            FSDataOutputStream out = fs.create( new Path(uro));           
            out.write(( "new1"+str).getBytes( "UTF-8")); 
            out.flush(); 
            out. sync(); 
            out.close(); 
            fs.close(); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
         
    } 
}import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import scala.Tuple2; 
 
 
import java.util.Arrays; 
import java.util.List; 
import java.util.regex.Pattern; 
 
 
public final class mysparktest { 
  private static final Pattern SPACE = Pattern. compile(" "); 
 
 
  public static void main(String[] args) throws Exception { 
    String uri = "hdfs://Master:9000/unit/xferlog";
    String uro = "hdfs://Master:9000/unit/xferlog1";
      
//    if (args.length < 1) { 
//      System.err.println("Usage: JavaWordCount "); 
//      System.exit(1); 
//    } 
 
 
    //建立SparkConf,包含application的相關資訊 
    SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount"); 
    //建立一個JavaSparkContext物件 
    JavaSparkContext sc = new JavaSparkContext(sparkConf); 
    //textFile()方法可將本地檔案或HDFS檔案轉換成RDD,讀取本地檔案需要各節點上都存在,或者通過網路共享該檔案 
    //讀取一行 
    JavaRDD lines = sc.textFile(uri, 1); 


    //flatMap與map的區別是,對每個輸入,flatMap會生成一個或多個的輸出,而map只是生成單一的輸出 
    //用空格分割各個單詞,輸入一行,輸出多個物件,所以用flatMap 
    JavaRDD words = lines.flatMap( new FlatMapFunction() { 
      @Override 
      public Iterable call(String s) { 
        return Arrays.asList(SPACE.split(s)); 
      } 
    }); 
    //對每個單詞生成key-value對,PairFunction 
    //表示輸入型別為T,生成的key-value對中的key型別為k,value型別為v,對本例,T=String, K=String, V=Integer(計數) 
    //重寫scala的Tupple2方法 
    JavaPairRDD ones = words.mapToPair( new PairFunction() { 
      @Override 
      //scala.Tuple2 call(T t) 
      //Tuple2為scala中的一個物件,call方法的輸入引數為T,即輸入一個單詞s,新的Tuple2物件的key為這個單詞,計數為1 
      public Tuple2 call(String s) { 
        return new Tuple2(s, 1); 
      } 
    }); 
    //呼叫reduceByKey方法,按key值進行reduce 
    //呼叫Function2物件,Function2 
    //輸入兩個引數,T1,T2,返回R 
    //若ones有<"one", 1>, <"one", 1>,會根據"one"將相同的pair單詞個數進行統計,輸入為Integer,輸出也為Integer 
    //輸出<"one", 2> 
    JavaPairRDD counts = ones. reduceByKey(new Function2() { 
      @Override 
      public Integer call(Integer i1, Integer i2) { 
        return i1 + i2; 
      } 
    }); 
    //將結果儲存到HDFS中 
    counts.saveAsTextFile(uro); 
    //collect返回一個包含RDD內所有元素的Array 
    List> output = counts.collect(); 
    for (Tuple2<?, ?> tuple : output) { 
      System.out.println(tuple._1() + ": " + tuple._2()); 
    } 
    sc.stop(); 
  } 
}importorg.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
 
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
 
 
public final class mysparktest {
  private static final Pattern SPACE = Pattern. compile(" ");
 
 
  public static void main(String[] args) throws Exception {
        String uri = "hdfs://Master:9000/unit/xferlog";
      String uro = "hdfs://Master:9000/unit/xferlog1" ;

      SparkConf sparkConf = new SparkConf().setAppName( "JavaWordCount");
      JavaSparkContext ctx = new JavaSparkContext(sparkConf);


      //也可以使用ctx獲取環境變數,例如下面的語句 
      System.out.println("spark home:"+ctx.getSparkHome()); 

       //一次一行,String型別    ,還有 hadoopfile,sequenceFile什麼的  ,可以直接用sc.textFile("path") 
      JavaRDD lines = ctx.textFile(uri, 1);  //java.lang.String path, int minSplits 
      lines.cache();   //cache,暫時放在快取中,一般用於哪些可能需要多次使用的RDD,據說這樣會減少執行時間 


      //collect方法,用於將RDD型別轉化為java基本型別,如下 
      List line = lines.collect(); 
      for(String val:line) 
              System. out.println(val); 


     //下面這些也是RDD的常用函式 
     // lines.collect();  List 
     // lines.union();     javaRDD 
     // lines.top(1);     List 
     // lines.count();      long 
     // lines.countByValue(); 


      /**
       *   filter test
       *   定義一個返回 bool型別的函式,spark執行filter的時候會過濾掉那些返回只為false的資料
       *   String s,中的變數s可以認為就是變數lines(lines可以理解為一系列的String型別資料)的每一條資料
       */  
      JavaRDD contaninsE = lines.filter( new Function() { 
      
          public Boolean call(String s) throws Exception { 
             return (s.contains( "passwd")); 
          } 
      }); 
     
      System.out.println("--------------next filter's  result------------------"); 
      line = contaninsE.collect(); 
      for(String val:line) 
          System. out.println(val); 


      /**
       * sample test
       * sample函式使用很簡單,用於對資料進行抽樣
       * 引數為:withReplacement: Boolean, fraction: Double, seed: Int
       *
       */  


      JavaRDD sampletest = lines.sample( false,0.1,5); 
      System.out.println("-------------next sample-------------------"); 
      line = sampletest.collect(); 
      for(String val:line) 
          System. out.println(val); 

      /**
       *
       * new FlatMapFunction兩個string分別代表輸入和輸出型別
       * Override的call方法需要自己實現一個轉換的方法,並返回一個 Iterable的結構
       *
       * flatmap屬於一類非常常用的spark函式,簡單的說作用就是將一條 rdd資料使用你定義的函式給分解成多條 rdd資料
       * 例如,當前狀態下,lines這個 rdd型別的變數中,每一條資料都是一行String,我們現在想把他拆分成1個個的詞的話,
       * 可以這樣寫 :
       */  


      JavaRDD words = lines.flatMap( new FlatMapFunction() { 
          @Override 
          public Iterable call(String s) { 
               String[] words=s.split( " "); 
                return Arrays. asList(words); 
          } 
      }); 

      /**
       * map 鍵值對 ,類似於MR的map方法
       * pairFunction: T:輸入型別;K,V:輸出鍵值對
       * 需要重寫call方法實現轉換
       */  
      JavaPairRDD ones = words.mapToPair( new PairFunction() { 
          @Override 
          public Tuple2 call(String s) { 
              return new Tuple2(s, 1); 
          } 
      }); 


      //A two-argument function that takes arguments 
      // of type T1 and T2 and returns an R. 
      /**
       *  reduceByKey方法,類似於MR的reduce
       *  要求被操作的資料(即下面例項中的ones)是KV鍵值對形式,該方法會按照key相同的進行聚合,在兩兩運算
       */  
      JavaPairRDD counts = ones.reduceByKey( new Function2() { 
          @Override 
          public Integer call(Integer i1, Integer i2) {  //reduce階段,key相同的value怎麼處理的問題 
              return i1 + i2; 
          } 
      }); 


      //備註:spark也有reduce方法,輸入資料是RDD型別就可以,不需要鍵值對, 
      // reduce方法會對輸入進來的所有資料進行兩兩運算 

      /**
       * sort,顧名思義,排序
       */  
      JavaPairRDD sort = counts.sortByKey(); 
      System.out.println("----------next sort----------------------"); 



      /**
       * collect方法其實之前已經出現了多次,該方法用於將spark的RDD型別轉化為我們熟知的java常見型別
       */  
      List> output = sort.collect(); 
      for (Tuple2<?,?> tuple : output) { 
          System. out.println(tuple. _1 + ": " + tuple._2()); 
       
      } 


      /**
       * 儲存函式,資料輸出,spark為結果輸出提供了很多介面
       */  
//      sort.saveAsTextFile( uro); 

     // sort.saveAsNewAPIHadoopFile(); 
    //  sort.saveAsHadoopFile();
     
      System.exit(0); 
  }
}