1. 程式人生 > >Nsq訊息佇列

Nsq訊息佇列

  • 安裝Nsq

            1.安裝包管理器

go get github.com/tools/godep

            2.安裝依賴包

go get github.com/bmizerany/assert

            3.安裝Nsq

https://github.com/nsqio/nsq/releases
  • Producer

type Producer struct {
    P  *nsq.Producer
    ip string
}

// 初始化生產者,最好是在專案初始化的時候就呼叫,否則可能會出現orm註冊model衝突的情況
func InitProducer (addr string, mysql string) (wr *Producer) {
    // 註冊資料庫
    _, err := orm.GetDB("default_nsq")
    if err != nil {
        fmt.Println("orm.RegisterDataBase = ", mysql)
        err = orm.RegisterDataBase("default_nsq", "mysql", mysql, 10, 10)
    }
    if err != nil {
        panic("註冊資料庫失敗:" + err.Error())
    }

    // 生產者配置,這裡使用基本配置
    cfg := nsq.NewConfig()
    cfg.AuthSecret = ""                  // nsq認證金鑰,暫時不需要
    p, err := nsq.NewProducer(addr, cfg) // 新建一個生產者
    if err != nil {
        panic(err)
    }

    // 設定日誌級別
    p.SetLogger(log.New(os.Stdout, "nsq producer:", 0), nsq.LogLevelInfo)
    wr = &Producer{P: p}

    // 獲取生產者ip
    ip, err := getLocalIp()
    if err != nil {
        panic("nsq InitProducer:" + err.Error())
    }
    wr.ip = ip

    return wr
}

func (p *Producer) Stop() {
    p.P.Stop()
}

// 釋出訊息
func (p *Producer) Publish(topic string, body []byte) error {
    // 不能釋出空位元組陣列,否則會導致error
    if len(body) == 0 {
        return nil
    }
    go p.PublishLog(topic, uuid, body)
    return p.P.Publish(topic, body)
}

// 新增日誌 (需要編寫models)
func (p *Producer) PublishLog(topic, uuid string, body []byte) (int64, error) {
    log := &models.NsqPublishLog{}
    log.Message = string(body)
    log.NsqdUrl = p.P.String()
    log.Topic = topic
    log.ProducerIp = p.ip
    log.MessageId = uuid
    return models.AddNsqPublishLog(log)
}
  • Consumer

type HandlerRegist struct {
    h       nsq.Handler
    topic   string
    channel string
    nsqd    string // address
    ip      string
}

// 初始化消費者 最好是在專案初始化的時候就呼叫,否則可能會出現orm註冊model衝突的情況
func InitConsumer(rg *HandlerRegist, mysql string) *nsq.Consumer {
    // 註冊資料庫
    _, err := orm.GetDB("default_nsq")
    if err != nil {
        fmt.Println("orm.RegisterDataBase = ", mysql)
        err = orm.RegisterDataBase("default_nsq", "mysql", mysql, 10, 10)
    }
    if err != nil {
    panic("InitConsumer註冊資料庫失敗:" + err.Error())
    }

    // 消費者配置,這裡使用基本配置
    cfg := nsq.NewConfig()
    cfg.LookupdPollInterval = time.Second                // 設定重連時間
    cfg.AuthSecret = ""                                  // nsq認證金鑰,暫時不需要
    c, err := nsq.NewConsumer(rg.topic, rg.channel, cfg) // 新建一個消費者
    if err != nil {
        panic(err)
    }

    // 設定日誌級別
    c.SetLogger(log.New(os.Stdout, "nsq consumer:", 0), nsq.LogLevelInfo)

    // 獲取消費者ip
    ip, err := getLocalIp()
    if err != nil {
        panic("nsq NewNsqConsumer:" + err.Error())
    }
    rg.ip = ip

    // 新增消費者介面
    c.AddHandler(rg)
    // 建立一個nsqd連線
    if err := c.ConnectToNSQD(rg.nsqd); err != nil {
        panic(err)
    }
    // 建立多個nsqd連線
    if err := c.ConnectToNSQDs([]string{"address1", "address2"}); err != nil {
        panic(err)
    }
    // 建立NSQLookupd連線
    if err := c.ConnectToNSQLookupd(rg.nsqd); err != nil {
        panic(err)
    }
    return c
}

// c.Stop()

// 處理訊息
func (rg *HandlerRegist) HandleMessage(message *nsq.Message) error {
	go rg.ConsumeLog(message) 
	return rg.h.HandleMessage(message)
}

// 新增日誌 (需要編寫models)
func (rg *HandlerRegist) ConsumeLog(message *nsq.Message) (int64, error) {
	log := &models.NsqConsumeLog{}
	log.ConsumerIp = rg.ip
	log.NsqdUrl = rg.nsqd
	log.Topic = rg.topic
	log.Channel = rg.channel
	log.Message = string(message.Body)
	log.MessageId = getMessageId(message.Body)
	return models.AddNsqConsumeLog(log)
}

func getMessageId(body []byte) string {
	m := map[string]interface{}{}
	json.Unmarshal(body, &m)
	if v, ok := m["nsq_msg_uuid"].(string); ok {
		return v
	}
	return ""
}
  • IP

func getLocalIp() (string, error) {
    addrs, err := net.InterfaceAddrs()

    if err != nil {
        fmt.Println("獲取ip地址失敗")
        return "", err
    }
    var ip []string
    for _, address := range addrs {
        // 檢查ip地址判斷是否迴環地址
        if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
            if ipnet.IP.To4() != nil {
                ip = append(ip, ipnet.IP.String())
            }
        }
    }
    return strings.Join(ip, ";"), nil
}

相關推薦

Nsq訊息佇列

安裝Nsq             1.安裝包管理器 go get github.com/tools/godep             2.安裝依賴包 go get github.com/bmizerany/assert             3.安裝Ns

Linux安裝及部署NSQ訊息佇列

1、NSQ知識點說明 NSQ是一個基於Go語言的開源分散式實時訊息平臺,NSQ可用於大規模系統的實時訊息服務,它的設計目標是為在分散式環境下提供一個強大的去除中心化的分散式服務架構,可以每天處理數以億計的實時訊息 Go語言,據說Go語言在處理高併發方面很強大,至於有

剖析nsq訊息佇列(一) 簡介及去中心化實現原理

分散式訊息佇列nsq,簡單易用,去中心化的設計使nsq更健壯,nsq充分利用了go語言的goroutine和channel來實現的訊息處理,程式碼量也不大,讀不了多久就沒了。後期的文章我會把nsq的原始碼分析給大家看。 主要的分析路線如下 分析nsq的整體框架結構,分析如何做到的無中心化分散式拓撲結構,如何

剖析nsq訊息佇列(二) 去中心化原始碼解析

在上一篇帖子剖析nsq訊息佇列(一) 簡介及去中心化實現原理中,我介紹了nsq的兩種使用方式,一種是直接連線,還有一種是通過nslookup來實現去中心化的方式使用,並大概說了一下實現原理,沒有什麼難理解的東西,這篇帖子我把nsq實現去中心化的原始碼和其中的業物邏輯展示給大家看一下。 nsqd和nsqlook

剖析nsq訊息佇列(三) 訊息傳輸的可靠性和持久化[二]diskqueue

上一篇主要說了一下nsq是如何保證訊息被消費端成功消費,大概提了一下訊息的持久化,--mem-queue-size 設定為 0,所有的訊息將會儲存到磁碟。 總有人說nsq的持久化問題,消除疑慮的方法就是閱讀原碼做benchmark測試,個人感覺nsq還是很靠譜的。 nsq自己實現了一個先進先出的訊息檔案佇列g

剖析nsq訊息佇列(四) 訊息的負載處理

剖析nsq訊息佇列-目錄 實際應用中,一部分服務叢集可能會同時訂閱同一個topic,並且處於同一個channel下。當nsqd有訊息需要傳送給訂閱客戶端去處理時,發給哪個客戶端是需要考慮的,也就是我要說的訊息的負載。 如果不考慮負載情況,把隨機的把訊息傳送到某一個客服端去處理訊息,如果機器的效能不同,可能

訊息佇列 nsq 使用筆記

在遠端機器 192.168.1.16 上啟動 nsqlookupd, nsqadminnohup ./nsqlookupd &nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 &nsqlookupd

nsq 優秀的訊息佇列

簡介NSQ是Go語言編寫的,開源的分散式訊息佇列中介軟體,其設計的目的是用來大規模地處理每天數以十億計級別的訊息。NSQ 具有分散式和去中心化拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證訊息的可靠傳遞的特徵,是一個成熟的、已在大規模生成環境下應用的產品。NS

java中JMS訊息佇列初始

1.什麼是訊息佇列:     JMS是一個訊息服務的標準或者說是規範,允許應用程式元件基於JavaEE平臺建立、傳送、接收和讀取訊息。它使分散式通訊耦合度更低,訊息服務更加可靠以及非同步性。 2.JMS基本概念:     JMS是ja

訊息佇列總結

     前言:關於訊息佇列應該大家都不陌生,在實際的專案中訊息佇列也無處不在,今天我和大家分享一下關於訊息佇列的問題。 1、訊息佇列定義 訊息佇列大家又經常稱為MQ(message queue),從字面的含義來看就是一個存放訊息的容器。 2、訊息佇列應用場景 2.1、非

訊息佇列順序

訊息佇列順序 https://www.cnblogs.com/LipeiNet/p/9877189.html 訊息佇列總結 前言:關於訊息佇列應該大家都不陌生,在實際的專案中訊息佇列也無處不在,今天我和大家分享一下關於訊息佇列的問題。 1、訊息佇列定義 訊息佇列大家又經常稱為MQ(message que

初識訊息佇列——WebSphere MQ入門

訊息佇列是什麼 訊息佇列對於我們來說應該並不陌生,訊息佇列(Message Queue,簡稱MQ),首先它是個佇列,先進先出。佇列裡面放的是訊息,訊息則指的是兩個獨立的系統之間傳遞的資料,這兩個系統可以是異構的,可以在不同的作業系統上,只需要寫一段程式碼呼叫一下提供的API既可以傳送

RocketMQ中介軟體訊息佇列在Maven專案中的配置使用操作 (分散式釋出訂閱訊息系統)

一、專案引用 <dependency>     <groupId>com.foriseland.fjf.mq</groupId>     <artifactI

Kafka-API中介軟體MQ訊息佇列在Maven專案中的配置使用操作 (分散式釋出訂閱訊息系統)

一、 Maven依賴 <dependency> <groupId>com.foriseland.fjf.mq</groupId> <artifactId>fjf-mq-kafka</artifactId> &

SpringBoot整合ActiveMQ訊息佇列和雙向佇列、點對點與釋出訂閱

ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。 &nbs

Qt應用Redis實現訊息佇列

    類似BS模式,客戶端傳送任務請求給服務端,服務端將處理結果返回給客戶端。 redis負責訊息的儲存和轉發。 模擬病人掛號看病,Patient程序進行掛號,Doctor程序進行看病 ,程式程式碼如下: /////////////

springBoot+ActiveMQ訊息佇列安裝測試

1.ActiveMQ下載安裝(我使用的是5.14.0版本): 下載連結地址: http://activemq.apache.org/activemq-5140-release.html 2.1 Windows平臺MQ啟動(根據自己不同的計算機配置選擇32位或64位): &

java面試—訊息佇列ActiveMQ

1.如何使用ActiveMq解決分散式事物     在應用中,都會有使用者註冊功能:收集使用者錄入資訊,儲存到資料庫—向用戶的手機或郵箱發驗證碼······     在傳統集中式架構,實現功能:開啟一個本地事物,往本地資料庫中插入一條使

滴滴出行基於RocketMQ構建企業級訊息佇列服務的實踐

本文整理自滴滴出行訊息佇列負責人 江海挺 在Apache RocketMQ開發者沙龍北京站的分享。通過本文,您將瞭解到滴滴出行: 在訊息佇列技術選型方面的思考; 為什麼選擇 RocketMQ 作為出行業務的訊息佇列解決方案; 如何構建自己的訊息佇列服務; 在 RocketMQ

訊息佇列之RabbitMQ - 簡介和安裝

       訊息佇列:是簡單的生產者和消費者模式,它的出現是讓各個服務板塊之間解耦和訊息通知。比如,我們一般生成服務板塊中的資料存在有:資料庫,靜態檔案,搜尋系統,hdfs等,那麼如果資料庫中的資料發生了變化,怎麼把這個訊息推送給其他的資料儲存單元呢?如果單