1. 程式人生 > >java dbcp連接池,大數據處理循環多表操作插入事例

java dbcp連接池,大數據處理循環多表操作插入事例

als postgresq postgres map() err manage fas space false

基礎連接池類:

package com.yl.sys.dao;

import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Vector;

public class ConnectionPoolTool {
private Vector<Connection> pool;
private String url;
private String user;
private String password;

/**
* 連接池的大小,也就是連接池中有多少個數據庫連接。
*/
private int poolSize = 1;

private static ConnectionPoolTool instance = null;

/**
* 私有的構造方法,禁止外部創建本類的對象,要想獲得本類的對象,通過<code>getIstance</code>方法。 使用了設計模式中的單子模式。
*/
private ConnectionPoolTool() {
init();
}

/**
* 連接池初始化方法,讀取屬性文件的內容 建立連接池中的初始連接
* @date 上午10:40:20
* @author DuChaoWei
* @descripte
*/
private void init() {
pool = new Vector<Connection>(poolSize);
readConfig();
addConnection();
}

/**
* 返回連接到連接池中
* @date 上午10:40:29
* @author DuChaoWei
* @descripte
* @param conn
*/
public synchronized void release(Connection conn) {
pool.add(conn);

}

/**
* 關閉連接池中的所有數據庫連接
*/
public synchronized void closePool() {
for (int i = 0; i < pool.size(); i++) {
try {
((Connection) pool.get(i)).close();
} catch (SQLException e) {
e.printStackTrace();
}
pool.remove(i);
}
}

/**
* 返回當前連接池的一個對象
*/
public static ConnectionPoolTool getInstance() {
if (instance == null) {
instance = new ConnectionPoolTool();
}
return instance;
}

/**
* 返回連接池中的一個數據庫連接
*/
public synchronized Connection getConnection() {
if (pool.size() > 0) {
Connection conn = pool.get(0);
pool.remove(conn);
return conn;
} else {
return null;
}
}

/**
* 在連接池中創建初始設置的的數據庫連接
* @date 上午9:10:14
* @author DuChaoWei
* @descripte
*/
private void addConnection() {
Connection conn = null;
for (int i = 0; i < poolSize; i++) {

try {
Class.forName("org.postgresql.Driver");
conn = java.sql.DriverManager.getConnection(url, user, password);
pool.add(conn);

} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}

}
}

/**
* 讀取設置連接池的屬性文件
* @date 上午9:10:02
* @author DuChaoWei
* @descripte
*/
private void readConfig() {
Properties prop = new Properties();
InputStream in = LocalPostgisDAO.class.getResourceAsStream("localpost.properties");
try {
prop.load(in);
url = prop.getProperty("url");
user = prop.getProperty("user");
password = prop.getProperty("password");
} catch (Exception e) {
e.printStackTrace();
}
}
}

調用類

package com.yl.zhi.stand.root;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.json.JSONArray;
import org.json.JSONObject;

import com.alibaba.fastjson.JSON;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.yl.sys.dao.ConnectionPoolTool;
import com.yl.sys.dao.LocalMongoDAO;
import com.yl.zhi.stand.utils.CompressUtils;

/**
* 創建引文關系類
*
* @date 下午3:25:21
* @author DuChaoWei
* @descripte
*/
public class CreateQuoteRelation {

// 文獻解析配置文件
private static final String name = "createQuoteRelation_config.js";
// 查詢數據量
private static final int searchSize = 20000;
// 存放所有引文 文獻的hashCode,去重
private Map<Integer, Integer> quoteRepeatMap = new HashMap<>();
// 統計錯誤引用文獻數
private int errorCount = 0;
// 存放錯誤引文
private List<String> errorList = new ArrayList<>();
// 統計不能插入文獻數據
private int errorWx = 0;
// 未插入文獻 文件id
private List<String> errorWxList = new ArrayList<>();

private Connection con = null;

public void Start() {
// 連接mongo
LocalMongoDAO.initMongoDB();

try {
execute();
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 執行錄入
*
* @date 下午3:35:35
* @author DuChaoWei
* @descripte
* @throws Exception
*/
public void execute() throws Exception {
// 讀取配置文件
JSONArray main = new JSONArray(getJsonConfig());
for (int i = 0; i < main.length(); i++) {
reader(main.getJSONObject(i));
}
}

private void reader(JSONObject object) throws Exception {
// mongo源數據表名
String mongoTable = object.getString("mongoTable");
// 存儲 主表
String mainTable = object.getString("mainTable");
// 關系表
String relationTable = object.getString("relationTable");
// 獲取mongodb源數據總數
int totalCount = getMongoDbCount(mongoTable);
int searchCount = (totalCount + searchSize - 1) / searchSize;

//初始化數據庫連接池
ConnectionPoolTool pool = ConnectionPoolTool.getInstance();
for (int i = 0; i < searchCount; i++) {
int skipNum = searchSize * i;
List<Map<String, Object>> list = getMongoData(mongoTable, skipNum);
System.out.println("處理20000條文獻開始時間:" + System.currentTimeMillis());

//開啟事務
for (Map<String, Object> map : list) {
//long start = System.currentTimeMillis();
con = pool.getConnection();
con.setAutoCommit(false);
try {
// 插入主表(文獻表),返回為數據id
Integer mainId = insertMain(map, mainTable);
// 處理引文,返回引用文獻的id
List<Integer> quoteList = dealQuote(map, object);

if (quoteList != null && quoteList.size() > 0) {
// 插入關系表
insertRelation(mainId, quoteList, relationTable);
}
// 提交
con.commit();
} catch (Exception e) {
e.printStackTrace();
errorWx++;
errorWxList.add(map.get("_id").toString());
con.rollback();
con.setAutoCommit(true);
}finally{
//釋放鏈接
pool.release(con);
}
}
System.out.println("處理20000條文獻結束時間:" + System.currentTimeMillis());
}
//關閉連接池
pool.closePool();
System.out.println("未插入文獻數量為:" + errorWx);
System.out.println("未插入文獻篇名為:" + JSON.toJSONString(errorWxList));
System.out.println("錯誤格式引文數量為:" + errorCount);
}

/**
* 獲取該文檔的引文
*
* @date 上午9:19:41
* @author DuChaoWei
* @descripte
* @param map
* @return
* @throws SQLException
*/
private List<Integer> dealQuote(Map<String, Object> map, JSONObject object) throws SQLException {
// 返回所有引用文獻id
List<Integer> list = new ArrayList<>();
// mongo源數據表名
String mongoTable = object.getString("mongoTable");
// 引文表
String quoteTable = object.getString("quoteTable");
// 獲取data
byte[] str = CompressUtils.unGZip((byte[]) map.get("data"));
String obj = new String(str);
JSONObject data = new JSONObject(obj);
if (data != null) {
// 獲取網址 用於 區別新舊數據
String webSite = data.getString("detail_URI");
// 文獻類型
String type = map.get("source_type").toString();
// 引文
String reference = data.getString("reference");
String[] quotes = getSplitQuote(reference);
// 存放所有處理後的引文
List<Map<String, Object>> quoteList = new ArrayList<>();
for (String quote : quotes) {
if (quote != null && !"".equals(quote)) {
int quoteHashCode = quote.hashCode();
if (quoteRepeatMap.containsKey(quoteHashCode)) {
// 引文id
Integer quoteId = quoteRepeatMap.get(quoteHashCode);
list.add(quoteId);
} else {
Map<String, Object> quoteMap = null;
quote = quote.replaceAll("‘", "‘‘");
if (webSite.equals("")) {
// 老數據處理引文
quoteMap = oldFormatQuote(quote, mongoTable, type);
} else {
// 處理引文
quoteMap = newFormatQuote(quote, mongoTable);
}
if (quoteMap != null) {
quoteList.add(quoteMap);
}
}
}
}
// 插入引文到postgress 返回插入對象
list= insertQuote(quoteList, quoteTable);
}
return list;
}

/**
* 拆分引文
*
* @date 下午3:19:21
* @author DuChaoWei
* @descripte
* @param str
* @return
*/
private String[] getSplitQuote(String str) {
if (str != null) {
String reg = "[\\[]{1}[0-9][\\]]{1}";
return str.split(reg);
}
return null;
}

/**
* 插入關系表
*
* @date 下午2:46:38
* @author DuChaoWei
* @descripte
* @param list
* @param relationTable
* @throws SQLException
*/
private void insertRelation(Integer mainId, List<Integer> quoteList, String relationTable) throws SQLException {
StringBuffer sql = new StringBuffer();
sql.append(" insert into " + relationTable);
sql.append(" (wx_id,yw_id) ");
sql.append(" values ");
StringBuffer valueSql = new StringBuffer();
for (Integer quoteId : quoteList) {
valueSql.append("(" + mainId + "," + quoteId + "),");
}
String valueStr = valueSql.toString();
valueStr = valueStr.substring(0, valueStr.lastIndexOf(",")) + ";";
sql.append(valueStr);
PreparedStatement ps1 = con.prepareStatement(sql.toString());
ps1.executeUpdate();
}

/**
* 插入引文到引文表
*
* @date 上午11:03:07
* @author DuChaoWei
* @descripte
* @param map
* @return
* @throws SQLException
*/
private List<Integer> insertQuote(List<Map<String, Object>> list, String quoteTable)
throws SQLException {
List<Integer> resultList = new ArrayList<>();
if (list != null && list.size() > 0) {
StringBuffer sb = new StringBuffer();
sb.append(" insert into " + quoteTable);
sb.append(" (mongo_id,title,type,author,company,dates,flag) ");
sb.append(" values ");
StringBuffer valueSb = new StringBuffer();
for (Map<String, Object> map : list) {
valueSb.append("(" + map.get("mongo_id") + ",");
valueSb.append("‘" + map.get("title").toString() + "‘,");
valueSb.append("‘" + map.get("type").toString() + "‘,");
valueSb.append("‘" + map.get("author").toString() + "‘,");
valueSb.append("‘" + map.get("company").toString() + "‘,");
valueSb.append("‘" + map.get("dates").toString() + "‘,");
valueSb.append(" 0), ");
}
String valueStr = valueSb.toString();
sb.append(valueStr.substring(0, valueStr.lastIndexOf(",")));
PreparedStatement ps1 = con.prepareStatement(sb.toString(),Statement.RETURN_GENERATED_KEYS);
ps1.executeUpdate();
ResultSet rs = ps1.getGeneratedKeys();
while(rs.next()){
resultList.add(rs.getInt(1));
}
rs.close();
}
return resultList;
}

/**
* 插入主表
*
* @date 上午9:07:26
* @author DuChaoWei
* @descripte
* @param map
* @param tableName
* @return
* @throws SQLException
*/
private Integer insertMain(Map<String, Object> map, String tableName) throws SQLException {
Integer result = null;
// 數據
byte[] str = CompressUtils.unGZip((byte[]) map.get("data"));
String obj = new String(str);
JSONObject data = new JSONObject(obj);
StringBuffer sb = new StringBuffer();
sb.append(" insert into " + tableName);
sb.append(" (mongo_id,title,type,author,company,dates,flag) ");
sb.append(" values ( ");
sb.append("‘" + map.get("_id").toString() + "‘,");
sb.append("‘" + data.getString("title").replaceAll("‘", "‘‘") + "‘,");
sb.append("‘" + data.getString("type").replaceAll("‘", "‘‘") + "‘,");
sb.append("‘" + data.getString("author").replaceAll("‘", "‘‘") + "‘,");
sb.append("‘" + data.getString("company").replaceAll("‘", "‘‘") + "‘,");
sb.append("‘" + data.getString("dates").replaceAll("‘", "‘‘") + "‘,");
sb.append(" 0) ");

PreparedStatement ps1 = con.prepareStatement(sb.toString(),Statement.RETURN_GENERATED_KEYS);
ps1.executeUpdate();
ResultSet rs = ps1.getGeneratedKeys();
while(rs.next()){
result = rs.getInt(1);
}
rs.close();
// QueryRunner runner = new QueryRunner(dataSource);
// result = runner.insert(sb.toString(), new ScalarHandler<Integer>("id"));
// ArrayHandler:把結果集中的第一行數據轉換成對象數組。
// ArrayListHandler:把結果集中的每一行數據都轉換成一個對象數組,再存放到List中。
// BeanHandler:將結果集中的第一行數據封裝到一個對應的JavaBean實例中。
// BeanListHandler:將結果集中的每一行數據都封裝到一個對應的JavaBean實例中,存放到List裏。
// MapHandler:將結果集中的第一行數據封裝到一個Map裏,key是列名,value就是對應的值。
// MapListHandler:將結果集中的每一行數據都封裝到一個Map裏,然後再存放到List。
// ColumnListHandler:將結果集中某一列的數據存放到List中。
// KeyedHandler(name):將結果集中的每一行數據都封裝到一個Map裏(List),再把這些map再存到一個map裏,其key為指定的列。
// ScalarHandler:獲取結果集中第一行數據指定列的值,常用來進行單值查詢
// result =
// LocalPostgisDAO.getInstance().getQuery().insert(sb.toString(),
// new ResultSetHandler<Integer>() {
// @Override
// public Integer handle(ResultSet arg0) throws SQLException {
// return arg0.getInt(0);
// }});
return result;
}

/**
* 老數據處理引文
*
* @date 下午4:37:24
* @author DuChaoWei
* @descripte
* @param quote
* @param mongoTable
* @return
*/
private Map<String, Object> oldFormatQuote(String quote, String mongoTable, String type) {

Map<String, Object> rsMap = new HashMap<>();
String[] mdArray = quote.split(",");
try {
// mongoid
rsMap.put("mongo_id", 0);
// 篇名
rsMap.put("title", mdArray[1].trim());
// 類型
rsMap.put("type", type);
// 作者
rsMap.put("author", mdArray[0].trim());
// 機構
rsMap.put("company", mdArray[2].trim().replace("‘", "‘‘"));
// 時間
rsMap.put("dates", mdArray[3].trim());
} catch (Exception e) {
errorCount++;
errorList.add(quote);
return null;
}
// 獲取引文 的mongoid
// Map<String, Object> map = getMongoData(mongoTable, mdArray[1]);
// if (map != null) {
// rsMap.put("mongo_id", map.get("_id"));
// } else {
// rsMap.put("mongo_id", 0);
// }
return rsMap;

}

/**
* 處理單個引用文獻
*
* @date 上午10:24:37
* @author DuChaoWei
* @descripte
* @param quote
* @return
*/
private Map<String, Object> newFormatQuote(String quote, String mongoTable) {
Map<String, Object> rsMap = new HashMap<>();
String[] mdArray = quote.replaceAll(";", "").split("\\.");
try {
// mongoid
rsMap.put("mongo_id", 0);
// 篇名
rsMap.put("title", mdArray[0].trim().substring(0, mdArray[0].indexOf("[")));
// 類型
rsMap.put("type", mdArray[0].trim().substring(mdArray[0].indexOf("["), mdArray[0].length()));
// 作者
rsMap.put("author", mdArray[1].trim());
// 機構
rsMap.put("company", mdArray[2].trim());
// 時間
rsMap.put("dates", mdArray[3].trim());
} catch (Exception e) {
errorCount++;
errorList.add(quote);
return null;
}
// 獲取引文 的mongoid
// Map<String, Object> map = getMongoData(mongoTable, mdArray[0].substring(0, mdArray[0].indexOf("[")));
// if (map != null) {
// rsMap.put("mongo_id", map.get("_id"));
// } else {
// rsMap.put("mongo_id", 0);
// }
rsMap.put("mongo_id", 0);
return rsMap;
}

/**
* 獲取mongo文檔總數
*
* @date 下午4:01:33
* @author DuChaoWei
* @descripte
* @param collectionName
* @return
*/
private int getMongoDbCount(String collectionName) {
BasicDBObject sort = new BasicDBObject();
sort.put("_id", 1);
DBCollection collection = LocalMongoDAO.getCollection(collectionName);
DBCursor cursor = collection.find().sort(sort);
return cursor.count();
}

/**
* 分頁查詢
*
* @date 下午5:44:57
* @author DuChaoWei
* @descripte
* @param collectionName
* @param skipNum
* @param totalCount
* @return
*/
@SuppressWarnings("unchecked")
private List<Map<String, Object>> getMongoData(String collectionName, int skipNum) {
List<Map<String, Object>> list = new ArrayList<>();
BasicDBObject sort = new BasicDBObject();
sort.put("_id", 1);// 1標識 順序 排序
DBCollection collection = LocalMongoDAO.getCollection(collectionName);
DBCursor cursor = collection.find().sort(sort).skip(skipNum).limit(searchSize);
while (cursor.hasNext()) {
DBObject obj = cursor.next();
if (obj != null) {
list.add(obj.toMap());
}
}
return list;
}

/**
* 獲取mongo數據
*
* @date 上午11:08:51
* @author DuChaoWei
* @descripte
* @param collectionName
* @param tital
* @return
*/
@SuppressWarnings("unchecked")
private Map<String, Object> getMongoData(String collectionName, String title) {
BasicDBObject queryObj = new BasicDBObject();
BasicDBObject queryObj_1 = new BasicDBObject();
queryObj.put("data", queryObj_1);
queryObj_1.put("title", title);
DBCollection collection = LocalMongoDAO.getCollection(collectionName);
DBObject obj = collection.findOne(queryObj);
if (obj != null) {
return obj.toMap();
}
return null;
}

/**
* 獲取配置文件
*
* @date 下午3:37:57
* @author DuChaoWei
* @descripte
* @return
* @throws IOException
*/
private String getJsonConfig() throws IOException {
StringBuffer buffer = new StringBuffer();
// String realPath = RootStart.getRootPath();
// String subPath = "/WEB-INF/res/config/" + name;
// String filePath = realPath + subPath;
// System.out.println("科技文獻標準化配置文件:" + filePath);
String filePath = "D:/ylkfSoft/workspace/IndexWeb/zhishi_config/" + name;
File file = new File(filePath);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
line = line.trim();
buffer.append(line);
}
bufferedReader.close();

return removeNotes(buffer.toString());
}

/**
* 去掉註釋
*
* @date 下午3:38:10
* @author DuChaoWei
* @descripte
* @param str
* @return
*/
private String removeNotes(String str) {
int start = str.indexOf("//#");
int end = str.indexOf("#//");
if (start > -1 && end > -1) {
String oldStr = str.substring(start, end + 3);
String removeStr = str.replace(oldStr, "");
return removeNotes(removeStr);
} else {
return str;
}
}

public static void main(String[] args) {
CreateQuoteRelation cqr = new CreateQuoteRelation();
cqr.Start();
}
}

java dbcp連接池,大數據處理循環多表操作插入事例