1. 程式人生 > >記spark過程中Scala多執行緒小問題

記spark過程中Scala多執行緒小問題

這次更改ThriftServer原始碼,加了些業務,中間遇到這樣一個問題,非同步提交任務的時候想做成多執行緒,剛開始是使用的scala的Actor,傳遞了SQLContext和sql,發現每次sparkSessionId在一直變化,每次提交和觸發Action之後產生的sessionId都不一致,這是怎麼回事,後來才發現是多執行緒非同步的問題,傳遞sqlContext線上程那邊執行任務的時候會重新觸發一個會話,那可怎麼辦呢,只能用以下方式實現了

java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Callable<Void>() {
@Override
public Void call(){
        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class);
        return null;
}
});
executorService.submit(new Callable<Void>() {
@Override
public Void call(){
        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class);
        return null;
}
});

executorService.shutdown();

在當前方法內部使用context變數就可以了

當然為了方便大家學習,另外常見的寫法如下:

import java.util.concurrent.{Executors, ExecutorService}

 object Test {
     def main(args: Array[String]) {
//建立執行緒池
val threadPool:ExecutorService=Executors.newFixedThreadPool(5)
try {
//提交5個執行緒
for(i <- 1 to 5){
//threadPool.submit(new ThreadDemo("thread"+i))
threadPool.execute(new ThreadDemo("thread"+i))
             }
         }finally {
             threadPool.shutdown()
         }
     }

//定義執行緒類,每列印一次睡眠100毫秒
class ThreadDemo(threadName:String) extends Runnable{
         override def run(){
for(i <- 1 to 10){
                 println(threadName+"|"+i)
                 Thread.sleep(100)
             }
         }
     }
 }

Callable示例

import java.util.concurrent.{Callable, FutureTask, Executors, ExecutorService}

object Test {
  def main(args: Array[String]) {
    val threadPool:ExecutorService=Executors.newFixedThreadPool(3)
    try {
      val future=new FutureTask[String](new Callable[String] {
        override def call(): String = {
          Thread.sleep(100)
          return "im result"
        }
      })
      threadPool.execute(future)
      println(future.get())
    }finally {
      threadPool.shutdown()
    }
  }
}