spark讀取es資料
阿新 • • 發佈:2018-11-16
spark-2.0.2
scala-2.11.8
<!-- https://mvnrepository.com/artifact/org.webjars.npm/spark-md5 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20090211</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.7</version> <scope>test</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.13.Final</version> </dependency>
import org.elasticsearch.spark._ //esRDD def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF) val conf: SparkConf = new SparkConf().setMaster("local").setAppName("aaa") conf.set("cluster.name", "name") conf.set("es.nodes", "ip") conf.set("es.port", "port") conf.set("es.scroll.size", "10000") conf.set("spark.broadcast.compress", "true") // 設定廣播壓縮 conf.set("spark.rdd.compress", "true") // 設定RDD壓縮 conf.set("spark.io.compression.codec", "org.apache.spark.io.LZFCompressionCodec") conf.set("spark.shuffle.file.buffer", "1280k") conf.set("spark.reducer.maxSizeInFlight", "1024m") conf.set("spark.es.nodes.wan.only", "false") conf.set("spark.reducer.maxMblnFlight", "1024m") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("index.mapper.dynamic","false") val sc=new SparkContext(conf) val read=sc.esRDD("index/type") //讀取 EsSpark.saveToEs(read,"index1/type1") //寫入 sc.stop() }