java hadoop hdfs 上寫檔案
阿新 • • 發佈:2019-01-24
專案中會用到往hdfs 上寫檔案 ,為下面kafka 往hdfs 上寫檔案做基礎。
例項如下:
1、配置檔案:com/xiefg/config/system.properties
#以下是安裝 hadoop 配置檔案的路徑
core.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/core-site.xml hdfs.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/hdfs-site.xml yarn.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/yarn-site.xml mapred.path=/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/lib/hadoop/etc/hadoop/mapred-site.xml
2、讀取配置檔案的工具類:
3、hdfs 檔案工具類package com.xiefg.util; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.PropertyResourceBundle; import java.util.ResourceBundle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; /*** * * @ClassName: PropertiesUtils * @Description: TODO * @author Comsys-xiefg * @date 2017年2月5日 下午2:41:52 * */ public class PropertiesUtils { private static ResourceBundle resources=null; public static String HDFS_PATH = null; public static String YARN_PATH = null; public static String CORE_PATH = null; public static String MAPRED_PATH = null; static{ InputStream in; try { /*String config_path = System.getProperty("user.dir") + "/config/system.properties"; in = new BufferedInputStream(new FileInputStream(config_path));*/ in=Thread.currentThread().getContextClassLoader().getResourceAsStream( "com/xiefg/config/system.properties"); resources = new PropertyResourceBundle(in); //初始化hadoop配置 initHadoopConfig(); } catch (Exception e) { e.printStackTrace(); } } /** * * @Title: initHadoopConfig * @Description: 初始化hadoop 配置 * @return void 返回型別 * @throws */ public static void initHadoopConfig(){ HDFS_PATH = resources.getString("hdfs.path"); YARN_PATH = resources.getString("yarn.path"); CORE_PATH = resources.getString("core.path"); MAPRED_PATH = resources.getString("mapred.path"); } public static Configuration getHDFSConf() { Configuration conf = new Configuration(); conf.addResource(new Path(HDFS_PATH)); conf.addResource(new Path(CORE_PATH)); conf.addResource(new Path(MAPRED_PATH)); conf.addResource(new Path(YARN_PATH)); return conf; } /** * 獲取指定屬性值 * @param property 屬性名 * @return */ public static String getPropertiesValue(String property) { String val = ""; try { val = resources.getString(property); } catch (Exception e) { // ignore e.printStackTrace(); } return val; } public static void main(String[] args) { Configuration conf = PropertiesUtils.getHDFSConf(); try { HdfsFileUtil.appendFile(conf, "/test/kafka", "kafka test to hdfs xiefg".getBytes()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } //System.out.println(PropertiesUtils.getPropertiesValue(KafkaProperties.ZK)); } }
/** * */ package com.xiefg.util; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; /** * * @ClassName: HdfsFileUtil * @Description: TODO * @author Comsys-xiefg * @date 2017年2月6日 上午10:23:13 * */ public class HdfsFileUtil { /*** * * @Title: deleteHfile * @Description: 刪除hdfs指定目錄的檔案 * @param conf * @param ioPath * @throws IOException 設定檔案 * @return void 返回型別 * @throws */ public static void deleteHfile(Configuration conf, String ioPath) throws IOException { FileSystem fileSystem=null; try{ fileSystem = FileSystem.get(conf); fileSystem.delete(new Path(ioPath), true); }catch(Exception e){ e.printStackTrace(); }finally{ try {fileSystem.close();} catch (IOException e) {} } } /** * * @Title: createFile * @Description:寫檔案 * @param conf * @param file * @param content * @throws IOException 設定檔案 * @return void 返回型別 * @throws */ public static void createFile(Configuration conf,String file, String content) throws IOException { FileSystem fs = FileSystem.get(conf); byte[] buff = content.getBytes(); FSDataOutputStream os = null; try { os = fs.create(new Path(file)); os.write(buff, 0, buff.length); os.flush(); System.out.println("Create: " + file); } finally { if (os != null) os.close(); } fs.close(); } /** * * @Title: appendFile * @Description: 建立檔案並追加內容 * @param conf * @param file * @param buff * @throws IOException 設定檔案 * @return void 返回型別 * @throws */ public static void appendFile(Configuration conf,String file,byte[] buff) throws IOException { FileSystem fs = FileSystem.get(conf); Path path=new Path(file); if (!fs.exists(path)) { createFile(conf,file,new String(buff,"UTF-8")); }else{ FSDataOutputStream os = null; try { os = fs.append(path); os.write(buff, 0, buff.length); os.flush(); //System.out.println("Create: " + file); } finally { if (os != null) os.close(); fs.close(); } } } /** * * @Title: isExist * @Description: 判斷檔案是否存在 * @param path * @param conf * @return * @throws IOException 設定檔案 * @return boolean 返回型別 * @throws */ public static boolean isExist(String path,Configuration conf) throws IOException { FileSystem fs=null; Boolean isexists = null; try { Path p = new Path(path); fs = p.getFileSystem(conf); isexists=fs.exists(p); }catch (Exception e) { e.printStackTrace(); }finally{ fs.close(); } return isexists; } public static void main(String[] args) throws Exception { } }
通過打jar 包 放在hadoop平臺上 執行 命令:
hadoop jar /usr/etl.jar com.xiefg.util.PropertiesUtils
執行結果可以看到 hdfs 生成檔案目錄 和內容