1. 程式人生 > >在spring boot下如何通過rest 介面 來上傳檔案 和下載檔案 到 hadoop hdfs

在spring boot下如何通過rest 介面 來上傳檔案 和下載檔案 到 hadoop hdfs

本文將用程式碼來演示在spring boot裡面,用hadoop client,通過restful API來上傳檔案 和下載檔案 到 hadoop hdfs。

裡面有一些程式碼依賴坑,注意繞行。

前提:

如果你的程式碼在windows上執行,去連線linux上的hadoop(2.7.x或者以上),則需要做一下設定。

1:下載下面的windows hadoop

在整個目錄下載下來,放到一個目錄下,比如 D:\thb\tools\hadoop-2.8.1-bin-master,然後設定環境變數

HADOOP_HOME=D:\thb\tools\hadoop-2.8.1-bin-master

PATH=%PATH%;%HADOOP_HOME%

如何設定請自行百度一下如何在windows裡面設定系統環境變數。

2:pom.xml (注意hadoop-client,commons-io,commons-fileupload版本,否則編譯或者執行會報錯)

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.restdocs</groupId>
            <artifactId>spring-restdocs-mockmvc</artifactId>
            <scope>test</scope>
        </dependency><dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier>
        </dependency>
        <dependency>
            <groupId>commons-fileupload</groupId>
            <artifactId>commons-fileupload</artifactId>
            <version>1.3.3</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
    </dependencies>

程式碼如下

FileController.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.multipart.MultipartHttpServletRequest;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.*;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.web.multipart.commons.*;
import org.apache.commons.fileupload.disk.DiskFileItem;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.multipart.MultipartHttpServletRequest;
import org.springframework.web.multipart.commons.CommonsMultipartFile;
import org.springframework.web.multipart.commons.CommonsMultipartResolver;
@RestController
@RequestMapping("/FileMgn")
public class FileController {
    private static final Logger log = LoggerFactory.getLogger(FileController.class);

    @Value("${fileUpload.path}")
    private String FILE_PATH;

    @Value("${hdfs.URILocation}")
    private String HDFS_URI;

    @Value("${fileUpload.tmpDir}")
    private String TMPDIR;  @RequestMapping(value = "/HDFSFileUpload")
    public String HDFSupload(@RequestParam("file") MultipartFile file) {
        String fileName = null;
        try {
            if (file.isEmpty()) {
                log.error("file name is Null!");
               String retmsg = "{\"ret\":\"200\",\"msg\":\"file name is required\"}";
                return retmsg;
            }
            fileName = file.getOriginalFilename();
            String path = TMPDIR + fileName;
            File dest = new File(path);
            if (!dest.getParentFile().exists()) {
            dest.getParentFile().mkdirs();// if not, create parent directories recursively
            }
            //save to tmp directory firstly, then upload to HDFS.
            file.transferTo(dest);
        HdfsFileSystem.copyFileToHDFSByFileObj(dest, HDFS_URI + "/" + fileName);
            dest.delete();
            log.info(fileName + " uploaded to HDFS successfully");
            String retmsg = "{\"ret\":\"200\",\"msg\":\"" + fileName + " uploaded successfully" + "\"}";
            return retmsg;
        } catch (IllegalStateException e) {
            log.error(" exception happens: " + e.getMessage());
        } catch (IOException e) {
            log.error(" exception happens: " + e.getMessage());
        }
        log.error("uploading " + fileName + " to HDFS fails");
        String retmsg = "{\"ret\":\"200\",\"msg\":\"" + fileName + " upload fails" + "\"}";
        return retmsg;
    }
@PostMapping("/HDFSBatchUpload")
    public String handleHDFSFileUpload(HttpServletRequest request) {
        List<MultipartFile> files = ((MultipartHttpServletRequest) request).getFiles("file");
        MultipartFile file = null;
        BufferedOutputStream stream = null;
        for (int i = 0; i < files.size(); ++i) {
            file = files.get(i);
            if (!file.isEmpty()) {
                try {
                    String fileName = file.getOriginalFilename();
                    String path = TMPDIR + fileName;
                    File dest = new File(path);
                    if (!dest.getParentFile().exists()) {
                        dest.getParentFile().mkdirs();
                    }
file.transferTo(dest);
                    HdfsFileSystem.copyFileToHDFSByFileObj(dest, HDFS_URI + "/" + fileName);
                    dest.delete();
                    log.info("file " + fileName + " uploaded to HDFS successfully.");
                } catch (Exception e) {
                    stream = null;
                    log.error(file.getOriginalFilename() + " uploads fails. exception: " + e.getMessage());
                    String retmsg = "{\"ret\":\"200\",\"msg\":\"" + file.getOriginalFilename() + " uploaded to HDFS fail.\"}";
                    return retmsg;
                }
            } else {
                log.error(file.getOriginalFilename() + " content is empty.Skip it");
            }
        }
        String retmsg = "{\"ret\":\"200\",\"msg\":\"" + "files batch-uploading succeeds\"}";
        return retmsg;
    }

 @RequestMapping(value = "/HDFSFileDownload")
    public String downloadHDFSFile(HttpServletRequest request, HttpServletResponse response) {
        String fileName = null;
        String submitMehtod = request.getMethod();
        if (submitMehtod.equals("GET")) {
            fileName = request.getParameter("file");
        } else {  // POST method not supported yet
            log.error("POST method not supported");
            String retmsg = "{\"ret\":\"200\",\"msg\":\"" + "POST not supported yet\"}";
            return retmsg;
        }
        if (null == fileName) {
            log.error("file name is null!");
            String retmsg = "{\"ret\":\"200\",\"msg\":\"" + "file name is required!\"}";
            return retmsg;
        }
if (!HdfsFileSystem.exists(HDFS_URI, fileName)) {
            log.error(HDFS_URI + "/" + fileName + " not exist in HDFS ");
            String retmsg = "{\"ret\":\"200\",\"msg\":\"" + fileName + " not exist\"}";
            return retmsg;
        }
        response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate");
        response.setHeader("Pragma", "no-cache");
        response.setHeader("Expires", "0");
        response.setHeader("charset", "utf-8");
        response.setContentType("application/force-download");//force not to open downloaded file
        response.setHeader("Content-Transfer-Encoding", "binary");
        response.setHeader("Content-Disposition", "attachment;filename=\"" + fileName + "\"");
 OutputStream os = null;
        try {
            os = response.getOutputStream();
            HdfsFileSystem.downloadFromHDFS(HDFS_URI, fileName, os);
        } catch (IOException e) {
            String retmsg = "{\"ret\":\"200\",\"msg\":\"" + fileName + " download failed \"}";
            log.error(fileName + " downloading failed. exception:" + e.getMessage());
            return retmsg;
        }
        log.info(fileName + " download successfully");
        String retmsg = "{\"ret\":\"200\",\"msg\":\"" + fileName + " download fails\"}";
        return retmsg;
    }
}

HdfsFileSystem.java

import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HdfsFileSystem {
    private static final Logger log = LoggerFactory.getLogger(HdfsFileSystem.class);
    
    public static void copyFileToHDFSByName(Configuration conf , String hdfsUri, String localFileName, String remoteFileName) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsUri), conf);
        fs.copyFromLocalFile(new Path(localFileName), new Path(remoteFileName));
        System.out.println("copy from local file:" + localFileName + " to HDFS file:" + remoteFileName +" done.");
        log.info("copy from local file:" + localFileName + " to HDFS file:" + remoteFileName +" done.");
        fs.close();
    }
public static void copyFileToHDFSByFileObj(File localPath, String hdfsPath) throws IOException {
        InputStream in = null;
        if(null==localPath || null==hdfsPath || hdfsPath.isEmpty()){
            log.warn("copyFileToHDFSByFile: localpath and hdfspath are required");
            return;
        }
        try {
            Configuration conf = new Configuration();
            FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), conf);
            FSDataOutputStream out = fileSystem.create(new Path(hdfsPath));
            in = new BufferedInputStream(new FileInputStream(localPath));
            IOUtils.copyBytes(in, out, 4096, false);
            out.hsync();
            out.close();
            //in.close();
        }
        finally {
            IOUtils.closeStream(in);
        }
        return;
    }
 /*
    * Download hdfs file in URI to local file
    */
    public static void downloadFromHDFS(Configuration conf , String uri ,String remoteFileName, String localFileName) throws IOException {
        Path path = new Path(remoteFileName);
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        fs.copyToLocalFile(path, new Path(localFileName));
        fs.close();
        System.out.println("downloading file from " + remoteFileName + " to " + localFileName +" succeed");
        log.info("downloading file from " + remoteFileName + " to " + localFileName +" succeed");
        return;
    }
 /*
     * Download hdfs file in URI to local file
     */
    public static void downloadFromHDFS(String uri ,String HDFSFileName, OutputStream localFileOutPut) throws IOException {
        Configuration config=new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri),config);
        InputStream is = fs.open(new Path(uri+"/"+HDFSFileName));
        IOUtils.copyBytes(is,localFileOutPut,4096, true);//close in and out stream via this API itself.
        System.out.println("downloading HDFS file " + HDFSFileName + " succeed");
        log.info("downloading HDFS file " + HDFSFileName + " succeed");
        fs.close();
        return;
    }
 /*
   * check whether the HDFS file exists in given URI
   */
    public static boolean  exists(String HDFSUri,String HDFSFileName) {
        Configuration conf = new Configuration();
        boolean fileExists=false;
        try {
            FileSystem fileSystem = FileSystem.get(URI.create(HDFSUri), conf);
            fileExists=fileSystem.exists(new Path(HDFSUri+"/"+HDFSFileName));
        } catch (IOException e){
            log.error("hdfs:exist() exception occurs. exception:" + e.getMessage());
            return fileExists;
        }

        System.out.println("HDFS URI:" +HDFSUri+", fileName:"+HDFSFileName +" exists ? "+fileExists);
        log.info("HDFS URI:" +HDFSUri+",fileName:"+HDFSFileName +" exists ? "+fileExists);
        return fileExists;
    }
}

一些有用的參考文件