1. 程式人生 > >Storm-Kafka使用筆記(一):Scheme和Mapper

Storm-Kafka使用筆記(一):Scheme和Mapper

一、背景

因為是專案驅動的去了解Storm-kafka,所以會由於分工去先了解某一部分,順序有點雜亂。

二、Storm-Kafka介紹

瞭解Storm的都知道,Storm拓撲中比較基本的就是Spout和Bolt,Spout作為資料發射源,可以從資料庫或者其他地方取得資料然後發射出去,Bolt就是中間一個個運算處理的元件,最後一個Bolt可以擔任資料處理結果彙總或者資料落地的角色。

三、Storm-Kafka為我們提供了什麼

最先了解的是KafkaSpout和KafkaBolt,顯而易見就是把我們上面說的Spout和最後一個Bolt的功用具體化,具體到KafkaSpout就是從Kafka取資料來源的Spout,而KafkaBolt就是把資料處理結果轉化為一定格式,傳送到Kafka的Bolt。

但是,在這之中我們需要有一些規約,就是Spout從Kafka拿到資料後我要怎麼處理轉換成Storm中的資料格式–Tuple,還有Bolt要怎麼把接收到的Tuple轉換成Kafka的格式傳送到Kafka,這就涉及到另外兩個基礎的概念,Scheme和Mapper,它們分別說明了我們上面的規約,並把資料進行轉換再返回。
Scheme就實現了從byte[]到其他格式的轉換(預設提供的是從位元組流到字串的轉換)。Mapper就實現了從Tuple到其他格式的轉換(預設提供的是從Tuple取Field為key的作為key返回,取Field為message的作為message轉換),也讓我們可以個性化實現(當然Storm-Kafka也提供了預設的)。

四、Mapper

Mapper介面:

package org.apache.storm.kafka.bolt.mapper;

import org.apache.storm.tuple.Tuple;

import java.io.Serializable;

/**
 * as the really verbose name suggests this interface mapps a storm tuple to kafka key and message.
 * @param <K> type of key.
 * @param <V> type of value.
 */
public interface TupleToKafkaMapper<K,V> extends Serializable { K getKeyFromTuple(Tuple tuple); V getMessageFromTuple(Tuple tuple); }

可以看到Mapper接口裡面只有兩個方法,分別是獲取Key和Message,對應的就是傳送給Kafka的資訊裡的Key(可選)和Message。

Storm-Kafka提供預設的FieldNameBasedTupleToKafkaMapper是這樣的:

//返回Key和Msg,預設的Field是"key" "message",在new的時候可以自定義,也可以在方法裡面做封裝和拼接
package org.apache.storm.kafka.bolt.mapper;

import org.apache.storm.tuple.Tuple;

public class FieldNameBasedTupleToKafkaMapper<K,V> implements TupleToKafkaMapper<K, V> {

    public static final String BOLT_KEY = "key";
    public static final String BOLT_MESSAGE = "message";
    public String boltKeyField;
    public String boltMessageField;

    public FieldNameBasedTupleToKafkaMapper() {
        this(BOLT_KEY, BOLT_MESSAGE);
    }

    public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String boltMessageField) {
        this.boltKeyField = boltKeyField;
        this.boltMessageField = boltMessageField;
    }

    @Override
    public K getKeyFromTuple(Tuple tuple) {
        //for backward compatibility, we return null when key is not present.
        return tuple.contains(boltKeyField) ? (K) tuple.getValueByField(boltKeyField) : null;
    }

    @Override
    public V getMessageFromTuple(Tuple tuple) {
        return (V) tuple.getValueByField(boltMessageField);
    }
}

然後,其實不難想到KafkaBolt裡面的操作,就是

//取出值然後封裝發出,非常簡單
key = mapper.getKeyFromTuple(input);
message = mapper.getMessageFromTuple(input);

producer.send(new KeyedMessage(topic, key, message));

五、Scheme

Scheme主要負責定義如何從訊息流中解析所需資料。
Scheme介面:

public interface Scheme extends Serializable {  
    public List<Object> deserialize(byte[] ser);  
    public Fields getOutputFields();  
}  

包括反序列化的方法和輸出的欄位宣告。
Storm-Kafka自帶的StringScheme的實現:

public class StringScheme implements Scheme {
    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
    public static final String STRING_SCHEME_KEY = "str";

    public List<Object> deserialize(ByteBuffer bytes) {
        return new Values(deserializeString(bytes));
    }

    public static String deserializeString(ByteBuffer string) {
        if (string.hasArray()) {
            int base = string.arrayOffset();
            return new String(string.array(), base + string.position(), string.remaining());
        } else {
            return new String(Utils.toByteArray(string), UTF8_CHARSET);
        }
    }

    public Fields getOutputFields() {
        return new Fields(STRING_SCHEME_KEY);
    }
}

其實就是返回了String和聲明瞭欄位”str”,只要在方法裡自定義對位元組流的操作,並聲明瞭欄位,就可以自定義自己的Scheme了。