1. 程式人生 > >storm+kafka整合異常處理

storm+kafka整合異常處理

1 拷貝kafka依賴jar包到storm lib
[[email protected] libs]# cp kafka_2.10-0.8.2.1.jar /opt/apache-storm-0.9.5/lib/
[[email protected] libs]# cp scala-library-2.10.4.jar /opt/apache-storm-0.9.5/lib/
[[email protected] libs]# cp metrics-core-2.2.0.jar /opt/apache-storm-0.9.5/lib/
[[email protected] libs]# cp zkclient-0.3.jar /opt/apache-storm-0.9.5/lib/
[
[email protected]
libs]# cp log4j-1.2.16.jar /opt/apache-storm-0.9.5/lib/
[[email protected] libs]# cp slf4j-api-1.7.6.jar /opt/apache-storm-0.9.5/lib/
[[email protected] libs]# cp jopt-simple-3.2.jar /opt/apache-storm-0.9.5/lib/
2 執行storm jar命令
報錯資訊:
Exception in thread "main" java.lang.NoClassDefFoundError: storm/kafka/BrokerHosts
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2615)
        at java.lang.Class.getMethod0(Class.java:2856)
        at java.lang.Class.getMethod(Class.java:1668)
        at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

原因:
server端沒有storm-kafka相關jar包
從本地maven庫找到拷過去


5372 [Thread-11-kafka-reader] ERROR backtype.storm.util - Async loop died!
java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
        at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.5.jar:0.9.5]
        at backtype.storm.daemon.executor$fn__6579$fn__6594.invoke(executor.clj:522) ~[storm-core-0.9.5.jar:0.9.5]
        at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_79]
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_79]
        at java.security.AccessController.doPrivileged(Native Method) ~[na:1.7.0_79]
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354) ~[na:1.7.0_79]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_79]
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) ~[na:1.7.0_79]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_79]
        ... 5 common frames omitted

從本地maven庫找到curator-client-2.7.1.jar curator-framework-2.7.1.jar拷過去
java.lang.NoClassDefFoundError: com/google/common/base/Preconditions
        at org.apache.curator.ensemble.fixed.FixedEnsembleProvider.<init>(FixedEnsembleProvider.java:39) ~[curator-client-2.7.1.jar:na]
        at org.apache.curator.framework.CuratorFrameworkFactory$Builder.connectString(CuratorFrameworkFactory.java:193) ~[curator-framework-2.7.1.jar:na]
        at org.apache.curator.framework.CuratorFrameworkFactory.newClient(CuratorFrameworkFactory.java:94) ~[curator-framework-2.7.1.jar:na]
        at storm.kafka.ZkState.newCurator(ZkState.java:45) ~[storm-kafka-0.9.5.jar:0.9.5]
        at storm.kafka.ZkState.<init>(ZkState.java:61) ~[storm-kafka-0.9.5.jar:0.9.5]
        at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) ~[storm-kafka-0.9.5.jar:0.9.5]
        at backtype.storm.daemon.executor$fn__6579$fn__6594.invoke(executor.clj:522) ~[storm-core-0.9.5.jar:0.9.5]
        at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.ClassNotFoundException: com.google.common.base.Preconditions
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_79]
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_79]
        at java.security.AccessController.doPrivileged(Native Method) ~[na:1.7.0_79]
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354) ~[na:1.7.0_79]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_79]
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) ~[na:1.7.0_79]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_79]
需要guava-11.0.2.jar,從hadoop home下的share=common-lib找的

又報zookeeper錯誤,將zookeeper-3.4.6.jar放進去
又報下列錯誤:
5481 [Thread-13-kafka-reader] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-test-topic5/partitions
        at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[storm-kafka-0.9.5.jar:0.9.5]
        at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) ~[storm-kafka-0.9.5.jar:0.9.5]
        at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) ~[storm-kafka-0.9.5.jar:0.9.5]
        at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) ~[storm-kafka-0.9.5.jar:0.9.5]
        at backtype.storm.daemon.executor$fn__6579$fn__6594.invoke(executor.clj:522) ~[storm-core-0.9.5.jar:0.9.5]
        at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-test-topic5/partitions
        at storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:94) ~[storm-kafka-0.9.5.jar:0.9.5]
        at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:65) ~[storm-kafka-0.9.5.jar:0.9.5]
        ... 7 common frames omitted
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-test-topic5/partitions
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:111) ~[zookeeper-3.4.6.jar:3.4.6-1569965]
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) ~[zookeeper-3.4.6.jar:3.4.6-1569965]
        at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1590) ~[zookeeper-3.4.6.jar:3.4.6-1569965]
        at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:214) ~[curator-framework-2.7.1.jar:na]
        at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:203) ~[curator-framework-2.7.1.jar:na]
        at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[curator-client-2.7.1.jar:na]
        at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:200) ~[curator-framework-2.7.1.jar:na]
        at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:191) ~[curator-framework-2.7.1.jar:na]
        at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:38) ~[curator-framework-2.7.1.jar:na]
        at storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:91) ~[storm-kafka-0.9.5.jar:0.9.5]
        ... 8 common frames omitted

原因:KeeperErrorCode = NoNode for /brokers
配置kafka時,如果使用zookeeper create /kafka建立了節點,kafka與storm整合時new ZkHosts(zks) 需要改成 new ZkHosts(zks,”/kafka/brokers”),不然會報java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic5/partitions。
storm-kafka外掛預設kafka的 zk_path如下:
public class ZkHosts implements BrokerHosts {
private static final String DEFAULT_ZK_PATH = “/brokers”;
改一下程式碼:

相關推薦

storm+kafka整合異常處理

1 拷貝kafka依賴jar包到storm lib [[email protected] libs]# cp kafka_2.10-0.8.2.1.jar /opt/apache-storm-0.9.5/lib/ [[email protected] libs]# cp scala-lib

Storm-kafka整合——1.1.0版本storm中tuple取KafkaSpout資料詳解

問題描述:KafkaSpout拉取kafka topic資料,下一級bolt從kafkaspout獲取資料,tuple到底採用什麼方法取出spout中的訊息呢?KafkaSpout建立:/* *根據資料來源topic和zk_id建立並返回kafkaSpout * */ pub

Storm--kafka整合

Bolt1: package com.jiangnan.storm.kafka; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.apache.storm.tas

Kafka消費異常處理

異常org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned

spring-mybatis整合項目 異常處理

pat 失敗 encoding endpoint per ans 分隔 ctp 技術分享 java.lang.reflect.InvocationTargetException at java.base/jdk.internal.reflect.NativeMet

springboot(3)——整合freemarker模板、AOP統一處理、全域性異常處理

《三》、整合freemarker模板、AOP統一處理、全域性異常處理 一、整合freemarker模板引擎 1、引入freemarker依賴 <dependency> <groupId>org.springframe

Java框架-SpringMVC統一異常處理、ssm框架整合

1. SpringMVC中異常處理 1.1 各層處理異常原則即實現方法 1.1.1 各層處理異常原則 dao:不處理,拋異常; service:不處理,拋異常; controller/servlet:必須處理,否則錯誤資訊將直接在瀏覽器顯示給使用者看。 1.1.2 異

Spring Boot 全域性異常處理 與 Hibernate Validator校驗框架整合

Hibernate Validator校驗框架的使用 Spring boot已經集成了hibernate-validator,不需要引入maven,其他框架也可以自己引入: <dependency> <groupId>org.h

【SpringMVC整合MyBatis】springmvc異常處理-全域性異常處理器開發

異常處理 1.異常處理思路 系統中異常包括兩類:預期異常和執行時異常RuntimeException,前者通過捕獲異常從而獲取異常資訊,後者主要通過規範程式碼開發、測試通過手段減少執行時異常的發生。 系統的dao、service、controller出現都通過throws E

大資料處理框架之:Storm + Kafka + zookeeper 叢集

Storm kafka zookeeper 叢集 我們知道storm的作用主要是進行流式計算,對於源源不斷的均勻資料流流入處理是非常有效的,而現實生活中大部分場景並不是均勻的資料流,而是時而多時而少的資料流入,這種情況下顯然用批量處理是不合適的,如果使用storm做實時計算的話可能因為資

flume+kafka+storm整合使用

Flume-ng Flume是一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。        不過這裡寫寫自己的見解 這個是flume的架構圖  從上圖可以看到幾個名詞: Agent: 一個Agent包含Source、Channel、Sink和其他的元件

Storm Kafka Integration (0.10.x+)官方文件翻譯:stormkafka整合

Storm Kafka Integration (0.10.x+) 相容性 Apache Kafka版本0.10以上 向kafka寫資料作為拓撲的一部分 你可以建立一個org.apache.storm.kafka.bolt.KafkaBolt的例項,

Amqp整合com.rabbitmq.client.ShutdownSignalException: channel error; protocol method異常處理

java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.ja

Storm系列(六)stormkafka整合

使用kafka-client jar進行Storm Apache Kafka整合 這包括新的Apache Kafka消費者API。相容性 Apache Kafka版本0.10起 引入jar包 <dependency> <groupId>org.apache.st

springboot整合mysql時報CLIENT_PLUGIN_AUTH is required異常處理

關於解決springboot在整合MySQL時,操作資料庫時如下異常: java.sql.SQLNonTransientConnectionException: CLIENT_PLUGIN_AUTH is required at com.mysql.cj.jdbc.exceptions

stormkafka整合

storm和kafka整合 依賴 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> &

storm-kafka spout獲取資料的異常解決辦法

20391 [Thread-8-kfkaSpout] WARN storm.kafka.KafkaUtils - Got fetch request with offset out of range: [140885176]; retrying with default

StormKafka 整合

這裡的目標是kafka 負責生產資料,storm 消費資料並將結果輸出 這裡用的是引進別人家寫的整合程式碼,因為使用的人也比較多,下面是專案地址 下載、解壓以及將這個目錄下的程式碼新增進專案 將kafka 和 storm 的JAR 新增進專案,作為依賴jar 包 然

2017-08-14 flume+kafka+storm+hdfs整合

基礎環境: Redhat 5.5 64位(我這裡是三臺虛擬機器h40,h41,h42) myeclipse 8.5 jdk1.7.0_25 zookeeper-3.4.5叢集 apache-storm-0.9.5叢集 kafka_2.1

2+6多機安裝部署、部分異常處理以及使用configtxlator對區塊基本資訊查詢(kafka共識,手動非docker方式)

根據蒐集的資料安裝測試並在安裝測試過程中整理的文件,如有不足希望不吝賜教。 本文介紹CentOS7中hyperledger-fabric1.1多機部署使用kafka共識非docker方式,大體上與之前solo共識的步驟類似,(solo共識:《CentOS7中hyperle