1. 程式人生 > >Java 實現對 OpenTSDB插入資料,查詢資料

Java 實現對 OpenTSDB插入資料,查詢資料

Java 實現對 OpenTSDB插入資料,查詢資料

2018年02月27日 17:20:00 楊鑫newlfe 閱讀數:901更多

所屬專欄: Java學習筆記 資料庫學習筆記

 版權宣告:學習交流為主,未經博主同意禁止轉載,禁止用於商用。 https://blog.csdn.net/u012965373/article/details/79390933

OpenTSDBUtil

 

 

package com.xxx.util;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.opentsdb.client.ExpectResponse;
import org.opentsdb.client.HttpClient;
import org.opentsdb.client.HttpClientImpl;
import org.opentsdb.client.builder.MetricBuilder;
import org.opentsdb.client.request.Filter;
import org.opentsdb.client.request.Query;
import org.opentsdb.client.request.QueryBuilder;
import org.opentsdb.client.request.SubQueries;
import org.opentsdb.client.response.Response;
import org.opentsdb.client.response.SimpleHttpResponse;
import org.opentsdb.client.util.Aggregator;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.*;


/**
* @author yangxin-ryan */ public class OpenTSDBUtil { private static Logger LOG = LoggerFactory.getLogger(OpenTSDBUtil.class); private static String openTSDBUrl = "http://xxxxxxx:4242"; private HttpClient httpClient; /** * 建構函式 */ public OpenTSDBUtil() { this.httpClient = new HttpClientImpl(openTSDBUrl
); } /** * 寫入資料 * @param metric 指標 * @param timestamp 時間點 * @param value * @param tagMap * @return * @throws Exception */ public boolean putData (String metric, Date timestamp, Long value, Map<String, String> tagMap) throws Exception { long timeSecs = timestamp.getTime() / 1000; return this.putData(metric, timeSecs, value, tagMap); } /**
* 寫入資料 * @param metric 指標 * @param timestamp 時間點 * @param value * @param tagMap * @return * @throws Exception */ public boolean putData (String metric, Date timestamp, Double value, Map<String, String> tagMap) throws Exception { long timeSecs = timestamp.getTime() / 1000; return this.putData(metric, timeSecs, value, tagMap); } /** * 寫入資料 * @param metric 指標 * @param timestamp 轉換為秒的時間點 * @param value * @param tagMap * @return * @throws Exception */ public boolean putData (String metric, long timestamp, Long value, Map<String, String> tagMap) throws Exception { MetricBuilder builder = MetricBuilder.getInstance(); builder.addMetric(metric).setDataPoint(timestamp, value).addTags(tagMap); try{ LOG.debug("write quest: {}", builder.build()); Response response = httpClient.pushMetrics(builder, ExpectResponse.SUMMARY); LOG.debug("response.statusCode: {}", response.getStatusCode()); return response.isSuccess(); } catch (Exception error){ LOG.error("put data to opentsdb error: " , error); error.printStackTrace(); throw error; } } /** * 寫入資料 * @param metric 指標 * @param timestamp 轉化為秒的時間點 * @param value * @param tagMap * @return * @throws Exception */ public boolean putData (String metric, long timestamp, Double value, Map<String, String> tagMap) throws Exception { MetricBuilder builder = MetricBuilder.getInstance(); builder.addMetric(metric).setDataPoint(timestamp, value).addTags(tagMap); try { LOG.debug("write quest: {} ", builder.build()); Response response = httpClient.pushMetrics(builder, ExpectResponse.SUMMARY); LOG.debug("response.statueCpde: {}", response.getStatusCode()); return response.isSuccess(); } catch (Exception error) { LOG.error("put data to opentsdb error: ", error); LOG.error(error.getMessage()); error.printStackTrace(); throw error; } } /** * 查詢資料, 返回的資料為json格式,結構為: * "[ " { " metric: mysql.innodb.row_lock_time, " tags: { " host: web01, " dc: beijing " }, " aggregateTags: [], " dps: { " 1435716527: 1234, " 1435716529: 2345 " } " }, " { " metric: mysql.innodb.row_lock_time, " tags: { " host: web02, " dc: beijing " }, " aggregateTags: [], " dps: { " 1435716627: 3456 " } " } "]"; * @param metric 要查詢的指標 * @param tagk tagk * @param tagvFtype tagv分過濾規則 * @param tagvFilter tagv的匹配規則 * @param aggregator 查詢的聚合型別,如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM * @param downsample 取樣的時間粒度,如: 1s, 2m, 1h, 1d, 2d * @param startTime 查詢開始時間,時間格式為 yyyy-MM-dd HH:mm:ss * @param endTime 查詢結束時間,時間格式為 yyyy-MM-dd HH:mm:ss * @return * @throws IOException */ public String getData (String metric, String tagk, String tagvFtype, String tagvFilter, String aggregator, String downsample, String startTime, String endTime) throws IOException { QueryBuilder queryBuilder = QueryBuilder.getInstance(); Query query = queryBuilder.getQuery(); query.setStart(DateTimeUtil.parse(startTime, "yyyy-MM-dd HH:mm:ss") / 1000); query.setEnd(DateTimeUtil.parse(endTime, "yyyy-MM-dd HH:mm:ss") / 1000); List<SubQueries> sqList = new ArrayList<>(); SubQueries sq = new SubQueries(); sq.setMetric(metric); sq.setAggregator(aggregator); List<Filter> filters = new ArrayList<>(); Filter filter = new Filter(); filter.setTagk(tagk); filter.setType(tagvFtype); filter.setFilter(tagvFilter); filter.setGroupBy(Boolean.TRUE); filters.add(filter); sq.setFilters(filters); sq.setDownsample(downsample + "-" + aggregator); sqList.add(sq); query.setQueries(sqList); try{ LOG.debug("query rqeust: {}", queryBuilder.build()); // 查詢校驗 SimpleHttpResponse spHttpResponse = httpClient.pushQueries(queryBuilder, ExpectResponse.DETAIL); LOG.debug("response.content: {}", spHttpResponse.getContent()); if (spHttpResponse.isSuccess()) { return spHttpResponse.getContent(); } return null; } catch (IOException ioe) { LOG.error("get data from opentsdb error: ", ioe); throw ioe; } } /** * 查詢資料,返回的資料為json格式 * @param metric 要查詢的指標 * @param filter 查詢過濾的條件,原來使用的tags在v2.2後已不適用 * filter.setType(): 設定過濾型別, 如: wildcard, regexp * filter.setTagk(): 設定tag * filter.setFilter(): 根據type設定tagv的過濾表示式, 如: hqdApp|hqdWechat * filter.setGroupBy():設定成true, 不設定或設定成false會導致讀超時 * @param aggregator 查詢的聚合型別, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM * @param downsample 取樣的時間粒度, 如: 1s, 2m, 1h, 1d, 2d * @param startTime 查詢開始時間,時間格式為:yyyy-MM-dd HH:mm:ss * @param endTime 查詢結束時間,時間格式為: yyyy-MM-dd HH:mm:ss * @return * @throws IOException */ public String getData (String metric, Filter filter, String aggregator, String downsample, String startTime, String endTime) throws IOException{ QueryBuilder queryBuilder = QueryBuilder.getInstance(); Query query = queryBuilder.getQuery(); query.setStart(DateTimeUtil.parse(startTime, "yyyy-MM-dd HH:mm:ss") / 1000); query.setEnd(DateTimeUtil.parse(endTime, "yyyy-MM-dd HH:mm:ss") / 1000); List<SubQueries> sqList = new ArrayList<>(); SubQueries sq = new SubQueries(); sq.addMetric(metric); sq.addAggregator(aggregator); List<Filter> filters = new ArrayList<>(); filters.add(filter); sq.setFilters(filters); sq.setDownsample(downsample + "-" + aggregator); sqList.add(sq); query.setQueries(sqList); try{ LOG.debug("query request: {}", queryBuilder.build()); SimpleHttpResponse spHttpResponse =httpClient.pushQueries(queryBuilder, ExpectResponse.DETAIL); LOG.debug("response.content: {}", spHttpResponse.getContent()); if (spHttpResponse.isSuccess()){ return spHttpResponse.getContent(); } return null; } catch (IOException e){ LOG.error("get data from opentsdb error: ", e); throw e; } } /** * 查詢時間,返回tags與時序值的對映:Map<tags, Map<時間點,value>> * @param metric 查詢指標 * @param tagk tagk * @param tagvFtype tagv的過濾規則 * @param tagvFilter tagv的匹配自字元 * @param aggregator 查詢的聚合型別,如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM * @param downsample 取樣的時間粒度,如:1s, 2m, 1h, 1d, 2d * @param startTime 查詢開始時間,時間格式為yyyy-MM-dd HH:mm:ss * @param endTime 查詢結束時間,時間格式為yyyy-MM-dd HH:mm:ss * @param retTimeFmt 返回的結果集中,時間點的格式,如:yyyy-MM-dd HH:mm:ss 或 yyyyMMddHH 等 * @return Map<tags, Map<時間點, value>> * @throws IOException */ public Map<String, Map<String, Object>> getData(String metric, String tagk, String tagvFtype, String tagvFilter, String aggregator, String downsample, String startTime, String endTime, String retTimeFmt) throws IOException { String resContent = this.getData(metric, tagk, tagvFtype, tagvFilter, aggregator, downsample, startTime, endTime); return this.convertContentToMap(resContent, retTimeFmt); } /** * 查詢資料,返回tags與時序值的對映: Map<tags, Map<時間點,value>> * @param metric 要查詢的指標 * @param filter 查詢過濾的條件,原來使用的tags在v2.2後已不適用 * filter.setType(): 設定過濾型別, 如: wildcard, regexp * filter.setTagk(): 設定tag * filter.setFilter(): 根據type設定tagv的過濾表示式, 如: hqdApp|hqdWechat * filter.setGroupBy():設定成true, 不設定或設定成false會導致讀超時 * @param aggregator 查詢的聚合型別, 如: OpentsdbClient.AGGREGATOR_AVG, OpentsdbClient.AGGREGATOR_SUM * @param downsample 取樣的時間粒度, 如:1s, 2m, 1h, 1d, 2d * @param startTime 查詢開始時間,時間格式為:yyyy-MM-dd HH:mm:ss * @param endTime 查詢結束時間,時間格式為:yyyy-MM-dd HH:mm:ss * @param retTimeFmt 返回的結果集中,時間點的格式,如:yyyy-MM-dd HH:mm:ss 或 yyyyMMddHH 等 * @return Map<String, Map<String, Object>> * @throws IOException */ public Map<String, Map<String, Object>> getData(String metric, Filter filter, String aggregator, String downsample, String startTime, String endTime, String retTimeFmt) throws IOException{ String resContent = this.getData(metric, filter, aggregator, downsample, startTime, endTime); return this.convertContentToMap(resContent, retTimeFmt); } public Map<String, Map<String, Object>> convertContentToMap(String resContent, String retTimeFmt) { Map<String, Map<String, Object>> tagsValuesMap = new HashMap<>(); if (resContent == null || "".equals(resContent.trim())) { return tagsValuesMap; } JSONArray array = (JSONArray) JSONObject.parse(resContent); if (array != null) { for (int i = 0; i < array.size(); i++){ JSONObject obj = (JSONObject) array.get(i); JSONObject tags = (JSONObject) obj.get("tags"); JSONObject dps = (JSONObject) obj.get("dps"); Map<String, Object> timeValueMap = new HashMap<>(); for (Iterator<String> it = dps.keySet().iterator(); it.hasNext();){ String timestamp = it.next(); Date datetime = new Date(Long.parseLong(timestamp) * 1000); timeValueMap.put(DateTimeUtil.format(datetime, retTimeFmt), dps.get(timestamp)); } tagsValuesMap.put(tags.toString(), timeValueMap); } } return tagsValuesMap; } public static void main(String[] args) { OpenTSDBUtil openTSDBUtil = new OpenTSDBUtil(); try{ // write Map<String, String> tagMap = new HashMap<>(); tagMap.put("ch2", "ofo-App"); openTSDBUtil.putData("metric-yangxin", 1519698960307L, 210l, tagMap); // read Filter filter = new Filter(); filter.setType("regexp"); filter.setTagk("ch2"); filter.setFilter("ofo-App"); filter.setGroupBy(Boolean.TRUE); String resContent = openTSDBUtil.getData("metric-yangxin", filter, Aggregator.avg.name(), "1h", "2016-06-27 12:00:00", "2018-06-30 13:00:00"); System.out.println(resContent); }catch (Exception e){ e.printStackTrace(); } } }

 

DateTimeUtil:(此處我忽略不規範的時間戳):

 

package com.xxx.util;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;


/**
 * @author yangxin-ryan
 */
public class DateTimeUtil {

    public static Long parse(String startTime, String timeFormat){
        System.out.println(startTime);
        return Timestamp.valueOf(startTime).getTime();
    }

    public static String format(Date datetime, String retTimeFmt) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(retTimeFmt);
        String date = simpleDateFormat.format(datetime);
        return Timestamp.valueOf(date).toString();
    }
}

 

結果展示: