1. 程式人生 > >基於Redis實現分散式訊息佇列(4)

基於Redis實現分散式訊息佇列(4)

1、訪問Redis的工具類

public class RedisManager {

    private static Pool<Jedis> pool;

    protected final static Logger logger = Logger.getLogger(RedisManager.class);

    static{
        try {
            init();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public
static void init() throws Exception { Properties props = ConfigManager.getProperties("redis"); logger.debug("初始化Redis連線池。"); if(props==null){ throw new RuntimeException("沒有找到redis配置檔案"); } // 建立jedis池配置例項 JedisPoolConfig jedisPoolConfig = new
JedisPoolConfig(); // 設定池配置項值 int poolMaxTotal = Integer.valueOf(props.getProperty("redis.pool.maxTotal").trim()); jedisPoolConfig.setMaxTotal(poolMaxTotal); int poolMaxIdle = Integer.valueOf(props.getProperty("redis.pool.maxIdle").trim()); jedisPoolConfig.setMaxIdle(poolMaxIdle); long
poolMaxWaitMillis = Long.valueOf(props.getProperty("redis.pool.maxWaitMillis").trim()); jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis); logger.debug(String.format("poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s ", poolMaxTotal,poolMaxIdle,poolMaxWaitMillis)); // 根據配置例項化jedis池 String connectMode = props.getProperty("redis.connectMode"); String hostPortStr = props.getProperty("redis.hostPort"); logger.debug(String.format("host : %s ",hostPortStr)); logger.debug(String.format("mode : %s ",connectMode)); if(StringUtils.isEmpty(hostPortStr)){ throw new OptimusException("redis配置檔案未配置主機-埠集"); } String[] hostPortSet = hostPortStr.split(","); if("single".equals(connectMode)){ String[] hostPort = hostPortSet[0].split(":"); pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim())); }else if("sentinel".equals(connectMode)){ Set<String> sentinels = new HashSet<String>(); for(String hostPort : hostPortSet){ sentinels.add(hostPort); } pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig); } } /** * 使用完成後,必須呼叫 returnResource 還回。 * @return 獲取Jedis物件 */ public static Jedis getResource(){ Jedis jedis = pool.getResource(); if(logger.isDebugEnabled()){ logger.debug("獲得連結:" + jedis); } return jedis; } /** * 獲取Jedis物件。 * * 用完後,需要呼叫returnResource放回連線池。 * * @param db 資料庫序號 * @return */ public static Jedis getResource(int db){ Jedis jedis = pool.getResource(); jedis.select(db); if(logger.isDebugEnabled()){ logger.debug("獲得連結:" + jedis); } return jedis; } /** * @param jedis */ public static void returnResource(Jedis jedis){ if(jedis!=null){ pool.returnResource(jedis); if(logger.isDebugEnabled()){ logger.debug("放回連結:" + jedis); } } } /** * 需要通過Spring確認這個方法被呼叫。 * @throws Exception */ public static void destroy() throws Exception { pool.destroy(); } }

這個類沒有通過技術手段強制呼叫returnResource和destroy,需要想想辦法。

2、佇列介面

public interface TaskQueue {

    /**
     * 獲取佇列名
     * @return
     */
    String getName();

    /**
     * 往佇列中新增任務
     * @param task
     */
    void pushTask(String task);

    /**
     * 從佇列中取出一個任務
     * @return
     */
    String popTask();

}

用String型別描述任務,也可以考慮byte[],要求對每個任務描述的資料儘可能短。

3、佇列的Redis實現類

/**
 * 任務佇列Redis實現。
 * 
 * 採用每次獲取Jedis並放回pool的方式。
 * 如果獲得Jedis後一直不放手,反覆重用,兩個操作耗時可以降低1/3。
 * 暫時先忍受這種低效能,不明確Jedis是否執行緒安全。
 *
 */
public class TaskQueueRedisImpl implements TaskQueue {

    private final static int REDIS_DB_IDX = 9;

    private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);

    private final String name; 

    /**
     * 建構函式。
     * 
     * @param name
     */
    public TaskQueueRedisImpl(String name) {
        this.name = name;
    }

    /* (non-Javadoc)
     * @see com.gwssi.common.mq.TaskQueue#getName()
     */
    public String getName() {
        return this.name;
    }
    /* (non-Javadoc)
     * @see com.gwssi.common.mq.TaskQueue#pushTask(String)
     */
    public void pushTask(String task) {
        Jedis jedis = null;
        try{
            jedis = RedisManager.getResource(REDIS_DB_IDX);
            jedis.lpush(this.name, task);
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
        }finally{
            if(jedis!=null){
                RedisManager.returnResource(jedis);
            }
        }
    }

    /* (non-Javadoc)
     * @see com.gwssi.common.mq.TaskQueue#popTask()
     */
    public String popTask() {
        Jedis jedis = null;
        String task = null;
        try{
            jedis = RedisManager.getResource(REDIS_DB_IDX);
            task = jedis.rpop(this.name);
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
        }finally{
            if(jedis!=null){
                RedisManager.returnResource(jedis);
            }
        }
        return task;
    }

}

4、獲取佇列例項的工具類

/**
 * <pre>
 *  // 獲得佇列
 *  TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
 *  
 *  // 新增任務到佇列
 *  String task = "task id";
 *  tq.pushTask(task);
 * 
 *  // 從佇列中取出任務執行
 *  String taskToDo = tq.popTask();
 * </pre>
 * @author liuhailong
 */
public class TaskQueueManager {

    protected final static Logger logger = Logger.getLogger(TaskQueueManager.class);

    private static Map<String, TaskQueueRedisImpl> queneMap = new ConcurrentHashMap<String, TaskQueueRedisImpl>();

    /**
     * 簡訊佇列名。
     */
    public static final String SMS_QUEUE = "SMS_QUEUE";

    /**
     * 規則佇列名。
     */
    public static final String RULE_QUEUE = "RULE_QUEUE";

    private static void initQueneMap() {
        logger.debug("初始化任務佇列...");
        queneMap.put(RULE_QUEUE, new TaskQueueRedisImpl(RULE_QUEUE));
        logger.debug("建立佇列:"+RULE_QUEUE);
        queneMap.put(SMS_QUEUE, new TaskQueueRedisImpl(SMS_QUEUE));
        logger.debug("建立佇列:"+SMS_QUEUE);
    }

    static {
        initQueneMap();
    }

    public static TaskQueue get(String name){
        return getRedisTaskQueue(name);
    }

    public static TaskQueue getRedisTaskQueue(String name){
        return queneMap.get(name);
    }

}

和具體的佇列過於緊耦合,但簡單好用。
先跑起來再說。

5、向佇列中新增任務的程式碼

TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
tq.pushTask(smsMessageId);

6、從佇列中取出任務執行的程式碼

public class SmsSendTask{

    protected final static Logger logger = Logger.getLogger(SmsSendTask.class);

    protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl();
    /**
     * 入口方法。
     */
    public void execute()  {
        TaskQueue taskQueue = null;
        String task = null;
        try {
            taskQueue = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);

            // 非執行緒安全
            Set<Serializable> executedTaskSet = new HashSet<Serializable>();

            task = taskQueue.popTask();
            while(task!=null){
                // 判斷是否把所有任務都執行一遍了,避免死迴圈
                if(executedTaskSet.contains(task)){
                    taskQueue.pushTask(task);
                    break;
                }

                executeSingleTask(taskQueue,task);

                task = taskQueue.popTask();
            }
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
            e.printStackTrace();
        }
    }

    /**
     * 傳送單條簡訊。
     * 
     * 取出任務並執行,如果失敗,放回任務列表。
     * 
     * @param taskQueue
     * @param task
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    private void executeSingleTask(TaskQueue taskQueue, String task) {
        try {
            // do the job
            String smsId = task;
            Map<String,String> sms = smsSendService.getSmsList(smsId);

            smsSendService.send(sms);

            smsSendService.updateSmsStatus(task,SmsSendService.STATUS_SENT);

            String opType = "2";
            TaskQueueUtil.taskLog(taskQueue.getName(), opType, task);
        } catch (Throwable e) {
            if(task!=null){
                taskQueue.pushTask(task);
                smsSendService.updateSmsStatus(task,SmsSendService.STATUS_WAIT);
                if(logger.isDebugEnabled()){
                    logger.error(String.format("任務%s執行失敗:%s,重新放回佇列", task, e.getMessage()));
                }
            }else {
                e.printStackTrace();
            }
        }
    }

}

這部分程式碼是固定模式,而且不這樣做存在重大缺陷,會有任務執行失敗,被丟棄,這部分程式碼應該寫到佇列實現中。
有空再改。