1. 程式人生 > >Java 實現HDFS檔案上傳

Java 實現HDFS檔案上傳

HDFS上傳檔案大致分為三種:

1)直接呼叫API

2)使用webhdfs

3)httpfs

以下對前兩種進行簡要說明

1)直接呼叫API(直接上程式碼)

public void hdfsUpload(String srcPath) throws IOException,URISyntaxException {
    Configuration conf = new Configuration();
FileSystem fs = null;
conf.set("hadoop.security.authentication", "kerberos");
String userName = "hochoy"
;// kerberos 認證的username,可配置在配置檔案中 Connection2hbase.getHbaseConfig().get("kerberos.username"); String keytab = "/usr/lib/hochoy.keytab";// kerberos 認證的keytab,配置在配置檔案中,存放於具體目錄 Connection2hbase.getHbaseConfig().get("kerberos.keytab"); URI urlHdfs = new URI("hdfs://nameservice1:8020"); String url17monipdb = "/user/hochoy"
; UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(userName,keytab); //kerberos 認證 fs = FileSystem.get(urlHdfs,conf); if (fs.exists(new Path(url17monipdb + "/17monipdb.dat"))){ //rename 及linux中的cp ,檔案拷貝 fs.rename(new Path(url17monipdb + "/17monipdb.dat"),new
Path(url17monipdb + "/17monipdb.dat"+".bak"+new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()))); } //呼叫API上傳檔案 fs.copyFromLocalFile(false,true,new Path(srcPath),new Path(url17monipdb+"/17monipdb.dat")); fs.close(); }

2)使用webhdfs

linux語句

http://ip:14000/webhdfs/v1/user/hochoy/17monipdb.dat?user.name=hochoy&op=GETFILESTATUS



獲取檔案狀態:
curl -i "http://ip:14000/webhdfs/v1/user/hochoy/readme.txt?user.name=hochoy&op=GETFILESTATUS"

curl -i "http://192.168.1.32:50070/webhdfs/v1/user/hochoy/readme.txt?user.name=hochoy&op=GETFILESTATUS"


===========================


獲取檔案資訊: curl -i -L "http://ip:14000/webhdfs/v1/user/hochoy/readme.txt?user.name=hochoy&op=OPEN"

建立目錄: curl -i -X PUT  "http://ip:14000/webhdfs/v1/user/hochoy/hochoy?user.name=hochoy&op=MKDIRS"
====================================

建立檔案:
curl -i -X PUT "http://ip:50070/webhdfs/v1/user/hochoy/hochoy/work.sh?user.name=hochoy&op=CREATE"
curl -i -X PUT "http://192.168.1.32:50070/webhdfs/v1/user/hochoy/hochoy/work.sh?user.name=hochoy&op=CREATE"


傳送資料到指定檔案
curl -i -X PUT -T work.sh  "http://tw-master:50075/webhdfs/v1/user/hochoy/hochoy/work.sh?user.name=hochoy&op=CREATE&namenoderpcaddress=nameservice1&overwrite=true"
curl -i -X PUT -T work.sh  "http://tw-slave01:50075/webhdfs/v1/user/hochoy/hochoy/work.sh?user.name=hochoy&op=CREATE&namenoderpcaddress=nameservice1&overwrite=true"
 
 curl -i -X PUT "http://ip:50070/webhdfs/v1/user/hochoy/readme.txt?user.name=hochoy&op=CREATE"
curl -i -X PUT -T work.sh  http://tw-master:50075/webhdfs/v1/user/hochoy/readme.txt?op=CREATE&user.name=hochoy&namenoderpcaddress=nameservice1&overwrite=true
 
檢視檔案內容
curl -i -L "http://ip:14000/webhdfs/v1/user/hochoy/readme.txt?user.name=hochoy&op=OPEN"

程式碼

/**
 * Created by hochoy on 2018/5/7.
 */
import net.sf.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.*;
public class HDFSOperating {
    private final Logger logger = LoggerFactory.getLogger(getClass());
/**
     * @param webhdfs
* @param stream       the InputStream of file to upload
     * @param hdfsFilePath
* @param op
* @param parameters
* @param method
* @throws IOException
     */
public void uploadFile(String webhdfs, InputStream stream, String hdfsFilePath, String op, Map<String, String> parameters, String method) throws IOException {
        HttpURLConnection con;
        try {
            con = getConnection(webhdfs, hdfsFilePath, op, parameters, method);
            byte[] bytes = new byte[1024];
            int rc = 0;
            while ((rc = stream.read(bytes, 0, bytes.length)) > 0)
                con.getOutputStream().write(bytes, 0, rc);
con.getInputStream();
con.disconnect();
} catch (IOException e) {
            logger.info(e.getMessage());
e.printStackTrace();
}
        stream.close();
}


    /**
     * @param webhdfs
* @param hdfsFilePath
* @param op
* @param parameters
* @param method
* @throws IOException
     */
public Map<String, Object> getFileStatus(String [] webhdfs, String hdfsFilePath, String op, Map<String, String> parameters, String method) {
        Map<String, Object> fileStatus = new HashMap<String, Object>();
HttpURLConnection connection  =null;
        for (String url:webhdfs){
            try {
                HttpURLConnection conn = getConnection(url,hdfsFilePath,op,parameters,method);
                if (conn.getInputStream() != null){
                    connection = conn;
                    break;
}
            }catch (IOException e){
                logger.error("");
}
        }


        StringBuffer sb = new StringBuffer();
        try {
            InputStream is = connection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
String line = null;
            while ((line = reader.readLine()) != null) {
                sb.append(line);
}
            reader.close();
System.out.println(sb.toString());
JSONObject root = JSONObject.fromObject(sb.toString());
JSONObject status = root.getJSONObject("FileStatus");
Iterator keys = status.keys();
            while (keys.hasNext()) {
                String key = keys.next().toString();
String value = status.get(key).toString();
fileStatus.put(key, value);
}
//            is.close();
}catch (IOException e){
            logger.error(Constants.EXCEPTION_WAS_CAUGHT,e);
}catch (NullPointerException e){
            logger.error(Constants.EXCEPTION_WAS_CAUGHT,e);
}

        return fileStatus;
}

    /**
     * @param strurl     webhdfs like http://ip:port/webhdfs/v1 ,port usually 50070 or 14000
     * @param path       hdfs path + hdfs filename  eg:/user/razor/readme.txt
     * @param op         the operation for hdfsFile eg:GETFILESTATUS,OPEN,MKDIRS,CREATE etc.
     * @param parameters other parameter if you need
     * @param method     method eg: GET POST PUT etc.
     * @return
*/
public HttpURLConnection getConnection(String strurl, String path, String op, Map<String, String> parameters, String method) {
        URL url = null;
HttpURLConnection con = null;
StringBuffer sb = new StringBuffer();
        try {
            sb.append(strurl);
sb.append(path);
sb.append("?op=");
sb.append(op);
            if (parameters != null) {
                for (String key : parameters.keySet())
                    sb.append("&").append(key + "=" + parameters.get(key));
}
            url = new URL(sb.toString());
con = (HttpURLConnection) url.openConnection();
con.setRequestMethod(method);
con.setRequestProperty("accept", "*/*");
con.setRequestProperty("connection", "Keep-Alive");
String s = "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)";
String s1 = "ozilla/4.0 (compatible; MSIE 5.0; Windows NT; DigExt)";
con.setRequestProperty("User-Agent", s1);
//            con.setRequestProperty("Accept-Encoding", "gzip");
//            con.setDoInput(true);
con.setDoOutput(true);
con.setUseCaches(false);
} catch (IOException e) {
            logger.error(Constants.EXCEPTION_WAS_CAUGHT, e);
}
        return con;
}
}

3)httpfs後續更新