後臺小程式批量推送(多執行緒)
阿新 • • 發佈:2019-01-03
一,建立需要的表
表名:t_push_task(推送任務表)
欄位 | 型別 | 註釋 | 鍵 | 能否為空 | 預設值 | 其他 |
---|---|---|---|---|---|---|
id | int(11) | PRI | NO | 無 | auto_increment | |
template_id | varchar(64) | 小程式模版ID | YES | 無 | ||
task_name | varchar(64) | 模版名稱 | YES | 無 | ||
task_topic | varchar(64) | 活動主題 | YES | 無 | ||
task_caption | varchar(200) | 活動說明 | YES | 無 | ||
task_tips | varchar(64) | 溫馨提示 | YES | 無 | ||
task_type | varchar(64) | 任務型別 | YES | 無 | ||
status | int(1) | 任務是否已經發送 1 未傳送[預設] 2 已傳送 | YES | 無 | ||
goods_id | int(11) | 商品ID | YES | 無 | ||
goods_name | varchar(64) | 商品標題 | YES | 無 | ||
goods_online_status | int(2) | 商品前臺狀態 1 上架 0 下架 | YES | 無 | ||
price | decimal(10,2) | 商品金額(需要的海貝數) | YES | 無 | ||
is_upload | int(1) | YES | 無 | |||
file_name | varchar(64) | YES | 無 | |||
create_time | datetime | 建立時間 | YES | 無 | ||
update_time | datetime | 更新時間 | MUL | NO | 無 |
表名:t_push_task_user(推送流水錶)
註釋:
返回目錄
欄位 | 型別 | 註釋 | 鍵 | 能否為空 | 預設值 | 其他 |
---|---|---|---|---|---|---|
id | int(11) | PRI | NO | 無 | auto_increment | |
task_id | int(11) | 推送ID | MUL | YES | 無 | |
user_id | int(11) | 使用者ID | YES | 無 | ||
form_id | varchar(50) | 小程式推送formId | YES | 無 | ||
status | int(2) | 推送狀態 1 已推送 2 未推送 | YES | 無 | ||
create_time | datetime | 建立時間 | YES | 無 | ||
update_time | datetime | 更新時間 | YES | 無 | ||
note | varchar(255) | 備註 | YES | 無 |
表名:t_push_user_from_id_record(收集formid表)
欄位 | 型別 | 註釋 | 鍵 | 能否為空 | 預設值 | 其他 |
---|---|---|---|---|---|---|
id | bigint(11) | PRI | NO | 無 | auto_increment | |
user_id | bigint(11) | 使用者id | MUL | YES | 無 | |
form_id | varchar(100) | 小程式推送formId | YES | 無 | ||
source | smallint(2) | 來源 source 1,步數兌換海貝按鈕;2,首頁邀請按鈕;3,步數攔截彈窗邀請按鈕;4,兌換商品按鈕;5,海貝不夠邀請按鈕;6,引導關注蒙層按鈕;7,健康體驗領取按鈕 | YES | 無 | ||
status | smallint(2) | 使用狀態 1 未使用[預設] 2 已使用 | YES | 無 | ||
create_time | datetime | 建立時間 | YES | 無 | ||
update_time | datetime | 修改時間 | YES | 無 |
二.小程式需要推送的必須欄位 為 formId和使用者的openid
小程式推送任務配置 新建任務
序號 | 任務建立時間 | 模板ID | 模板名稱 | 關聯商品編碼 | 關聯商品標題 | 關聯商品前臺狀態 | 推送使用者量 | 任務型別 | 操作 |
98 | 2018-09-20 | 895CZeh4-ISI3cpl1yyS-o_i6EtMfrC8bzgaipHauF0 | 活動加入成功提醒 | 1989 | 健康助利測試商品1 | 上架 | 638 | 指定使用者傳送 | 檢視 傳送完成 |
97 | 2018-09-20 | 895CZeh4-ISI3cpl1yyS-o_i6EtMfrC8bzgaipHauF0 | 活動加入成功提醒 | 1989 | 健康助利測試商品1 | 上架 | 638 | 指定使用者傳送 | 檢視 傳送完成 |
96 | 2018-09-20 | 895CZeh4-ISI3cpl1yyS-o_i6EtMfrC8bzgaipHauF0 | 活動加入成功提醒 | 1989 | 健康助利測試商品1 | 上架 | 637 | 指定使用者傳送 | 檢視 傳送完成 |
95 | 2018-09-20 | 895CZeh4-ISI3cpl1yyS-o_i6EtMfrC8bzgaipHauF0 | 活動加入成功提醒 | 1989 | 健康助利測試商品1 | 上架 | 637 | 指定使用者傳送 | 檢視 傳送完成 |
新增任務(可以配置需要推送的任何商品)
模板ID: 895CZeh4-ISI3cpl1yyS-o_i6EtMfrC8bzgaipHauF0
模板名稱: 活動加入成功提醒
活動主題: 限10個字,小程式模板訊息的活動主題
活動說明: 限30個字,小程式模板訊息的活動說明
溫馨提示: 限30個字,小程式模板訊息的溫馨提示
模板關聯前臺商品編碼: 更新 限1個上架商品編碼,小程式模板商品標題及金額
商品標題: 健康助利測試商品1
商品金額: 250
商品前臺狀態: 上架
然後再列表可以點選 傳送按鈕;
傳送程式碼
/**
* 小程式任務推送
* @param id
* @return
*/
public ResultInfoObject pushMsg(Integer id,String userName) {
ResultInfoObject resultInfo = new ResultInfoObject();
//針對於售後服務重複資料進行redis加鎖,處理併發問題
String key = "health_cache_push_" + id;
if(JedisUtil.keyExists(key)) {
resultInfo.setRetcode(CommonConstant.PARAM_ERROR_CODE);
resultInfo.setRetdesc("操作過於頻繁,請稍後再試!");
return resultInfo;
}
JedisUtil.setex(key, 300, "1");
log.info("[pushMsg] add redis lock key : {}", key);
HealthPushTask healthPushTask = healthPushTaskDao.selectByPrimaryKey(id);
if (healthPushTask.getStatus()==2){ //已傳送
log.info("此任務已傳送id="+healthPushTask.getId());
resultInfo.setRetcode(CommonConstant.PARAM_ERROR_CODE);
resultInfo.setObject("此任務已傳送id="+healthPushTask.getId());
return resultInfo ;
}
healthPushTask.setStatus(2);
healthPushTask.setUpdateTime(new Date());
healthPushTaskDao.updateByPrimaryKeySelective(healthPushTask);
List<HealthPushTaskUser> healthPushTaskUserList = healthPushTaskUserDao.selectListByTaskId(id);
if (healthPushTaskUserList== null|| healthPushTaskUserList.size()<1){
log.info("還沒有上傳使用者ID");
resultInfo.setRetcode(CommonConstant.PARAM_ERROR_CODE);
resultInfo.setObject("還沒有上傳使用者ID");
return resultInfo ;
}
List<HealthPushUserFromIdRecord> healthPushUserFromIdRecordList = healthPushUserFromIdRecordDao.selectAvialFormId();
if (healthPushUserFromIdRecordList== null|| healthPushUserFromIdRecordList.size()<1){
log.info("沒有formid");
resultInfo.setRetcode(CommonConstant.PARAM_ERROR_CODE);
resultInfo.setObject("沒有formid");
return resultInfo ;
}
Map<String, HealthPushUserFromIdRecord> formIdMap = new HashMap();
AtomicInteger successCount = new AtomicInteger(0);
for (HealthPushUserFromIdRecord s : healthPushUserFromIdRecordList) {
formIdMap.put(s.getUserId() + "", s);
}
int totalCount = healthPushTaskUserList.size();
//多執行緒分發
final int size = 100;
int threadCount = (totalCount + size - 1) / size;
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for(int i=0;i<threadCount;i++) {
final int beginIndex = i * size;
final int endIndex = (threadCount - i) ==1 ? totalCount : (beginIndex + size);
ThreadPoolUtils.addProcess(new Runnable() {
@Override
public void run() {
List<HealthPushTaskUser> subList = new ArrayList<>(healthPushTaskUserList.subList(beginIndex,endIndex));
log.info("小程式推送子list範圍: {}-{}",beginIndex , endIndex);
for (HealthPushTaskUser healthPushTaskUser : subList) {
HealthUser healthUser = healthUserDao.selectByPrimaryKey(Long.valueOf(healthPushTaskUser.getUserId()));
if (healthUser != null && healthUser.getOpenid() != null) {
HealthPushUserFromIdRecord healthPushUserFromIdRecord =formIdMap.get(healthUser.getUserId() + "");
try {
if (healthPushUserFromIdRecord== null){
log.info(healthUser.getUserId()+"沒有formid");
healthPushTaskUser.setUpdateTime(new Date());
healthPushTaskUser.setStatus(3);
healthPushTaskUser.setNote("沒有formid");
healthPushTaskUserDao.updateByPrimaryKeySelective(healthPushTaskUser);
continue;
}
String page ="pages/goods/detail/detail";
String accessToken = wechatApiService.getAccessToken("health_applet");
String templateId = healthPushTask.getTemplateId();
Map<String, TemplateData> param = new HashMap<String, TemplateData>();
param.put("keyword1", new TemplateData(healthPushTask.getTaskTopic(), "#EE0000"));
param.put("keyword2", new TemplateData(healthPushTask.getTaskCaption(), "#EE0000"));
param.put("keyword3", new TemplateData(healthPushTask.getGoodsName(), "#EE0000"));
param.put("keyword4", new TemplateData( Validator.doubleTrans(Validator.rountTwo(healthPushTask.getPrice().doubleValue()))+"海貝", "#EE0000"));
param.put("keyword5", new TemplateData( healthPushTask.getTaskTips(), "#EE0000"));
com.alibaba.fastjson.JSONObject jsonObject = com.alibaba.fastjson.JSONObject.parseObject(JSON.toJSONString(param));
// //呼叫傳送微信訊息給使用者的介面 ********這裡寫自己在微信公眾平臺拿到的模板ID
JSONObject resultJson = WX_TemplateMsgUtil.sendWechatMsgToUser(healthUser.getOpenid(), templateId, page + "?backToHome=1&&goodsId=" + healthPushTask.getGoodsId(),
healthPushUserFromIdRecord.getFormId(), jsonObject, accessToken);
String errmsg = "error";
if (resultJson!=null){
errmsg = (String) resultJson.get("errmsg");
}
if ("ok".equals(errmsg)){
healthPushTaskUser.setStatus(1);
successCount.incrementAndGet();
}else {
healthPushTaskUser.setStatus(3);
// errorCount++;
}
healthPushUserFromIdRecord.setStatus(2);
healthPushUserFromIdRecord.setUpdateTime(new Date());
healthPushUserFromIdRecordDao.updateByPrimaryKeySelective(healthPushUserFromIdRecord);
healthPushTaskUser.setUpdateTime(new Date());
healthPushTaskUser.setNote(errmsg);
healthPushTaskUser.setFormId(healthPushUserFromIdRecord.getFormId());
healthPushTaskUserDao.updateByPrimaryKeySelective(healthPushTaskUser);
} catch (Exception e) {
resultInfo.setRetdesc("傳送失敗");
resultInfo.setRetcode(CommonConstant.PARAM_ERROR_CODE);
log.error("APPPUSH:"+e.getMessage());
}
} else {
healthPushTaskUser.setUpdateTime(new Date());
healthPushTaskUser.setStatus(3);
healthPushTaskUser.setNote("使用者不存在或者openid不存在");
healthPushTaskUserDao.updateByPrimaryKeySelective(healthPushTaskUser);
log.info("APPPUSH:使用者不存在或者openid不存在=" + healthPushTaskUser.getUserId());
}
}
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("小程式推送執行緒中斷,錯誤:{}", e.getMessage());
}
String obj = "總髮送數量="+totalCount+",傳送成功數量="+successCount.get()+",傳送失敗數量="+(totalCount-successCount.get());
log.info(obj);
resultInfo.setObject(obj);
resultInfo.setRetcode(CommonConstant.RET_SUCCESS);
return resultInfo;
}
首先通過redis 鎖 鎖住任務,防止任務重複傳送,
傳送速度通過多執行緒的方式,每個執行緒傳送100個推送,通過擷取list的方式,選擇推送範圍
qq 群 621692258