1. 程式人生 > >非同步redis佇列實現 資料入庫

非同步redis佇列實現 資料入庫

業務需求

app客戶端向服務端介面傳送來json 資料  每天 發一次   清空快取後會再次傳送

出問題之前業務邏輯:

php 介面 首先將 json 轉為陣列  去重  在一張大表中插入不存在的資料

該使用者已經存在 和新增的id

入另一種詳情表

問題所在:

當用戶因特殊情況清除快取  導致app 傳送json串  入庫併發高 導致CPU 暴增到88% 並且居高不下


優化思路:

1、非同步佇列處理

2、redis 過濾(就是隻處理當天第一次請求)

3、redis 輔助儲存app名稱(驗證過後批量插入資料app名稱表中)

4、拼接插入的以及新增的如詳細表中

解決辦法:

1、介面修改  redis 過濾 + 如list佇列   並將結果存入redis中

  首先 redis將之前的歷史資料放在redis 雜湊裡面  中文為鍵名  id 為鍵值

<?php
/**
 * Created by haiyong.
 * User: jia
 * Date: 2017/9/18
 * Time: 20:06
 */
namespace App\Http\Controllers\App;


use App\Http\Controllers\Controller;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Redis;

class OtherAppController extends Controller{

    /**
     * app應用統計介面
     * @param Request $request
     * @return string
     */
    public function appTotal(Request $request)
    {
        //  //歷史資料入庫
        //$redis = Redis::connection('web_active');
        // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');
        // $str  = '';
        // foreach ($app_name as $key => $val) {
        //     $str.= "{$val} {$key} ";
        // }
        // $redis->hmset('app_name', $app_name);
        // echo $str;exit;
        $result = $request->input('res');
        $list = json_decode($result, true);
        if (empty ($list) || !is_array($list)) {
            return json_encode(['result' => 'ERROR', 'msg' => 'parameter error']);
        }
        $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
        $data['time'] = date('Y-m-d');
        $redis_key = 'log_app:'.$data['time'];
        //redis 過濾
        $redis = Redis::connection('web_active');
        //redis 鍵值過期設定
        if (empty($redis->exists($redis_key))) {
            $redis->hset($redis_key, 1, 'start');
            $redis->EXPIREAT($redis_key, strtotime($data['time'].'+2 day'));
        }
        //值確定
        if ($redis->hexists($redis_key, $data['uid'])) {
            return json_encode(['result' => 'SUCCESS']);
        } else {
            //推入佇列
            $redis->hset($redis_key, $data['uid'], $result);
            $redis->rpush('log_app_list', $data['time'] . ':' . $data['uid']);
            return json_encode(['result' => 'SUCCESS']);
        }
    }
}




2、php 指令碼迴圈 監控redis 佇列 執行邏輯    防止記憶體溢位

mget 獲取該使用者的app id  不存在就會返回null

通過判斷null  運用redis 新值作為自增id指標  將null 補齊  之後批量入mysql   並跟新redis 雜湊 和指標值  併入庫 詳情表

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Storage;

class AppTotal extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'AppTotal:run';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Command description';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
    	    //歷史資料入庫  
        // $redis = Redis::connection('web_active');  
        // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName');  
        // $redis->hmset('app_name', $app_name);  
        // exit;  
          while(1) {
            $redis = Redis::connection('web_active');
            //佇列名稱
            $res = $redis->lpop('log_app_list');   
            //開關按鈕
            $lock = $redis->get('log_app_lock');
            if (!empty($res)) {
                list($date,$uid) = explode(':',$res);
                $result = $redis->hget('log_app:'.$date, $uid);
                if (!empty($result)) {
                        $table_name = 'app_total'.date('Ym');
                        $list = json_decode($result, true);
                        $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
                        $data['sex'] = isset($list['sex']) ? $list['sex'] : '' ;
                        $data['device'] = isset($list['device']) ? $list['device'] : '' ;
                        $data['appList'] = isset($list['list']) ? $list['list'] : '' ;
                        //資料去重    flip比unique更節約效能
                        $data['appList'] = array_flip($data['appList']);
                        $data['appList'] = array_flip($data['appList']);
                        $data['time'] = date('Y-m-d');
                        //app應用過濾
                        $app_res = $redis->hmget('app_name', $data['appList']);
                        //新增加app陣列
                        $new_app = [];
                        //mysql 入庫陣列
                        $mysql_new_app = [];
				        //獲取當前redis 自增指標
				        $total = $redis->get('app_name_total');
				        foreach ($app_res as $key =>& $val) {
				            if (is_null($val)) {
				                $total += 1;
				                $new_app[$data['appList'][$key]] = $total; 
				                $val = $total;
				                array_push($mysql_new_app,['id' => $total, 'appName'=> $data['appList'][$key]]);
				            }
				        }
				        if (count($new_app)){
				        	$str = "INSERT IGNORE INTO app_set_name (id,appName) values";
				        	foreach ($new_app as $key => $val) {
				        		$str.= "(".$val.",'".$key."'),";
				        	}
				        	$str = trim($str, ',');
				        	//$mysql_res = DB::connection('phpLog')->table('app_set_name')->insert($mysql_new_app);
				        	$mysql_res = DB::connection('phpLog')->statement($str);
						        if ($mysql_res) {
						            // 設定redis 指標
						             $redis->set('app_name_total', $total);
						            //  redis 資料入庫
						             $redis->hmset('app_name', $new_app);
						        }
						 }
			            //  詳情資料入庫
			             $data['appList'] = implode(',', $app_res);
                        //app統計入庫
                        DB::connection('phpLog')->statement("INSERT IGNORE INTO ".$table_name." (uid,sex,device,`time`,appList) 
        values('".$data['uid']."',".$data['sex'].",'".$data['device']."','".$data['time']."','".$data['appList']."')");
                        //log 記錄   當檔案達到123MB的時候產生記憶體保錯  所有這個地方可是利用日誌切割 或者 不寫入 日誌
                        Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').'  success '.$result."\n"); 
				    }  else {
				       Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').'  error '.$result."\n");
				    }
            }
            //執行間隔
            sleep(1);
            //結束按鈕
            if ($lock == 2) {
                exit;
            }
            //記憶體檢測
            if(memory_get_usage()>1000*1024*1024){
                exit('記憶體溢位');//大於100M記憶體退出程式,防止記憶體洩漏被系統殺死導致任務終端
            }
        }
    }
}



3、執定 定時任務監控指令碼執行情況

crontab -e

/2 * * * * /bin/bash /usr/local/nginx/html/test.sh 1>>/usr/local/nginx/html/log.log 2>&1

test.sh 內容  (檢視執行命令返回的程序id  如果沒有就執行命令開啟)

#!/bin/bash
alive=`ps -ef | grep AppTotal | grep -v grep | awk '{print $2}'`
if [ ! $alive ]
then
  /usr/local/php/bin/php   /var/ms/artisan AppTotal:run > /dev/null &
fi

記得授權哦   chmod +x test.sh

筆者用的laravel 框架   將命令啟用丟入後臺

執行命令

  /usr/local/php/bin/php   /var/ms/artisan AppTotal:run > /dev/null &

完事直接 ctrl -c 結束就行 命令以在後臺執行  可以用shell 中的命令檢視程序id

這樣就實現佇列非同步入庫

還有很多問題需要優化!!大致功能已經實現!!!!!!

優化完成後cpu