1. 程式人生 > >利用Redis keyspace notification 實現定時執行

利用Redis keyspace notification 實現定時執行

EventConstants類, 主要是用來定義事件相關常量

/**
 * 事件相關常量
 * @author victor
 *
 */
public class EventConstants {
	
	public static final String KEY_EVENT_AUTO_ID = "KEY_EVENT_AUTO_ID"; // event 計數器, 用於生成EVENT_ID

	public static final String KEY_EVENT_DATA_MAP = "KEY_EVENT_DATA_MAP"; //用來儲存使用者自定義data的 Hash
	
	public static final String EVENT_KEY = "KEY_EVENT"; // 用來做event redis唯一標識字首
	
	public static final String EVENT_DATA_KEY = "DATA";// hash  中儲存data字首
	
	public static final String DEFAULT_EMPTY_DATA = "KEY_EVENT_DATA_EMPTY"; // 空data時常量替換串
	
	public static final String EVENT_META_KEY = "META";//用來儲存 事件型別字首
}
Event 列舉類, 用來區分事件型別:
/**
 * 事件型別列舉
 * @author victor
 *
 */
public enum EventEnum implements IndexedEnum<EventEnum>{
	
	EXAMPLE(1,"example","示例事件");
	
	EventEnum(int index,String code,String name){
		this.index = index;
		this.name = name;
		this.code = code;
	}

	private int index;
	
	private String name;
	
	private String code;
	
	
	private static final ImmutableMap<Integer, EventEnum> INDEXS = IndexedEnumUtil.toIndexes(values());
	
	public static EventEnum indexOf(int index){
		return INDEXS.get(index);
	}


	public int getIndex() {
		return index;
	}

	public String getName() {
		return name;
	}

	public String getCode() {
		return code;
	}
	
}



訊息註冊Service以及介面:負責註冊事件,移出事件等實現:

/**
 * 事件服務介面
 * @author victor
 *
 */
public interface IEventService {

	/**
	 * 註冊事件,相同事件不會覆蓋,註冊成功將返回全域性唯一事件ID
	 * @param event  事件
	 * @param data	業務資料
	 * @return 事件ID,全域性唯一
	 */
	public String register(EventEnum event,String data);
	
	/**
	 * 註冊事件,相同事件不會覆蓋,註冊成功將返回全域性唯一事件ID
	 * @param event  事件
	 * @param data	業務資料
	 * @param seconds 事件執行時間, 單位秒
	 * @return 事件ID,全域性唯一
	 */
	public String register(EventEnum event,String data,int seconds);
	
	/**
	 * 註冊事件,相同事件不會覆蓋,註冊成功將返回全域性唯一事件ID
	 * @param event  事件
	 * @param data	業務資料
	 * @param seconds 事件執行時間
	 * @return 事件ID,全域性唯一
	 */
	public String register(EventEnum event,String data,Date date);
	
	
	/**
	 * 移除事件,如果事件已經被執行,移除無意義,如果事件未執行,則返回移除成功或者失敗
	 * @param eventId  事件ID
	 * @return 是否移除成功,true 表示成功,false表示失敗(如事件正在被執行)
	 */
	public boolean remove(String eventId);
	
}

實現類:

@Service
public class EventServiceImpl implements IEventService{
	
	private static Logger logger = Logger.getLogger(EventServiceImpl.class);
	
	@Autowired
	private IRedisService redisService;
	
	@Autowired
	private IDistributedLock distributedLock;

	@Override
	public String register(EventEnum event, String data) {
		return register(event, data,1);
	}

	@Override
	public String register(EventEnum event, String data, int seconds) {
		seconds = seconds <= 0 ? 1 : seconds;
		data = StringUtils.isNullOrEmpty(data) ? EventConstants.DEFAULT_EMPTY_DATA : data;
		String eventId = String.valueOf(redisService.incyby(EventConstants.KEY_EVENT_AUTO_ID));
		Map<String,String> hash = new HashMap<>();
		hash.put(EventUtils.getDataKey(eventId), data);
		hash.put(EventUtils.getMetaKey(eventId), event.getIndex() + "");
		String res = redisService.hmset(EventConstants.KEY_EVENT_DATA_MAP,hash);
		if(StringUtils.isNullOrEmpty(data) || StringUtils.isNullOrEmpty(hash) || StringUtils.isNullOrEmpty(res)){
			logger.error("Set " + EventConstants.KEY_EVENT_DATA_MAP + " error . eventId:" + eventId + ",data: " + data + ",meta:" + event.getIndex());
		}
		redisService.setex(EventUtils.getEventKey(eventId), event.getIndex() + "", seconds);
		logger.info("register event :" + eventId);
		/*ThreadPoolUtil.executor(ThreadPoolEnum.KEY_EVENT_REGISTER).execute(new KeyEventRegisterRunable(redisService,eventId,event,data,seconds));*/
		return eventId;
	}

	@Override
	public String register(EventEnum event, String data, Date date) {
		int seconds = DateUtils.calculateDifferenceSeconds(date, new Date());
		return register(event, data,seconds);
	}

	@Override
	public boolean remove(String eventId) {
		if(distributedLock.lock(eventId, 5)){
			redisService.del(EventUtils.getEventKey(eventId));
			redisService.hdel(EventConstants.KEY_EVENT_DATA_MAP, EventUtils.getDataKey(eventId),EventUtils.getMetaKey(eventId));
			distributedLock.unlock(eventId);
		}
		logger.info("Remove key event error : event is executed!");
		return false;
	}

}
實現比較簡單:
註冊事件: 根據事件型別,向redis中新增key, 並設定相關過期時間,然向hash中插入使用者自定義data.

移出事件: 直接刪除事件, 防止刪除事件同時事件正在執行,加鎖

訂閱監聽類:

/**
 * Key event 頻道訂閱<br/>
 * 需要redis版本2.8以上,並且修改conf配置檔案引數: <cite>notify-keyspace-events Ex</cite><br/>
 * 開啟配置之後redis將會在key失效時,將訊息傳送到<cite>[email protected]<db>__:expired</cite>頻道,其中db代表key所在資料庫的index
 * @author victor
 *
 */
@Component
@Configuration
public class KeyEventMessageListener implements MessageListener{
	
	private static Logger logger = Logger.getLogger(KeyEventMessageListener.class);
	
	private final RedisSerializer<String> serializer = new StringRedisSerializer();
	
	@Autowired
	private EventHandlerServiceFacotry eventHandlerServiceFacotry;
	
	@Autowired
	private IRedisService redisService;
	
	@Autowired
	private IRedisLock redisLock;
	
	@Override
	public void onMessage(Message message, byte[] pattern) {
		String key = serializer.deserialize(message.getBody());
		String eventId = EventUtils.getEventId(key);
		if(!StringUtils.isNullOrEmpty(eventId)){
			if(redisLock.lock(eventId, 300)){//must executed in 300S
				logger.info("Handle event :" + eventId + " start !");
				try{
					handler(eventId);
				}catch (Exception ex) {
					logger.error("Key event hadppend bug : eventId:" + eventId + ", message :" + ex.getMessage() , ex);
					ex.printStackTrace();
				}finally{
					//delete data and meta
					redisService.del(EventUtils.getEventKey(eventId));
					redisService.hdel(EventConstants.KEY_EVENT_DATA_MAP, EventUtils.getDataKey(eventId),EventUtils.getMetaKey(eventId));
					redisLock.unlock(eventId);
				}
				logger.info("Handle event :" + eventId + " end!");
			}else{// event was handled or is handling
				logger.debug("Event is handled");
			}
		} 
	}
	
	private void handler(String eventId){
		String data = redisService.hget(EventConstants.KEY_EVENT_DATA_MAP,EventUtils.getDataKey(eventId));
		String meta = redisService.hget(EventConstants.KEY_EVENT_DATA_MAP,EventUtils.getMetaKey(eventId));
		if(StringUtils.isNullOrEmpty(data) || StringUtils.isNullOrEmpty(meta)){// event was handled
			logger.info("Event was handled, eventId:" + eventId + ".");
			return;
		}
		int event = Integer.parseInt(meta);
		data = data.equals(EventConstants.DEFAULT_EMPTY_DATA) ? null : data;
		IEventHandlerService service = eventHandlerServiceFacotry.instance(event);
		if(service == null){
			logger.error("Event handler not found, eventId:" + eventId + ",data:" + data + ",meta:" + meta + "!");
			throw new EventNotMappingHandlerException("Event handler service not mapping!",eventId,event,data);
		}
		try{
			service.handle(EventEnum.indexOf(event), eventId, data);
		}catch (Exception e) {
			logger.error("Handler event error, eventId : " + eventId, e);
			throw new EventHandleException("Handler event error.",eventId,event,data);
		}
	}
}


此類主要完成功能, 接受redis推送的訂閱訊息, 並且根據key來找到對應的 事件型別, 事件data,  加鎖防止重複消費, 通過事件執行工廠根據事件型別獲取不同的事件最終業務service.

正常情況, 不同的事件可能各自都有自己不同的事件, 比如訂單取消事件 和 商品自動上架兩種事件執行的業務邏輯就不同, 所以此處使用多型動態獲取不同的事件業務實現例項。

事件業務工廠 EventHandlerServiceFactory類:
/**
 * 事件處理工廠
 * @author victor
 *
 */
@Service
public class EventHandlerServiceFacotry {
	
	private List<IEventHandlerService> services;
	
	@Autowired
	public EventHandlerServiceFacotry(List<IEventHandlerService> services){
		this.services = services;
	}
	
	public IEventHandlerService instance(int event){
		for(IEventHandlerService service : services){
			if(service.getEvent().getIndex() == event){
				return service;
			}
		}
		return null;
	}
	
	public IEventHandlerService instance(EventEnum event){
		return instance(event.getIndex());
	}
}

IEventHandlerService介面, 定義事件執行介面方法以及獲取時間型別介面:

/**
 * 事件觸發服務 自定義
 * @author victor
 *
 */
public interface IEventHandlerService {

	/**
	 * 執行
	 * @param eventId
	 * @param event
	 * @param data
	 */
	public void handle(EventEnum event,String eventId,String data);
	
	/**
	 * 事件型別
	 * @return
	 */
	public EventEnum getEvent();
}

EXAMPLE_EVENT 示例事件實現示例:
@Service
public class ExampleEventHandlerServiceImpl implements IEventHandlerService{
	
	@Override
	public void handle(EventEnum event, String eventId, String data) {
		System.out.println("執行事件:" + event.getName() + ",使用者自定義data:" + data);
	}

	@Override
	public EventEnum getEvent() {
		return EventEnum.EXAMPLE;
	}

}


輔助類EventUtils:

public final class EventUtils {

	private EventUtils(){
		
	}
	
	public static String getEventKey(String eventId){
		return EventConstants.EVENT_KEY + "_" + eventId;
	}
	
	public static String getDataKey(String eventId){
		return eventId + "_" + EventConstants.EVENT_DATA_KEY;
	}
	
	public static String getMetaKey(String eventId){
		return eventId + "_" + EventConstants.EVENT_META_KEY;
	}
	
	public static boolean isEvent(String key){
		if(!StringUtils.isNullOrEmpty(key) && key.toUpperCase().indexOf(EventConstants.EVENT_KEY + "_") == 0){
			return true;
		}
		return false;
	}
	
	
	public static String getEventId(String key){
		if(isEvent(key)){
			String eventId = key.replace(EventConstants.EVENT_KEY + "_", "");
			return eventId;
		}
		return null;
	}
	
}


上述程式碼中, 部分類可能沒有, 比如redisService : 為redis操作命令封裝   distributionLock :分散式鎖,筆者部落格前面有自己實現的redis鎖。

呼叫註冊事件:註冊一個型別為EXAMPLE型別事件,自定義data為 hello,example字串, 並且在60秒之後執行:

@Autowired
private  IEventService eventService;
eventService.register(EventEnum.EXAMPLE,"hello , example !" ,60);



相關推薦

利用Redis keyspace notification 實現定時執行

EventConstants類, 主要是用來定義事件相關常量/** * 事件相關常量 * @author victor * */ public class EventConstants { public static final String KEY_EVENT_AUTO_ID = "KEY_E

Spring+Redis(keyspace notification)實現定時任務(訂單過期自動關閉)

1.起因 最近公司專案要做訂單超期未支付需自動關閉,首先想到的是用spring的定時器(@Schedule),結果領導舉各種例子說會影響效能,只能作罷。後來想能不能基於redis實現,學習(baidu)之後,大概就是使用redis的Keyspace Notificat

Java定時任務:利用java Timer類實現定時執行任務的功能

lpad 虛擬 觀察 exce 就是 set ring 構造 trac 一、概述 在java中實現定時執行任務的功能,主要用到兩個類,Timer和TimerTask類。其中Timer是用來在一個後臺線程按指定的計劃來執行指定的任務。 TimerTask一個抽象類,它的子類代

利用redis主從+keepalived實現高可用

linux 運維 redisRedis簡介: Redis是一個開源的使用ANSI C語言編寫、支持網絡、可基於內存亦可持久化的日誌型、Key-Value數據庫,並提供多種語言的API。從2010年3月15日起,Redis的開發工作由VMware主持。 redis是一個key-value存儲系統。和Memc

看代碼網備份|利用WebClient|eKing.CmdDownLoadDbBakOper|實現定時拷貝數據庫備份文件到文件服務器

emd 返回 隱藏 res -i ogr try 計劃任務 -a 摘要: 1、有兩臺服務器 (1)看代碼網(記為A):內網IP:10.186.73.30 (2)文件服務器(記為B):內網IP:10.135.87.157 2、在A架設一個網站,端口8088(防火強設置B才能

android studio 定時器操作 實現定時執行相關任務

package ipget.wenzheng.studio.ipget; import android.os.Bundle; import android.os.Handler; import android.os.Message; import android.support.v7.app.AppCo

Netty遊戲伺服器實戰開發(8):利用redis或者zookeeper實現3pc分散式事務鎖(二)。支撐騰訊系列某手遊百萬級流量公測

導讀:在上篇文章中介紹了分散式事務專案的基本原理和工程元件,我們瞭解到了分散式事務的理論知識。處於實戰的經驗,我們將理論知識使用到實際專案中。所以我們將藉助idea中maven工程 來實戰我們的專案。 回到正文: 在上篇文章中我們已經把需要的準備工作做好了。現在

AlarmManager+Notification實現定時通知提醒功能

AlarmManager簡介   AlarmManager實質是一個全域性的定時器,是Android中常用的一種系統級別的提示服務,在指定時間或週期性啟動其它元件(包括Activity,Service,BroadcastReceiver)。本文將講解一下如何使

利用pendingintent 和AlarmManager實現定時任務的一些分析

PendingIntent 獲取 PendingIntent物件的方法: 可以通過getActivity(Context context, int requestCode, Intent intent, int flags)系列方法從系統取得一個用於啟動一個

怎麼實現定時執行一個儲存過程

需求:客戶要求把A系統出來的一些資料每月底同步到指定表(在A系統資料庫)中。 實現方案:往指定表插資料採用儲存過程(procA)。每月底執行採用windows任務定時或“SQL Server代理——作業”實現。 怎麼使用windows自帶任務定時執行bat實現上述需求

javaWeb監聽器結合定時實現定時執行任務

java監聽器 通過監聽器可以自動的激發一些操作,比如監聽線上人數; 監聽器介面有: ServletContextListener——監聽ServletContext物件; HttpSessionL

利用redis和springsession實現線上人數統計

1.springSession相比以前的session機制,好處在於可以實現session共享。 2.通過攔截器(filter要寫在其他攔截器的前面)和裝飾者模式替換原有session機制,不需要改程式碼 3.配置: web.xml applicationcontext.

ThinkPHP實現定時執行任務的兩種方法

    $this->show('<style type="text/css">*{ padding: 0; margin: 0; } div{ padding: 4px 48px;} body{ background: #fff; font-family: "微軟雅黑"; color: #

使用tornado實現定時執行任務

使用tornado實現輪詢:from tornado import web, ioloop import datetime class MainHandler(web.RequestHandler): def get(self): self.wri

利用redis-sentinel+keepalived實現redis高可用

目標、需求: 為上層應用提供高可靠、低延遲、低(無限接近0)資料損失的Redis快取服務 方案概述: 採用同一網路內的三臺主機(可以是物理主機、虛擬機器或docker容器),要求三臺主機之間都能相互訪問,每一臺主機上都安裝redis-server、redis-sen

WEB struts2 實現定時執行

Spring中Quartz的配置各種企業應用幾乎都會碰到任務排程的需求,就拿論壇來說:每隔半個小時生成精華文章的RSS檔案,每天凌晨統計論壇使用者的積分排名,每隔30分鐘執行鎖定使用者解鎖任務。對於一個典型的MIS系統來說,在每月1號凌晨統計上個月各部門的業務資料生成月報表,

Redis--keyspace notification(鍵空間)

重要: Keyspace notifications 從2.8.0版本開始啟用 功能概覽 鍵空間通知使得客戶端可以通過釋出訂閱(Redis自帶)通道,來接收那些以某種方式改動了Redis 資料集的事件。 可以接受到通知的例子: 影響到鍵的命令,對鍵進行操作的命令。 所有接

SQL實現定時執行計劃(---SQL Server2012)

 一、開啟資料庫SQL企業管理器SQL Server Management Studio,連線資料庫例項 三、設定SQL Server代理為開機自動啟動,否則下次重啟主機後,SQL Server代理還需要手動啟動。         開始——>程式——>Mi

【轉載】Win7利用任務計劃程序實現定時關機

系統 zha ref 定時 設置 執行 執行時間 img win10系統 在Win7、Win8或者Win10系統中,如果要實現電腦的自動定時關機,不需要借用任何的外部程序,直接系統自帶的任務計劃程序即可實現電腦的定時自動關機,支持設定電腦關機時間以及執行頻率次數,如固定每天

redis】spring boot利用redisKeyspace Notifications實現消息通知

客戶 無效 handler mage extend width psu 消息通知 queue 前言 需求:當redis中的某個key失效的時候,把失效時的value寫入數據庫。 github: https://github.com/vergilyn/RedisSampl