1. 程式人生 > >後臺小程式批量推送(多執行緒)

後臺小程式批量推送(多執行緒)

一,建立需要的表

表名:t_push_task(推送任務表)

表格資訊
欄位 型別 註釋 能否為空 預設值 其他
id int(11) PRI NO auto_increment
template_id varchar(64) 小程式模版ID YES
task_name varchar(64) 模版名稱 YES
task_topic varchar(64) 活動主題 YES
task_caption varchar(200) 活動說明 YES
task_tips varchar(64) 溫馨提示 YES
task_type varchar(64) 任務型別 YES
status int(1) 任務是否已經發送 1 未傳送[預設] 2 已傳送 YES
goods_id int(11) 商品ID YES
goods_name varchar(64) 商品標題 YES
goods_online_status int(2) 商品前臺狀態 1 上架 0 下架 YES
price decimal(10,2) 商品金額(需要的海貝數) YES
is_upload int(1) YES
file_name varchar(64) YES
create_time datetime 建立時間 YES
update_time datetime 更新時間 MUL NO

表名:t_push_task_user(推送流水錶)

註釋:

返回目錄

表格資訊
欄位 型別 註釋 能否為空 預設值 其他
id int(11) PRI NO auto_increment
task_id int(11) 推送ID MUL YES
user_id int(11) 使用者ID YES
form_id varchar(50) 小程式推送formId YES
status int(2) 推送狀態 1 已推送 2 未推送 YES
create_time datetime 建立時間 YES
update_time datetime 更新時間 YES
note varchar(255) 備註 YES

表名:t_push_user_from_id_record(收集formid表)

表格資訊
欄位 型別 註釋 能否為空 預設值 其他
id bigint(11) PRI NO auto_increment
user_id bigint(11) 使用者id MUL YES
form_id varchar(100) 小程式推送formId YES
source smallint(2) 來源 source 1,步數兌換海貝按鈕;2,首頁邀請按鈕;3,步數攔截彈窗邀請按鈕;4,兌換商品按鈕;5,海貝不夠邀請按鈕;6,引導關注蒙層按鈕;7,健康體驗領取按鈕 YES
status smallint(2) 使用狀態 1 未使用[預設] 2 已使用 YES
create_time datetime 建立時間 YES
update_time datetime 修改時間 YES

二.小程式需要推送的必須欄位 為 formId和使用者的openid

小程式推送任務配置 新建任務

序號 任務建立時間 模板ID 模板名稱 關聯商品編碼 關聯商品標題 關聯商品前臺狀態 推送使用者量 任務型別 操作
98 2018-09-20 895CZeh4-ISI3cpl1yyS-o_i6EtMfrC8bzgaipHauF0 活動加入成功提醒 1989 健康助利測試商品1 上架 638 指定使用者傳送 檢視 傳送完成
97 2018-09-20 895CZeh4-ISI3cpl1yyS-o_i6EtMfrC8bzgaipHauF0 活動加入成功提醒 1989 健康助利測試商品1 上架 638 指定使用者傳送 檢視 傳送完成
96 2018-09-20 895CZeh4-ISI3cpl1yyS-o_i6EtMfrC8bzgaipHauF0 活動加入成功提醒 1989 健康助利測試商品1 上架 637 指定使用者傳送 檢視 傳送完成
95 2018-09-20 895CZeh4-ISI3cpl1yyS-o_i6EtMfrC8bzgaipHauF0 活動加入成功提醒 1989 健康助利測試商品1 上架 637 指定使用者傳送 檢視 傳送完成

新增任務(可以配置需要推送的任何商品)

模板ID: 895CZeh4-ISI3cpl1yyS-o_i6EtMfrC8bzgaipHauF0

模板名稱: 活動加入成功提醒

活動主題:  限10個字,小程式模板訊息的活動主題

活動說明:  限30個字,小程式模板訊息的活動說明

溫馨提示:  限30個字,小程式模板訊息的溫馨提示

模板關聯前臺商品編碼:  更新 限1個上架商品編碼,小程式模板商品標題及金額

商品標題: 健康助利測試商品1

商品金額: 250

商品前臺狀態: 上架

然後再列表可以點選 傳送按鈕;

傳送程式碼

 /**
     * 小程式任務推送
     * @param id
     * @return
     */
    public ResultInfoObject pushMsg(Integer id,String userName) {
        ResultInfoObject resultInfo = new ResultInfoObject();
        //針對於售後服務重複資料進行redis加鎖,處理併發問題
        String key = "health_cache_push_" + id;
        if(JedisUtil.keyExists(key)) {
            resultInfo.setRetcode(CommonConstant.PARAM_ERROR_CODE);
            resultInfo.setRetdesc("操作過於頻繁,請稍後再試!");
            return resultInfo;
        }

        JedisUtil.setex(key, 300, "1");

        log.info("[pushMsg] add redis lock key : {}", key);

        HealthPushTask healthPushTask = healthPushTaskDao.selectByPrimaryKey(id);

        if (healthPushTask.getStatus()==2){ //已傳送
            log.info("此任務已傳送id="+healthPushTask.getId());
            resultInfo.setRetcode(CommonConstant.PARAM_ERROR_CODE);
            resultInfo.setObject("此任務已傳送id="+healthPushTask.getId());
            return resultInfo ;
        }
        healthPushTask.setStatus(2);
        healthPushTask.setUpdateTime(new Date());
        healthPushTaskDao.updateByPrimaryKeySelective(healthPushTask);
        List<HealthPushTaskUser> healthPushTaskUserList = healthPushTaskUserDao.selectListByTaskId(id);
        if (healthPushTaskUserList== null|| healthPushTaskUserList.size()<1){
            log.info("還沒有上傳使用者ID");
            resultInfo.setRetcode(CommonConstant.PARAM_ERROR_CODE);
            resultInfo.setObject("還沒有上傳使用者ID");
            return resultInfo ;
        }
        List<HealthPushUserFromIdRecord> healthPushUserFromIdRecordList = healthPushUserFromIdRecordDao.selectAvialFormId();
        if (healthPushUserFromIdRecordList== null|| healthPushUserFromIdRecordList.size()<1){
            log.info("沒有formid");
            resultInfo.setRetcode(CommonConstant.PARAM_ERROR_CODE);
            resultInfo.setObject("沒有formid");
            return resultInfo ;
        }

        Map<String, HealthPushUserFromIdRecord> formIdMap = new HashMap();
        AtomicInteger successCount = new AtomicInteger(0);
        for (HealthPushUserFromIdRecord s : healthPushUserFromIdRecordList) {
            formIdMap.put(s.getUserId() + "", s);
        }
        int totalCount = healthPushTaskUserList.size();
        
        //多執行緒分發
        final int size = 100;
        int threadCount = (totalCount + size - 1) / size;
		final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
		for(int i=0;i<threadCount;i++) {
			final int beginIndex = i * size;
			final int endIndex = (threadCount - i) ==1 ? totalCount : (beginIndex + size);
			ThreadPoolUtils.addProcess(new Runnable() {
				@Override
				public void run() {
					List<HealthPushTaskUser> subList = new ArrayList<>(healthPushTaskUserList.subList(beginIndex,endIndex));
					log.info("小程式推送子list範圍: {}-{}",beginIndex , endIndex);
					
					for (HealthPushTaskUser healthPushTaskUser : subList) {
			            HealthUser healthUser = healthUserDao.selectByPrimaryKey(Long.valueOf(healthPushTaskUser.getUserId()));
			            if (healthUser != null && healthUser.getOpenid() != null) {
			                HealthPushUserFromIdRecord healthPushUserFromIdRecord =formIdMap.get(healthUser.getUserId() + "");

			                try {
			                    if (healthPushUserFromIdRecord== null){
			                        log.info(healthUser.getUserId()+"沒有formid");
			                        healthPushTaskUser.setUpdateTime(new Date());
			                        healthPushTaskUser.setStatus(3);
			                        healthPushTaskUser.setNote("沒有formid");
			                        healthPushTaskUserDao.updateByPrimaryKeySelective(healthPushTaskUser);
			                        continue;
			                    }
			                    String page ="pages/goods/detail/detail";
			                    String  accessToken = wechatApiService.getAccessToken("health_applet");
			                    String templateId = healthPushTask.getTemplateId();
			                    Map<String, TemplateData> param = new HashMap<String, TemplateData>();
			                    param.put("keyword1", new TemplateData(healthPushTask.getTaskTopic(), "#EE0000"));
			                    param.put("keyword2", new TemplateData(healthPushTask.getTaskCaption(), "#EE0000"));
			                    param.put("keyword3", new TemplateData(healthPushTask.getGoodsName(), "#EE0000"));
			                    param.put("keyword4", new TemplateData( Validator.doubleTrans(Validator.rountTwo(healthPushTask.getPrice().doubleValue()))+"海貝", "#EE0000"));
			                    param.put("keyword5", new TemplateData( healthPushTask.getTaskTips(), "#EE0000"));

			                    com.alibaba.fastjson.JSONObject jsonObject = com.alibaba.fastjson.JSONObject.parseObject(JSON.toJSONString(param));
//			        //呼叫傳送微信訊息給使用者的介面    ********這裡寫自己在微信公眾平臺拿到的模板ID
			                    JSONObject resultJson = WX_TemplateMsgUtil.sendWechatMsgToUser(healthUser.getOpenid(), templateId, page + "?backToHome=1&&goodsId=" + healthPushTask.getGoodsId(),
			                            healthPushUserFromIdRecord.getFormId(), jsonObject, accessToken);
			                    String errmsg = "error";
			                    if (resultJson!=null){
			                        errmsg = (String) resultJson.get("errmsg");
			                    }
			                    if ("ok".equals(errmsg)){
			                        healthPushTaskUser.setStatus(1);
			                        successCount.incrementAndGet();
			                    }else {
			                        healthPushTaskUser.setStatus(3);
			                        // errorCount++;
			                    }
			                    healthPushUserFromIdRecord.setStatus(2);
			                    healthPushUserFromIdRecord.setUpdateTime(new Date());
			                    healthPushUserFromIdRecordDao.updateByPrimaryKeySelective(healthPushUserFromIdRecord);
			                    healthPushTaskUser.setUpdateTime(new Date());
			                    healthPushTaskUser.setNote(errmsg);
			                    healthPushTaskUser.setFormId(healthPushUserFromIdRecord.getFormId());
			                    healthPushTaskUserDao.updateByPrimaryKeySelective(healthPushTaskUser);
			                } catch (Exception e) {
			                    resultInfo.setRetdesc("傳送失敗");
			                    resultInfo.setRetcode(CommonConstant.PARAM_ERROR_CODE);
			                    log.error("APPPUSH:"+e.getMessage());
			                }
			            } else {
			                healthPushTaskUser.setUpdateTime(new Date());
			                healthPushTaskUser.setStatus(3);
			                healthPushTaskUser.setNote("使用者不存在或者openid不存在");
			                healthPushTaskUserDao.updateByPrimaryKeySelective(healthPushTaskUser);
			                log.info("APPPUSH:使用者不存在或者openid不存在=" + healthPushTaskUser.getUserId());
			            }
			        }
					
					countDownLatch.countDown();
				}
			});
		}
		try {
			countDownLatch.await();
		} catch (InterruptedException e) {
			log.error("小程式推送執行緒中斷,錯誤:{}", e.getMessage());
		}
		
		String obj = "總髮送數量="+totalCount+",傳送成功數量="+successCount.get()+",傳送失敗數量="+(totalCount-successCount.get());
        log.info(obj);
        resultInfo.setObject(obj);
        resultInfo.setRetcode(CommonConstant.RET_SUCCESS);
        return resultInfo;
    }

首先通過redis 鎖 鎖住任務,防止任務重複傳送,

傳送速度通過多執行緒的方式,每個執行緒傳送100個推送,通過擷取list的方式,選擇推送範圍

qq 群 621692258