1. 程式人生 > >物聯網時代-跟著Thingsboard學IOT架構-HTTP裝置協議及API相關限制

物聯網時代-跟著Thingsboard學IOT架構-HTTP裝置協議及API相關限制

 

 

thingsboard官網: https://thingsboard.io/

thingsboard GitHub: https://github.com/thingsboard/thingsboard

thingsboard提供的體驗地址: http://demo.thingsboard.io/

 

BY Thingsboard team

以下內容是在原文基礎上演繹的譯文。除非另行註明,頁面上所有內容採用知識共享-署名(CC BY 2.5 AU)協議共享。

原文地址: ThingsBoard API參考:HTTP裝置API


 

HTTP

協議介紹

HTTP是可用於IoT應用程式的通用網路協議。您可以在此處找到有關HTTP的更多資訊。HTTP協議是基於TCP的,並使用請求 - 響應模型。當然它的缺點也極為明顯,HTTP對於嵌入式裝置來說太重了,也不靈活。

 

協議特點

  1. 支援客戶/伺服器模式。

  2. 簡單快速: 客戶向伺服器請求服務時,只需傳送請求方法和路徑。請求方法常用的有GET、PUT、POST。每種方法規定了客戶與伺服器聯絡的型別不同。由於HTTP協議簡單,使得HTTP伺服器的程式規模小,因此通訊速度很快。

  3. 靈活: HTTP允許傳輸任意型別的資料物件。正在傳輸的型別由Content-Type加以標記。

  4. 無連線:無連線的含義是限制每次連線只處理一個請求。伺服器處理完客戶的請求,並收到客戶的應答後,即斷開連線。採用這種方式可以節省傳輸時間。

  5. 無狀態:HTTP協議是無狀態協議。無狀態是指協議對於事務處理沒有記憶能力。缺少狀態意味著如果後續處理需要前面的資訊,則它必須重傳,這樣可能導致每次連線傳送的資料量增大。另一方面,在伺服器不需要先前資訊時它的應答就較快。

 

 

客戶端設定

  • curl

  • Postman

 

Thingsboard的HTTP傳輸協議架構

因為Thingsboard最新release,是基於微服務架構,不利用單獨理解程式碼。

Thingsboard CoAP裝置傳輸協議原始碼:https://github.com/thingsboard/thingsboard/tree/release-2.0/transport/http

本文基於上面原始碼後,剔除相關的安全驗證和處理之後搭建簡易的講解專案:

https://github.com/sanshengshui/IOT-Technical-Guide/tree/master/IOT-Guide-HTTP

 


Spring Boot框架

Thingsboard的HTTP裝置傳輸協議是基於Spring Boot。

Spring Boot 是 Spring 的子專案,正如其名字,提供 Spring 的引導( Boot )的功能。

通過 Spring Boot ,我們開發者可以快速配置 Spring 專案,引入各種 Spring MVC、Spring Transaction、Spring AOP、MyBatis 等等框架,而無需不斷重複編寫繁重的 Spring 配置,降低了 Spring 的使用成本。

猶記當年,Spring XML 為主的時代,大晚上各種搜尋 Spring 的配置,苦不堪言。現在有了 Spring Boot 之後,生活真美好。

Spring Boot 提供了各種 Starter 啟動器,提供標準化的預設配置。例如:

  • spring-boot-starter-web 啟動器,可以快速配置 Spring MVC 。

  • mybatis-spring-boot-starter 啟動器,可以快速配置 MyBatis 。

並且,Spring Boot 基本已經一統 Java 專案的開發,大量的開源專案都實現了其的 Starter 啟動器。例如:

  • incubator-dubbo-spring-boot-project 啟動器,可以快速配置 Dubbo 。

  • rocketmq-spring-boot-starter 啟動器,可以快速配置 RocketMQ 。

 

專案解讀

專案結構

 ├── java
 │   └── com
 │       └── sanshengshui
 │           └── http
 │               ├── controller
 │               │   └── DeviceApiController.java    // 裝置傳輸API介面
 │               ├── HttpApiServer.java  //專案啟動主類
 │               └── quota   //API限制類包
 │                   ├── AbstractQuotaService.java   //抽象限制服務類
 │                   ├── Clock.java      //時鐘類
 │                   ├── host
 │                   │   ├── HostIntervalRegistryCleaner.java    //主機API清理器
 │                   │   ├── HostIntervalRegistryLogger.java  //主機API記錄器
 │                   │   ├── HostRequestIntervalRegistry.java    //主機API請求登錄檔
 │                   │   ├── HostRequestLimitPolicy.java  //主機API請求限制條件
 │                   │   └── HostRequestsQuotaService.java    //主機請求限制開關
 │                   ├── inmemory
 │                   │   ├── IntervalCount.java   //間歇計數
 │                   │   ├── IntervalRegistryCleaner.java    //時間間隔內登錄檔清理器
 │                   │   ├── IntervalRegistryLogger.java     //時間間隔內登錄檔記錄器
 │                   │   └── KeyBasedIntervalRegistry.java    //基礎API請求邏輯
 │                   ├── QuotaService.java   //限制服務類
 │                   └── RequestLimitPolicy.java //請求限制策略
 └── resources
     └── application.yml

 


 ​

 

專案程式碼

引入依賴

 <dependencies>
         <dependency>
             <groupId>com.sanshengshui</groupId>
             <artifactId>IOT-Guide-TSL</artifactId>
         </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
 </dependencies>

 

  • Spring Boot提供的web框架基於Tomcat,可以通過引入spring-boot-starter-web來配置依賴關係。

  • commons-lang3guava用於API請求限制服務。

 

引數配置

 server:
   port: 8080
 ​
 ​
 http:
   request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}"
 ​
 ​
 quota:
   host:
     limit: "${QUOTA_HOST_LIMIT:10}"
     intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
     ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
     cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
     enabled: "${QUOTA_HOST_ENABLED:true}"
     whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
     blacklist: "${QUOTA_HOST_BLACKLIST:}"
     log:
       topSize: 10
       intervalMin: 2

 

  • server.port: 8080: 伺服器啟動繫結的埠,預設情況下是:8080。

  • http.request_timeout : 請求超時時間,此處設定為60000。

  • quota.host.limitquota.host.intervalMs: 分別為API請求限額數和單位時間。此處為了驗證方便,設定為10次和60s,即60s內API請求限額數為10次。

  • quota.host.cleanPeriodMsquota.host.ttlMs : 分別為清理週期時間和TTL時間。

  • quota.host.enabledquota.host.whitelistquota.host.blacklist分別表示API請求開關、白名單及黑名單。

  • quota.host.log.topSizequota.host.log.intervalMin: 指的是快取記憶體中的(近似)最大條目數和間隔時間。

 

API限制服務類

 

KeyBasedIntervalRegistry:基礎API請求邏輯

 
 1 package com.sanshengshui.http.quota.inmemory;
 2  ​
 3  import com.google.common.collect.Sets;
 4  import lombok.extern.slf4j.Slf4j;
 5  import org.apache.commons.lang3.StringUtils;
 6  ​
 7  import java.util.Map;
 8  import java.util.Set;
 9  import java.util.concurrent.ConcurrentHashMap;
10  import java.util.stream.Collectors;
11  ​
12  /**
13   * @author james mu
14   * @date 2019/8/10 下午4:50
15   */
16  @Slf4j
17  public class KeyBasedIntervalRegistry {
18  ​
19      private final Map<String, IntervalCount> hostCounts = new ConcurrentHashMap<>();
20      private final long intervalDurationMs;
21      private final long ttlMs;
22      private final Set<String> whiteList;
23      private final Set<String> blackList;
24  ​
25      public KeyBasedIntervalRegistry(long intervalDurationMs, long ttlMs, String whiteList, String blackList, String name) {
26          this.intervalDurationMs = intervalDurationMs;
27          this.ttlMs = ttlMs;
28          this.whiteList = Sets.newHashSet(StringUtils.split(whiteList, ','));
29          this.blackList = Sets.newHashSet(StringUtils.split(blackList, ','));
30  ​
31      }
32  ​
33      private void validate(String name) {
34          if (ttlMs < intervalDurationMs) {
35              log.warn("TTL for {} IntervalRegistry [{}] smaller than interval duration [{}]", name, ttlMs, intervalDurationMs);
36          }
37          log.info("Start {} KeyBasedIntervalRegistry with whitelist {}", name, whiteList);
38          log.info("Start {} KeyBasedIntervalRegistry with blacklist {}", name, blackList);
39      }
40  ​
41      public long tick(String clientHostId) {
42          IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
43          long currentCount = intervalCount.resetIfExpiredAndTick();
44          if (whiteList.contains(clientHostId)) {
45              return 0;
46          } else if (blackList.contains(clientHostId)) {
47              return Long.MAX_VALUE;
48          }
49          return currentCount;
50      }
51  ​
52      public void clean() {
53          hostCounts.entrySet().removeIf(entry -> entry.getValue().silenceDuration() > ttlMs);
54      }
55  ​
56      public Map<String, Long> getContent() {
57          return hostCounts.entrySet().stream()
58                  .collect(
59                          Collectors.toMap(
60                                  Map.Entry:: getKey,
61                                  interval -> interval.getValue().getCount()
62                          )
63                  );
64      }
65  }
66  ​
67  

 

  • validate(string name): 要求ttlMs<intervalDurationMs,並打印出API請求的黑名單和白名單。

  • 第42行通過computeIfAbsent函式對map中不存在key時的處理,在這裡通過新建intervalCount(intervalDurationMs)的方式來處理。

  • 第43行通過intervalCount的resetIfExpiredAndTick()對時間間隔內進行計數。

  • 第44-48行通過判斷API請求客戶端地址是否在黑白名單中,如果在白名單,返回0,如果在黑名單中,返回Long.MAX_VALUE

  • clean()為通過時間間隔內是否大於ttlMs來過濾集合中的元素。

  • getContent()為遍歷hostCounts中的客戶端地址的IntervalCount。

 

IntervalCount: 間歇時間內計數

 1  package com.sanshengshui.http.quota.inmemory;
 2  ​
 3  import com.sanshengshui.http.quota.Clock;
 4  ​
 5  import java.util.concurrent.atomic.LongAdder;
 6  ​
 7  /**
 8   * @author james mu
 9   * @date 19-8-9 下午16:50
10   */
11  public class IntervalCount {
12  ​
13      private final LongAdder addr = new LongAdder();
14      private final long intervalDurationMs;
15      private volatile long startTime;
16      private volatile long lastTickTime;
17  ​
18      public IntervalCount(long intervalDurationMs) {
19          this.intervalDurationMs = intervalDurationMs;
20          startTime = Clock.millis();
21      }
22  ​
23      //計數或時間過期後重置時間
24      public long resetIfExpiredAndTick(){
25          if (isExpired()){
26              reset();
27          }
28          tick();
29          return addr.sum();
30      }
31  ​
32     //計算已過時間
33      public long silenceDuration() {
34          return Clock.millis() - lastTickTime;
35      }
36  ​
37      public long getCount() {
38          return addr.sum();
39      }
40  ​
41     //計數操作,累加一
42      private void tick() {
43          addr.add(1);
44          lastTickTime = Clock.millis();
45      }
46  ​
47     //重置計數時間
48      private void reset() {
49          addr.reset();
50          lastTickTime = Clock.millis();
51      }
52  ​
53  //判斷間隔時間是否失效
54      private boolean isExpired() {
55          return (Clock.millis() - startTime) > intervalDurationMs;
56      }
57  ​
58  }
59  ​

 

 

剩下的處理類,留給讀者去自己研究了!

  1. 主機API清理器: HostIntervalRegistryCleaner注入quota.host.cleanPeriodMs並繼承抽象類IntervalRegistryCleaner

  2. 主機API記錄器: HostIntervalRegistryLogger注入quota.host.log.topSize和quota.host.log.intervalMin並繼承IntervalRegistryLogger

  3. 主機API請求登錄檔: HostRequestIntervalRegistry注入quota.host.intervalMs、quota.host.ttlMs、quota.host.whitelist和quota.host.blacklist並繼承KeyBasedIntervalRegistry

  4. 主機API請求限制條件: HostRequestLimitPolicy注入quota.host.limit並繼承RequestLimitPolicy

  5. 主機請求限制開關: HostRequestsQuotaService注入quota.host.enabled並繼承AbstractQuotaService

 

屬性API和遙測資料上傳API

 
 1 @RestController
 2  @RequestMapping("/api/v1")
 3  @Slf4j
 4  public class DeviceApiController {
 5      
 6      @Autowired(required = false)
 7      private HostRequestsQuotaService quotaService;//API限制服務類
 8      
 9      @RequestMapping(value = "/attributes",method = RequestMethod.POST)
10      public DeferredResult<ResponseEntity> postDeviceAttributes(
11              @RequestBody String json, HttpServletRequest request) {
12          DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
13          if (quotaExceeded(request, responseWriter)) {
14              return responseWriter;
15          }
16          responseWriter.setResult(new ResponseEntity<>(HttpStatus.ACCEPTED));
17          Set<AttributeKvEntry> attributeKvEntrySet = JsonConverter.convertToAttributes(new JsonParser().parse(json)).getAttributes();
18          for (AttributeKvEntry attributeKvEntry : attributeKvEntrySet){
19              System.out.println("屬性名="+attributeKvEntry.getKey()+" 屬性值="+attributeKvEntry.getValueAsString());
20          }
21          return responseWriter;
22      }
23      
24      @RequestMapping(value = "/telemetry",method = RequestMethod.POST)
25      public DeferredResult<ResponseEntity> postTelemetry(@RequestBody String json, HttpServletRequest request){
26          DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
27          if (quotaExceeded(request, responseWriter)) {
28              return responseWriter;
29          }
30          responseWriter.setResult(new ResponseEntity(HttpStatus.ACCEPTED));
31          Map<Long, List<KvEntry>> telemetryMaps = JsonConverter.convertToTelemetry(new JsonParser().parse(json)).getData();
32          for (Map.Entry<Long,List<KvEntry>> entry : telemetryMaps.entrySet()) {
33              System.out.println("key= " + entry.getKey());
34              for (KvEntry kvEntry: entry.getValue()) {
35                  System.out.println("屬性名="+kvEntry.getKey()+ " 屬性值="+kvEntry.getValueAsString());
36              }
37          }
38          return responseWriter;
39      }
40  }

 

 

專案演示

遙測上傳API

要將遙測資料釋出到伺服器節點,請將POST請求傳送到以下URL:

 http://localhost:8080/api/v1/telemetry

 

最簡單的支援資料格式是:

 {"key1":"value1", "key2":"value2"}

 

要麼

 [{"key1":"value1"}, {"key2":"value2"}]

 

請注意,在這種情況下,伺服器端時間戳將分配給上傳的資料!

如果您的裝置能夠獲取客戶端時間戳,您可以使用以下格式:

 
{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}

 

在上面的示例中,我們假設“1451649600512”是具有毫秒精度的unix時間戳。例如,值'1451649600512'對應於'Fri,2016年1月1日12:00:00.512 GMT'

例子:

 curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/telemetry --header "Content-Type:application/json"
 C:\Users\james>curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/telemetry --header "Content-Type:application/json"
 Note: Unnecessary use of -X or --request, POST is already inferred.
 *   Trying ::1...
 * TCP_NODELAY set
 * Connected to localhost (::1) port 8080 (#0)
 > POST /api/v1/telemetry HTTP/1.1
 > Host: localhost:8080
 > User-Agent: curl/7.55.1
 > Accept: */*
 > Content-Type:application/json
 > Content-Length: 63
 >
 * upload completely sent off: 63 out of 63 bytes
 < HTTP/1.1 202
 < Content-Length: 0
 < Date: Sun, 18 Aug 2019 16:16:07 GMT
 <
 * Connection #0 to host localhost left intact

 

結果:

 key= 1566144967139
 屬性名=stringKey 屬性值=value1
 屬性名=booleanKey 屬性值=true
 屬性名=doubleKey 屬性值=42.0
 屬性名=longKey 屬性值=73

 

 

屬性API

屬性API允許裝置

  • 將客戶端裝置屬性上載到伺服器。

  • 從伺服器請求客戶端和共享裝置屬性。

將屬性更新發布到伺服器

要將客戶端裝置屬性發布到ThingsBoard伺服器節點,請將POST請求傳送到以下URL:

http://localhost:8080/api/v1/attributes
 例子:
 
curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/attributes --header "Content-Type:application/json"
 C:\Users\james>curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/attributes --header "Content-Type:application/json"
 Note: Unnecessary use of -X or --request, POST is already inferred.
 *   Trying ::1...
 * TCP_NODELAY set
 * Connected to localhost (::1) port 8080 (#0)
 > POST /api/v1/attributes HTTP/1.1
 > Host: localhost:8080
 > User-Agent: curl/7.55.1
 > Accept: */*
 > Content-Type:application/json
 > Content-Length: 63
 >
 * upload completely sent off: 63 out of 63 bytes
 < HTTP/1.1 202
 < Content-Length: 0
 < Date: Sun, 18 Aug 2019 16:21:00 GMT
 <
 * Connection #0 to host localhost left intact

 

結果:

 屬性名=stringKey 屬性值=value1
 屬性名=booleanKey 屬性值=true
 屬性名=doubleKey 屬性值=42.0
 屬性名=longKey 屬性值=73

 

API限額服務

為了演示方便,我們設定60s內最多API請求測試為10次,現在我們使用遙測上傳API連續發起介面呼叫,我們會發現如下的情況出現:

 屬性名=longKey 屬性值=73
 屬性名=stringKey 屬性值=value1
 屬性名=booleanKey 屬性值=true
 屬性名=doubleKey 屬性值=42.0
 屬性名=longKey 屬性值=73
 2019-08-19 00:26:25.696  WARN 16332 --- [nio-8080-exec-1] c.s.http.controller.DeviceApiController  : REST Quota exceeded for [0:0:0:0:0:0:0:1] . Disconnect
 2019-08-19 00:26:26.402  WARN 16332 --- [nio-8080-exec-2] c.s.http.controller.DeviceApiController  : REST Quota exceeded for [0:0:0:0:0:0:0:1] . Disconnect

 

這說明了我們的API限額服務起了作用,當然你也可以測試黑白名單等功能。

 

當在真實情況下,通常的API限額會很大,我這裡提供了一個gatling自動化測試來提供介面測試。地址為:https://github.com/sanshengshui/IOT-Technical-Guide/tree/master/IOT-Guide-HTTP-Test

關於gatling的其他資訊,大家可以參考:

  • 負載,效能測試工具-Gatling

  • Gatling簡單測試SpringBoot工程

 

到此,物聯網時代,相信大家對IOT架構下的HTTP協議和API相關限制有所瞭解了,感謝大家的閱