1. 程式人生 > >利用JAVA API遠程進行HDFS的相關操作

利用JAVA API遠程進行HDFS的相關操作

測試 call 文件重命名 spa getname created ces 一段時間 action

學習HDFS有一段時間了,現在把自己總結的HDFS的相關操作代碼展示給大家。

主要有HDFS的增刪改查,文件的追加,windows本地文件的上傳,hdfs文件的下載,文件重命名,創建目錄,文件是否存在等操作。

準備工作:我是用maven搭配的環境,下面用到了單元測試@Test,需要在pom.xml文件內添加junit的依賴

  1 package hdfs;
  2 
  3 import java.io.*;
  4 import java.security.PrivilegedExceptionAction;
  5 
  6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.fs.FSDataOutputStream; 8 import org.apache.hadoop.fs.FileStatus; 9 import org.apache.hadoop.fs.FileSystem; 10 import org.apache.hadoop.fs.Path; 11 import org.apache.hadoop.io.IOUtils; 12 import org.apache.hadoop.security.UserGroupInformation; 13 import org.junit.Test;
14 15 16 public class OperatingFiles { 17 // initialization 18 //讀取配置文件 19 static Configuration conf = new Configuration(); 20 static FileSystem hdfs; 21 22 static {
        //root是你主節點虛機的用戶名
23 UserGroupInformation ugi = UserGroupInformation 24 .createRemoteUser("root");
25 try { 26 ugi.doAs(new PrivilegedExceptionAction<Void>() { 27 public Void run() throws Exception { 28 Configuration conf = new Configuration();
               //"hdfs://lyz01:9000/"對應的是你自己的網址
29 conf.set("fs.default.name", "hdfs://lyz01:9000/"); 30 //conf.set("hadoop.job.ugi", "root"); 31 //以下兩行是支持 hdfs的追加 功能的:hdfs.append() 32 conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); 33 conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true"); 34 Path path = new Path("hdfs://lyz01:9000/"); 35 //如果在本地測試,需要使用此種方法獲取文件系統 36 hdfs = FileSystem.get(path.toUri(), conf); 37 //hdfs = path.getFileSystem(conf); // 這個也可以 38 //如果在Hadoop集群下運行,使用此種方法可以直接獲取默認文件系統 39 //hdfs = FileSystem.get(conf); //這個不行,這樣得到的hdfs所有操作都是針對本地文件系統,而不是針對hdfs的,原因不太清楚 40 return null; 41 } 42 }); 43 } catch (IOException e) { 44 // TODO Auto-generated catch block 45 e.printStackTrace(); 46 } catch (InterruptedException e) { 47 // TODO Auto-generated catch block 48 e.printStackTrace(); 49 } 50 } 51 52 // 創建hdfs目錄 53 @Test 54 public void createDir() throws IOException { 55 String dir = "/test2/"; 56 Path path = new Path(dir); 57 if (hdfs.exists(path)) { 58 System.out.println("dir \t" + conf.get("fs.default.name") + dir 59 + "\t already exists"); 60 return; 61 } 62 hdfs.mkdirs(path); 63 System.out.println("new dir \t" + conf.get("fs.default.name") + dir); 64 } 65 66 // 文件重命名 67 @Test 68 public void renameFile() throws IOException{ 69 String oldName = "/reduceJoin/2.txt"; 70 String newName = "/reduceJoin/tb_b.txt"; 71 Path oldPath = new Path(oldName); 72 Path newPath = new Path(newName); 73 if (hdfs.exists(oldPath)){ 74 hdfs.rename(oldPath,newPath); 75 System.out.println("rename成功!"); 76 }else{ 77 System.out.println("文件不存在!rename失敗!"); 78 } 79 } 80 81 // 讀取文件 82 @Test 83 public void readFile() throws IOException{ 84 String uri = "/output2017_11_12_12_57_04/part-r-00000"; 85 //判斷文件是否存在 86 if(!hdfs.exists(new Path(uri))){ 87 System.out.println("Error ; the file not exists."); 88 return; 89 } 90 InputStream in = null; 91 try { 92 in = hdfs.open(new Path(uri)); 93 //BufferedReader bf =new BufferedReader(new InputStreamReader(in,"GB2312"));//防止中文亂碼 94 //復制到標準輸出流 95 IOUtils.copyBytes(in, System.out, 4096,false); 96 /*String line = null; 97 while((line = bf.readLine()) != null){ 98 System.out.println(line); 99 }*/ 100 } catch (Exception e) { 101 e.printStackTrace(); 102 }finally{ 103 IOUtils.closeStream(in); 104 } 105 } 106 107 // 從本地往HDFS上傳文件 108 @Test 109 public void copyFile() throws IOException { 110 String localSrc = "D:/group_max.txt"; 111 String hdfsDst = "/group/"; 112 Path src = new Path(localSrc); 113 Path dst = new Path(hdfsDst); 114 //本地文件不存在 115 if (!(new File(localSrc)).exists()) { 116 System.out.println("Error: local dir \t" + localSrc 117 + "\t not exists."); 118 return; 119 } 120 //hdfs路徑不存在 121 if (!hdfs.exists(dst)) { 122 System.out.println("Error: dest dir \t" + dst.toUri() 123 + "\t not exists."); 124 return; 125 } 126 String dstPath = dst.toUri() + "/" + src.getName(); 127 //System.out.println(dstPath);// "/test1/3931.jpg" 128 //判斷上傳的文件 hdfs的目錄下是否存在 129 if (hdfs.exists(new Path(dstPath))) { 130 System.out.println("Warn: dest file \t" + dstPath 131 + "\t already exists."); 132 }else{ 133 //本地文件上傳hdfs 134 hdfs.copyFromLocalFile(src, dst); 135 // list all the files in the current direction 136 //遍歷文件 137 FileStatus files[] = hdfs.listStatus(dst); 138 System.out.println("Upload to \t" + conf.get("fs.default.name") 139 + hdfsDst); 140 for (FileStatus file : files) { 141 System.out.println(file.getPath()); 142 } 143 } 144 } 145 146 // 從HDFS 下載文件 到本地 147 @Test 148 public void downloadFile() throws IllegalArgumentException,IOException{ 149 String hdfsDst = "/test2/2_1"; 150 String localSrc = "D:/hadfs"; 151 Path dst = new Path(hdfsDst); 152 Path src = new Path(localSrc); 153 //本地的路徑 + hdfs下載的文件名 154 String localFile = localSrc + "/" + dst.getName(); 155 //如果HDFS路徑不存在 156 if(!hdfs.exists(dst.getParent())){ 157 System.out.println("Error : the HDFS directory:\t" + dst.getParent() + "\tdoes not exist. Please check it!"); 158 return; 159 } 160 //如果本地目錄不存在,則創建 161 if(!new File(localSrc).exists()){ 162 new File(localSrc).mkdirs(); 163 System.out.println("Warn : The local directory does not exist. It has been automatically created for you!"); 164 } 165 // 如果本地文件存在 166 if(new File(localFile).exists()){ 167 System.out.println("Error : the localSrc: \t" + localFile + "\t already exists."); 168 return; 169 } 170 //如果HDFS文件不存在 171 if(!hdfs.exists(new Path(hdfsDst))){ 172 System.out.println("Error : the HDFS file: \t" + hdfsDst + "\t not exists."); 173 }else{ 174 //HDFS下載文件到本地 175 hdfs.copyToLocalFile(false,dst,src,true); 176 System.out.println("successful :download successful! please look at: \t" + localSrc); 177 } 178 } 179 180 181 // create a new file 182 @Test 183 public void createFile() 184 throws IOException { 185 String fileName = "/test3/b.txt"; 186 String fileContent = ""; 187 Path dst = new Path(fileName); 188 //判斷 新建的文件在hdfs上是否存在 189 if(hdfs.exists(dst)){ 190 System.out.println("Error : the hdfs file exists."); 191 }else { 192 byte[] bytes = fileContent.getBytes(); 193 FSDataOutputStream output = hdfs.create(dst); 194 output.write(bytes); 195 System.out.println("new file \t" + conf.get("fs.default.name") 196 + fileName); 197 } 198 } 199 200 // 追加內容到文件 201 @Test 202 public void appendFile() 203 throws IOException { 204 String fileName = "/test2/file2.txt"; 205 String fileContent = "你好 世界"; 206 Path dst = new Path(fileName); 207 byte[] bytes = fileContent.getBytes(); 208 //如果文件不存在 209 if (!hdfs.exists(dst)) { 210 System.out.println("Error : the file not exists"); 211 return; 212 } 213 FSDataOutputStream output = hdfs.append(dst); 214 output.write(bytes); 215 System.out.println("successful: append to file \t" + conf.get("fs.default.name") 216 + fileName); 217 } 218 219 220 // 列出所有文件 221 @Test 222 public void listFiles() throws IOException { 223 String dirName = "/test1"; 224 Path f = new Path(dirName); 225 FileStatus[] status = hdfs.listStatus(f); 226 System.out.println(dirName + " has all files:"); 227 if (status.length == 0) { 228 System.out.println("nothing !"); 229 } else { 230 for (int i = 0; i < status.length; i++) { 231 System.out.println(status[i].getPath().toString()); 232 } 233 } 234 } 235 236 // 判斷文件是否存在,存在即刪除 237 @Test 238 public void deleteFile() throws IOException { 239 String fileName = "/test2"; 240 Path f = new Path(fileName); 241 boolean isExists = hdfs.exists(f); 242 if (isExists) { // if exists, delete 243 boolean isDel = hdfs.delete(f, true); 244 System.out.println(fileName + " delete? \t" + isDel); 245 } else { 246 System.out.println(fileName + " exist? \t" + notExists); 247 } 248 } 249 }

利用JAVA API遠程進行HDFS的相關操作