1. 程式人生 > >Storm 系列(七)—— Storm 整合 Redis 詳解

Storm 系列(七)—— Storm 整合 Redis 詳解

一、簡介

Storm-Redis 提供了 Storm 與 Redis 的整合支援,你只需要引入對應的依賴即可使用:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-redis</artifactId>
    <version>${storm.version}</version>
    <type>jar</type>
</dependency> 

Storm-Redis 使用 Jedis 為 Redis 客戶端,並提供瞭如下三個基本的 Bolt 實現:

  • RedisLookupBolt:從 Redis 中查詢資料;
  • RedisStoreBolt:儲存資料到 Redis;
  • RedisFilterBolt : 查詢符合條件的資料;

RedisLookupBoltRedisStoreBoltRedisFilterBolt 均繼承自 AbstractRedisBolt 抽象類。我們可以通過繼承該抽象類,實現自定義 RedisBolt,進行功能的拓展。

二、整合案例

2.1 專案結構

這裡首先給出一個整合案例:進行詞頻統計並將最後的結果儲存到 Redis。專案結構如下:

用例原始碼下載地址:storm-redis-integration

2.2 專案依賴

專案主要依賴如下:

<properties>
    <storm.version>1.2.2</storm.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-redis</artifactId>
        <version>${storm.version}</version>
    </dependency>
</dependencies>

2.3 DataSourceSpout

/**
 * 產生詞頻樣本的資料來源
 */
public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // 模擬產生資料
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }


    /**
     * 模擬資料
     */
    private String productData() {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t", 0, endIndex);
    }

}

產生的模擬資料格式如下:

Spark   HBase
Hive    Flink   Storm   Hadoop  HBase   Spark
Flink
HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive
Hadoop  Spark   HBase   Storm

2.4 SplitBolt

/**
 * 將每行資料按照指定分隔符進行拆分
 */
public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String line = input.getStringByField("line");
        String[] words = line.split("\t");
        for (String word : words) {
            collector.emit(new Values(word, String.valueOf(1)));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

2.5 CountBolt

/**
 * 進行詞頻統計
 */
public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts = new HashMap<>();

    private OutputCollector collector;


    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector=collector;
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        // 輸出
        collector.emit(new Values(word, String.valueOf(count)));

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

2.6 WordCountStoreMapper

實現 RedisStoreMapper 介面,定義 tuple 與 Redis 中資料的對映關係:即需要指定 tuple 中的哪個欄位為 key,哪個欄位為 value,並且儲存到 Redis 的何種資料結構中。

/**
 * 定義 tuple 與 Redis 中資料的對映關係
 */
public class  WordCountStoreMapper implements RedisStoreMapper {
    private RedisDataTypeDescription description;
    private final String hashKey = "wordCount";

    public WordCountStoreMapper() {
        description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return description;
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return tuple.getStringByField("count");
    }
}

2.7 WordCountToRedisApp

/**
 * 進行詞頻統計 並將統計結果儲存到 Redis 中
 */
public class WordCountToRedisApp {

    private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
    private static final String SPLIT_BOLT = "splitBolt";
    private static final String COUNT_BOLT = "countBolt";
    private static final String STORE_BOLT = "storeBolt";

    //在實際開發中這些引數可以將通過外部傳入 使得程式更加靈活
    private static final String REDIS_HOST = "192.168.200.226";
    private static final int REDIS_PORT = 6379;

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
        // split
        builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
        // count
        builder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLT);
        // save to redis
        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
        RedisStoreMapper storeMapper = new WordCountStoreMapper();
        RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
        builder.setBolt(STORE_BOLT, storeBolt).shuffleGrouping(COUNT_BOLT);

        // 如果外部傳參 cluster 則代表線上環境啟動否則代表本地啟動
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterWordCountToRedisApp", new Config(), builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalWordCountToRedisApp",
                    new Config(), builder.createTopology());
        }
    }
}

2.8 啟動測試

可以用直接使用本地模式執行,也可以打包後提交到伺服器叢集執行。本倉庫提供的原始碼預設採用 maven-shade-plugin 進行打包,打包命令如下:

# mvn clean package -D maven.test.skip=true

啟動後,檢視 Redis 中的資料:

三、storm-redis 實現原理

3.1 AbstractRedisBolt

RedisLookupBoltRedisStoreBoltRedisFilterBolt 均繼承自 AbstractRedisBolt 抽象類,和我們自定義實現 Bolt 一樣,AbstractRedisBolt 間接繼承自 BaseRichBolt

AbstractRedisBolt 中比較重要的是 prepare 方法,在該方法中通過外部傳入的 jedis 連線池配置 ( jedisPoolConfig/jedisClusterConfig) 建立用於管理 Jedis 例項的容器 JedisCommandsInstanceContainer

public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
    protected OutputCollector collector;

    private transient JedisCommandsInstanceContainer container;

    private JedisPoolConfig jedisPoolConfig;
    private JedisClusterConfig jedisClusterConfig;

   ......
   
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        // FIXME: stores map (stormConf), topologyContext and expose these to derived classes
        this.collector = collector;

        if (jedisPoolConfig != null) {
            this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
        } else if (jedisClusterConfig != null) {
            this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
        } else {
            throw new IllegalArgumentException("Jedis configuration not found");
        }
    }

  .......
}

JedisCommandsInstanceContainerbuild() 方法如下,實際上就是建立 JedisPool 或 JedisCluster 並傳入容器中。

public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
        JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
        return new JedisContainer(jedisPool);
    }

 public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
        JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getTimeout(), config.getMaxRedirections(), config.getPassword(), DEFAULT_POOL_CONFIG);
        return new JedisClusterContainer(jedisCluster);
    }

3.2 RedisStoreBolt和RedisLookupBolt

RedisStoreBolt 中比較重要的是 process 方法,該方法主要從 storeMapper 中獲取傳入 key/value 的值,並按照其儲存型別 dataType 呼叫 jedisCommand 的對應方法進行儲存。

RedisLookupBolt 的實現基本類似,從 lookupMapper 中獲取傳入的 key 值,並進行查詢操作。

public class RedisStoreBolt extends AbstractRedisBolt {
    private final RedisStoreMapper storeMapper;
    private final RedisDataTypeDescription.RedisDataType dataType;
    private final String additionalKey;

   public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
        super(config);
        this.storeMapper = storeMapper;

        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
        this.dataType = dataTypeDescription.getDataType();
        this.additionalKey = dataTypeDescription.getAdditionalKey();
    }

    public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
        super(config);
        this.storeMapper = storeMapper;

        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
        this.dataType = dataTypeDescription.getDataType();
        this.additionalKey = dataTypeDescription.getAdditionalKey();
    }
       
  
    @Override
    public void process(Tuple input) {
        String key = storeMapper.getKeyFromTuple(input);
        String value = storeMapper.getValueFromTuple(input);

        JedisCommands jedisCommand = null;
        try {
            jedisCommand = getInstance();

            switch (dataType) {
                case STRING:
                    jedisCommand.set(key, value);
                    break;

                case LIST:
                    jedisCommand.rpush(key, value);
                    break;

                case HASH:
                    jedisCommand.hset(additionalKey, key, value);
                    break;

                case SET:
                    jedisCommand.sadd(key, value);
                    break;

                case SORTED_SET:
                    jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
                    break;

                case HYPER_LOG_LOG:
                    jedisCommand.pfadd(key, value);
                    break;

                case GEO:
                    String[] array = value.split(":");
                    if (array.length != 2) {
                        throw new IllegalArgumentException("value structure should be longitude:latitude");
                    }

                    double longitude = Double.valueOf(array[0]);
                    double latitude = Double.valueOf(array[1]);
                    jedisCommand.geoadd(additionalKey, longitude, latitude, key);
                    break;

                default:
                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
            }

            collector.ack(input);
        } catch (Exception e) {
            this.collector.reportError(e);
            this.collector.fail(input);
        } finally {
            returnInstance(jedisCommand);
        }
    }

     .........
}

3.3 JedisCommands

JedisCommands 介面中定義了所有的 Redis 客戶端命令,它有以下三個實現類,分別是 Jedis、JedisCluster、ShardedJedis。Strom 中主要使用前兩種實現類,具體呼叫哪一個實現類來執行命令,由傳入的是 jedisPoolConfig 還是 jedisClusterConfig 來決定。

3.4 RedisMapper 和 TupleMapper

RedisMapper 和 TupleMapper 定義了 tuple 和 Redis 中的資料如何進行對映轉換。

1. TupleMapper

TupleMapper 主要定義了兩個方法:

  • getKeyFromTuple(ITuple tuple): 從 tuple 中獲取那個欄位作為 Key;

  • getValueFromTuple(ITuple tuple):從 tuple 中獲取那個欄位作為 Value;

2. RedisMapper

定義了獲取資料型別的方法 getDataTypeDescription(),RedisDataTypeDescription 中 RedisDataType 列舉類定義了所有可用的 Redis 資料型別:

public class RedisDataTypeDescription implements Serializable { 

    public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG, GEO }
     ......
    }

3. RedisStoreMapper

RedisStoreMapper 繼承 TupleMapper 和 RedisMapper 介面,用於資料儲存時,沒有定義額外方法。

4. RedisLookupMapper

RedisLookupMapper 繼承 TupleMapper 和 RedisMapper 介面:

  • 定義了 declareOutputFields 方法,宣告輸出的欄位。
  • 定義了 toTuple 方法,將查詢結果組裝為 Storm 的 Values 的集合,並用於傳送。

下面的例子表示從輸入 Tuple 的獲取 word 欄位作為 key,使用 RedisLookupBolt 進行查詢後,將 key 和查詢結果 value 組裝為 values 併發送到下一個處理單元。

class WordCountRedisLookupMapper implements RedisLookupMapper {
    private RedisDataTypeDescription description;
    private final String hashKey = "wordCount";

    public WordCountRedisLookupMapper() {
        description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
    }

    @Override
    public List<Values> toTuple(ITuple input, Object value) {
        String member = getKeyFromTuple(input);
        List<Values> values = Lists.newArrayList();
        values.add(new Values(member, value));
        return values;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("wordName", "count"));
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return description;
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return null;
    }
} 

5. RedisFilterMapper

RedisFilterMapper 繼承 TupleMapper 和 RedisMapper 介面,用於查詢資料時,定義了 declareOutputFields 方法,宣告輸出的欄位。如下面的實現:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("wordName", "count"));
}

四、自定義RedisBolt實現詞頻統計

4.1 實現原理

自定義 RedisBolt:主要利用 Redis 中雜湊結構的 hincrby key field 命令進行詞頻統計。在 Redis 中 hincrby 的執行效果如下。hincrby 可以將欄位按照指定的值進行遞增,如果該欄位不存在的話,還會新建該欄位,並賦值為 0。通過這個命令可以非常輕鬆的實現詞頻統計功能。

redis>  HSET myhash field 5
(integer) 1
redis>  HINCRBY myhash field 1
(integer) 6
redis>  HINCRBY myhash field -1
(integer) 5
redis>  HINCRBY myhash field -10
(integer) -5
redis> 

4.2 專案結構

4.3 自定義RedisBolt的程式碼實現

/**
 * 自定義 RedisBolt 利用 Redis 的雜湊資料結構的 hincrby key field 命令進行詞頻統計
 */
public class RedisCountStoreBolt extends AbstractRedisBolt {

    private final RedisStoreMapper storeMapper;
    private final RedisDataTypeDescription.RedisDataType dataType;
    private final String additionalKey;

    public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
        super(config);
        this.storeMapper = storeMapper;
        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
        this.dataType = dataTypeDescription.getDataType();
        this.additionalKey = dataTypeDescription.getAdditionalKey();
    }

    @Override
    protected void process(Tuple tuple) {
        String key = storeMapper.getKeyFromTuple(tuple);
        String value = storeMapper.getValueFromTuple(tuple);

        JedisCommands jedisCommand = null;
        try {
            jedisCommand = getInstance();
            if (dataType == RedisDataTypeDescription.RedisDataType.HASH) {
                jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value));
            } else {
                throw new IllegalArgumentException("Cannot process such data type for Count: " + dataType);
            }

            collector.ack(tuple);
        } catch (Exception e) {
            this.collector.reportError(e);
            this.collector.fail(tuple);
        } finally {
            returnInstance(jedisCommand);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

4.4 CustomRedisCountApp

/**
 * 利用自定義的 RedisBolt 實現詞頻統計
 */
public class CustomRedisCountApp {

    private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
    private static final String SPLIT_BOLT = "splitBolt";
    private static final String STORE_BOLT = "storeBolt";

    private static final String REDIS_HOST = "192.168.200.226";
    private static final int REDIS_PORT = 6379;

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
        // split
        builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
        // save to redis and count
        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
        RedisStoreMapper storeMapper = new WordCountStoreMapper();
        RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper);
        builder.setBolt(STORE_BOLT, countStoreBolt).shuffleGrouping(SPLIT_BOLT);

        // 如果外部傳參 cluster 則代表線上環境啟動,否則代表本地啟動
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterCustomRedisCountApp", new Config(), builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalCustomRedisCountApp",
                    new Config(), builder.createTopology());
        }
    }
}

參考資料

  1. Storm Redis Integration

更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南