1. 程式人生 > >DataX安裝部署-Reader外掛二次開發

DataX安裝部署-Reader外掛二次開發

DataX

DataX 是阿里巴巴集團內被廣泛使用的離線資料同步工具/平臺,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構資料來源之間高效的資料同步功能。

DataX詳細介紹

DataX安裝部署及小試 1.下載壓縮包: 下載頁面地址:https://github.com/alibaba/DataX 在頁面中【Quick Start】--->【Download DataX下載地址】進行下載。下載後的包名:datax.tar.gz。解壓後{datax}目錄下有{bin conf job lib log log_perf plugin script tmp}幾個目錄。

2.安裝 將下載後的壓縮包直接解壓後可用,前提是對應的java及python環境滿足要求。

System Requirements:

  • Linux
  • JDK(1.6以上,推薦1.6)
  • Python(推薦Python2.6.X)一定要為python2,因為後面執行datax.py的時候,裡面的python的print會執行不了,導致執行不成功,會提示你print語法要加括號,python2中加不加都行 python3中必須要加,否則報語法錯,因為執行過程通過python指令碼執行,所以python3環境報錯無法執行。
  • Apache Maven 3.x (Compile DataX)

3.測試 配置測試樣例:下面我們配置一組 從TXT文字到另一個TXT文字。  

第一步、建立作業的配置檔案(json格式) 在bin目錄執行 python datax.py -r {your_reader} -w {your_writer}

{your_reader} 為datax\plugin\reader 目錄下的外掛名字,到包名就可以,{your_writer}相同;

如圖:txtfilereaderTest是我二次開發之後的包,在datax\plugin\reader目錄下;

執行python datax.py -r txtfilereaderTest -w txtFileWriter;會生成下面紅框json模板,只需要將其複製出來,在datax\job\目錄下新建一個newJob.josn 然後內容使用該模板。

自己配置的模板如下:

  1. {
  2.     "job": {
  3.         "content": [
  4.             {
  5.                "reader": {
  6.                     "name": "Test",
  7.                     "parameter": {
  8.                         "column": [
  9.                         {
  10.                              "index": 0,
  11.                               "type": "string"
  12.                         },
  13.                         {
  14.                              "index": 1,
  15.                               "type": "string"
  16.                         }
  17.                         ],
  18.                         "encoding": "utf-8",
  19.                         "fieldDelimiter": ",",
  20.                         "skipHeader": "True",
  21.                         "path": ["E:/python_project/BigData-Base/etl/test/ADDRESS02.txt"]
  22.                     }
  23.                 },
  24.                 "writer": {
  25.                     "name": "txtfilewriter",
  26.                     "parameter": {
  27.                         "dateFormat": "yyyy-MM-dd",
  28.                         "fieldDelimiter": ",",
  29.                         "fileName": "ADDRESS.txt",
  30.                         "path": "E:/datax/job/",
  31.                         "writeMode": "append"
  32.                     }
  33.                 }
  34.             }
  35.         ],
  36.         "setting": {
  37.             "speed": {
  38.                 "channel": "1"
  39.             }
  40.         }
  41.     }
  42. }

模板具體內容怎麼配置可參考:https://github.com/alibaba/DataX ,官方每個包內都有個doc資料夾,裡面專門有配置引數說明,按照說明配置即可。

啟動:python datax.py {your_file} 如果是Windows則要寫絕對路徑。

python datax.py E:\datax\job\txtjsonTest.json

windows下亂碼修復: 我把這個工具遷移到一臺windows主機上使用時候看到控制檯友好的中文提示居然都變成了亂碼了(話說有中文提示也是我選擇他很重要的理由啊)。還好官方也給出瞭解決方案: 開啟CMD.exe命令列視窗 通過 chcp命令改變內碼表,UTF-8的內碼表為65001 chcp 65001 執行該操作後,內碼表就被變成UTF-8了。但是,在視窗中仍舊不能正確顯示UTF-8字元。 修改視窗屬性,改變字型,在命令列標題欄上點選右鍵,選擇"屬性"->"字型",將字型修改為True Type字型"Lucida Console",然後點選確定將屬性應用到當前視窗。 執行:

型別 資料來源 Reader(讀) Writer(寫) 文件
RDBMS 關係型資料庫 MySQL  、
Oracle         √         √      、
SQLServer  、
PostgreSQL  、
DRDS  、
通用RDBMS(支援所有關係型資料庫)  、
阿里雲數倉資料儲存 ODPS  、
ADS
OSS  、
OCS  、
NoSQL資料儲存 OTS  、
Hbase0.94  、
Hbase1.1  、
Phoenix4.x  、
MongoDB  、
Hive  、
無結構化資料儲存 TxtFile  、
FTP  、
HDFS  、
Elasticsearch

也可以參考其他部落格,測試例子基本相同,實際使用根據自己需求調整配置檔案即可,目前DataX已經迭代到了3.0版本,開源部分目前是單機版本,不過阿里內部已經可以叢集運行了,3.0中也有所體現,未來說不定叢集版本也會開源呢。

列舉一下官方文件:

DataX外掛二次開發

如果官方提供的外掛沒有自己需要用的怎麼辦,就需要自己開發需要的外掛了,可參考官方DataX外掛開發寶典。

我做了一個Reader外掛,讀取TXT文件的時候清除單引號的ETLdemo。

1. 建立一個maven工程。

模板:只需要改artifactId,name其他內容不變。

pom.xml內容,紅色標記為自己檔案

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
      <groupId>com.alibaba.datax</groupId>
      <artifactId>datax-all</artifactId>
      <version>0.0.1-SNAPSHOT</version>
   </parent>
   <artifactId>Test</artifactId>
   <name>Test</name>
   <description>txtFilereaderTest,並可以根據使用者配置的型別進行型別轉換,建議開發、測試環境使用。</description>
   <packaging>jar</packaging>

   <dependencies>
      <dependency>
         <groupId>com.alibaba.datax</groupId>
         <artifactId>datax-common</artifactId>
         <version>${datax-project-version}</version>
         <exclusions>
            <exclusion>
               <artifactId>slf4j-log4j12</artifactId>
               <groupId>org.slf4j</groupId>
            </exclusion>
         </exclusions>
      </dependency>
      <dependency>
         <groupId>com.alibaba.datax</groupId>
         <artifactId>plugin-unstructured-storage-util</artifactId>
         <version>${datax-project-version}</version>
      </dependency>
      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
      </dependency>
      <dependency>
         <groupId>ch.qos.logback</groupId>
         <artifactId>logback-classic</artifactId>
      </dependency>
      <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
            <version>16.0.1</version>
      </dependency>
   </dependencies>

   <build>
      <plugins>
         <!-- compiler plugin -->
         <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
               <source>1.6</source>
               <target>1.6</target>
               <encoding>${project-sourceEncoding}</encoding>
            </configuration>
         </plugin>
         <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
               <descriptors>
                  <descriptor>src/main/assembly/package.xml</descriptor>
               </descriptors>
               <finalName>datax</finalName>
            </configuration>
            <executions>
               <execution>
                  <id>dwzip</id>
                  <phase>package</phase>
                  <goals>
                     <goal>single</goal>
                  </goals>
               </execution>
            </executions>
         </plugin>
      </plugins>
   </build>

</project>

2. 建立如下檔案樹

package.xml 檔案為最後打包檔案

它會讀取模板配置:plugin.json;plugin_job_template.json

需要自行修改的已標紅,其他目錄級別不可變。

<assembly
   xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
   <id></id>
   <formats>
      <format>dir</format>
   </formats>
   <includeBaseDirectory>false</includeBaseDirectory>
   <fileSets>
      <fileSet>
         <directory>src/main/resources</directory>
         <includes>
            <include>plugin.json</include>
            <include>plugin_job_template.json</include>
         </includes>
         <outputDirectory>plugin/reader/txtFilereaderTest</outputDirectory>
      </fileSet>
      <fileSet>
         <directory>target/</directory>
         <includes>
            <include>txtFilereaderTest-0.0.1-SNAPSHOT.jar</include>
         </includes>
         <outputDirectory>plugin/reader/txtFilereaderTest</outputDirectory>
      </fileSet>
   </fileSets>

   <dependencySets>
      <dependencySet>
         <useProjectArtifact>false</useProjectArtifact>
         <outputDirectory>plugin/reader/txtFilereaderTest/libs</outputDirectory>
         <scope>runtime</scope>
      </dependencySet>
   </dependencySets>
</assembly>

 plugin.json檔案:

class:你自己的class檔案目錄,我的是reader外掛,所以在reader目錄下的txtFilereaderTest包裡的Test類。

{
    "name": "Test",
    "class": "com.alibaba.datax.plugin.reader.txtFilereaderTest.Test",
    "description": "useScene: test. mechanism: use datax framework to transport data from txt file. warn: The more you know about the data, the less problems you encounter.",
    "developer": "alibaba"
}

plugin_job_template.json檔案:

該檔案用於開始生成模板的檔案,reader外掛與writer外掛各有不同,可參考官方不同包下的doc資料夾裡的說明。

{
    "name": "Test",
    "parameter": {
        "path": [],
        "encoding": "",
        "column": [],
        "fieldDelimiter": ""
    }
}

剩下的如何編寫邏輯就要自己實現了,可以先參考官方說明:DataX外掛開發寶典 

我這裡簡單實現一下去除文字中單引號,部分程式碼如下:

重寫startRead方法,並且在檔案輸入流中對讀入的文字進行字串替換處理,將“ ‘北京‘ ”,處理為:“北京”;

自己要實現的所有邏輯都在此編寫。

@Override
    public void startRead(RecordSender recordSender) {
        LOG.debug("start read source files...");
        for (String fileName : this.sourceFiles) {
            LOG.info(String.format("reading file : [%s]", fileName));
            InputStream inputStream;
            InputStream inputStreamEtl;
            try {
                inputStream = new FileInputStream(fileName);
                try {
                    inputStreamEtl =  string_InputStream(inputStream_String(inputStream));
                    com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil.readFromStream(inputStreamEtl,
                            fileName, this.readerSliceConfig, recordSender,
                            this.getTaskPluginCollector());
                    recordSender.flush();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } catch (FileNotFoundException e) {
                // warn: sock 檔案無法read,能影響所有檔案的傳輸,需要使用者自己保證
                String message = String
                        .format("找不到待讀取的檔案 : [%s]", fileName);
                LOG.error(message);
                throw DataXException.asDataXException(
                        TxtFileReaderErrorCode.OPEN_FILE_ERROR, message);
            }
        }
        LOG.debug("end read source files...");
    }

}

/**
 * inputStream to String
 * @param in
 * @return "去除字串中引號"
 * @throws Exception
 */
public static String inputStream_String(InputStream in) throws Exception
{
    java.io.ByteArrayOutputStream swapStream = new java.io.ByteArrayOutputStream();
    int ch;
    while ((ch = in.read()) != -1) {
        swapStream.write(ch);
    }
    /**
     * ETL: 去除字元之間的單引號
     * '北京', '大興',
     * 北京, 大興,
     */
    return swapStream.toString().replace("'","");
}

/**
 * String to inputStream
 * @param str
 * @return
 * @throws Exception
 */
public static ByteArrayInputStream string_InputStream(String str) throws Exception{
    ByteArrayInputStream stream= new ByteArrayInputStream(str.getBytes());
    return stream;
}

3. 打包部署 

編寫好的檔案按照上圖檔案樹放置,然後新增自己外掛到Git clone下的原始碼最外層package.xml和pom.xml 檔案。

package.xml

directory:為你的外掛包名,不是類名

<fileSet>
    <directory>txtFilereaderTest/target/datax/</directory>
    <includes>
        <include>**/*.*</include>
    </includes>
    <outputDirectory>datax</outputDirectory>
</fileSet>

pom.xml

同上,新增自己包名,新增至reader樹內,我使用idea開發,所以maven工程可直接打包。

注意:該pom檔案用maven打包會將所有外掛都重新打一次包,如果只想打自己的包,將其他reader和writer包都註釋掉,不然需要好久

<module>txtFileReaderTest</module>

4. 打包成功 

應該包含以下目錄及檔案:

libs為所有依賴,另外兩個json檔案上面提過,一個載入class檔案,一個生成配置;

執行:python datax.py -r txtfilereader -w txtFileWriter;

#### 據說功能很強大,還在摸索中