原 薦 Ignite整合Spark之IgniteRDD
本系列共兩篇文章,會探討如何將Ignite和Spark進行整合。
Ignite是一個分散式的記憶體資料庫、快取和處理平臺,為事務型、分析型和流式負載而設計,在保證擴充套件性的前提下提供了記憶體級的效能。 Spark是一個流式資料和計算引擎,通常從HDFS或者其他儲存中獲取資料,一直以來,他都傾向於OLAP型業務,並且聚焦於MapReduce型別負載。
因此,這兩種技術是可以互補的。
將Ignite與Spark整合
整合這兩種技術會為Spark使用者帶來若干明顯的好處:
- 通過避免大量的資料移動,獲得真正可擴充套件的記憶體級效能;
- 提高RDD、DataFrame和SQL的效能;
- 在Spark作業之間更方便地共享狀態和資料。
下圖中顯示瞭如何整合這兩種技術,並且標註了顯著的優勢: 在本系列的第一篇文章中會聚焦於Ignite RDD,在第二篇文章中會聚焦於Ignite DataFrame。
Ignite RDD
Ignite提供了一個SparkRDD的實現,叫做IgniteRDD,這個實現可以在記憶體中跨Spark作業共享任何資料和狀態,IgniteRDD為Ignite中相同的記憶體資料提供了一個共享的、可變的檢視,它可以跨多個不同的Spark作業、工作節點或者應用,相反,原生的SparkRDD無法在Spark作業或者應用之間進行共享。
IgniteRDD作為Ignite分散式快取的檢視,既可以在Spark作業執行程序中部署,也可以在Spark工作節點中部署,也可以在它自己的叢集中部署。因此,根據預配置的部署模型,狀態共享既可以只存在於一個Spark應用的生命週期的內部(嵌入式模式),或者也可以存在於Spark應用的外部(獨立模式)。
Ignite還可以幫助Spark使用者提高SQL的效能,雖然SparkSQL支援豐富的SQL語法,但是它沒有實現索引。從結果上來說,即使在普通的較小的資料集上,Spark查詢也可能花費幾分鐘的時間,因為需要進行全表掃描。如果使用Ignite,Spark使用者可以配置主索引和二級索引,這樣可以帶來上千倍的效能提升。
IgniteRDD示例
下面通過一些程式碼以及建立若干應用的方式,演示如何使用IgniteRDD以及看到它的好處,程式碼的完整版本,可以從 ofollow,noindex" target="_blank">GitHub 上進行下載。
程式碼共包括兩個簡單的Scala應用和兩個Java應用。這是為了說明可以使用多種語言來訪問Ignite RDD,這在使用不同程式語言和框架的組織中可能存在這樣的場景。此外,會從兩個不同的環境執行應用:從終端執行Scala應用以及通過IDE執行Java應用。作為一個花絮,還會在Java應用程式中執行一些SQL程式碼。
對於Scala應用,一個應用會用於往IgniteRDD中寫入部分資料,而另一個應用會執行部分過濾然後結果集。使用Maven將程式碼構建為一個jar檔案後在終端視窗中執行這個程式,下面是詳細的程式碼:
object RDDWriter extends App { val conf = new SparkConf().setAppName("RDDWriter") val sc = new SparkContext(conf) val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml") val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD") sharedRDD.savePairs(sc.parallelize(1 to 1000, 10).map(i => (i, i))) ic.close(true) sc.stop() } object RDDReader extends App { val conf = new SparkConf().setAppName("RDDReader") val sc = new SparkContext(conf) val ic = new IgniteContext(sc, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml") val sharedRDD: IgniteRDD[Int, Int] = ic.fromCache("sharedRDD") val greaterThanFiveHundred = sharedRDD.filter(_._2 > 500) println("The count is " + greaterThanFiveHundred.count()) ic.close(true) sc.stop() }
在這個Scala的RDDWriter中,首先建立了包含應用名的 SparkConf
,之後基於這個配置建立了 SparkContext
,最後,根據這個 SparkContext
建立一個 IgniteContext
。建立 IgniteContext
有很多種方法,本例中會使用一個叫做 example-shared-rdd.xml
的XML檔案,該檔案會結合Ignite發行版然後根據需求進行了預配置。顯然,需要根據自己的環境修改路徑(Ignite主目錄),之後指定IgniteRDD持有的整數值元組,最後,將從1到1000的整數值存入IgniteRDD,數值的儲存使用了10個parallel操作。
在這個Scala的RDDReader中,初始化和配置與Scala RDDWriter相同,也會使用同一個xml配置檔案,應用會執行部分過濾,然後關注儲存了多少大於500的值,答案最後會輸出出來。
關於 IgniteContext
和 IgniteRDD
的更多資訊,可以看Ignite的 文件 。
要構建jar檔案,可以使用下面的maven命令:
mvn clean install
接下來,看下Java程式碼,先寫一個Java應用往IgniteRDD中寫入多個元組,然後另一個應用會執行部分過濾然後返回結果集,下面是RDDWriter的程式碼細節:
public class RDDWriter { public static void main(String args[]) { SparkConf sparkConf = new SparkConf() .setAppName("RDDWriter") .setMaster("local") .set("spark.executor.instances", "2"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>( sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true); JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD"); List<Integer> data = new ArrayList<>(20); for (int i = 1001; i <= 1020; i++) { data.add(i); } JavaRDD<Integer> javaRDD = sparkContext.<Integer>parallelize(data); sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() { public Tuple2<Integer, Integer> call(Integer val) throws Exception { return new Tuple2<Integer, Integer>(val, val); } })); igniteContext.close(true); sparkContext.close(); } }
在這個Java的RDDWriter中,首先建立了包含應用名和執行器數量的 SparkConf
,之後基於這個配置建立了 SparkContext
,最後,根據這個 SparkContext
建立一個 IgniteContext
。建立 IgniteContext
有很多種方法,本例中會使用一個叫做 example-shared-rdd.xml
的XML檔案,該檔案會結合Ignite發行版然後根據需求進行了預配置。顯然,需要根據自己的環境修改路徑(Ignite主目錄),最後,往IgniteRDD中添加了額外的20個值。
在這個Java的RDDReader中,初始化和配置與Java RDDWriter相同,也會使用同一個xml配置檔案,應用會執行部分過濾,然後關注儲存了多少大於500的值,答案最後會輸出出來,下面是Java RDDReader的程式碼:
public class RDDReader { public static void main(String args[]) { SparkConf sparkConf = new SparkConf() .setAppName("RDDReader") .setMaster("local") .set("spark.executor.instances", "2"); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); Logger.getRootLogger().setLevel(Level.OFF); Logger.getLogger("org.apache.ignite").setLevel(Level.OFF); JavaIgniteContext<Integer, Integer> igniteContext = new JavaIgniteContext<Integer, Integer>( sparkContext, "/path_to_ignite_home/examples/config/spark/example-shared-rdd.xml", true); JavaIgniteRDD<Integer, Integer> sharedRDD = igniteContext.<Integer, Integer>fromCache("sharedRDD"); JavaPairRDD<Integer, Integer> greaterThanFiveHundred = sharedRDD.filter(new Function<Tuple2<Integer, Integer>, Boolean>() { public Boolean call(Tuple2<Integer, Integer> tuple) throws Exception { return tuple._2() > 500; } }); System.out.println("The count is " + greaterThanFiveHundred.count()); System.out.println(">>> Executing SQL query over Ignite Shared RDD..."); Dataset df = sharedRDD.sql("select _val from Integer where _val > 10 and _val < 100 limit 10"); df.show(); igniteContext.close(true); sparkContext.close(); } }
最後,馬上就可以對程式碼進行測試了。
執行這個應用
在第一個終端視窗中,啟動Spark的主節點,如下:
$SPARK_HOME/sbin/start-master.sh
在第二個終端視窗中,啟動Spark工作節點,如下:
$SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker spark://ip:port
根據自己的環境,修改IP地址和埠號(ip:port)。
在第三個終端視窗中,啟動一個Ignite節點,如下:
$IGNITE_HOME/bin/ignite.sh examples/config/spark/example-shared-rdd.xml
這裡使用了之前討論過的 example-shared-rdd.xml
檔案。
在第四個終端視窗中,可以執行Scala版的RDDWriter應用,如下:
$SPARK_HOME/bin/spark-submit --class "com.gridgain.RDDWriter" --master spark://ip:port "/path_to_jar_file/ignite-spark-scala-1.0.jar"
根據自己的環境修改IP地址和埠(ip:port),以及jar檔案的路徑(/path_to_jar_file)。
會產生如下的輸出:
The count is 500
這是我們期望的值。
接下來,殺掉Spark的主節點和工作節點,而Ignite節點仍然在執行中並且IgniteRDD對於其他應用仍然可用,下面會使用IDE通過Java應用接入IgniteRDD。
執行Java版RDDWriter會擴充套件之前儲存於IgniteRDD中的元組列表,通過執行Java版RDDReader可以進行測試,它會產生如下的輸出:
The count is 520
這也是我們期望的。
最後,SQL查詢會在IgniteRDD中執行一個SELECT語句,返回範圍在10到100之間的最初10個值,輸出如下:
+----+ |_VAL| +----+ |11| |12| |13| |14| |15| |16| |17| |18| |19| |20| +----+
結果正確。
總結
本文中,看到了如何從多個環境中使用多個程式語言輕鬆地訪問IgniteRDD。可以對IgniteRDD進行資料的讀寫,並且即使Spark已經關閉狀態也通過Ignite得以保持,因此可以看到,這為Spark使用者帶來了很大的靈活性和好處。
在本系列的下一篇文章中,會看到Ignite和Spark整合之後的Ignite DataFrames及其優勢。