1. 程式人生 > >java整合spring和hadoop HDFS

java整合spring和hadoop HDFS

首先新增 hadoop配置檔案 hbase-site.xml  ,這裡只需要配置zk的地址和埠。

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>127.0.0.1</value>
    </property>
    <property>
        <name>hbase.zookeeper.property.clientPort</name>
        <value>2181</value>
    </property>
</configuration>


然後在spring配置檔案中配置下面資訊,讓spring幫我們注入hadoopConfiguration
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"
    xmlns:cache="http://www.springframework.org/schema/cache" xmlns:hdp="http://www.springframework.org/schema/hadoop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd  
     http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd 
     http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd 
     http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd 
     http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">
 
    <!-- 預設的hadoopConfiguration,預設ID為hadoopConfiguration,且對於hadoopFile等不需指定ref,自動注入hadoopConfiguration -->
    <hdp:configuration resources="classpath:hadoop/hbase-site.xml" />
    <hdp:hbase-configuration configuration-ref="hadoopConfiguration" />
 
    <!-- hadoop hdfs 操作類FileSystem,用來讀寫HDFS檔案 -->
    <hdp:file-system id="hadoopFile" configuration-ref="hadoopConfiguration" />
 
    <!-- 配置HbaseTemplate -->
    <bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
        <property name="configuration" ref="hbaseConfiguration" />
    </bean>
 
 
</beans>


配置比較簡單,下面是關鍵的HDFS操作類

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
 
/**
 * HDFS操作類
 * 
 * @author kokJuis
 * @version 1.0
 * @date 2016-12-12
 * @email [email protected]
 */
public class HDFSUtil {
 
    private HDFSUtil() {
    }
 
    // hadoop fs的配置檔案
    static Configuration conf = new Configuration(true);
    static {
        // 指定hadoop fs的地址
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000");
    }
 
    /**
     * 判斷路徑是否存在
     * 
     * @param conf
     * @param path
     * @return
     * @throws IOException
     */
    public static boolean exits(String path) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        return fs.exists(new Path(path));
    }
 
    /**
     * 建立檔案
     * 
     * @param conf
     * @param filePath
     * @param contents
     * @throws IOException
     */
    public static void createFile(String filePath, byte[] contents)
            throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path(filePath);
        FSDataOutputStream outputStream = fs.create(path);
        outputStream.write(contents);
        outputStream.close();
        fs.close();
    }
 
    /**
     * 建立檔案
     * 
     * @param conf
     * @param filePath
     * @param fileContent
     * @throws IOException
     */
    public static void createFile(String filePath, String fileContent)
            throws IOException {
        createFile(filePath, fileContent.getBytes());
    }
 
    /**
     * @param conf
     * @param localFilePath
     * @param remoteFilePath
     * @throws IOException
     */
    public static void copyFromLocalFile(String localFilePath,
            String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path localPath = new Path(localFilePath);
        Path remotePath = new Path(remoteFilePath);
        fs.copyFromLocalFile(false, true, localPath, remotePath);
        fs.close();
    }
 
    /**
     * 刪除目錄或檔案
     * 
     * @param conf
     * @param remoteFilePath
     * @param recursive
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(String remoteFilePath, boolean recursive)
            throws IOException {
        FileSystem fs = FileSystem.get(conf);
        boolean result = fs.delete(new Path(remoteFilePath), recursive);
        fs.close();
        return result;
    }
 
    /**
     * 刪除目錄或檔案(如果有子目錄,則級聯刪除)
     * 
     * @param conf
     * @param remoteFilePath
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(String remoteFilePath) throws IOException {
        return deleteFile(remoteFilePath, true);
    }
 
    /**
     * 檔案重新命名
     * 
     * @param conf
     * @param oldFileName
     * @param newFileName
     * @return
     * @throws IOException
     */
    public static boolean renameFile(String oldFileName, String newFileName)
            throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path oldPath = new Path(oldFileName);
        Path newPath = new Path(newFileName);
        boolean result = fs.rename(oldPath, newPath);
        fs.close();
        return result;
    }
 
    /**
     * 建立目錄
     * 
     * @param conf
     * @param dirName
     * @return
     * @throws IOException
     */
    public static boolean createDirectory(String dirName) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        Path dir = new Path(dirName);
        boolean result = false;
        if (!fs.exists(dir)) {
            result = fs.mkdirs(dir);
        }
        fs.close();
        return result;
    }
 
    /**
     * 列出指定路徑下的所有檔案(不包含目錄)
     * 
     * @param conf
     * @param basePath
     * @param recursive
     */
    public static RemoteIterator<LocatedFileStatus> listFiles(String basePath,
            boolean recursive) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator = fs
                .listFiles(new Path(basePath), recursive);
 
        return fileStatusRemoteIterator;
    }
 
    /**
     * 列出指定路徑下的檔案(非遞迴)
     * 
     * @param conf
     * @param basePath
     * @return
     * @throws IOException
     */
    public static RemoteIterator<LocatedFileStatus> listFiles(String basePath)
            throws IOException {
        FileSystem fs = FileSystem.get(conf);
        RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(
                new Path(basePath), false);
        fs.close();
        return remoteIterator;
    }
 
    /**
     * 列出指定目錄下的檔案\子目錄資訊(非遞迴)
     * 
     * @param conf
     * @param dirPath
     * @return
     * @throws IOException
     */
    public static FileStatus[] listStatus(String dirPath) throws IOException {
        FileSystem fs = FileSystem.get(conf);
        FileStatus[] fileStatuses = fs.listStatus(new Path(dirPath));
        fs.close();
        return fileStatuses;
    }
 
    /**
     * 讀取檔案內容
     * 
     * @param conf
     * @param filePath
     * @return
     * @throws IOException
     */
    public static byte[] readFile(String filePath) throws IOException {
        byte[] fileContent = null;
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path(filePath);
        if (fs.exists(path)) {
            InputStream inputStream = null;
            ByteArrayOutputStream outputStream = null;
            try {
                inputStream = fs.open(path);
                outputStream = new ByteArrayOutputStream(
                        inputStream.available());
                IOUtils.copyBytes(inputStream, outputStream, conf);
                fileContent = outputStream.toByteArray();
            } finally {
                IOUtils.closeStream(inputStream);
                IOUtils.closeStream(outputStream);
                fs.close();
            }
        }
        return fileContent;
    }
 
    /**
     * 下載 hdfs上的檔案
     * 
     * @param conf
     * @param uri
     * @param remote
     * @param local
     * @throws IOException
     */
    public static void download(String remote, String local) throws IOException {
        Path path = new Path(remote);
        FileSystem fs = FileSystem.get(conf);
        fs.copyToLocalFile(path, new Path(local));
        System.out.println("download: from" + remote + " to " + local);
        fs.close();
    }

這裡有一點需要注意的,就是在你使用HDFS的時候,要顯示指定hadoop fs的地址,如果不指定fs的地址,雖然也能上傳,但實際並沒有使用到HDFS。

下面是一個下載檔案的示例


--------------------- 
作者:西門吹水_ 
來源:CSDN 
原文:https://blog.csdn.net/KokJuis/article/details/53586406 
版權宣告:本文為博主原創文章,轉載請附上博文連結!