1. 程式人生 > >spark與spring整合做web介面

spark與spring整合做web介面

需要實現的功能:

寫訪問spark的介面,也就是從web上輸入網址就能把我們需要的資訊通過提交一個job然後返回給我們json資料。

成果展示:

通過url請求,然後的到一個wordcount的json結果(藉助的是谷歌瀏覽器postman外掛顯示的,直接在瀏覽器上輸入網址是一樣的效果)
wordcount效果圖

使用的關鍵技術:

java語言程式設計,springmvc框架,tomcat容器,spark框架,scala相關依賴

成體架構:

maven構建的web工程

我使用的是maven構建的一個web工程,pom檔案如下:

  <dependencies>
    <!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>1.6.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>1.6.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.11</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-reflect --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.11</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-compiler --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.11</version> </dependency> <!-- spring框架的相關jar包 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.4.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework/spring-jdbc --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>4.3.4.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework/spring-web --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>4.3.4.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework/spring-webmvc --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.3.4.RELEASE</version> </dependency> <!--新增持久層框架(mybatise)--> <!-- https://mvnrepository.com/artifact/org.mybatis/mybatis --> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>3.4.1</version> </dependency> <!--mybatise和spring整合包--> <!-- https://mvnrepository.com/artifact/org.mybatis/mybatis-spring --> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis-spring</artifactId> <version>1.3.0</version> </dependency> <!-- --> <dependency> <groupId>commons-DBCP</groupId> <artifactId>commons-DBCP</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.8.9</version> </dependency> <!--新增連線池的jar包--> <!-- https://mvnrepository.com/artifact/com.alibaba/druid --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.18</version> </dependency> <!--新增資料庫驅動--> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.39</version> </dependency> <!-- 日誌處理 --> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> <!-- https://mvnrepository.com/artifact/log4j/log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!--json相關的依賴,不要使用jackson的依賴--> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency> </dependencies>

web.xml的配置(這裡只配置了springmvc容器)

<?xml version="1.0" encoding="UTF-8"?>

<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" id="WebApp_ID" version="3.0">

  <display-name>Archetype Created Web Application</display-name>
  <!-- springmvc的前端控制器 -->
  <servlet>
    <servlet-name>manager</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <!-- contextConfigLocation不是必須的, 如果不配置contextConfigLocation, springmvc的配置檔案預設在:WEB-INF/servlet的name+"-servlet.xml" -->
    <init-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:springmvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
  </servlet>
  <servlet-mapping>
    <servlet-name>manager</servlet-name>
    <url-pattern>/</url-pattern>
  </servlet-mapping>

  <!-- 解決post亂碼 -->
  <filter>
    <filter-name>CharacterEncodingFilter</filter-name>
    <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
    <init-param>
      <param-name>encoding</param-name>
      <param-value>utf-8</param-value>
    </init-param>
  </filter>
  <filter-mapping>
    <filter-name>CharacterEncodingFilter</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>

  <!-- 日誌配置 -->
  <context-param>
    <param-name>log4jConfigLocation</param-name>
    <param-value>classpath:log4j.properties</param-value>
  </context-param>
  <listener>
    <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
  </listener>
</web-app>

然後就是springMVC的配置檔案

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <!-- 配置包掃描器 -->
    <context:component-scan base-package="com.zzrenfeng.zhsx.controller" />
      <!-- 配置註解驅動 -->
    <mvc:annotation-driven />


    <context:component-scan base-package="com.zzrenfeng.zhsx.service"></context:component-scan>
    <context:component-scan base-package="com.zzrenfeng.zhsx.spark.service"></context:component-scan>
    <context:component-scan base-package="com.zzrenfeng.zhsx.spark.conf"></context:component-scan>
</beans>

配置檔案就就沒有了,如果有需要可以再去整合其他的,下面進入編碼的介紹

物件和json相互轉換的工具類:

(為什麼使用手動的去轉換,而沒有使用jackson的相關依賴進行自動轉換,是我在使用的時候發現使用jackson會對咱們的spark作業有影響,spark作業會異常終止掉)

package com.zzrenfeng.zhsx.util;
import java.lang.reflect.Field;  
import java.util.ArrayList;  
import java.util.HashMap;  
import java.util.Iterator;  
import java.util.List;  
import java.util.Map;  
import java.util.Set;  

import net.sf.json.JSONArray;  
import net.sf.json.JSONObject;  
import net.sf.json.JsonConfig;  

/** 
 * Json與javaBean之間的轉換工具類 
 *  
 * @author 
 * @version
 *  
 * {@code   現使用json-lib元件實現 
 *          需要 
 *              json-lib-2.4-jdk15.jar 
 *              ezmorph-1.0.6.jar 
 *              commons-collections-3.1.jar 
 *              commons-lang-2.0.jar 
 *          支援 
 * } 
 */  
public class JsonUtil {  


    /**   
     * 從一個JSON 物件字元格式中得到一個java物件   
     *    
     * @param jsonString   
     * @param beanCalss   
     * @return   
     */  
    @SuppressWarnings("unchecked")  
    public static <T> T jsonToBean(String jsonString, Class<T> beanCalss) {  

        JSONObject jsonObject = JSONObject.fromObject(jsonString);  
        T bean = (T) JSONObject.toBean(jsonObject, beanCalss);  

        return bean;  

    }  

    /**   
     * 將java物件轉換成json字串   
     * 
     * @param bean   
     * @return   
     */  
    public static String beanToJson(Object bean) {  

        JSONObject json = JSONObject.fromObject(bean);  

        return json.toString();  

    }  

    /**   
     * 將java物件轉換成json字串   
     * 
     * @param bean   
     * @return   
     */  
    public static String beanToJson(Object bean, String[] _nory_changes, boolean nory) {  

        JSONObject json = null;  

        if(nory){//轉換_nory_changes裡的屬性  

            Field[] fields = bean.getClass().getDeclaredFields();  
            String str = "";  
            for(Field field : fields){  
//              System.out.println(field.getName());  
                str+=(":"+field.getName());  
            }  
            fields = bean.getClass().getSuperclass().getDeclaredFields();  
            for(Field field : fields){  
//              System.out.println(field.getName());  
                str+=(":"+field.getName());  
            }  
            str+=":";  
            for(String s : _nory_changes){  
                str = str.replace(":"+s+":", ":");  
            }  
            json = JSONObject.fromObject(bean,configJson(str.split(":")));  

        }else{//轉換除了_nory_changes裡的屬性  



            json = JSONObject.fromObject(bean,configJson(_nory_changes));  
        }  



        return json.toString();  

    }  
     private static JsonConfig configJson(String[] excludes) {     

                JsonConfig jsonConfig = new JsonConfig();     

                jsonConfig.setExcludes(excludes);     
//  
                jsonConfig.setIgnoreDefaultExcludes(false);     
//  
//              jsonConfig.setCycleDetectionStrategy(CycleDetectionStrategy.LENIENT);     

//              jsonConfig.registerJsonValueProcessor(Date.class,     
//  
//                  new DateJsonValueProcessor(datePattern));     



                return jsonConfig;     

            }    





    /** 
     * 將java物件List集合轉換成json字串   
     * @param beans 
     * @return 
     */  
    @SuppressWarnings("unchecked")  
    public static String beanListToJson(List beans) {  

        StringBuffer rest = new StringBuffer();  

        rest.append("[");  

        int size = beans.size();  

        for (int i = 0; i < size; i++) {  

            rest.append(beanToJson(beans.get(i))+((i<size-1)?",":""));  

        }  

        rest.append("]");  

        return rest.toString();  

    }  

    /** 
     *  
     * @param beans 
     * @param _no_changes 
     * @return 
     */  
    @SuppressWarnings("unchecked")  
    public static String beanListToJson(List beans, String[] _nory_changes, boolean nory) {  

        StringBuffer rest = new StringBuffer();  

        rest.append("[");  

        int size = beans.size();  

        for (int i = 0; i < size; i++) {  
            try{  
                rest.append(beanToJson(beans.get(i),_nory_changes,nory));  
                if(i<size-1){  
                    rest.append(",");  
                }  
            }catch(Exception e){  
                e.printStackTrace();  
            }  
        }  

        rest.append("]");  

        return rest.toString();  

    }  

    /**   
     * 從json HASH表示式中獲取一個map,改map支援巢狀功能   
     * 
     * @param jsonString   
     * @return   
     */  
    @SuppressWarnings({ "unchecked" })  
    public static Map jsonToMap(String jsonString) {  

        JSONObject jsonObject = JSONObject.fromObject(jsonString);  
        Iterator keyIter = jsonObject.keys();  
        String key;  
        Object value;  
        Map valueMap = new HashMap();  

        while (keyIter.hasNext()) {  

            key = (String) keyIter.next();  
            value = jsonObject.get(key).toString();  
            valueMap.put(key, value);  

        }  

        return valueMap;  
    }  

    /** 
     * map集合轉換成json格式資料 
     * @param map 
     * @return 
     */  
    public static String mapToJson(Map<String, ?> map, String[] _nory_changes, boolean nory){  

        String s_json = "{";  

         Set<String> key = map.keySet();  
         for (Iterator<?> it = key.iterator(); it.hasNext();) {  
             String s = (String) it.next();  
             if(map.get(s) == null){  

             }else if(map.get(s) instanceof List<?>){  
                 s_json+=(s+":"+JsonUtil.beanListToJson((List<?>)map.get(s), _nory_changes, nory));  

             }else{  
                 JSONObject json = JSONObject.fromObject(map);  
                 s_json += (s+":"+json.toString());;  
             }  

             if(it.hasNext()){  
                 s_json+=",";  
             }  
        }  

         s_json+="}";  
        return s_json;   
    }  

    /**   
     * 從json陣列中得到相應java陣列   
     * 
     * @param jsonString   
     * @return   
     */  
    public static Object[] jsonToObjectArray(String jsonString) {  

        JSONArray jsonArray = JSONArray.fromObject(jsonString);  

        return jsonArray.toArray();  

    }  

    public static String listToJson(List<?> list) {  

        JSONArray jsonArray = JSONArray.fromObject(list);  

        return jsonArray.toString();  

    }  

    /**   
     * 從json物件集合表示式中得到一個java物件列表   
     * 
     * @param jsonString   
     * @param beanClass   
     * @return   
     */  
    @SuppressWarnings("unchecked")  
    public static <T> List<T> jsonToBeanList(String jsonString, Class<T> beanClass) {  

        JSONArray jsonArray = JSONArray.fromObject(jsonString);  
        JSONObject jsonObject;  
        T bean;  
        int size = jsonArray.size();  
        List<T> list = new ArrayList<T>(size);  

        for (int i = 0; i < size; i++) {  

            jsonObject = jsonArray.getJSONObject(i);  
            bean = (T) JSONObject.toBean(jsonObject, beanClass);  
            list.add(bean);  

        }  

        return list;  

    }  

    /**   
     * 從json陣列中解析出java字串陣列   
     * 
     * @param jsonString   
     * @return   
     */  
    public static String[] jsonToStringArray(String jsonString) {  

        JSONArray jsonArray = JSONArray.fromObject(jsonString);  
        String[] stringArray = new String[jsonArray.size()];  
        int size = jsonArray.size();  

        for (int i = 0; i < size; i++) {  

            stringArray[i] = jsonArray.getString(i);  

        }  

        return stringArray;  
    }  

    /**   
     * 從json陣列中解析出javaLong型物件陣列   
     * 
     * @param jsonString   
     * @return   
     */  
    public static Long[] jsonToLongArray(String jsonString) {  

        JSONArray jsonArray = JSONArray.fromObject(jsonString);  
        int size = jsonArray.size();  
        Long[] longArray = new Long[size];  

        for (int i = 0; i < size; i++) {  

            longArray[i] = jsonArray.getLong(i);  

        }  

        return longArray;  

    }  

    /**   
     * 從json陣列中解析出java Integer型物件陣列   
     * 
     * @param jsonString   
     * @return   
     */  
    public static Integer[] jsonToIntegerArray(String jsonString) {  

        JSONArray jsonArray = JSONArray.fromObject(jsonString);  
        int size = jsonArray.size();  
        Integer[] integerArray = new Integer[size];  

        for (int i = 0; i < size; i++) {  

            integerArray[i] = jsonArray.getInt(i);  

        }  

        return integerArray;  

    }  

    /**   
     * 從json陣列中解析出java Double型物件陣列   
     * 
     * @param jsonString   
     * @return   
     */  
    public static Double[] jsonToDoubleArray(String jsonString) {  

        JSONArray jsonArray = JSONArray.fromObject(jsonString);  
        int size = jsonArray.size();  
        Double[] doubleArray = new Double[size];  

        for (int i = 0; i < size; i++) {  

            doubleArray[i] = jsonArray.getDouble(i);  

        }  

        return doubleArray;  

    }  

} 

spark的工具類:(主要負責sparkcontext的初始化工作)

package com.zzrenfeng.zhsx.spark.conf;

import java.io.Serializable;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.stereotype.Component;

@Component
public class ApplicationConfiguration implements Serializable{

    private static final long serialVersionUID = 1L;

    public SparkConf sparkconf(){
        SparkConf conf = new SparkConf()
            .setMaster("local[*]")
            .setAppName("wc");
        return conf;
    }
    public JavaSparkContext javaSparkContext(){

        return new JavaSparkContext(sparkconf());
    }

    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

    public String filePath(){
        return "E:\\測試檔案\\nlog.txt";
    }   
}

wordcount model類(對wordcount進行封裝)

package com.zzrenfeng.zhsx.spark.domain;

import scala.Serializable;

public class WordCount implements Serializable{
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private String word;
    private Integer count;
    public WordCount(){}
    public WordCount(String v1, int l) {
        word = v1;
        count = l;
    }
    public String getWord() {
        return word;
    }
    public void setWord(String word) {
        this.word = word;
    }
    public int getCount() {
        return count;
    }
    public void setCount(int count) {
        this.count = count;
    }
    @Override
    public String toString() {
        return "WordCount [word=" + word + ", count=" + count + "]";
    }

}

spark service類,主要是負責spark word count的job任務邏輯

package com.zzrenfeng.zhsx.spark.service;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import scala.Tuple2;

import com.zzrenfeng.zhsx.spark.conf.ApplicationConfiguration;
import com.zzrenfeng.zhsx.spark.domain.WordCount;
@Component
public class SparkServiceTest implements java.io.Serializable{
    @Autowired
    ApplicationConfiguration applicationConfiguration;

    public List<WordCount> doWordCount(){
        JavaSparkContext javaSparkContext = applicationConfiguration.javaSparkContext();
        System.out.println(javaSparkContext);
        JavaRDD<String> file = javaSparkContext.textFile(applicationConfiguration.filePath());
        JavaRDD<String> worlds = file.flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterable<String> call(String t) throws Exception {
                // TODO Auto-generated method stub
                List<String> list = Arrays.asList(t.split(" "));
                return list;
            }
        });
        JavaRDD<WordCount> wordcount = worlds.map(new Function<String, WordCount>() {

            @Override
            public WordCount call(String v1) throws Exception {

                return new WordCount(v1,1);
            }
        });
        JavaPairRDD<String, Integer> pairwordCount = wordcount.mapToPair(new PairFunction<WordCount, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(WordCount t) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<>(t.getWord() , new Integer(t.getCount()));

            }
        });

        JavaPairRDD<String, Integer> worldCounts = pairwordCount.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                // TODO Auto-generated method stub
                return v1+v2;
            }
        });
        JavaRDD result = worldCounts.map(new Function<Tuple2<String,Integer>, WordCount>() {

            @Override
            public WordCount call(Tuple2<String, Integer> v1) throws Exception {
                // TODO Auto-generated method stub
                return new WordCount(v1._1,v1._2);
            }
        });
        List<WordCount> list = result.collect();
        javaSparkContext.close();
        System.out.println(list.toString());
        return list;
    }

}

controller層,主要負責請求的攔截

package com.zzrenfeng.zhsx.controller;

import java.util.ArrayList;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import com.zzrenfeng.zhsx.spark.domain.WordCount;
import com.zzrenfeng.zhsx.spark.service.SparkServiceTest;
import com.zzrenfeng.zhsx.util.JsonUtil;

@Controller
@RequestMapping("hello")
public class ControllerTest {
    @Autowired
    private SparkServiceTest sparkServiceTest;

    @RequestMapping("wc")
    @ResponseBody
    public String wordCount(){
        List<WordCount> list = sparkServiceTest.doWordCount();
        return JsonUtil.listToJson(list);
    }
}

進行啟動,然後在瀏覽器上輸入上面的攔截的url就可以看到開始出現的結果了。
應為這是個web介面,所以可以從各個端去呼叫,甚至可以用其他語言去呼叫。
現在可以愉快的去擼spark程式碼了,也許有人會問spark不應該用scala開發更好嗎?
個人認為如果是純粹的資料處理可以使用scala,編寫起來太爽了,但是跟其他的整合的時候最好還是用java,畢竟有問題了還可以跟java大牛去討論討論。
歡迎有興趣的一起來探討