1. 程式人生 > >Sqoop之java API匯入匯出資料

Sqoop之java API匯入匯出資料

折騰了一下午終於成功了,這裡做一下總結

專案依賴:

<dependency>

<groupId>org.apache.sqoop</groupId>

<artifactId>sqoop-client</artifactId>

<version>1.99.7</version>

</dependency>

遇到的一些坑都在程式碼中進行註釋,下面就是利用java操作sqoop的程式碼:

import org.apache.sqoop.client.SqoopClient;

import org.apache.sqoop.model.*;

import org.apache.sqoop.submission.counter.Counter;

import org.apache.sqoop.submission.counter.CounterGroup;

import org.apache.sqoop.submission.counter.Counters;

import org.apache.sqoop.validation.Status;



import java.util.Arrays;

import java.util.UUID;



public class SqoopDataModel {

    //建立靜態客戶端物件

    static SqoopClient client;

    //建立jdbc連線

    public static MLink createMysqlLink() {

    //使用內建的聯結器

    MLink link = client.createLink("generic-jdbc-connector");

    // 隨機生成名字,也可以自己自定,用於建立job使用

    link.setName("jdbc-link" + UUID.randomUUID().toString().substring(0, 4));

    link.setCreationUser("wangwang");



    //獲取連線配置物件

    MLinkConfig linkConfig = link.getConnectorLinkConfig();

    //指定連線jdbc路徑uri、驅動、使用者名稱和密碼
  linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost:3306/db1");

linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");

linkConfig.getStringInput("linkConfig.username").setValue("root");

linkConfig.getStringInput("linkConfig.password").setValue("wuwenwang00oo");

// 這裡必須指定 identifierEnclose, 它預設是雙引號,mysql也會報錯

//表示解析sql語句的單詞界定符,這裡我配置成空格

linkConfig.getStringInput("dialect.identifierEnclose").setValue(" ");

//儲存連線

    Status status = client.saveLink(link);

    if (status.canProceed()) {

        System.out.println("Created Link with Link Name : " + link.getName());

        return link;

    } else {

        System.out.println("Something went wrong creating the link");

        return null;

    }

}


/**
 *建立hdfs連線
 *
 */
public static MLink createHdfsLink() {

    //使用內建的聯結器

    MLink link = client.createLink("hdfs-connector");
    
    link.setName("hdfs-link" + UUID.randomUUID().toString().substring(0, 4));

    link.setCreationUser("wangwang");

    //獲取連線配置物件,並配置hdfs路徑及hadoop配置路徑

    MLinkConfig linkConfig = link.getConnectorLinkConfig();

    linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://localhost:9000/");
   linkConfig.getStringInput("linkConfig.confDir").setValue("/Users/wangwang/softdir/hadoop-2.8.5/etc/hadoop");

    //儲存連線

    Status status = client.saveLink(link);

    if (status.canProceed()) {

        System.out.println("Created Link with Link Name : " + link.getName());

        return link;

    } else {

        System.out.println("Something went wrong creating the link");

        return null;

    }

}

/**

* job:mysql to hdfs

* @param fromLink

* @param toLink

* @return

*/

public static String createMysqlToHdfsJob(MLink fromLink, MLink toLink) {

    //建立job,引數1表示資料來源link名稱,引數2表示目的地link名稱

    MJob job = client.createJob(fromLink.getName(), toLink.getName());

    job.setName("wangwang-job" + UUID.randomUUID());
    
    job.setCreationUser("wangwang");

    //獲取資料來源配置物件fromJobConfig,並配置資料庫名和表名,以及欄位名

    MFromConfig fromJobConfig = job.getFromJobConfig();

    fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("db1");

    fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("t_user");
    fromJobConfig.getListInput("fromJobConfig.columnList").setValue(Arrays.asList("id", "user_name", "passwd"));

    //獲取目的地配置物件,並配置輸出路徑、輸出格式、配置壓縮比、是否覆蓋空值

    MToConfig toJobConfig = job.getToJobConfig();

    //這裡為了每次不對輸出檔案刪除,我做了隨機拼接操作,保證每次的輸出路徑不同,因為sqoop的hdfs匯出路徑要求不能存在

    toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/sqooptest" + UUID.randomUUID());

    //如果不指定輸出格式,則會出現以下異常

    //Caused by: org.apache.sqoop.common.SqoopException: MAPRED_EXEC_0013:Cannot write to the data writer

    toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");

    toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE");

    toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(true);

    //獲取驅動器並指定map數量

    MDriverConfig driverConfig = job.getDriverConfig();

    driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);

    //儲存job

    Status status = client.saveJob(job);

    if (status.canProceed()) {

        System.out.println("Created Job with Job Name: " + job.getName());

        return job.getName();

    } else {

        System.out.println("Something went wrong creating the job");

        return null;

    }

}

/**

* job:hdfs to mysql

* @param fromLink

* @param toLink

* @return

*/

public static String createHdfsToMysqlJob(MLink fromLink, MLink toLink) {

    MJob job = client.createJob(fromLink.getName(), toLink.getName());
    
    job.setName("wangwang" + UUID.randomUUID());

    job.setCreationUser("wangwang");
    


    MFromConfig fromJobConfig = job.getFromJobConfig();

    fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/sqoopDir");

    MToConfig toJobConfig = job.getToJobConfig();

    toJobConfig.getStringInput("toJobConfig.tableName").setValue("t_user");

    //這裡不需要指定表的欄位,否則會出現語法錯誤

    //GENERIC_JDBC_CONNECTOR_0002:Unable to execute the SQL

    // toJobConfig.getListInput("toJobConfig.columnList")

    // .setValue(Arrays.asList("id", "user_name", "passwd"));

    MDriverConfig driverConfig = job.getDriverConfig();

    //這裡指定map數量,檢視mapreduce執行情況發現就沒有reduce任務

    driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);

    //這裡我們不能指定reduce的數量,否則會出現異常:No data available in table

    //driverConfig.getIntegerInput("throttlingConfig.numLoaders").setValue(10);



    Status status = client.saveJob(job);

    if (status.canProceed()) {

        System.out.println("Created Job with Job Name: " + job.getName());

        return job.getName();

    } else {

        System.out.println("Something went wrong creating the job");

    return null;

    }

}

//啟動job

static void startJob(String jobName) {

    //Job start

    MSubmission submission = client.startJob(jobName);

    System.out.println("Job Submission Status : " + submission.getStatus());

    if (submission.getStatus().isRunning() && submission.getProgress() != -1) {

    System.out.println("Progress : " + String.format("%.2f %%",     submission.getProgress() * 100));

}



    System.out.println("Hadoop job id :" + submission.getExternalJobId());

    System.out.println("Job link : " + submission.getExternalLink());

    Counters counters = submission.getCounters();

    if (counters != null) {

    System.out.println("Counters:");

    for (CounterGroup group : counters) {

        System.out.print("\t");

        System.out.println(group.getName());

        for (Counter counter : group) {

            System.out.print("\t\t");

            System.out.print(counter.getName());

            System.out.print(": ");

            System.out.println(counter.getValue());

        }

    }

}

}



public static void main(String[] args) {

    String url = "http://localhost:12000/sqoop/";

    client = new SqoopClient(url);

    System.out.println(client);

    MLink mysqlLink = createMysqlLink();

    MLink hdfsLink = createHdfsLink();

    // 將資料匯入 hdfs

    // startJob(createMysqlToHdfsJob(mysqlLink, hdfsLink));

    // 將資料導回 mysql

    startJob(createHdfsToMysqlJob(hdfsLink, mysqlLink));

    }

}