1. 程式人生 > >使用java語言,利用多執行緒呼叫WebService進行資料處理

使用java語言,利用多執行緒呼叫WebService進行資料處理

因工作原因,需要將一個表(tbA)中的所有資料,根據user_id,去請求webserive獲取相關的資料,然後插入到另外的一張表(tbB)中,供他人使用。不過這個表中的資料不少有78萬條左右,而這樣的大批量資料操作,還不能白天執行。只能在夜裡,等伺服器負荷低的時候進行執行。考慮如果webservice的效率不高的時候,需要對資料進行分批執行操作。經過綜合考慮,最後採用多執行緒技術(不過最後經過測試,效率還是不錯78萬條資料,使用10個執行緒操作,8個多小時就可以了)。     首先在tbA表中,追加一個欄位,deal_tag,這個欄位為處理標誌欄位。該欄位的資料預設為'0',如果需要分批處理,則可以設計部分資料中的該欄位值為'1'。這個時候就可以讀取了。考慮到伺服器的負荷問題,採用的方案是讀取一部分資料到緩衝區中,同時更改已經進入緩衝區中待處理的資料的deal_tag欄位值為‘B’,標識該資料已經進入緩衝區中。對資料的處理,需要請求webservice。也就有可能是因為因為種種原因,webservice沒有正常的返回資料,或者出現異常,而為了不讓異常影響資料的處理,則每次處理完一條資料,將該資料放入另外的緩衝區中,如果請求WebService成功,標識資料的欄位為'F',請求成功。如果因為WebService或者其他的原因導致失敗,則標誌該資料欄位的deal_tag欄位值為'E',等待下一次的呼叫呼叫處理。另外為了不增加資料庫的負荷,等到緩衝區中的資料達到一定的資料,一次提交。而不採用處理一條資料,提交一條資料,這樣效率太慢。核心程式碼如下(因不想透漏具體的表名,採用tbA,tbB,tbC代替):

點選(此處)摺疊或開啟

  1. import java.sql.ResultSet;
  2. import java.sql.SQLException;
  3. import java.util.ArrayList;
  4. import java.util.Collections;
  5. import java.util.List;
  6. import com.epro.DBUtility.CommonHelper;
  7. import com.epro.DBUtility.DBHelper;
  8. import com.epro.DBUtility.BatchData;

  9. public class FetchData implements Runnable {
  10.     private int READ_COUNT = 500; //一次讀取資料庫多少條記錄到緩衝區中
  11.     private int FLASH_BUFFER_COUNT = 2000; //處理後的資料,多少條進行提交到資料庫
  12.     private List<UserInfo> userList = null;
  13.     private List<String> userStateSqlList = null; 
  14.     private List<
    BatchData> commitDataList = null;
  15.     public FetchData(){
  16.         userList = Collections.synchronizedList(new ArrayList<UserInfo>());
  17.         userStateSqlList = new ArrayList<String>(); 
  18.         commitDataList = new ArrayList<BatchData>();
  19.     }
  20.     //讀取資料
  21.     private
     synchronized int readData(){
  22.         int result = 0;
  23.         String strSql = "SELECT user_id, serial_number, cust_name, eparchy_code, " + 
  24.             "detail_install_address, link_phone, service_code, cust_type, rate, " +
  25.             "in_date, deal_tag " +
  26.             "FROM tbA " +
  27.             "WHERE deal_tag in ('1','E') AND rownum <= " + READ_COUNT;
  28.         ResultSet rs = null;
  29.         DBHelper db = new DBHelper();        
  30.         try {
  31.             rs = db.executeQuery(strSql);
  32.             while(rs.next()){
  33.                 UserInfo user = new UserInfo();
  34.                 user.setUserId(rs.getString("user_id"));
  35.                 user.setSerialNumber(rs.getString("serial_number"));
  36.                 user.setCustName(rs.getString("cust_name"));
  37.                 user.setEparchyCode(rs.getString("eparchy_code"));
  38.                 user.setDetailInstallAddress(rs.getString("detail_install_address"));
  39.                 user.setLinkPhone(rs.getString("link_phone"));
  40.                 user.setServiceCode(rs.getString("service_code"));
  41.                 user.setCustType(rs.getString("cust_type"));
  42.                 user.setRate(rs.getString("rate"));
  43.                 user.setInDate(rs.getString("in_date"));
  44.                 user.setDealTag(rs.getString("deal_tag"));
  45.                 userList.add(user); //放入佇列中                
  46.                 ++result;
  47.             }
  48.         } catch (SQLException e) {
  49.             e.printStackTrace();
  50.         }finally{
  51.             try {
  52.                 rs.close();
  53.             } catch (SQLException e) {
  54.                 // TODO Auto-generated catch block
  55.                 e.printStackTrace();
  56.             }
  57.             finally{
  58.                 rs = null;
  59.             }
  60.             db.Close();
  61.         }
  62.         userIntoBuffer(); //將查詢出來的資料,更改標誌位為'B'
  63.         return result;
  64.     }
  65.     private void userIntoBuffer(){
  66.         int buffersize = this.userList.size();
  67.         String sql = "";
  68.         String userID = "";
  69.         List<String> sqlList = new ArrayList<String>();
  70.         if(buffersize > 0){
  71.             DBHelper db = new DBHelper();    
  72.             for(int i=0; i<buffersize; i++){
  73.                 userID = userList.get(i).getUserId();
  74.                 sql = this.getUserStateSql(userID, "B");
  75.                 sqlList.add(sql);                
  76.             }
  77.             db.doBatch(sqlList); //批量進行資料處理
  78.             db.Close();
  79.         }
  80.     }
  81.     private String getUserStateSql(String userID, String dealTag){
  82.         String result = "";
  83.         result = "UPDATE tbA SET deal_tag = '" + dealTag + 
  84.             "' WHERE user_id = " + userID;
  85.         return result;
  86.     }
  87.     public synchronized UserInfo getUserInfo(){
  88.         UserInfo result = null;
  89.         if(userList.size() > 0){
  90.             result = userList.remove(0);
  91.         }
  92.         return result;
  93.     }
  94.     //開始進行資料的處理
  95.     public void beginDealData(){
  96.         ResInfo resInfo = null;
  97.         while(true){
  98.             UserInfo user = this.getUserInfo();
  99.             if(user != null && !"".equals(user.getUserId().trim())){ //當沒有資料時,跳出迴圈
  100.                 //進行WebService請求,獲取生產環境的資源資料,對資料進行解析,返回資源物件
  101.                 resInfo = getResInfo(user.getUserId());
  102.             }
  103.             else if(user == null){
  104.                 break;
  105.             }
  106.             else if(user != null && "".equals(user.getUserId().trim())){
  107.                 resInfo = new ResInfo();
  108.                 resInfo.out_err_id = "-1";
  109.                 resInfo.ln_line_flag = "0";
  110.                 resInfo.out_err_msg = "beginDealData is null.";
  111.                 System.out.println("user id is null.");
  112.             }
  113.             else{
  114.                 ;
  115.             }
  116.             userFinish(user, resInfo); //將使用者的資料和資源的資料進行合併操作
  117.         }
  118.         int datacount = readData(); //讀取表,看是否還有要處理的資料
  119.         if(datacount > 0){
  120.             beginDealData(); //遞迴呼叫,對需要處理的資料,進行繼續讀取
  121. 相關推薦

    使用java語言利用執行呼叫WebService進行資料處理

    因工作原因,需要將一個表(tbA)中的所有資料,根據user_id,去請求webserive獲取相關的資料,然後插入到另外的一張表(tbB)中,供他人使用。不過這個表中的資料不少有78萬條左右,而這樣的大批量資料操作,還不能白天執行。只能在夜裡,等伺服器負荷低的時候進

    爬取不得姐網站利用執行來爬取

    利用到的庫 time, requests, lxml, queue, threading 功能 爬取不得姐網站中前二十頁的段子資料 import time import requests from lxml import etree from queue

    java 如何使用執行呼叫類的靜態方法?

      1.情景展示   靜態方法內部實現:將指定內容生成圖片格式的二維碼;   如何通過多執行緒實現? 2.分析   之所以採用多執行緒,是為了節省時間  3.解決方案   準備工作   logo檔案     將生成的檔案儲存在F

    Java併發(十八):阻塞佇列BlockingQueue BlockingQueue(阻塞佇列)詳解 二叉堆(一)之 圖文解析 和 C語言的實現 多執行緒程式設計:阻塞、併發佇列的使用總結 Java併發程式設計:阻塞佇列 java阻塞佇列 BlockingQueue(阻塞佇列)詳解

    阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。 這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。 阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者

    Java執行執行安全開啟執行及每執行迴圈10次對類進行輸出測試

    最近看到執行緒問題,emmm~腦闊回想到計算機作業系統貌似又講,不過上課睡覺覺去啦哈哈哈,java課老師莫得講~ 然歸正傳,今對執行緒進行查閱及測試,做一下筆記,有錯之處還請指出,謝謝~上程式碼之前呢先說一哈前傳 執行緒是程序中的最小執行單位:    手機呢會有很多單獨

    Java裸寫爬蟲技術運用執行技術高效爬取某個醫療機構網站資料

    最近喜歡上了資料的龐大的感覺,就爬取了一下某個醫療機構網站醫療資料,由於資料量龐大,只爬取了江西省的各個市的各個醫院的各個科室的各個科室。中各種資訊。其中用的持久層技術是hibernate框架,和用到一

    我用java寫的搶紅包用的紅包類支援執行

    import java.util.concurrent.atomic.AtomicInteger; public class CashGift { public static class OverException extends Exception { }

    java基礎】執行匿名內部類和lambda建立方式執行中的兩個面試題

    一、可以用匿名類和lambda兩個種方式建立多執行緒。 1.利用匿名內部類建立多執行緒並開啟。 new Thread() {//建立方式1 public void run() { for(int x=0; x<50; x++) { System.out

    Java程式設計師金三銀四求職季這些執行面試題你會嗎?

      多執行緒是Java技術面試中面試官比較喜歡問的問題之一。在這裡,從面試的角度列出了大部分重要的問題,但是作為一個程式設計師仍然應該牢固的掌握Java多執行緒基礎知識來對應日後碰到的問題。 1. 程序和執行緒之間有什麼不同? 一個程序是一個獨立(self contain

    ArrayList在執行呼叫Add()新增元素時的下標越界問題(java.lang.ArrayIndexOutOfBoundsException)

    最近在看《實戰Java虛擬機器》一書,看到有關鎖與併發章節時,看到如下一個多執行緒使用ArrayList的例子:        兩個執行緒t1和t2同時向numberList中新增資料,由於Arr

    利用執行和TCP技術實現客戶端與服務端之間的通訊

    server.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <errno.h>

    利用執行解決Tkinter在button事件中執行高io高耗時操作視窗無響應問題

        昨天利用python自己寫了一個微型伺服器,突發奇想用Tkinter寫一個GUI介面,並將監聽開始的函式繫結到其中的一個button上,但是當我點選開始服務button時,視窗立馬陷入無響應狀態。搜尋一番發現,是伺服器函式中的while迴圈阻塞了GUI的響應,同理而言

    Java高併發(三)——執行協作同步控制

          繼上一篇:Java高併發——多執行緒基礎 中講到,共享資源的合理使用,才能夠使多執行緒程式有條不紊的執行。其中我們通過synchronized來實現臨界區資源的是否可以訪問。而,這篇我們來重點總結synchronized的增強替代版鎖,以及其它JDK併發包提供的一

    Java 知識 - 集合、執行、IO、JVM

    GitHub 專案地址 Collection Java Collection 新增、刪除等操作時可選操作,如 Arrays.asList,會產生固定大小的集合,會丟擲 UnsupportedOperationException Set HashSet、TreeSet、LinkedH

    java併發程式設計一一執行之間通訊(一)

    1.多執行緒之間如何實現通訊 多執行緒之間通訊,其實就是多個執行緒在操作同一個資源,但是操作的動作不同。 1.1什麼是多執行緒之間通訊? 需求:第一個執行緒寫入(input)使用者,另一個執行緒讀取(out)使用者。實現讀一個,寫一個操作。 1.2多執行緒之間通訊需求?

    java併發程式設計一一執行執行安全(四)

    ##1.java重排序 ###1.1資料依賴性 如果兩個操作訪問同一個變數時,且這兩個操作匯中有一個為寫操作,此時這兩個操作之間就 存在資料依賴性。資料依賴分下列三種類型。 名稱 程式碼示例 說明

    java併發程式設計一一執行執行安全(三)

    1.多執行緒的三大特性 1.1什麼是原子性 即一個操作或多個操作要麼全部執行並且執行的過程不會被任何因素打斷,要麼就都不執行。 一個很經典的例子就是銀行賬戶轉賬問題: 比如從賬戶A向賬戶B轉1000元,那麼必然包括2個操作:從賬戶A減去1000元,往賬戶B加上1000元。這2

    java併發程式設計一一執行執行安全(二)

    1.多執行緒死鎖 1.1什麼是多執行緒死鎖? 同步中巢狀同步,導致鎖無法釋放 程式碼示例: class Thread009 implements Runnable { private int trainCount = 100; private Object

    java併發程式設計一一執行執行安全(一)

    1.什麼是執行緒安全? 1.1為什麼有執行緒安全問題? 當多個執行緒同時共享同一個全域性變臉或靜態變數,做寫的操作時,可能會發生資料衝突的問題, 也就是執行緒安全的問題。但是做讀操作是不會發生資料衝突問題。 舉例:現在有100張火車票,有兩個視窗同時搶火車票,用多執行緒模擬搶

    java併發程式設計一一執行基礎快速入門

    1.執行緒與程序的區別 每個正在系統上執行的程式都是一個程序。每個程序包含一到多個執行緒。執行緒是一組指令的集合,或者是程式的特殊段,他可以在程式裡獨立執行。也可以把它理解為程式碼執行的上下文。 所以執行緒基本是輕量級的程序,它負責在單個程式裡執行任務。通常有作業系統負責多個執行緒