在spring boot下如何通過rest 介面 來上傳檔案 和下載檔案 到 hadoop hdfs
阿新 • • 發佈:2019-01-10
本文將用程式碼來演示在spring boot裡面,用hadoop client,通過restful API來上傳檔案 和下載檔案 到 hadoop hdfs。
裡面有一些程式碼依賴坑,注意繞行。
前提:
如果你的程式碼在windows上執行,去連線linux上的hadoop(2.7.x或者以上),則需要做一下設定。
1:下載下面的windows hadoop
在整個目錄下載下來,放到一個目錄下,比如 D:\thb\tools\hadoop-2.8.1-bin-master,然後設定環境變數
HADOOP_HOME=D:\thb\tools\hadoop-2.8.1-bin-master
PATH=%PATH%;%HADOOP_HOME%
如何設定請自行百度一下如何在windows裡面設定系統環境變數。
2:pom.xml (注意hadoop-client,commons-io,commons-fileupload版本,否則編譯或者執行會報錯)
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.restdocs</groupId> <artifactId>spring-restdocs-mockmvc</artifactId> <scope>test</scope> </dependency><dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency> <dependency> <groupId>commons-fileupload</groupId> <artifactId>commons-fileupload</artifactId> <version>1.3.3</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> </dependencies>
程式碼如下
FileController.java
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartHttpServletRequest; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.*; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.*; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.web.multipart.commons.*; import org.apache.commons.fileupload.disk.DiskFileItem; import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartHttpServletRequest; import org.springframework.web.multipart.commons.CommonsMultipartFile; import org.springframework.web.multipart.commons.CommonsMultipartResolver; @RestController @RequestMapping("/FileMgn") public class FileController { private static final Logger log = LoggerFactory.getLogger(FileController.class); @Value("${fileUpload.path}") private String FILE_PATH; @Value("${hdfs.URILocation}") private String HDFS_URI; @Value("${fileUpload.tmpDir}") private String TMPDIR; @RequestMapping(value = "/HDFSFileUpload") public String HDFSupload(@RequestParam("file") MultipartFile file) { String fileName = null; try { if (file.isEmpty()) { log.error("file name is Null!"); String retmsg = "{\"ret\":\"200\",\"msg\":\"file name is required\"}"; return retmsg; } fileName = file.getOriginalFilename(); String path = TMPDIR + fileName; File dest = new File(path); if (!dest.getParentFile().exists()) { dest.getParentFile().mkdirs();// if not, create parent directories recursively } //save to tmp directory firstly, then upload to HDFS. file.transferTo(dest); HdfsFileSystem.copyFileToHDFSByFileObj(dest, HDFS_URI + "/" + fileName); dest.delete(); log.info(fileName + " uploaded to HDFS successfully"); String retmsg = "{\"ret\":\"200\",\"msg\":\"" + fileName + " uploaded successfully" + "\"}"; return retmsg; } catch (IllegalStateException e) { log.error(" exception happens: " + e.getMessage()); } catch (IOException e) { log.error(" exception happens: " + e.getMessage()); } log.error("uploading " + fileName + " to HDFS fails"); String retmsg = "{\"ret\":\"200\",\"msg\":\"" + fileName + " upload fails" + "\"}"; return retmsg; } @PostMapping("/HDFSBatchUpload") public String handleHDFSFileUpload(HttpServletRequest request) { List<MultipartFile> files = ((MultipartHttpServletRequest) request).getFiles("file"); MultipartFile file = null; BufferedOutputStream stream = null; for (int i = 0; i < files.size(); ++i) { file = files.get(i); if (!file.isEmpty()) { try { String fileName = file.getOriginalFilename(); String path = TMPDIR + fileName; File dest = new File(path); if (!dest.getParentFile().exists()) { dest.getParentFile().mkdirs(); } file.transferTo(dest); HdfsFileSystem.copyFileToHDFSByFileObj(dest, HDFS_URI + "/" + fileName); dest.delete(); log.info("file " + fileName + " uploaded to HDFS successfully."); } catch (Exception e) { stream = null; log.error(file.getOriginalFilename() + " uploads fails. exception: " + e.getMessage()); String retmsg = "{\"ret\":\"200\",\"msg\":\"" + file.getOriginalFilename() + " uploaded to HDFS fail.\"}"; return retmsg; } } else { log.error(file.getOriginalFilename() + " content is empty.Skip it"); } } String retmsg = "{\"ret\":\"200\",\"msg\":\"" + "files batch-uploading succeeds\"}"; return retmsg; } @RequestMapping(value = "/HDFSFileDownload") public String downloadHDFSFile(HttpServletRequest request, HttpServletResponse response) { String fileName = null; String submitMehtod = request.getMethod(); if (submitMehtod.equals("GET")) { fileName = request.getParameter("file"); } else { // POST method not supported yet log.error("POST method not supported"); String retmsg = "{\"ret\":\"200\",\"msg\":\"" + "POST not supported yet\"}"; return retmsg; } if (null == fileName) { log.error("file name is null!"); String retmsg = "{\"ret\":\"200\",\"msg\":\"" + "file name is required!\"}"; return retmsg; } if (!HdfsFileSystem.exists(HDFS_URI, fileName)) { log.error(HDFS_URI + "/" + fileName + " not exist in HDFS "); String retmsg = "{\"ret\":\"200\",\"msg\":\"" + fileName + " not exist\"}"; return retmsg; } response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); response.setHeader("Pragma", "no-cache"); response.setHeader("Expires", "0"); response.setHeader("charset", "utf-8"); response.setContentType("application/force-download");//force not to open downloaded file response.setHeader("Content-Transfer-Encoding", "binary"); response.setHeader("Content-Disposition", "attachment;filename=\"" + fileName + "\""); OutputStream os = null; try { os = response.getOutputStream(); HdfsFileSystem.downloadFromHDFS(HDFS_URI, fileName, os); } catch (IOException e) { String retmsg = "{\"ret\":\"200\",\"msg\":\"" + fileName + " download failed \"}"; log.error(fileName + " downloading failed. exception:" + e.getMessage()); return retmsg; } log.info(fileName + " download successfully"); String retmsg = "{\"ret\":\"200\",\"msg\":\"" + fileName + " download fails\"}"; return retmsg; } }
HdfsFileSystem.java
import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HdfsFileSystem {
private static final Logger log = LoggerFactory.getLogger(HdfsFileSystem.class);
public static void copyFileToHDFSByName(Configuration conf , String hdfsUri, String localFileName, String remoteFileName) throws IOException {
FileSystem fs = FileSystem.get(URI.create(hdfsUri), conf);
fs.copyFromLocalFile(new Path(localFileName), new Path(remoteFileName));
System.out.println("copy from local file:" + localFileName + " to HDFS file:" + remoteFileName +" done.");
log.info("copy from local file:" + localFileName + " to HDFS file:" + remoteFileName +" done.");
fs.close();
}
public static void copyFileToHDFSByFileObj(File localPath, String hdfsPath) throws IOException {
InputStream in = null;
if(null==localPath || null==hdfsPath || hdfsPath.isEmpty()){
log.warn("copyFileToHDFSByFile: localpath and hdfspath are required");
return;
}
try {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), conf);
FSDataOutputStream out = fileSystem.create(new Path(hdfsPath));
in = new BufferedInputStream(new FileInputStream(localPath));
IOUtils.copyBytes(in, out, 4096, false);
out.hsync();
out.close();
//in.close();
}
finally {
IOUtils.closeStream(in);
}
return;
}
/*
* Download hdfs file in URI to local file
*/
public static void downloadFromHDFS(Configuration conf , String uri ,String remoteFileName, String localFileName) throws IOException {
Path path = new Path(remoteFileName);
FileSystem fs = FileSystem.get(URI.create(uri), conf);
fs.copyToLocalFile(path, new Path(localFileName));
fs.close();
System.out.println("downloading file from " + remoteFileName + " to " + localFileName +" succeed");
log.info("downloading file from " + remoteFileName + " to " + localFileName +" succeed");
return;
}
/*
* Download hdfs file in URI to local file
*/
public static void downloadFromHDFS(String uri ,String HDFSFileName, OutputStream localFileOutPut) throws IOException {
Configuration config=new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),config);
InputStream is = fs.open(new Path(uri+"/"+HDFSFileName));
IOUtils.copyBytes(is,localFileOutPut,4096, true);//close in and out stream via this API itself.
System.out.println("downloading HDFS file " + HDFSFileName + " succeed");
log.info("downloading HDFS file " + HDFSFileName + " succeed");
fs.close();
return;
}
/*
* check whether the HDFS file exists in given URI
*/
public static boolean exists(String HDFSUri,String HDFSFileName) {
Configuration conf = new Configuration();
boolean fileExists=false;
try {
FileSystem fileSystem = FileSystem.get(URI.create(HDFSUri), conf);
fileExists=fileSystem.exists(new Path(HDFSUri+"/"+HDFSFileName));
} catch (IOException e){
log.error("hdfs:exist() exception occurs. exception:" + e.getMessage());
return fileExists;
}
System.out.println("HDFS URI:" +HDFSUri+", fileName:"+HDFSFileName +" exists ? "+fileExists);
log.info("HDFS URI:" +HDFSUri+",fileName:"+HDFSFileName +" exists ? "+fileExists);
return fileExists;
}
}
一些有用的參考文件