微服務實踐之分散式定時任務
阿新 • • 發佈:2021-02-01
承接上篇:上篇文章講到改造 `go-zero` 生成的 `app module` 中的 `gateway & RPC` 。本篇講講如何接入 **非同步任務** 以及 **log的使用**。
## Delay Job
日常任務開放中,我們會有很多非同步、批量、定時、延遲任務要處理,go-zero中有 `go-queue`,推薦使用 `go-queue` 去處理,`go-queue` 本身也是基於 `go-zero` 開發的,其本身是有兩種模式:
- `dq `: 依賴於` beanstalkd` ,分散式,可儲存,延遲、定時設定,關機重啟可以重新執行,訊息會丟失,使用非常簡單,go-queue中使用了redis setnx保證了每個訊息只被消費一次,使用場景主要是用來做日常任務使用
- `kq`:依賴於 `kafka` ,這個就不多介紹啦,大名鼎鼎的 `kafka` ,使用場景主要是做日誌用
我們主要說一下dq,kq使用也一樣的,只是依賴底層不同,如果沒使用過beanstalkd,沒接觸過beanstalkd的可以先google一下,使用起來還是挺容易的。
我在jobs下使用goctl新建了一個message-job.api服務
```
info(
title: //訊息任務
desc: // 訊息任務
author: "Mikael"
email: "[email protected]"
)
type BatchSendMessageReq {}
type BatchSendMessageResp {}
service message-job-api {
@handler batchSendMessageHandler // 批量傳送簡訊
post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)
}
```
因為不需要使用路由,所以handler下的routes.go被我刪除了,在handler下新建了一個jobRun.go,內容如下:
```go
package handler
import (
"fishtwo/lib/xgo"
"fishtwo/app/jobs/message/internal/svc"
)
/**
* @Description 啟動job
* @Author Mikael
* @Date 2021/1/18 12:05
* @Version 1.0
**/
func JobRun(serverCtx *svc.ServiceContext) {
xgo.Go(func() {
batchSendMessageHandler(serverCtx)
//...many job
})
}
```
其實xgo.Go就是 `go batchSendMessageHandler(serverCtx)` ,封裝了一下go攜程,防止野生goroutine panic
然後修改一下啟動檔案message-job.go
```go
package main
import (
"flag"
"fmt"
"fishtwo/app/jobs/message/internal/config"
"fishtwo/app/jobs/message/internal/handler"
"fishtwo/app/jobs/message/internal/svc"
"github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/rest"
)
var configFile = flag.String("f", "etc/message-job-api.yaml", "the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)
server := rest.MustNewServer(c.RestConf)
defer server.Stop()
handler.JobRun(ctx)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start()
}
```
主要是handler.RegisterHandlers(server, ctx) 修改為handler.JobRun(ctx)
接下來,我們就可以引入dq了,首先在etc/xxx.yaml下新增dqConf
```
.....
DqConf:
Beanstalks:
- Endpoint: 127.0.0.1:7771
Tube: tube1
- Endpoint: 127.0.0.1:7772
Tube: tube2
Redis:
Host: 127.0.0.1:6379
Type: node
```
我這裡本地用不同埠,模擬開了2個節點,7771、7772
在internal/config/config.go新增配置解析物件
```go
type Config struct {
....
DqConf dq.DqConf
}
```
修改handler/batchsendmessagehandler.go
```go
package handler
import (
"context"
"fishtwo/app/jobs/message/internal/logic"
"fishtwo/app/jobs/message/internal/svc"
"github.com/tal-tech/go-zero/core/logx"
)
func batchSendMessageHandler(ctx *svc.ServiceContext){
rootCxt:= context.Background()
l := logic.NewBatchSendMessageLogic(context.Background(), ctx)
err := l.BatchSendMessage()
if err != nil{
logx.WithContext(rootCxt).Error("【JOB-ERR】 : %+v ",err)
}
}
```
修改logic下batchsendmessagelogic.go,寫我們的consumer消費邏輯
```go
package logic
import (
"context"
"fishtwo/app/jobs/message/internal/svc"
"fmt"
"github.com/tal-tech/go-zero/core/logx"
)
type BatchSendMessageLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic {
return BatchSendMessageLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *BatchSendMessageLogic) BatchSendMessage() error {
fmt.Println("job BatchSendMessage start")
l.svcCtx.Consumer.Consume(func(body []byte) {
fmt.Printf("job BatchSendMessage %s \n" + string(body))
})
fmt.Printf("job BatchSendMessage finish \n")
return nil
}
```
這樣就大功告成了,啟動message-job.go就ok課
```
go run message-job.go
```
之後我們就可以在業務程式碼中向dq新增任務,它就可以自動消費了
producer.Delay 向dq中投遞5個延遲任務:
```go
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:7771",
Tube: "tube1",
},
{
Endpoint: "localhost:7772",
Tube: "tube2",
},
})
for i := 1000; i < 1005; i++ {
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
if err != nil {
fmt.Println(err)
}
}
```
`producer.At` 可以指定某個時間執行,非常好用,感興趣的朋友自己可以研究下。
## 錯誤日誌
在前面說到gateway改造時候,如果眼神好的童鞋,在上面的httpresult.go中已經看到了log的身影:
![](https://img2020.cnblogs.com/other/14470/202102/14470-20210201101556537-1380481275.jpg)
我們在來看下rpc中怎麼處理的
![](https://img2020.cnblogs.com/other/14470/202102/14470-20210201101556908-1840285640.jpg)
是的,我在每個rpc啟動的main中加入了grpc攔截器 https://www.yuque.com/tal-tech/go-zero/ttzlo1,那讓我們看看grpc攔截器裡面做了什麼
![](https://img2020.cnblogs.com/other/14470/202102/14470-20210201101557412-685371255.jpg)
然後我程式碼裡面使用github/pkg/errors這個包去處理錯誤的,這個包還是很好用的
![](https://img2020.cnblogs.com/other/14470/202102/14470-20210201101557728-860077802.jpg)
![](https://img2020.cnblogs.com/other/14470/202102/14470-20210201101558258-830704313.jpg)
所以呢:
我們在 `grpc` 中列印日誌 `logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v",err)`;
在`api` 中列印日誌 `logx.WithContext(r.Context()).Error("【GATEWAY-SRV-ERR】 : %+v ",err)`
`go-zero` 中列印日誌,使用` logx.WithContext `會把trace-id帶入,這樣一個請求下來,比如
````
user-api --> user-srv --> message-srv
````
那如果 `messsage-srv` 出錯,他們三個是同一個 `trace-id` ,是不是就可以在elk通過輸入這個trace-id一次性搜尋出來這條請求報錯堆疊資訊呢?當然你也可以接入 `jaeger、zipkin、skywalking` 等,這個我暫時還沒接入。
## 框架地址
[https://github.com/tal-tech/go-zero](https://github.com/tal-tech/go-zero)
歡迎使用 go-zero 並 **star** 支援我們!