ubuntu伺服器下使用 supervisor 和 celery

supervisor 的解除安裝過程:

sudo apt purge supervisor

whereis supervisord

如果有用 pip 安裝的,用pip uninstall supervisor 再解除安裝一遍

查詢 supervisord 在哪,然後刪除所有資訊

root@jdu4e00u53f7:~# whereis supervisord
supervisord: /usr/local/bin/supervisord
rm -rf /usr/local/bin/supervisord

重新安裝過程:

安裝:  pip3 install supervisor  或者 apt-get install supervisor

新建檔案: mkdir /etc/supervisor

執行程式碼: echo_supervisord_conf > /etc/supervisor/supervisord.conf

目錄 etc/supervisor 下有主配置檔案  supervisord.conf  和   conf.d  資料夾。可以直接使用 vim 編輯,在 supervisord.conf 檔案底部插入執行的內容;

也可以在  conf.d  資料夾裡新建  filename.conf  的檔案,把執行的內容寫在該檔案下同樣能被載入到,不過需要在  supervisord.conf  的配置底部,新增 files = /etc/supervisor/conf.d/*.conf ,也就是包含載入該路徑下的配置檔案。

 1 [program:workername]  # 程序的名字,隨意起
2 command=celery -A celery_name worker -l info # celery執行的命令
3 directory=/mnt/your_project_dir # celery任務檔案所在的目錄;
4 user=root
5 numprocs=1
6 # 設定log的路徑
7 stdout_logfile=/var/log/supervisor/mailboxworker.log
8 stderr_logfile=/var/log/supervisor/mailboxworker.log
9 autostart=true  
10 autorestart=true
11 startsecs=10
12 stopwaitsecs = 600
13 priority=15

然後執行命令 supervisord 啟動。

檢視狀態: supervisorctl tail celery_name

停止: supervisorctl stop celery_name

啟動: supervisorctl start celery_name

重啟: supervisorctl restart celery_name

1 supervisorctl stop all 停止全部程序
2 supervisorctl reload 載入最新的配置檔案,停止原有程序並按新的配置啟動、管理所有程序
3 supervisorctl update 根據最新的配置檔案,啟動新配置或有改動的程序,配置沒有改動的程序不會受影響而重啟

也可到 /var/log/supervisor/ 目錄下檢視日誌檔案。可根據內容檢視 celery 是否正常執行。

遇到的報錯問題:

1、執行 supervisord 執行時報錯:

-bash: /usr/bin/supervisord: No such file or directory supervisord

把 superviosr 解除安裝重灌就好了。

2、執行  supervisorctl status  報錯:

一開始用的 pip 安裝,執行報以下錯誤,然後解除安裝後改用 apt-get 安裝解決;但報了第二個錯誤,然後在不解除安裝的情況下再次用 pip 安裝了一遍就好了。。。

unix:///tmp/supervisor.sock no such file

-bash: /usr/local/bin/supervisorctl: No such file or directory

3、執行狀態發現報錯,提示推出太多次失敗等其它問題  BACKOFF 或者 FATAL 狀態,  Exited too quickly (process log may have details)

改 celery 的配置 startsecs=0 ,然後重新執行  supervisorctl reload  載入配置解決。參考路徑

4、如果有其它錯誤,根據配置中寫的日誌報錯位置,檢視報錯日誌,一般報錯位置: /var/log/supervisor

網上其他的一些 celery 配置參考:

 1 [program:update_ip] ;專案名稱
2 directory = /home/xxxx/works/ip_update/ip_update_on_server_no_1/ ; 程式的啟動目錄
3 command = python /home/xxxx/works/ip_update/ip_update_on_server_no_1/update_ip_internal.py ; 啟動命令,可以看出與手動在命令列啟動的命令是一樣
4 autostart = true ; 在 supervisord 啟動的時候也自動啟動
5 startsecs = 5 ; 啟動 5 秒後沒有異常退出,就當作已經正常啟動了
6 autorestart = true ; 程式異常退出後自動重啟
7 startretries = 3 ; 啟動失敗自動重試次數,預設是 3
8 user = shimeng ; 用哪個使用者啟動
9 redirect_stderr = true ; 把 stderr 重定向到 stdout,預設 false
10 stdout_logfile_maxbytes = 50MB ; stdout 日誌檔案大小,預設 50MB
11 stdout_logfile_backups = 20 ; stdout 日誌檔案備份數
12 ; stdout 日誌檔案,需要注意當指定目錄不存在時無法正常啟動,所以需要手動建立目錄(supervisord 會自動建立日誌檔案)
13 stdout_logfile = /home/xxxx/works/ip_update/ip_update_on_server_no_1/supervisor.log
14 loglevel=info
15
16 [supervisorctl]
17 serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket
18
19 [unix_http_server]
20 file=/tmp/supervisor.sock ; (the path to the socket file)
21 chmod=0777 ; socket file mode (default 0700)
22 ;chown=nobody:nogroup ; socket file uid:gid owner
23 ;username=shimeng ; (default is no username (open server))
24 ;password=123 ; (default is no password (open server))
25
26 [inet_http_server] ; inet (TCP) server disabled by default
27 port=127.0.0.1:9001 ; (ip_address:port specifier, *:port for all iface)
28 username=shimeng ; (default is no username (open server))
29 password=123
 1 配置詳解:
2 a) 在supervisord.conf檔案中,分號“;”後面的內容表示註釋
3 b) [group:組名],設定一個服務分組,programs後面跟組內所有服務的名字,以分號分格。
4 c) [program:服務名],下面是這個服務的具體設定:
5 Command:啟用Tornado服務檔案的命令,也就是我們手動啟動的命令。
6 Directory:服務檔案所在的目錄
7 User:啟用服務的使用者
8 Autorestart:是否自動重啟服務
9 stdout_logfile:服務的產生的日起檔案
10 loglevel:日誌級別

以下內容無關

個人專案內容,resource/send_email.py

 1 from error_handler import error_handler
2 from util import id_generator
3 from wrapper import universal_resource_wrapper
4 from model import EmailSmtpModel
5 from celery_tasks import send_task
6
7
8 class SendEmail(Resource):
9 @universal_resource_wrapper(required=['title', 'content', 'receiverEmails', 'sendEmail'],
10 optional=['annexPathList', 'draftId'])
11 @cross_origin(allow_headers=['Content-Type'])
12 def post(self):
13 data = request.get_json()
14 smtp_data = EmailSmtpModel.find_by_email(data['sendEmail'])
15 if len(data['receiverEmails']) > 100:
16 return error_handler('單次群發郵箱數不要超過100個')
17 data['annexList'] = []
18 if 'annexPathList' in data and data['annexPathList'] != '':
19 for k in data['annexPathList']:
20 data['annexList'].append({'annexName': k['fileName'], 'annexPath': k['filePath'],
21 'annexUrl': k['fileUrl']})
22 del data['annexPathList']
23 if len(data['title']) <= 300:
24 title_template = data['title']
25 else:
26 raise ValueError('郵件標題太長,應小於等於300字元!')
27 if len(data['content']) <= 100000:
28 body_template = data['content']
29 else:
30 raise ValueError('郵件內容太長,應小於等於100000字!')
31 # 開啟非同步任務
32 send_task.apply_async(
33 args=[smtp_data['clientId'], smtp_data.sendEmail, smtp_data.smtpServer, smtp_data.smtpPort,
34 smtp_data.authorizationCode, smtp_data.sendUsername, data, title_template, body_template])
35 # task.wait()
36 resp = jsonify({
37 'msg': '郵件預設後臺傳送',
38 'status': True
39 })
40 # return jsonify({}), 202, {'Location': url_for('gettask', task_id=task.id)}
41 return resp

celery_tasks/celery.py 的內容:

  1 import time
2 from celery import Celery
3 from flask import Flask
4 from config import MONGO_DB, MONGO_URI, REDIS_SERVER_IP, REDIS_SERVER_PORT
5 from model import UserModel, send_email_with_custom_smtp, DraftBoxMailModel, EmailContentModel, db
6 from util import id_generator
7
8
9 def make_celery(app):
10 celery = Celery(
11 app.import_name,
12 backend=app.config['CELERY_RESULT_BACKEND'],
13 broker=app.config['CELERY_BROKER_URL']
14 )
15 # celery.conf.update(app.config)
16
17 class ContextTask(celery.Task):
18 def __call__(self, *args, **kwargs):
19 with app.app_context():
20 return self.run(*args, **kwargs)
21
22 celery.Task = ContextTask
23 return celery
24
25
26 current_app = Flask(__name__)
27
28 current_app.config.update(
29 CELERY_BROKER_URL='redis://{}:{}'.format(REDIS_SERVER_IP, REDIS_SERVER_PORT),
30 CELERY_RESULT_BACKEND='redis://{}:{}'.format(REDIS_SERVER_IP, REDIS_SERVER_PORT)
31 )
32 celery = make_celery(current_app)
33 current_app.config['MONGODB_SETTINGS'] = [
34 {
35 'alias': 'MAILDB',
36 'db': MONGO_DB,
37 'host': MONGO_URI,
38 "connect": False
39 },
40 {
41 'alias': 'TESTDB',
42 'db': MONGO_DB,
43 'host': MONGO_URI,
44 "connect": False
45 }
46 ]
47 db.init_app(current_app)
48 # # 非同步任務
49 # @celery.task(bind=True)
50 # def long_task(self):
51 # total = 50
52 # for i in range(total):
53 # # 自定義狀態 state
54 # self.update_state(state=u'處理中', meta={'current': i, 'total': total})
55 # time.sleep(3)
56 # return {'current': 100, 'total': 100, 'result': u'完成'}
57
58
59 @celery.task
60 def send_task(clientId, sendEmail, smtpServer, smtpPort, authorizationCode, sendUsername, data, title_template,
61 body_template):
62 usr = UserModel.find_by_client_id(clientId)
63 from datetime import datetime
64 print('開始', datetime.now())
65 for email_addr in data['receiverEmails']:
66 status, msg = send_email_with_custom_smtp(
67 sender_email=sendEmail,
68 receiver_email=email_addr,
69 server=smtpServer,
70 port=smtpPort,
71 usr=sendEmail,
72 password=authorizationCode,
73 subject=title_template,
74 text=body_template,
75 html=body_template,
76 annex_path=data['annexList']
77 )
78 if status:
79 if 'draftId' in data:
80 draft_mail = DraftBoxMailModel.find_by_draft_id(data['draftId'])
81 draft_mail.sentStatus = True
82 draft_mail.save()
83 else:
84 raise ValueError(msg)
85 time.sleep(60)
86 print('結束:',datetime.now())
87 # 儲存傳送的記錄
88 usr.send_records(fromEmail=sendEmail, toEmailList=data['receiverEmails'], title=title_template,
89 content=body_template, created_at=time.time(),
90 created_by=usr['uid'], status=True, annex_list=data['annexList'])
91 mail_data = DraftBoxMailModel.find_email(title_template, body_template, sendEmail,
92 data['receiverEmails'], data['annexList'])
93 if 'Reply:' in title_template:
94 email_data = EmailContentModel(
95 eid=id_generator(template='uuid'),
96 clientId=usr['clientId'],
97 subject=title_template,
98 content=body_template,
99 belongEmail=sendEmail,
100 fromEmail=sendEmail,
101 fromName=sendUsername,
102 emailTime=time.time(),
103 toEmail=data['receiverEmails'][0],
104 annexList=data['annexList'],
105 createdAt=time.time(),
106 createdBy=usr['uid']
107 )
108 email_data.save()
109 if mail_data is None:
110 draft_mail_data = DraftBoxMailModel(
111 clientId=usr['clientId'],
112 draftId=id_generator('uuid'),
113 subject=title_template,
114 content=body_template,
115 belongEmail=sendEmail,
116 fromEmail=sendEmail,
117 fromName=sendUsername,
118 toEmailList=data['receiverEmails'],
119 createdAt=time.time(),
120 createdBy=usr['uid'],
121 sentStatus=True,
122 annexList=data['annexList']
123 )
124 draft_mail_data.save()