Super CSV 線程池高並發處理大批量數據
一、下面先來看簡單數據處理
引入依賴包:
<dependency> <groupId>net.sf.supercsv</groupId> <artifactId>super-csv</artifactId> <version>2.4.0</version> </dependency>
下面來看一下官方文檔中的代碼示例。
-
根據頭來讀取CSV文件
把文件中的每行記錄讀取出來轉化為java對象,假設你有一個UserBean類,代碼如下:public class UserBean { int id; String username, password, street, town; int zip; public int getId() { return id;} public String getPassword() { return password; } public String getStreet() { return street; } public String getTown() { return town; } public String getUsername() { return username; } public int getZip() { return zip; } public void setId(int id) { this.id = id; } public void setPassword(String password) { this.password = password; } public void setStreet(String street) { this.street = street; } public void setTown(String town) { this.town = town; } public void setUsername(String username) { this.username = username; } public void setZip(int zip) { this.zip = zip; } }
並且有一個CSV文件,包含一個文件頭,假設文件內容如下:
id,username,password,date,zip,town
1,Klaus,qwexyKiks,17/1/2007,1111,New York
2,Oufud,bobilop213,10/10/2007,4555,New York
3,Oufud1,bobilop213,10/10/2007,4555,New York
4,Oufud2,bobilop213,10/10/2007,4555,New York
5,Oufud3,bobilop213,10/10/2007,4555,New York
6,Oufud4,bobilop213,10/10/2007,4555,New York
7,Oufud5,bobilop213,10/10/2007,4555,New York
9,Oufud7,bobilop213,10/10/2007,4555,New York
10,Oufud8,bobilop213,10/10/2007,4555,New York
11,Oufud9,bobilop213,10/10/2007,4555,New York
12,Oufud10,bobilop213,10/10/2007,4555,New York
13,Oufud11,bobilop213,10/10/2007,4555,New York
14,Oufud12,bobilop213,10/10/2007,4555,New York
15,Oufud13,bobilop213,10/10/2007,4555,New York然後你可以使用一下代碼來創建UserBean的實例對象,並打印出對象的屬性值:
class ReadingObjects { public static void main(String[] args) throws Exception{ ICsvBeanReader inFile = new CsvBeanReader(new FileReader("foo.csv"), CsvPreference.STANDARD_PREFERENCE); try { final String[] header = inFile.getCSVHeader(true); UserBean user; while( (user = inFile.read(UserBean.class, header, processors)) != null) { System.out.println(user.getZip()); } } finally { inFile.close(); } } }
我們還剩下processors沒有定義,通過名字我們可以看出是解析器,用來處理每列的數據,當然你也可以傳入null,表示該列不做特殊處理,每個解析器可以被另外一個包含在內部,new Unique(new StrMinMax(5,20)),這個代碼該列的值為唯一的,並且長度為8到20,具體處理細節我們先不講,來看一下我們所需要的processors是如何定義的:
final CellProcessor[] processors = new CellProcessor[] { new Unique(new ParseInt()), new Unique(new StrMinMax(5, 20)), new StrMinMax(8, 35), new ParseDate("dd/MM/yyyy"), new Optional(new ParseInt()), null };
上面的代碼的具體意思為:
第一列是一個字符串,並且值是唯一的,長度為5到20
第二列是一個字符串,長度是8到35
第三列為一個日期類型,格式為天/月/年(day/month/year)
第四列是一個整型數字,但只有這列有值的時候ParseInt處理器才會去處理這個值(其實就是該列可以為空)
第五列為一個字符串(默認),不使用處理器
如果你的CSV文件沒有頭,你也可以定義個數組來替代:
final String[] header = new String[] { "username", "password", "date", "zip", "town"};
如果你想忽略某一列,和定義處理器類似,直接在頭數組中使用null。
全部代碼如下:
import Java.io.FileReader;
import Java.io.IOException;
import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseDate;
import org.supercsv.cellprocessor.ParseInt;
import org.supercsv.cellprocessor.constraint.StrMinMax;
import org.supercsv.cellprocessor.constraint.Unique;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;
class ReadingObjects {
static final CellProcessor[] userProcessors = new CellProcessor[] {
new Unique(new ParseInt()),
new Unique(new StrMinMax(5, 20)),
new StrMinMax(8, 35),
new ParseDate("dd/MM/yyyy"),
new Optional(new ParseInt()),
null
};
public static void main(String[] args) throws Exception {
ICsvBeanReader inFile = new CsvBeanReader(new FileReader("foo.csv"), CsvPreference.STANDARD_PREFERENCE);
try {
final String[] header = inFile.getCSVHeader(true);
UserBean user;
while( (user = inFile.read(UserBean.class, header, userProcessors)) != null) {
System.out.println(user.getZip());
}
} finally {
inFile.close();
}
}
}
public class UserBean {
String username, password, town;
Date date;
int zip;
public Date getDate() {
return date;
}
public String getPassword() {
return password;
}
public String getTown() {
return town;
}
public String getUsername() {
return username;
}
public int getZip() {
return zip;
}
public void setDate(final Date date) {
this.date = date;
}
public void setPassword(final String password) {
this.password = password;
}
public void setTown(final String town) {
this.town = town;
}
public void setUsername(final String username) {
this.username = username;
}
public void setZip(final int zip) {
this.zip = zip;
}
}
如果你在讀取文件之前根本不知道文件的具體格式,你可以選擇CsvListReader.read()方法,把每行讀出出來的數據放在一個List裏面。
讀取文件的代碼我們看到了,下面來看一下寫的操作,也很簡單。
import Java.util.HashMap;
import org.supercsv.io.*;
import org.supercsv.prefs.CsvPreference;
class WritingMaps {
main(String[] args) throws Exception {
ICsvMapWriter writer = new CsvMapWriter(new FileWriter(...), CsvPreference.STANDARD_PREFERENCE);
try {
final String[] header = new String[] { "name", "city", "zip" };
// set up some data to write
final HashMap<String, ? super Object> data1 = new HashMap<String, Object>();
data1.put(header[0], "Karl");
data1.put(header[1], "Tent city");
data1.put(header[2], 5565);
final HashMap<String, ? super Object> data2 = new HashMap<String, Object>();
data2.put(header[0], "Banjo");
data2.put(header[1], "River side");
data2.put(header[2], 5551);
// the actual writing
writer.writeHeader(header);
writer.write(data1, header);
writer.write(data2, header);
} finally {
writer.close();
}
}
}
二、並發分批處理大數據量的數據更新
代碼如下
import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseDate;
import org.supercsv.cellprocessor.ParseInt;
import org.supercsv.cellprocessor.constraint.StrMinMax;
import org.supercsv.cellprocessor.constraint.Unique;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvBeanReader;
import org.supercsv.io.ICsvBeanReader;
import org.supercsv.prefs.CsvPreference;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
class ThreadReadingObjects {
static final CellProcessor[] userProcessors = new CellProcessor[] {
new Unique(new ParseInt()),//唯一的,int id
new Unique(new StrMinMax(5, 20)),//唯一的,長度為5到20
new StrMinMax(8, 35), //長度是8到35
new ParseDate("dd/MM/yyyy"), //格式為天/月/年(day/month/year)
new Optional(new ParseInt()), //整型數字,但只有這列有值的時候ParseInt處理器才會去處理這個值(其實就是該列可以為空)
null //不使用處理器
};
public static void main(String[] args) throws Exception {
// InputStreamReader freader = new InputStreamReader(inputStream,"UTF-8");
// ICsvBeanReader inFile = new CsvBeanReader(freader, CsvPreference.STANDARD_PREFERENCE);
ICsvBeanReader inFile = new CsvBeanReader(new FileReader("D:\\foo.csv"), CsvPreference.STANDARD_PREFERENCE);
ExecutorService executorService = null;
try {
//如果你的CSV文件沒有頭,你也可以定義個數組來替代:
// final String[] header = new String[] { "username", "password", "date", "zip", "town"};
final String[] header = inFile.getHeader(true);
//創建緩存線程池
List<Future<String>> futureList = new ArrayList<Future<String>>();
executorService = Executors.newCachedThreadPool();
//分頁讀取數據後,加入線程池處理
while (getPageUserList(executorService,futureList,inFile, header)) {}
//獲取線程處理結果
for (Future<String> future : futureList) {
while (true) {
if (future.isDone() && !future.isCancelled()) {
System.out.println("future result: "+future.get());
break;
} else {
Thread.sleep(1000);
}
}
}
} finally {
inFile.close();
executorService.shutdown();
}
}
private static boolean getPageUserList(ExecutorService executorService, List<Future<String>> futureList, ICsvBeanReader inFile, String[] header) throws IOException {
int index = 0;
boolean status = false;
List<UserBean> userBeans = new ArrayList<UserBean>();
UserBean user;
while ((user = inFile.read(UserBean.class, header, userProcessors)) != null) {// 這裏從第一行開始取數據
userBeans.add(user);
index++;
//讀取的行數,每個線程處理的記錄數,根據實際情況修改
if (index == 10) {
status = true;
break;
}
}
//添加到線程集合
if(!userBeans.isEmpty()){
Future<String> future = executorService.submit(getUpdateDbJob(futureList.size(),userBeans));
futureList.add(future);
}
return status;
}
private static Callable<String> getUpdateDbJob(int threadNo,List<UserBean> userBeans) {
return new Callable<String>() {
@Override
public String call() throws Exception {
//分批量寫入數據庫
List<UserBean> userList = new ArrayList<UserBean>();
for(int i=0;i<userBeans.size();i++){
userList.add(userBeans.get(i));
//如果數據量比較大再次分批commit,第一次提交3條,後面每次提交2條
//取 % 條數根據實際情況修改
if (i > 0 && i % 3 == 0) {
System.out.println("線程"+threadNo+"更新用戶:"+userList.size()+" 個成功");
//采用jdbcTemplate 批量寫入數據庫
// TODO 寫入數據中
userList = new ArrayList<UserBean>();
} else if (i == userBeans.size() - 1) {
//處理最後一批數據提交
System.out.println("線程"+threadNo+"更新用戶:"+userList.size()+" 個成功");
// TODO 寫入數據中
userList = new ArrayList<UserBean>();
}
}
return String.valueOf(userBeans.size());
}
};
}
}
運行後返回結果:
線程0更新用戶:4 個成功
線程0更新用戶:3 個成功
線程0更新用戶:3 個成功
線程1更新用戶:4 個成功
線程1更新用戶:1 個成功
future result: 10
future result: 5
Super CSV 線程池高並發處理大批量數據