Mysql 流增量寫入 Hdfs(二) --Storm + hdfs 的流式處理
一. 概述
上一篇我們介紹瞭如何將資料從 mysql 拋到 kafka,這次我們就專注於利用 storm 將資料寫入到 hdfs 的過程,由於 storm 寫入 hdfs 的可定製東西有些多,我們先不從 kafka 讀取,而先自己定義一個 Spout 資料充當資料來源,下章再進行整合。這裡預設你是擁有一定的 storm 知識的基礎,起碼知道 Spout 和 bolt 是什麼。
寫入 hdfs 可以有以下的定製策略:
- 自定義寫入檔案的名字
- 定義寫入內容格式
- 滿足給定條件後更改寫入的檔案
- 更改寫入檔案時觸發的 Action
本篇會先說明如何用 storm 寫入 HDFS,寫入過程一些 API 的描述,以及最後給定一個例子:
storm 每接收到 10 個 Tuple 後就會改變 hdfs 寫入檔案,新檔案的名字就是第幾次改變。
ps:storm 版本:1.1.1 。Hadoop 版本:2.7.4 。
接下來我們首先看看 Storm 如何寫入 HDFS 。
二. Storm 寫入 HDFS
Storm 官方有提供了相應的 API 讓我們可以使用。可以通過建立 HdfsBolt 以及定義相應的規則,即可寫入 HDFS。
首先通過 maven 配置依賴以及外掛。
<properties> <storm.version>1.1.1</storm.version> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <!--<scope>provided</scope>--> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>15.0</version> </dependency> <!--hadoop模組--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.4</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.4</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-hdfs --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> <version>1.1.1</version> <!--<scope>test</scope>--> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>com.learningstorm.kafka.KafkaTopology</mainClass> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.7</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
這裡要提一下,如果要打包部署到叢集上的話,打包的外掛需要使用 maven-shade-plugin 這個外掛,然後使用 maven Lifecycle 中的 package 打包。而不是用 Maven-assembly-plugin 外掛進行打包。
因為使用 Maven-assembly-plugin 的時候,會將所有依賴的包unpack,然後在pack,這樣就會出現,同樣的檔案被覆蓋的情況。釋出到叢集上的時候就會報 No FileSystem for scheme: hdfs 的錯 。
然後是使用 HdfsBolt 寫入 Hdfs。這裡來看看官方文件中的例子吧。
// 使用 "|" 來替代 ",",來進行字元分割 RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); // 每輸入 1k 後將內容同步到 Hdfs 中 SyncPolicy syncPolicy = new CountSyncPolicy(1000); // 當檔案大小達到 5MB ,轉換寫入檔案,即寫入到一個新的檔案中 FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); //當轉換寫入檔案時,生成新檔案的名字並使用 FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/foo/"); HdfsBolt bolt = new HdfsBolt() .withFsUrl("hdfs://localhost:9000") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); //生成該 bolt topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
到這裡就結束了。可以將 HdfsBolt 當作一個 Storm 中特殊一些的 bolt 即可。這個 bolt 的作用即使根據接收資訊寫入 Hdfs。
而在新建 HdfsBolt 中,Storm 為我們提供了相當強的靈活性,我們可以定義一些策略,比如當達成某個條件的時候轉換寫入檔案,新寫入檔案的名字,寫入時候的分隔符等等。
如果選擇使用的話,Storm 有提供部分介面供我們使用,但如果我們覺得不夠豐富也可以自定義相應的類。下面我們看看如何控制這些策略吧。
RecordFormat
這是一個介面,允許你自由定義接收到內容的格式。
public interface RecordFormat extends Serializable { byte[] format(Tuple tuple); }
Storm 提供了 DelimitedRecordFormat ,使用方法在上面已經有了。這個類預設的分割符是逗號",",而你可以通過 withFieldDelimiter 方法改變分隔符。
如果你的初始分隔符不是逗號的話,那麼也可以重寫寫一個類實現 RecordFormat 介面即可。
FileNameFormat
同樣是一個介面。
public interface FileNameFormat extends Serializable { void prepare(Map conf, TopologyContext topologyContext); String getName(long rotation, long timeStamp); String getPath(); }
Storm 所提供的預設的是 org.apache.storm.hdfs.format.DefaultFileNameFormat 。預設人使用的轉換檔名有點長,格式是這樣的:
{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
例如:
MyBolt-5-7-1390579837830.txt
預設情況下,字首是空的,擴充套件標識是".txt"。
SyncPolicy
同步策略允許你將 buffered data 緩衝到 Hdfs 檔案中(從而client可以讀取資料),通過實現org.apache.storm.hdfs.sync.SyncPolicy 介面:
public interface SyncPolicy extends Serializable { boolean mark(Tuple tuple, long offset); void reset(); }
FileRotationPolicy
這個介面允許你控制什麼情況下轉換寫入檔案。
public interface FileRotationPolicy extends Serializable { boolean mark(Tuple tuple, long offset); void reset(); }
Storm 有提供三個實現該介面的類:
-
最簡單的就是不進行轉換的org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy ,就是什麼也不幹。
-
通過檔案大小觸發轉換的 org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy。
-
通過時間條件來觸發轉換的 org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy。
如果有更加複雜的需求也可以自己定義。
RotationAction
這個主要是提供一個或多個 hook ,可加可不加。主要是在觸發寫入檔案轉換的時候會啟動。
public interface RotationAction extends Serializable { void execute(FileSystem fileSystem, Path filePath) throws IOException; }
三.實現一個例子
瞭解了上面的情況後,我們會實現一個例子,根據寫入記錄的多少來控制寫入轉換(改變寫入的檔案),並且轉換後文件的名字表示當前是第幾次轉換。
首先來看看 HdfsBolt 的內容:
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" "); // sync the filesystem after every 1k tuples SyncPolicy syncPolicy = new CountSyncPolicy(1000); //FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB); /** rotate file with Date,every month create a new file * format:yyyymm.txt */ FileRotationPolicy rotationPolicy = new CountStrRotationPolicy(); FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/"); RotationAction action = new NewFileAction(); HdfsBolt bolt = new HdfsBolt() .withFsUrl("hdfs://127.0.0.1:9000") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy) .addRotationAction(action);
然後分別來看各個策略的類。
FileRotationPolicy
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.tuple.Tuple; import java.text.SimpleDateFormat; import java.util.Date; /** * 計數以改變Hdfs寫入檔案的位置,當寫入10次的時候,則更改寫入檔案,更改名字取決於 “TimesFileNameFormat” * 這個類是執行緒安全 */ public class CountStrRotationPolicy implements FileRotationPolicy { private SimpleDateFormat df = new SimpleDateFormat("yyyyMM"); private String date =null; private int count = 0; public CountStrRotationPolicy(){ this.date =df.format(new Date()); //this.date = df.format(new Date()); } /** * Called for every tuple the HdfsBolt executes. * * @param tupleThe tuple executed. * @param offset current offset of file being written * @return true if a file rotation should be performed */ @Override public boolean mark(Tuple tuple, long offset) { count ++; if(count == 10) { System.out.print("num :" +count + ""); count = 0; return true; } else { return false; } } /** * Called after the HdfsBolt rotates a file. */ @Override public void reset() { } @Override public FileRotationPolicy copy() { return new CountStrRotationPolicy(); } }
FileNameFormat
import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.task.TopologyContext; import java.util.Map; /** * 決定重新寫入檔案時候的名字 * 這裡會返回是第幾次轉換寫入檔案,將這個第幾次做為檔名 */ public class TimesFileNameFormat implements FileNameFormat { //預設路徑 private String path = "/storm"; //預設字尾 private String extension = ".txt"; private Long times = new Long(0); public TimesFileNameFormat withPath(String path){ this.path = path; return this; } @Override public void prepare(Map conf, TopologyContext topologyContext) { } @Override public String getName(long rotation, long timeStamp) { times ++ ; //返回檔名,檔名為更換寫入檔案次數 return times.toString() + this.extension; } public String getPath(){ return this.path; } }
RotationAction
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; /** 當轉換寫入檔案時候呼叫的 hook ,這裡僅寫入日誌。 */ public class NewFileAction implements RotationAction { private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class); @Override public void execute(FileSystem fileSystem, Path filePath) throws IOException { LOG.info("Hdfs change the written file!!"); return; } }
OK,這樣就大功告成了。通過上面的程式碼,每接收到 10 個 Tuple 後就會轉換寫入檔案,新檔案的名字就是第幾次轉換。
完整程式碼包括一個隨機生成字串的 Spout ,可以到我的 github 上檢視。
ofollow,noindex" target="_blank">StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo
更多幹貨,歡迎關注公眾號, 哈爾的資料城堡 。
