1. 程式人生 > >用 Flask 來寫個輕部落格 (26) — 使用 Flask-Celery-Helper 實現非同步任務

用 Flask 來寫個輕部落格 (26) — 使用 Flask-Celery-Helper 實現非同步任務

目錄

前文列表

擴充套件閱讀

Celery

Celery 是使用 Python 多工庫來編寫的任務佇列工具, 可以 並行 的執行任務. 我們會將執行時間較長但又不那麼追求實時的功能以非同步任務的形式完成, EG. 上傳檔案, 傳送郵件…, Python 和 Celery 之間需要一箇中間人(訊息佇列)來進行任務佇列的管理, Celery 官方推薦使用 RabbirMQ 或 Redis 來充當這個角色. 當然也可以同時使用兩者, 其中 MQ 作為中間人, Redis 傳遞 Celery 執行的結果給應用端. 這樣做的優勢在於, 返回給應用的結果是持久化儲存在資料庫中的.

訊息佇列: 是一種專門設計的系統, 用於在生產者(往佇列傳送訊息的程式)和消費者(從佇列中取出訊息的佇列)之間傳遞訊息.

這裡寫圖片描述

  • 安裝 Celery
pip install Celery
  • 安裝 Flask-Celery-Helper
    Flask-Celery-Helper 是一個 Flask 擴充套件, 用於輔助使用 app 來初始化 Celery 物件, 使其得以註冊到 app 物件中.
pip install Flask-Celery-Helper
pip freeze > requirments.txt
/etc/init.d/rabbitmq-server start

將 Celery 加入到應用中

  • 配置 Celery 連線 RabbitMQ 的 URL
    vim jmilkfansblog/config.py
class DevConfig(Config):
    """Development config class."""
...
    # Celery <--> RabbitMQ connection
    CELERY_RESULT_BACKEND = "amqp://guest:[email protected]:5672//"
    CELERY_BROKER_URL = "amqp://guest:
[email protected]
:5672//"

NOTE: RabbitMQ 使用預設的 guest 使用者, 埠為 5672

  • 建立 celery 物件
    vim jmilkfansblog/extensions.py
from flask.ext.celery import Celery

...

# Create the Flask-Celery-Helper's instance
flask_celery = Celery()
  • 將 celery 物件註冊到 app 物件
    vim jmilkfansblog/__init__.py
from jmilkfansblog.extensions import flask_celery

def create_app(object_name):
    """Create the app instance via `Factory Method`"""
...
    # Init the Flask-Celery-Helper via app object
    # Register the celery object into app object
    flask_celery.init_app(app)

NOTE 1: Celery 的程序必須在 Flask app 的上下文中執行, 這樣 Celery 才能夠跟其他的 Flask 擴充套件協同工作。所以必須將 flask_celery 物件註冊到 app 物件中, 並且每建立一個 Celery 程序都需要建立一個新的 Flask app 物件, 這裡我們使用工廠模式來建立 celery 物件。這一點是非常重要的,實際上 Flask application 和 Celery application 是兩個不同的程序,在 Celery 沒有加入 Flask 上下文的情況下,Celery 的程式邏輯就不能輕易的訪問 Flask 相關資源,比如不能載入 Flask 的環境配置資訊,無法通過 Flask 來訪問資料庫,不能使用 Flask 的擴充套件功能等。如果想做到這些,Celery 都需要自己再實現一套相同的邏輯,這樣做顯然是沒有必要的。所以 Flask application 原生支援將自己的 Context 嵌入到別的 application 中,當然有些情況也需要相應擴充套件的輔助,例如 Flask-Celery-Helper 在這裡就充當著這個角色。

NOTE 2: flask_celery 物件是 Flask-Celery-Helper 擴充套件的物件, 用於輔助處理 Celery 的初始化, 所以實際上我們是可以不使用這個擴充套件, 而直接使用 Celery 的. celery 物件才是真正的 Celery 的物件.

  • 使用工廠模式來建立 celery 物件
    ./celery_runner.py
mport os

from celery import Celery

from jmilkfansblog import create_app


def make_celery(app):
    """Create the celery process."""

    # Init the celery object via app's configuration.
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL'])

    # Flask-Celery-Helpwe to auto-setup the config.
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):

        abstract = True

        def __call__(self, *args, **kwargs):
            """Will be execute when create the instance object of ContextTesk."""

            # Will context(Flask's Extends) of app object(Producer Sit) 
            # be included in celery object(Consumer Site).
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    # Include the app_context into celery.Task.
    # Let other Flask extensions can be normal calls.
    celery.Task = ContextTask
    return celery

env = os.environ.get('BLOG_ENV', 'dev')
flask_app = create_app('jmilkfansblog.config.%sConfig' % env.capitalize())
# 1. Each celery process needs to create an instance of the Flask application.
# 2. Register the celery object into the app object.
celery = make_celery(flask_app)

NOTE 1: 我們以後會以 CLI 的形式來管理和控制 Celery 的 worker, 所以我們將 celery 物件的實現模組放置在 ./celery_runner.py 中, 而不是 jmilkfansblog/celery_runner.py. Flask app 內部的 tasks 任何的定義和實現就交由 Flask-Celery-Helper 來支援就好了, 這也是 Flask-Celery-Helper 存在的意義.

NOTE 2: make_celery()最重要的作用就是讓每個 Celery 的程序中(celery物件)都包含有 app 物件的上下文, 至於為什麼這麼做呢? 上述已經給出了答案.

NOTE 3: 這裡通過 create_app() 建立的物件不能夠命名為 app, 而是命名為 flask_app, 這是因為 Celery 會預設將命名為 app 或 celery 的物件都作為一個 Celery 物件來處理.

NOTE 4: celery.conf.update(app.config) 會將 app 物件的 config 更新到 celery 物件中, 當然也包括了剛剛定義的 RabbitMQ 連線 URL 配置.

  • 啟動 Celery 服務
(env) [email protected]:/opt/JmilkFan-s-Blog$ celery worker -A celery_runner --loglevel=info
...

 -------------- celery@JmilkFan-Devstack v4.0.1 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.4.0-53-generic-x86_64-with-Ubuntu-16.04-xenial 2016-12-15 19:12:33
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         jmilkfansblog:0x7f5b24345990
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . jmilkfansblog.tasks.digest
  . jmilkfansblog.tasks.log
  . jmilkfansblog.tasks.multiply
  . jmilkfansblog.tasks.remind

NOTE: 在啟動 Celery 服務的時候, 能夠用 Output 看見其自身的配置資訊和現在所處理的 tasks.

實現向新使用者傳送歡迎郵件

下面使用 Celery 實現在使用者建立賬戶之後, 在指定的時間內非同步的向新使用者傳送歡迎郵件.

  • 新增 Reminder Model 使用者存放使用者的 email 地址和歡迎內容.
    vim jmilkfansblog/models.py
class Reminder(db.Model):
    """Represents Proected reminders."""

    __tablename__ = 'reminders'
    id = db.Column(db.String(45), primary_key=True)
    date = db.Column(db.DateTime())
    email = db.Column(db.String(255))
    text = db.Column(db.Text())

    def __init__(self, id, text):
        self.id = id
        self.email = text

    def __repr__(self):
        return '<Model Reminder `{}`>'.format(self.text[:20])
  • 建立一個 task
    vim jmilkfansblog/tasks.py
import smtplib
import datetime
from email.mime.text import MIMEText

from flask_mail import Message

from jmilkfansblog.extensions import flask_celery, mail

@flask_celery.task(
    bind=True,
    igonre_result=True,
    default_retry_delay=300,
    max_retries=5)
def remind(self, primary_key):
    """Send the remind email to user when registered.
       Using Flask-Mail.
    """

    reminder = Reminder.query.get(primary_key)

    msg = MIMEText(reminder.text)
    msg['Subject'] = 'Welcome!'
    msg['FROM'] = <your_email>
    msg['To'] = reminder.email

    try:
        smtp_server = smtplib.SMTP('localhost')
        smtp_server.starttls()
        smtp_server.login(<user>, <password>)
        smtp_server.sendmail(<your_email>,
                             [reminder.email],
                             msg.as_string())
        smtp_server.close()
        return

    except Exception as err:
        self.retry(exc=err)

def on_reminder_save(mapper, connect, self):
    """Callbask for task remind."""
    remind.apply_async(args=(self.id), eta=self.date)

NOTE 1: Celery Task 本質上就是一個被 celery.task()裝飾過的函式,

NOTE 2: 使用主鍵 primary_key 來獲取 reminder 物件是為了避免資料競態的發生, 因為從生成 reminder 物件到 task 被執行的過程並不能保證資料是最新的, 這也是處理非同步呼叫時, 需要時刻注意的地方.

NOTE 3: on_reminder_save() 是一個回撥函式, 當我們在一個特定的情景下呼叫這個函式的時候就觸發了一個 Celery task.

  • 應用 SQLAlchemy 的 event 特性來觸發 Celery task
    vim jmilkfansblog/__init__.py
from sqlalchemy import event


def create_app(object_name):
    """Create the app instance via `Factory Method`"""
...
    # Will be callback on_reminder_save when insert recond into table `reminder`.
    event.listen(Reminder, 'after_insert', on_reminder_save)
  • NOTE 1: SQLAlchemy 允許在 Model 上註冊回撥函式, 當 Model 物件發生特定的情景時, 就會執行這個回撥函式, 這就是所謂的 event, 這裡我們使用 after_insert 來指定當建立一個新的 Reminder 物件(插入一條記錄)時就觸發這個回撥函式. 而是回撥函式中的形參, 會由 event 來負責傳入.

相關推薦

Flask 部落 (26) — 使用 Flask-Celery-Helper 實現非同步任務

目錄 前文列表 擴充套件閱讀 Celery Celery 是使用 Python 多工庫來編寫的任務佇列工具, 可以 並行 的執行任務. 我們會將執行時間較長但又不那

Flask 部落 (14) — M(V)C_實現專案首頁的模板

目錄 前文列表 實現所需要的檢視函式 在開始實現首頁模板之前, 我們為了除錯和顯示的方便, 首先偽造一些假資料: fake_data.py import rand

Flask 部落 (16) — MV(C)_Flask Blueprint 藍圖

目錄 前文列表 擴充套件閱讀 Blueprint 藍圖 Blueprint 藍圖是一種用來擴充套件已有 Flask 應用結構的方法,通過藍圖的思想我們能夠把自己的 Application 拆分成為不同的元件。通常,一個元件代

Flask 部落 (23) — 應用 OAuth 實現 Facebook 第三方登入

目錄 前文列表 擴充套件閱讀 第三方登入流程 Resource Owner:資源所有者,本文中又稱”使用者”(user)。 Authorization server:認證伺服器,即服務提供商專門用來處理認證的伺服器。

Flask 部落 (6) — (M)VC_models 的關係(one to many)

目錄 前文列表 擴充套件閱讀 前言 models 中的關係能夠對映成為關係型資料庫表中的關係,models 中可以相互建立引用,使得相關聯的資料能夠很容易的一次性的從資料庫中取出。 一對多 首先繼續在 models

Flask 部落 (17) — MV(C)_應用藍圖重構專案

目錄 前文列表 重構目錄結構 現在專案的目錄結構: (env) [[email protected] opt]# tree JmilkFan-s-Blog/

Flask 部落 (24) — 使用 Flask-Login 保護應用安全

目錄 前文列表 擴充套件閱讀 使用者登入帳號 Web上的使用者登入功能應該是最基本的功能了,但這個功能可能並沒有你所想像的那麼簡單,這是一個關係到使用者安全的功能. 在現代這樣速度的計算速度下,用窮舉法破解賬戶的密碼會是一件

Flask 部落 (13) — M(V)C_WTForms 服務端表單檢驗

目錄 前文列表 用 Flask 來寫個輕部落格 (1) — 建立專案 用 Flask 來寫個輕部落格 (2) — Hello World! 用 Flask 來寫個輕部落格 (3) — (M)VC_連線 MySQL 和 SQLAlchemy

Flask 部落 (2) — Hello World!

目錄 前文列表 擴充套件閱讀 實現最簡單的 Flask 應用 建立 config.py 檔案 該檔案是整個 Flask 應用程式的配置檔案,定義我們常用的配置類 Config/ProdConfig/DevConfig

Flask 部落 (3) — (M)VC_連線 MySQL 和 SQLAlchemy

目錄 前文列表 擴充套件閱讀 前言 大多數的應用程式在開發之前都需要先進行資料庫設計這一環節,所以本篇就先來記錄在 Flask 中如何使用 Models,也就是 MVC 模式中的 M 。 Models 模型 模型 就

Flask 部落 (31) — 使用 Flask-Admin 實現 FileSystem 管理

目錄 前文列表 擴充套件閱讀 編寫 FileSystem Admin 頁面 所謂的 FileSystem Admin 功能, 就是哪呢鋼構通過後臺管理頁面檢視並修改 blog 專案中, 或自定義的檔案目錄內容. 使用 F

Flask 部落 (7) — (M)VC_models 的關係(many to many)

目錄 目錄 前文列表 擴充套件閱讀 前期準備 多對多 使用樣例 前文列表 擴充套件閱讀 前期準備 在實現多對多之前,我們還需要先增加一個評論(Comment) models class,而且 Comment 是 P

Flask 部落 (37) — 在 Github 上為第一階段的版本打 Tag

目錄 目錄 前文列表 打 Tag 前文列表 第一階段結語 從 2016/11/13 至今 2017/01/02 剛好 50 天, <<用 Flask 來寫個輕部落格>> 系列博文的第一階段也就算告一段

Flask 部落 (32) — 使用 Flask-RESTful 構建 RESTful API 之一

目錄 前文列表 擴充套件閱讀 RESTful API REST(Representational State Transfer):是一種軟體架構的設計風格,而不是一種標準。主要用於 C/S 架構的軟體設計,也能很好的支援 B

Flask 部落 (5) — (M)VC_SQLAlchemy 的 CRUD 詳解

目錄 前文列表 擴充套件閱讀 SQLAlchemy 的 CRUD CRUD 提供了在 Web 應用程式中所需要的所有操作和檢視資料的基礎功能, 尤其在 REST 風格的應用中, CRUD 就能實現一切所需功能. 本篇博文主要記

本想python 好友發送給我指定的號,然後截屏發給好友,但是消息分割處理小毛病,還在測試

nbsp for wechat 全屏 名片 .text pict 參數 end from wxpy import * from PIL import ImageGrab import os img_path_name=‘1.jpg‘ def jiepin(img

段子了 誰說部落不能段子

1. 你就是我的第一志願 因為我想上你 2.我願有一天能牽著你的手 ,去敬各位來賓的酒 3.我給你變個魔術,我變完了 我變得更喜歡你了,但你卻看不到。 4.我喜歡一個人,  那人一定很漂亮吧 !你太自戀了。 5.在遇見你之前我有兩顆心,,一顆善心 一顆噁心,,現在我只有一顆

markdown的新部落覆蓋了原有的部落,導致原有的部落消失了

查了一下,確實是找不到了,瀏覽量還是存在的。只有重新寫了,那是在markdown上快取的部落格,感覺是個小bug。 剛開始用csdn,不會用markdown真是一臉黑。 查了下,點(也不知道圖片上傳成功了沒)新建就可以了。也確實可以了!

Flask開發部落(二):Flask-模板

作者:chen_h 微訊號 & QQ:862251340 微信公眾號:coderpai 目錄 上節回顧 如果你依照上一章的話,你應當有一個完全工作的簡單的 web 應用程式,我們專案的檔案結構如下(前