1. 程式人生 > >Super CSV 線程池高並發處理大批量數據

Super CSV 線程池高並發處理大批量數據

CSV super-csv 批量處理 並發處理 csv大數據

Super CSV是一個用於處理CSV文件的Java開源項目。它完全圍繞面向對象的思想進行設計,因此可以利用你的面向對象代碼來使得處理CSV文件變得更加簡易。它支持輸入/輸出類型轉換、數據完整性校驗,支持從任何地方以任何編碼讀寫數據,只要提供相應的Reader與Writer對象。可配置分割符,空格符號和行結束符等。

一、下面先來看簡單數據處理
引入依賴包:

<dependency>
    <groupId>net.sf.supercsv</groupId>
    <artifactId>super-csv</artifactId>
    <version>2.4.0</version>
</dependency>

下面來看一下官方文檔中的代碼示例。

  1. 根據頭來讀取CSV文件
    把文件中的每行記錄讀取出來轉化為java對象,假設你有一個UserBean類,代碼如下:

    public class UserBean {  
            int id;
            String username, password, street, town;  
            int zip;  
    
            public int getId() { return id;}
            public String getPassword() { return password; }  
            public String getStreet() { return street; }  
            public String getTown() { return town; }  
            public String getUsername() { return username; }  
            public int getZip() { return zip; }  
            public void setId(int id) { this.id = id; }  
            public void setPassword(String password) { this.password = password; }  
            public void setStreet(String street) { this.street = street; }  
            public void setTown(String town) { this.town = town; }  
            public void setUsername(String username) { this.username = username; }  
            public void setZip(int zip) { this.zip = zip; }  
        }  

    並且有一個CSV文件,包含一個文件頭,假設文件內容如下:
    id,username,password,date,zip,town
    1,Klaus,qwexyKiks,17/1/2007,1111,New York
    2,Oufud,bobilop213,10/10/2007,4555,New York
    3,Oufud1,bobilop213,10/10/2007,4555,New York
    4,Oufud2,bobilop213,10/10/2007,4555,New York
    5,Oufud3,bobilop213,10/10/2007,4555,New York
    6,Oufud4,bobilop213,10/10/2007,4555,New York
    7,Oufud5,bobilop213,10/10/2007,4555,New York

    8,Oufud6,bobilop213,10/10/2007,4555,New York
    9,Oufud7,bobilop213,10/10/2007,4555,New York
    10,Oufud8,bobilop213,10/10/2007,4555,New York
    11,Oufud9,bobilop213,10/10/2007,4555,New York
    12,Oufud10,bobilop213,10/10/2007,4555,New York
    13,Oufud11,bobilop213,10/10/2007,4555,New York
    14,Oufud12,bobilop213,10/10/2007,4555,New York
    15,Oufud13,bobilop213,10/10/2007,4555,New York

    然後你可以使用一下代碼來創建UserBean的實例對象,並打印出對象的屬性值:

    class ReadingObjects {  
        public static void main(String[] args) throws Exception{  
            ICsvBeanReader inFile = new CsvBeanReader(new FileReader("foo.csv"), CsvPreference.STANDARD_PREFERENCE);  
            try {  
                final String[] header = inFile.getCSVHeader(true);  
                UserBean user;  
                while( (user = inFile.read(UserBean.class, header, processors)) != null) {  
                    System.out.println(user.getZip());  
                }  
            } finally {  
                inFile.close();  
            }  
        }  
    }  

    我們還剩下processors沒有定義,通過名字我們可以看出是解析器,用來處理每列的數據,當然你也可以傳入null,表示該列不做特殊處理,每個解析器可以被另外一個包含在內部,new Unique(new StrMinMax(5,20)),這個代碼該列的值為唯一的,並且長度為8到20,具體處理細節我們先不講,來看一下我們所需要的processors是如何定義的:

    final CellProcessor[] processors = new CellProcessor[] {  
        new Unique(new ParseInt()),
        new Unique(new StrMinMax(5, 20)),  
        new StrMinMax(8, 35),  
        new ParseDate("dd/MM/yyyy"),  
        new Optional(new ParseInt()),  
        null  
    };  

    上面的代碼的具體意思為:
    第一列是一個字符串,並且值是唯一的,長度為5到20
    第二列是一個字符串,長度是8到35
    第三列為一個日期類型,格式為天/月/年(day/month/year)
    第四列是一個整型數字,但只有這列有值的時候ParseInt處理器才會去處理這個值(其實就是該列可以為空)
    第五列為一個字符串(默認),不使用處理器

如果你的CSV文件沒有頭,你也可以定義個數組來替代:

final String[] header = new String[] { "username", "password", "date", "zip", "town"}; 

如果你想忽略某一列,和定義處理器類似,直接在頭數組中使用null。

全部代碼如下:

import Java.io.FileReader;  
import Java.io.IOException;  
import org.supercsv.cellprocessor.Optional;  
import org.supercsv.cellprocessor.ParseDate;  
import org.supercsv.cellprocessor.ParseInt;  
import org.supercsv.cellprocessor.constraint.StrMinMax;  
import org.supercsv.cellprocessor.constraint.Unique;  
import org.supercsv.cellprocessor.ift.CellProcessor;  
import org.supercsv.io.CsvBeanReader;  
import org.supercsv.io.ICsvBeanReader;  
import org.supercsv.prefs.CsvPreference;  

class ReadingObjects {  

    static final CellProcessor[] userProcessors = new CellProcessor[] {  
            new Unique(new ParseInt()),
        new Unique(new StrMinMax(5, 20)),  
        new StrMinMax(8, 35),  
        new ParseDate("dd/MM/yyyy"),  
        new Optional(new ParseInt()),  
        null  
    };  

    public static void main(String[] args) throws Exception {  
        ICsvBeanReader inFile = new CsvBeanReader(new FileReader("foo.csv"), CsvPreference.STANDARD_PREFERENCE);  
        try {  
          final String[] header = inFile.getCSVHeader(true);  
          UserBean user;  
          while( (user = inFile.read(UserBean.class, header, userProcessors)) != null) {  
            System.out.println(user.getZip());  
          }  
        } finally {  
          inFile.close();  
        }  
   }  
}  

public class UserBean {  
    String username, password, town;  
    Date date;  
    int zip;  

    public Date getDate() {  
        return date;  
    }  

    public String getPassword() {  
        return password;  
    }  

    public String getTown() {  
        return town;  
    }  

    public String getUsername() {  
        return username;  
    }  

    public int getZip() {  
        return zip;  
    }  

    public void setDate(final Date date) {  
        this.date = date;  
    }  

    public void setPassword(final String password) {  
        this.password = password;  
    }  

    public void setTown(final String town) {  
        this.town = town;  
    }  

    public void setUsername(final String username) {  
        this.username = username;  
    }  

    public void setZip(final int zip) {  
        this.zip = zip;  
    }  

}  

如果你在讀取文件之前根本不知道文件的具體格式,你可以選擇CsvListReader.read()方法,把每行讀出出來的數據放在一個List裏面。

讀取文件的代碼我們看到了,下面來看一下寫的操作,也很簡單。

import Java.util.HashMap;  
import org.supercsv.io.*;  
import org.supercsv.prefs.CsvPreference;  

class WritingMaps {  
  main(String[] args) throws Exception {  
    ICsvMapWriter writer = new CsvMapWriter(new FileWriter(...), CsvPreference.STANDARD_PREFERENCE);  
    try {  
      final String[] header = new String[] { "name", "city", "zip" };  
      // set up some data to write  
      final HashMap<String, ? super Object> data1 = new HashMap<String, Object>();  
      data1.put(header[0], "Karl");  
      data1.put(header[1], "Tent city");  
      data1.put(header[2], 5565);  
      final HashMap<String, ? super Object> data2 = new HashMap<String, Object>();  
      data2.put(header[0], "Banjo");  
      data2.put(header[1], "River side");  
      data2.put(header[2], 5551);  
      // the actual writing  
      writer.writeHeader(header);  
      writer.write(data1, header);  
      writer.write(data2, header);  
    } finally {  
      writer.close();  
    }  
  }  
}  

二、並發分批處理大數據量的數據更新
代碼如下

import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseDate;
import org.supercsv.cellprocessor.ParseInt;
import org.supercsv.cellprocessor.constraint.StrMinMax;
import org.supercsv.cellprocessor.constraint.Unique;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;

import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class ThreadReadingObjects {

    static final CellProcessor[] userProcessors = new CellProcessor[] {
            new Unique(new ParseInt()),//唯一的,int id
            new Unique(new StrMinMax(5, 20)),//唯一的,長度為5到20
            new StrMinMax(8, 35), //長度是8到35
            new ParseDate("dd/MM/yyyy"), //格式為天/月/年(day/month/year)
            new Optional(new ParseInt()), //整型數字,但只有這列有值的時候ParseInt處理器才會去處理這個值(其實就是該列可以為空)
            null //不使用處理器
    };

    public static void main(String[] args) throws Exception {
        // InputStreamReader freader = new InputStreamReader(inputStream,"UTF-8");
        // ICsvBeanReader inFile = new CsvBeanReader(freader, CsvPreference.STANDARD_PREFERENCE);

        ICsvBeanReader inFile = new CsvBeanReader(new FileReader("D:\\foo.csv"), CsvPreference.STANDARD_PREFERENCE);

        ExecutorService executorService = null;
        try {
            //如果你的CSV文件沒有頭,你也可以定義個數組來替代:
            // final String[] header = new String[] { "username", "password", "date", "zip", "town"};
            final String[] header = inFile.getHeader(true);

            //創建緩存線程池
            List<Future<String>> futureList = new ArrayList<Future<String>>();
            executorService = Executors.newCachedThreadPool();

            //分頁讀取數據後,加入線程池處理
            while (getPageUserList(executorService,futureList,inFile, header)) {}

                //獲取線程處理結果
                for (Future<String> future : futureList) {
                    while (true) {
                        if (future.isDone() && !future.isCancelled()) {
                            System.out.println("future result: "+future.get());
                            break;
                        } else {
                            Thread.sleep(1000);
                        }
                    }
                }

        } finally {
            inFile.close();
            executorService.shutdown();
        }
    }

    private static boolean getPageUserList(ExecutorService executorService, List<Future<String>> futureList, ICsvBeanReader inFile, String[] header) throws IOException {
        int index = 0;
        boolean status = false;
        List<UserBean> userBeans = new ArrayList<UserBean>();
        UserBean user;
        while ((user = inFile.read(UserBean.class, header, userProcessors)) != null) {// 這裏從第一行開始取數據
            userBeans.add(user);
            index++;
            //讀取的行數,每個線程處理的記錄數,根據實際情況修改
            if (index == 10) {
                status = true;
                break;
            }
        }
        //添加到線程集合
        if(!userBeans.isEmpty()){
            Future<String> future = executorService.submit(getUpdateDbJob(futureList.size(),userBeans));
            futureList.add(future);
        }

        return status;
    }

    private static Callable<String> getUpdateDbJob(int threadNo,List<UserBean> userBeans) {
        return new Callable<String>() {
            @Override
            public String call() throws Exception {
                //分批量寫入數據庫
                List<UserBean> userList = new ArrayList<UserBean>();
                for(int i=0;i<userBeans.size();i++){
                    userList.add(userBeans.get(i));
                    //如果數據量比較大再次分批commit,第一次提交3條,後面每次提交2條
                    //取 % 條數根據實際情況修改
                    if (i > 0 && i % 3 == 0) {
                        System.out.println("線程"+threadNo+"更新用戶:"+userList.size()+" 個成功");
                        //采用jdbcTemplate 批量寫入數據庫
                        // TODO 寫入數據中
                        userList = new ArrayList<UserBean>();
                    } else if (i == userBeans.size() - 1) {
                        //處理最後一批數據提交
                        System.out.println("線程"+threadNo+"更新用戶:"+userList.size()+" 個成功");
                        // TODO 寫入數據中
                        userList = new ArrayList<UserBean>();
                    }
                }
                return String.valueOf(userBeans.size());
            }
        };
    }
}

運行後返回結果:

線程0更新用戶:4 個成功
線程0更新用戶:3 個成功
線程0更新用戶:3 個成功
線程1更新用戶:4 個成功
線程1更新用戶:1 個成功
future result: 10
future result: 5

Super CSV 線程池高並發處理大批量數據