專案練習(二)—微博資料結構化
1.ETL概念
ETL,是英文 Extract-Transform-Load 的縮寫,用來描述將資料從來源端經過抽取(extract)、互動轉換(transform)、載入(load)至目的端的過程。
2.專案目標:
本次專案側重於資料的整合(即將檔案中資料進行清洗成為乾淨的結構化的資料儲存在Hive中)。
1、將微博資料爬取下來儲存在資料檔案中
user-->使用者資訊
comment-->對應每個使用者的微博內容
2、格式化資料保證能夠load 到表hive中
3、原始資料結構:
(1) weibodata(資料夾)---房地產(資料夾)---content(資料夾)---以uid為名的txt
---user(資料夾)----以uid為名的txt
其他資料夾桶房地產相同
(2)user資料夾下 以uid為名的TXT內容格式
User [id=1107717945, screenName=null, name=null, province=0, city=0, location=null, description=null, url=null, profileImageUrl=null, userDomain=null, gender=null, followersCount=0, friendsCount=0, statusesCount=0, favouritesCount=0, createdAt=null, following=false, verified=false, verifiedType=0, allowAllActMsg=false, allowAllComment=false, followMe=false, avatarLarge=null, onlineStatus=0, status=null, biFollowersCount=0, remark=房地產, lang=null, verifiedReason=null]
(3)content資料夾下 以uid為名的TXT內容格式
<comment>
<content>2014年可以使用,中國中心在64層到69層共2萬平米 //@禾臣薛朝陽:#中國中心#也蓋出來了?@許立-VANTONE</content>
<time>2012-4-3 11:30:08</time>
<repostsCount>8</repostsCount>
<commentsCount>4</commentsCount>
</comment>
3.專案思路
2.1.原始資料檔案解析成資料物件
2.2.資料物件序列化到輸出資料檔案中
資料檔案-->User {} --> UserPojo物件 -->(獲取)List<UserPojo> --> fileUtil\IOUtil
-->Content{} -->ContentPojo物件 -->(獲取)LIst<ContentPojo> --> fileUtil\IOUtil
3.Hive過程 -- (規範四個步驟:config、create、udf、deal)
3.1 建立表 -- User和Content
3.2 載入資料到User和Content中
4.資料結構化完成!
4.專案分析
1.源資料檔案 -- 資料格式
檢視爬取的微博資料格式:User和Content
2.寫Java程式對源資料檔案處理,生成"結構化"的物件,並按照"結構化"格式輸出到檔案中。
寫Java程式碼,進行結構化:
2.1.需要對檔案操作、需要IO流讀寫、可能會對日期進行處理、需要解析XML格式檔案。
編寫對應的工具類
2.2.需要將資料檔案結構化成POJO物件:
User檔案 -- 建立UserInfo類
Content檔案 -- 建立ContentInfo類
2.3.資料進行結構化,需要管理結構化的流程:
DataLoadManager類:提供多個方法,按步驟進行格式化。
讀取檔案 -> 解析成User物件或Content物件 -> 結構化寫入到輸出檔案中
2.4.提供一個系統控制類,方便統一開啟任務執行。
SystemController類 -- 開關類
3.將生成的輸出檔案,載入到Hive表中。
3.1.規範四個步驟:config、create、udf、deal
3.2.編寫指令碼
5.專案開發
先從專案規範上:
4.1.config:編寫環境變數的指令碼
4.2.create:編寫建表指令碼
4.3.udf:存放我們Java編寫的"生成結構化資料的Jar包"
// 新建Maven專案 - 資料結構化程式開發 - 打Jar包
//通過命令java -jar jar包 inputDir
//得到:在執行命令的目錄下會得到兩個資料檔案user.txt 和content.txt
4.4.deal:處理資料的指令碼
6.Jar包開發
專案架構:
(1)controller --->systemController->系統控制(使用者的總入口)
(2)manager --->DataLoadManger--管理資料結構化的流程:(資料流的串聯)
(讀取原檔案->物件->輸出到檔案)
(3)pojo --資料模型:User Content檔案
(4)util--工具類:FileOperatorUtil\IOUtil\DateUtil\XMLParseUtil(或者正則)
7.專案程式碼:
(1)Util包
package com.tl.job002.utils; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileWriter; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; public class IOUtil { /** * 檔案按行讀到list * @param txtFilePath * @param charset * @return * @throws Exception */ public static List<String> getTxtContent(String txtFilePath, String charset) throws Exception { File txtFile = new File(txtFilePath); FileInputStream fis = new FileInputStream(txtFile); InputStreamReader isr = new InputStreamReader(fis, charset); BufferedReader br = new BufferedReader(isr); List<String> lineList = new ArrayList<String>(); String tempLine = null; //將讀取的每一行放入list<String>中 //即每一行comment都是一個List物件 while ((tempLine = br.readLine()) != null) { lineList.add(tempLine); } br.close(); return lineList; } /** * * @param lineList * @param outputFilePath * @param charset * @return * @throws Exception */ public static boolean writeListToFile(List<String> lineList, String outputFilePath, String charset) throws Exception { File outputFile = new File(outputFilePath); FileOutputStream fos = new FileOutputStream(outputFile); int lineCounter = 0; for (String line : lineList) { if (lineCounter > 0) { //先判斷下一行有內容,在輸出換行符,防止輸出內容就立刻輸出換行符導致最後多一個換行符 //第一行不會輸出,當第二行才會輸出,最後一行也不會輸出 fos.write('\n'); } fos.write(line.getBytes(charset)); lineCounter++; } fos.close(); return true; } public static boolean writeListToFile(String txtContent, String outputFilePath, String charset) throws Exception { File outputFile = new File(outputFilePath); FileOutputStream fos = new FileOutputStream(outputFile); fos.write(txtContent.getBytes(charset)); fos.close(); return true; } public static void main(String[] args) throws Exception { // String txtFilePath = "房地產\\user\\2297199692.txt"; String txtFilePath = "房地產\\content\\1484018951.txt"; String inputCharset = "gbk"; String outputCharset = "utf-8"; String outputFilePath = "newFile.txt"; List<String> lineList = getTxtContent(txtFilePath, inputCharset); for (String tempLine : lineList) { System.out.println(tempLine); } writeListToFile(lineList, outputFilePath, outputCharset); } }
package com.tl.job002.utils; import java.io.File; import java.util.ArrayList; import java.util.List; public class FileOperatorUtil { public static List<String> getAllSubNormalFilePath(String filePath) { File file = new File(filePath); List<String> resultList = new ArrayList<String>(); // 如果是目錄,則往下一層 if (file.isDirectory()) { for (File tempFile : file.listFiles()) { //將該直接子檔案直接寫到list中 //注意:這裡使用遞迴的方式,返回一級目錄的全路徑 resultList.addAll(getAllSubNormalFilePath(tempFile.toString())); } } else { resultList.add(file.toString()); } return resultList; } /** * 得到不帶字尾的檔名 * @param inputPath * @return */ public static String getFileNameWithoutSuffix(String inputPath){ //new File(inputPath).getName().split("\\.") return new File(inputPath).getName().split("\\.")[0]; } public static void main(String[] args) { String inputPath="房地產\\user\\1855569733.txt"; System.out.println(); } }
package com.tl.job002.utils; import java.util.List; public class StringUtil { public static String join(List<Object> objList, String deli) { //記錄當前行號 int lineCounter = 0; //建立字串 StringBuilder stringBuilder = new StringBuilder(); //遍歷 for (Object obj : objList) { if (lineCounter > 0) { //當不是第一行時候,進行拼接字元 stringBuilder.append(deli); } stringBuilder.append(obj.toString()); lineCounter++; } return stringBuilder.toString(); } }
package com.tl.job002.utils; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class DateUtil { static SimpleDateFormat dateFormat = new SimpleDateFormat( "yyyy-MM-dd hh:mm:ss"); public static Date getDate(String dateString) throws ParseException { return dateFormat.parse(dateString); } public static String formatDate(Date date){ return dateFormat.format(date); } public static void main(String[] args) { Date date=new Date(); System.out.println(formatDate(date)); } }
package com.tl.job002.utils; import java.awt.print.Book; import java.io.File; import java.io.StringReader; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.dom4j.Attribute; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; import org.dom4j.io.SAXReader; public class XmlParserUtil { private static ArrayList<Book> bookList = new ArrayList<Book>(); public static void printXML(String xmlPath) { // 解析books.xml檔案 // 建立SAXReader的物件reader SAXReader reader = new SAXReader(); try { // 通過reader物件的read方法載入books.xml檔案,獲取docuemnt物件。 Document document = reader.read(new File(xmlPath)); // 通過document物件獲取根節點bookstore Element bookStore = document.getRootElement(); // 通過element物件的elementIterator方法獲取迭代器 Iterator it = bookStore.elementIterator(); // 遍歷迭代器,獲取根節點中的資訊(書籍) while (it.hasNext()) { Element book = (Element) it.next(); System.out.println("節點名:" + book.getName() + "--節點值:" + book.getStringValue()); } } catch (DocumentException e) { e.printStackTrace(); } } public static Element getXmlRootElement(File xmlFile) { // 解析books.xml檔案 // 建立SAXReader的物件reader SAXReader reader = new SAXReader(); try { // 通過reader物件的read方法載入books.xml檔案,獲取docuemnt物件。 Document document = reader.read(xmlFile); // 通過document物件獲取根節點bookstore Element bookStore = document.getRootElement(); return bookStore; } catch (DocumentException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public static Element getXmlRootElement(String xmlContent, boolean isFilter) { SAXReader reader = new SAXReader(); try { if (isFilter) { xmlContent = xmlContent.replace("&", "&") .replace("'", "'").replace("\"", """); } StringReader stringReader = new StringReader(xmlContent); Document document = reader.read(stringReader); // 通過document物件獲取根節點bookstore Element rootElement = document.getRootElement(); return rootElement; } catch (DocumentException e) { // e.printStackTrace(); } return null; } /** * @param args */ public static void main(String[] args) { // String xmlContent = // "<comment><content>回覆@雷亞雷:在哈佛或MIT,每天都有各種議題的小型研討會,凡在中午或晚餐時間舉行的,一般都會準備便餐。即使如此,有時也會忽略。 //@雷亞雷:王老師,不吃午飯嗎?</content><time>2012-4-6 5:45:52</time><repostsCount>574</repostsCount><commentsCount>290</commentsCount></comment>"; String xmlContent = "<comment><content>誠意推介加拿大殿堂級音樂大師David Foster 的演唱會DVD《Hit Man Returns: David Foster & Friends》!演唱者包括Earth, Wind & Fire、Michael Bolton及Donna Summer等等,全都是星光熠熠的唱家班!就連只得11歲的America's Got Talent參加者Jackie Evancho的女高音亦非常震撼人心,金曲聽出耳油!</content><time>2011-6-11 0:15:28</time><repostsCount>1</repostsCount><commentsCount>7</commentsCount></comment>"; xmlContent = xmlContent.replace("&", "&").replace("'", "'") .replace("\"", """); Element rootElement = getXmlRootElement(xmlContent, true); System.out.println(rootElement.elementText("content")); } }
(2)pojos包
package com.tl.job002.pojos; import java.util.ArrayList; import java.util.Date; import java.util.List; import com.tl.job002.utils.StringUtil; public class WbUserInfoPojo { private long uid; private String screenName; private String name; private int province; private int city; private String location; private String description; private String userDomain; private String gender; private int followersCount; private int friendsCount; private int statusesCount; private int favouritesCount; private Date createdAt; private boolean verified; private String remark; private String verifiedReason; @Override public String toString() { return "WbUserInfoPojo [uid=" + uid + ", screenName=" + screenName + ", name=" + name + ", province=" + province + ", city=" + city + ", location=" + location + ", description=" + description + ", userDomain=" + userDomain + ", gender=" + gender + ", followersCount=" + followersCount + ", friendsCount=" + friendsCount + ", statusesCount=" + statusesCount + ", favouritesCount=" + favouritesCount + ", createdAt=" + createdAt + ", verified=" + verified + ", remark=" + remark + ", verifiedReason=" + verifiedReason + "]"; } public String toString4FileOutput() { List<Object> fieldList = new ArrayList<Object>(); fieldList.add(uid); fieldList.add(getScreenName()); fieldList.add(province); fieldList.add(remark); // System.out.println("screenName="+(screenName==null?"null值":"null字串")); return StringUtil.join(fieldList, "\001"); } public long getUid() { return uid; } public void setUid(long uid) { this.uid = uid; } public String getScreenName() { // if(screenName==null || screenName.equals("null")){ // screenName=""; // } return screenName; } public void setScreenName(String screenName) { this.screenName = screenName; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getProvince() { return province; } public void setProvince(int province) { this.province = province; } public int getCity() { return city; } public void setCity(int city) { this.city = city; } public String getLocation() { return location; } public void setLocation(String location) { this.location = location; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getUserDomain() { return userDomain; } public void setUserDomain(String userDomain) { this.userDomain = userDomain; } public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } public int getFollowersCount() { return followersCount; } public void setFollowersCount(int followersCount) { this.followersCount = followersCount; } public int getFriendsCount() { return friendsCount; } public void setFriendsCount(int friendsCount) { this.friendsCount = friendsCount; } public int getStatusesCount() { return statusesCount; } public void setStatusesCount(int statusesCount) { this.statusesCount = statusesCount; } public int getFavouritesCount() { return favouritesCount; } public void setFavouritesCount(int favouritesCount) { this.favouritesCount = favouritesCount; } public Date getCreatedAt() { return createdAt; } public void setCreatedAt(Date createdAt) { this.createdAt = createdAt; } public boolean isVerified() { return verified; } public void setVerified(boolean verified) { this.verified = verified; } public String getRemark() { return remark; } public void setRemark(String remark) { this.remark = remark; } public String getVerifiedReason() { return verifiedReason; } public void setVerifiedReason(String verifiedReason) { this.verifiedReason = verifiedReason; } }
package com.tl.job002.pojos; import java.util.ArrayList; import java.util.Date; import java.util.List; import com.tl.job002.utils.DateUtil; import com.tl.job002.utils.StringUtil; public class WbContentInfoPojo { private long uid; private String content; private Date time; private int repostsCount; private int commentsCount; /** * 為了輸出到檔案,可以寫一個指定輸出格式的方法 * 目的:讓物件以指定字串格式輸出到檔案中 */ public String toString4FileOutput() { List<Object> fieldList = new ArrayList<Object>(); fieldList.add(uid); fieldList.add(content); fieldList.add(DateUtil.formatDate(time)); fieldList.add(repostsCount); fieldList.add(commentsCount); //將List按照指定分隔符進行拼接 return StringUtil.join(fieldList, "\001"); } public long getUid() { return uid; } public void setUid(long uid) { this.uid = uid; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public Date getTime() { return time; } public void setTime(Date time) { this.time = time; } public int getRepostsCount() { return repostsCount; } public void setRepostsCount(int repostsCount) { this.repostsCount = repostsCount; } public int getCommentsCount() { return commentsCount; } public void setCommentsCount(int commentsCount) { this.commentsCount = commentsCount; } }
package com.tl.job002.pojos; import java.util.List; /** * 將User集合和content集合封裝到一起 * @author dell * */ public class UserAndContentInfoPojo { private List<WbUserInfoPojo> userPojoList; private List<WbContentInfoPojo> contentPojoList; public UserAndContentInfoPojo(List<WbUserInfoPojo> userPojoList, List<WbContentInfoPojo> contentPojoList) { super(); this.userPojoList = userPojoList; this.contentPojoList = contentPojoList; } public List<WbUserInfoPojo> getUserPojoList() { return userPojoList; } public void setUserPojoList(List<WbUserInfoPojo> userPojoList) { this.userPojoList = userPojoList; } public List<WbContentInfoPojo> getContentPojoList() { return contentPojoList; } public void setContentPojoList(List<WbContentInfoPojo> contentPojoList) { this.contentPojoList = contentPojoList; } }
(3)manager包
package com.tl.job002.manager; import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.dom4j.Element; import com.tl.job002.pojos.UserAndContentInfoPojo; import com.tl.job002.pojos.WbContentInfoPojo; import com.tl.job002.pojos.WbUserInfoPojo; import com.tl.job002.utils.DateUtil; import com.tl.job002.utils.FileOperatorUtil; import com.tl.job002.utils.IOUtil; import com.tl.job002.utils.XmlParserUtil; /** * 資料結構化處理類 * @author dell *思路:1、原資料檔案-->UserInfo()物件 -->list * -->ContentInfo()物件 -->list --->UserAndContentInfoPojo物件 * 2、UserAndContentInfoPojo物件 -->user_pojo_list.txt * -->content_pojo_list.txt * *思路整合: *資料檔案: ->{uid,List<String>} ->{userInfoPojo} */ public class DataLoadManager { public static class UidAndListPojo { //內部靜態 //uid private String uid; //該uid對應的每條資訊: private List<String> lineList; public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public List<String> getLineList() { return lineList; } public void setLineList(List<String> lineList) { this.lineList = lineList; } } /** * 得到所有原始檔的 * @param inputDir * @param charset * @return * @throws Exception */ public static List<UidAndListPojo> getAllFileMapResult(String inputDir, String charset) throws Exception { // key是uid,value是行集合 List<UidAndListPojo> uidAndListPojoList = new ArrayList<UidAndListPojo>(); List<String> txtFilePathList = FileOperatorUtil .getAllSubNormalFilePath(inputDir); for (String txtFilePath : txtFilePathList) { ArrayList<String> txtLineList = new ArrayList<String>(); List<String> singleTxtLineList = IOUtil.getTxtContent(txtFilePath, charset); txtLineList.addAll(singleTxtLineList); //獲取該子檔案的檔案uid String uidValue = FileOperatorUtil .getFileNameWithoutSuffix(txtFilePath); UidAndListPojo uidAndListPojo = new UidAndListPojo(); uidAndListPojo.setLineList(txtLineList); uidAndListPojo.setUid(uidValue); uidAndListPojoList.add(uidAndListPojo); } return uidAndListPojoList; } public static UserAndContentInfoPojo getConstructInfoPojo( List<UidAndListPojo> uidAndListPojoList) throws ParseException { List<WbUserInfoPojo> userPojoList = new ArrayList<WbUserInfoPojo>(); List<WbContentInfoPojo> contentPojoList = new ArrayList<WbContentInfoPojo>(); int errorLineCounter4Content = 0; int errorLineCounter4User = 0; for (UidAndListPojo uidAndListPojo : uidAndListPojoList) { String uidValue = uidAndListPojo.getUid(); for (String line : uidAndListPojo.getLineList()) { line = line.trim(); if (line.length() == 0) { continue; } if (line.startsWith("<")) { // 說明content型別 line = line.trim(); //獲取該元素的根元素 Element rootElement = XmlParserUtil.getXmlRootElement(line,true); //出錯過濾掉就可以 if (rootElement == null) { // System.out.println("解析出現錯誤!"); // System.out.println(line); errorLineCounter4Content++; continue; } WbContentInfoPojo contentInfoPojo = new WbContentInfoPojo(); contentInfoPojo.setUid(Long.parseLong(uidValue)); contentInfoPojo.setContent(rootElement .elementText("content")); contentInfoPojo.setTime(DateUtil.getDate(rootElement .elementText("time"))); contentInfoPojo.setRepostsCount(Integer .parseInt(rootElement.elementText("repostsCount"))); contentInfoPojo .setCommentsCount(Integer.parseInt(rootElement .elementText("commentsCount"))); // 將形成的物件加入指定content List當中 contentPojoList.add(contentInfoPojo); } else { // 剩餘是user型別 try { line = line.subSequence(line.indexOf('[') + 1, line.lastIndexOf(']')).toString(); String[] kvArray = line.split(","); WbUserInfoPojo userInfoPojo = new WbUserInfoPojo(); for (String kv : kvArray) { kv = kv.trim(); String[] kvPair = kv.split("="); if (kvPair[0].equals("id")) { userInfoPojo.setUid(Long.parseLong(kvPair[1])); } else if (kvPair[0].equals("screenName")) { userInfoPojo.setScreenName(kvPair[1]); } else if (kvPair[0].equals("province")) { userInfoPojo.setProvince(Integer .parseInt(kvPair[1])); } else if (kvPair[0].equals("remark")) { userInfoPojo.setRemark(kvPair[1]); } } userPojoList.add(userInfoPojo); } catch (Exception e) { // System.out.println(line); errorLineCounter4User++; } } } } System.out.println("errorLineCounter4Content=" + errorLineCounter4Content); System.out.println("errorLineCounter4User=" + errorLineCounter4User); return new UserAndContentInfoPojo(userPojoList, contentPojoList); } /** * 將 * @param userAndContentInfoPojo * @param userOutputFilePath * @param contentOutputFilePath * @param outputCharset * @return * @throws Exception */ public static boolean writePojoToFile( UserAndContentInfoPojo userAndContentInfoPojo, String userOutputFilePath, String contentOutputFilePath, String outputCharset) throws Exception { // 1、輸出user pojo list List<WbUserInfoPojo> userInfoPojoList = userAndContentInfoPojo .getUserPojoList(); StringBuilder stringBuilder = new StringBuilder(); int lineCounter = 0; for (WbUserInfoPojo tempPojo : userInfoPojoList) { if (lineCounter > 0) { stringBuilder.append("\n"); } stringBuilder.append(tempPojo.toString4FileOutput()); lineCounter++; } IOUtil.writeListToFile(stringBuilder.toString(), userOutputFilePath, outputCharset); // 輸出content pojo list List<WbContentInfoPojo> contentInfoPojoList = userAndContentInfoPojo .getContentPojoList(); stringBuilder = new StringBuilder(); lineCounter = 0; for (WbContentInfoPojo tempPojo : contentInfoPojoList) { if (lineCounter > 0) { stringBuilder.append("\n"); } stringBuilder.append(tempPojo.toString4FileOutput()); lineCounter++; } IOUtil.writeListToFile(stringBuilder.toString(), contentOutputFilePath, outputCharset); return true; } /** * 標準的輸入輸出 * @param inputDir * @param inputCharset * @param output4User * @param output4Content * @param outputCharset * @return */ public static boolean startProcess(String inputDir, String inputCharset, String output4User, String output4Content, String outputCharset) { try { // 把給定目錄中的文字檔案讀取成list List<UidAndListPojo> uidAndLiPojoList = getAllFileMapResult( inputDir, inputCharset); // 將字串的list轉化成結構化物件pojo形式的list UserAndContentInfoPojo userAndContentInfoPojo = getConstructInfoPojo(uidAndLiPojoList); // 把兩個pojo形式的list物件,分別持久化輸出到一個統一的文字檔案中,編碼為utf-8 writePojoToFile(userAndContentInfoPojo, output4User, output4Content, outputCharset); } catch (Exception e) { e.printStackTrace(); return false; } return true; } public static void main(String[] args) throws Exception { // String inputDir = "房地產"; String inputDir = "weibodata"; String inputCharset = "gbk"; String output4User = "user_pojo_list.txt"; String output4Content = "content_pojo_list.txt"; String outputCharset = "utf-8"; startProcess(inputDir, inputCharset, output4User, output4Content, outputCharset); System.out.println("done!"); } }
(4)controler包
package com.tl.job002.controler; import com.tl.job002.manager.DataLoadManager; public class SystemControler { public static void main(String[] args) throws Exception { if(args==null || args.length!=1){ System.out.println("usage: 至少需要輸入一個源資料目錄!"); } String inputDir = args[0]; String inputCharset = "gbk"; String output4User = "user_pojo_list.txt"; String output4Content = "content_pojo_list.txt"; String outputCharset = "utf-8"; DataLoadManager.startProcess(inputDir, inputCharset, output4User, output4Content, outputCharset); System.out.println("done!"); } }