觀察者模式實際應用:監聽線程,意外退出線程後自動重啟
阿新 • • 發佈:2017-06-21
lee text 實時 之間 最終 ren tap instance and
摘要: 觀察者模式,定義對象之間的一種一對多的依賴關系,當對象的狀態發生改變時,所有依賴於它的對象都得到通知並且被自動更新。觀察者模式在JDK中有現成的實現,java.util.Obserable。
首先說下需求:通過ftp上傳約定格式的文件到服務器指定目錄下,應用程序能實時監控該目錄下文件變化,如果上傳的文件格式符合要求,將將按照每一行讀取解析再寫入到數據庫,解析完之後再將文件改名。(這個是原先已經實現了的功能,請看我的一篇文章java利用WatchService實時監控某個目錄下的文件變化並按行解析(註:附源代碼))
但項目上線一段時間後,發現再利用FileZilla登陸上傳文件,文件不能被解析,而重啟tomcat之後再上傳,又能解析,於是判定是監控指定目錄的那個線程掛掉了,導致上傳後的文件不能被檢測到,故也不能被解析。之後查看日誌也最終驗證了我推斷。
所以關鍵的問題就是:如何監聽線程,當意外退出線程後進行自動重啟,這也是本文所要利用觀察者模式實現的。
下面請看實現過程(尤其見紅色註解部分):
1、web.xml監聽器配置文件監控監聽器,初始化創建一個監控指定目錄的線程
<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:root-context.xml</param-value>
</context-param>
<filter>
<filter-name>CharacterEncodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>CharacterEncodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<filter>
<filter-name>sitemesh</filter-name>
<filter-class>com.opensymphony.sitemesh.webapp.SiteMeshFilter</filter-class>
</filter>
<filter-mapping>
<filter-name>sitemesh</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<servlet>
<servlet-name>appServlet</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:servlet-context.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>appServlet</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<!-- 配置spring監聽器 -->
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<!-- 配置監控文件變化監聽器 -->
< listener>
<listener-class>com.zealer.ad.listener.ThreadStartUpListenser</listener-class>
</listener>
<listener>
<listener-class>com.zealer.ad.listener.SessionLifecycleListener</listener-class>
</listener>
<jsp-config>
<taglib>
<taglib-uri>/tag</taglib-uri>
<taglib-location>/WEB-INF/tag/tag.tld</taglib-location>
</taglib>
</jsp-config>
<welcome-file-list>
<welcome-file>index.jsp</welcome-file>
</welcome-file-list>
<session-config>
<session-timeout>45</session-timeout>
</session-config>
</web-app>
2、編寫一個觀察者實現類,用於監聽“監控指定目錄線程”,當“監控指定目錄線程”掛掉後,自動重啟該線程
package com.zealer.ad.listener; import java.util.Observable; import java.util.Observer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.zealer.ad.task.WatchFilePathTask; public class ObserverListener implements Observer{ private Log log = LogFactory.getLog(ObserverListener.class); @Override public void update(Observable o, Object arg) { log.info("WatchFilePathTask掛掉"); WatchFilePathTask run = new WatchFilePathTask(); run.addObserver(this); new Thread(run).start(); log.info("WatchFilePathTask重啟"); } }
3、編寫一個ThreadStartUpListenser類,實現ServletContextListener,tomcat啟動時創建後臺線程
package com.zealer.ad.listener; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.stereotype.Component; import com.zealer.ad.task.WatchFilePathTask; @Component public class ThreadStartUpListenser implements ServletContextListener { private static WatchFilePathTask r = new WatchFilePathTask(); private Log log = LogFactory.getLog(ThreadStartUpListenser.class); @Override public void contextDestroyed(ServletContextEvent paramServletContextEvent) { // r.interrupt(); } @Override public void contextInitialized(ServletContextEvent paramServletContextEvent) { ObserverListener listen = new ObserverListener();
//給“監控指定目錄下的線程”(被觀察者),添加一個觀察者 r.addObserver(listen); new Thread(r).start(); // r.start(); log.info("ImportUserFromFileTask is started!"); } }
4、創建指定目錄文件變化監控類WatchFilePathTask
package com.zealer.ad.task; import java.io.File; import java.io.FileFilter; import java.nio.file.FileSystems; import java.nio.file.Path; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; import java.util.Observable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.joda.time.DateTime; import com.zealer.ad.util.ConfigUtils; import com.zealer.ad.util.SpringUtils; /** * 指定目錄文件變化監控類 * @author cancer * */ public class WatchFilePathTask extends Observable implements Runnable { private Log log = LogFactory.getLog(WatchFilePathTask.class); private static final String filePath = ConfigUtils.getInstance().getValue("userfile_path"); private WatchService watchService; // 此方法一經調用,立馬可以通知觀察者,在本例中是監聽線程 public void doBusiness()
{ if(true)
{ super.setChanged(); }
//通知觀察者,重啟線程 notifyObservers(); } @Override public void run() { try { //獲取監控服務 watchService = FileSystems.getDefault().newWatchService(); log.debug("獲取監控服務"+watchService); Path path = FileSystems.getDefault().getPath(filePath); log.debug("@@@:Path:"+path); final String todayFormat = DateTime.now().toString("yyyyMMdd"); File existFiles = new File(filePath); //啟動時檢查是否有未解析的符合要求的文件 if(existFiles.isDirectory()) { File[] matchFile = existFiles.listFiles(new FileFilter() { @Override public boolean accept(File pathname) { if((todayFormat+".txt").equals(pathname.getName())) { return true; } else { return false; } } }); if(null != matchFile) { for (File file : matchFile) { //找到符合要求的文件,開始解析 ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask"); task.setFileName(file.getAbsolutePath()); task.start(); } } } //註冊監控服務,監控新增事件 WatchKey key = path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE); while (true) { key = watchService.take(); for (WatchEvent<?> event : key.pollEvents()) { //獲取目錄下新增的文件名 String fileName = event.context().toString(); //檢查文件名是否符合要求 if((todayFormat+".txt").equals(fileName)) { String filePath = path.toFile().getAbsolutePath()+File.separator+fileName; log.info("import filePath:"+filePath); //啟動線程導入用戶數據 ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask");//new ImportUserFromFileTask(filePath); task.setFileName(filePath); task.start(); log.debug("啟動線程導入用戶數據"+task); } } key.reset(); } }
catch (Exception e) { e.printStackTrace(); System.out.println("已經到這裏來了"); doBusiness();//在拋出異常時調用,通知觀察者,讓其重啟線程 } } }
5、創建解析用戶文件及導入數據庫線程,由WatchFilePathTask啟動
package com.zealer.ad.task; import com.zealer.ad.entity.AutoPutUser; import com.zealer.ad.entity.Bmsuser; import com.zealer.ad.service.AutoPutUserService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.joda.time.DateTime; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Date; import javax.annotation.Resource; /** * 解析用戶文件及入庫線程,由WatchFilePathTask啟動 * @author cancer * */ public class ImportUserFromFileTask extends Thread { private Log log = LogFactory.getLog(ImportUserFromFileTask.class); private String fileName; @Resource(name = "autoPutUserService") private AutoPutUserService autoPutUserService; @Override public void run() { File file = new File(fileName); if (file.exists() && file.isFile()) { log.debug(":@@@準備開始休眠10秒鐘:" + file); //休眠十分鐘,防止文件過大還沒完全拷貝到指定目錄下,這裏的線程就開始讀取文件 try { sleep(10000); } catch (InterruptedException e1) { e1.printStackTrace(); } InputStreamReader read; try { read = new InputStreamReader(new FileInputStream(file), "UTF-8"); BufferedReader bufferedReader = new BufferedReader(read); String lineTxt = null; int count = 0; Boolean f = false; while ((lineTxt = bufferedReader.readLine()) != null) { if ((null == lineTxt) || "".equals(lineTxt)) { continue; } if (lineTxt.startsWith("‘")) { lineTxt = lineTxt.substring(1, lineTxt.length()); } //解析分隔符為‘, ‘ String[] lines = lineTxt.split("‘, ‘"); int length = lines.length; if (length < 2) { continue; } Bmsuser bmsuser = new Bmsuser(); bmsuser.setName(lines[0]);if (!"".equals(lines[1])) { bmsuser.setCity(lines[1]); } //根據唯一索引已經存在的數據則不插入 f = autoPutUserService.insertIgnore(bmsuser); if (f) { count++; } } //匯總數據 AutoPutUser autoPutUser = new AutoPutUser(); autoPutUser.setTotalCount(autoPutUserService.getUserCount()); autoPutUser.setCount(count); autoPutUser.setCountDate(new Date(System.currentTimeMillis())); String today = DateTime.now().toString("yyyy-MM-dd"); Integer oldCount = autoPutUserService.getOldCount(today); //如果今天導入過了就更新否則插入 if (!oldCount.equals(0)) { autoPutUserService.updateUserData(autoPutUser, today, oldCount); } else { autoPutUserService.gatherUserData(autoPutUser); } //註意:要關閉流 read.close(); } catch (Exception e) { log.error(e.getMessage(), e); } File newFile = new File(file.getPath() + System.currentTimeMillis() + ".complate"); file.renameTo(newFile); } else { log.error(fileName + " file is not exists"); } } public String getFileName() { return fileName; } public void setFileName(String fileName) { this.fileName = fileName; } public AutoPutUserService getAutoPutUserService() { return autoPutUserService; } public void setAutoPutUserService(AutoPutUserService autoPutUserService) { this.autoPutUserService = autoPutUserService; } }
附帶:
1、sql腳本
CREATE TABLE `bmsuser` (
`id` int(255) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(32) DEFAULT NULL ,
`city` varchar(32) DEFAULT NULL COMMENT ,
PRIMARY KEY (`bmsid`),
UNIQUE KEY `bbLoginName` (`bbLoginName`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2、文件格式,命名為yyyyMMdd.txt
‘張三‘, ‘深圳‘
觀察者模式實際應用:監聽線程,意外退出線程後自動重啟