Hadoop檔案儲存系統-HDFS詳解以及java程式設計實現
前言
這是關於Hadoop的系列文章。
背景
我們在本系列的第一篇文章的時候就談到過,面對海量資料,我們最為缺乏的就是對大資料量的儲存能力以及處理能力。而這兩種能力在Hadoop的體現分別就是HDFS以及map-reduce。今天,我們就來看看Hadoop為我們帶來的這個分散式檔案系統。
猜想
實際上,我們在未能接觸HDFS之前可以猜測一些它會是什麼樣子的?現有的linux檔案系統也很好用,所以我們猜想,它也將會保留一部分的當前檔案系統的特性,比如說採取目錄的方式、linux的一些命令的保留、甚至於說一些常用的指令的完全相容,當然,我覺得一定也會有新的東西的加入。比如說對於分散式的支援,我們在第一篇文章其實有談到過HDFS在儲存的時候的幾個角色。NameNode負責文佳資訊的索引,而DataNode則負責處理資料的實際儲存。當然,這裡的模型只是簡化的處理模型,但其實我們如果只是使用HDFS的話,我們不care這樣的細節。
HDFS為我們帶來了什麼?
HDFS為我們的實際業務應用帶來的最強大的能力應該就是對於大資料量的儲存能力,而且這種能力的獲得並沒有讓我們的業務的複雜性提高,也就是說分散式的儲存對於我們而言是透明的,這是我們所希望看到的。正因為如此,它才顯得非常的簡單易用。因為我們可以把我們的注意力一致的集中在我們的儲存上,而不是該怎麼去協調節點間的資料,該怎麼去防止資料丟失。可以說,HDFS為我們帶來了簡單易用的分散式儲存。而且他和普通的檔案系統在別的方面並無二致。
程式設計實現
我們知道,任何程式同文件系統之間的操作都離不開IO,而這些互動的主要無非就集中在如下的幾點,那麼我們就可以來看到我們需要做的事情:
好了,那下面就讓我們愉快的開始寫程式碼吧?我們首先需要搞清楚的是我們需要依賴的jar,如下所示:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.1</version>
</dependency>
好了,看完了依賴,我們接下來就是要開始我們的程式碼編寫了。我們查詢到FileSystem這個類是最為最重要的一個類。我們點進去看看註釋:
/****************************************************************
* An abstract base class for a fairly generic filesystem. It
* may be implemented as a distributed filesystem, or as a "local"
* one that reflects the locally-connected disk. The local version
* exists for small Hadoop instances and for testing.
*
* <p>
*
* All user code that may potentially use the Hadoop Distributed
* File System should be written to use a FileSystem object. The
* Hadoop DFS is a multi-machine system that appears as a single
* disk. It's useful because of its fault tolerance and potentially
* very large capacity.
*
* <p>
* The local implementation is {@link LocalFileSystem} and distributed
* implementation is DistributedFileSystem.
*****************************************************************/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured implements Closeable {
這是一個抽象類,無法例項化。上面的註釋的大意是Hadoop的HDFS是一個分散式檔案系統,想要在這個檔案系統上用java做點事情,就必須使用這個類。然後我們繼續往下看。
/**
* Get a filesystem instance based on the uri, the passed
* configuration and the user
* @param uri of the filesystem
* @param conf the configuration to use
* @param user to perform the get as
* @return the filesystem instance
* @throws IOException
* @throws InterruptedException
*/
public static FileSystem get(final URI uri, final Configuration conf,
final String user) throws IOException, InterruptedException {
String ticketCachePath =
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
UserGroupInformation ugi =
UserGroupInformation.getBestUGI(ticketCachePath, user);
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws IOException {
return get(uri, conf);
}
});
}
/**
* Returns the configured filesystem implementation.
* @param conf the configuration to use
*/
public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
/** Get the default filesystem URI from a configuration.
* @param conf the configuration to use
* @return the uri of the default filesystem
*/
public static URI getDefaultUri(Configuration conf) {
return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
}
/** Set the default filesystem URI in a configuration.
* @param conf the configuration to alter
* @param uri the new default filesystem uri
*/
public static void setDefaultUri(Configuration conf, URI uri) {
conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
}
/** Set the default filesystem URI in a configuration.
* @param conf the configuration to alter
* @param uri the new default filesystem uri
*/
public static void setDefaultUri(Configuration conf, String uri) {
setDefaultUri(conf, URI.create(fixName(uri)));
}
我們可以看的很清楚的是其提供了多個靜態方法供我們使用,然後我們就可以採取其中的某一個方法得到一個這個抽象類的例項。當然,我們要選就選擇最簡單的。只需要一個conf的那個方法。於是,我們現在就可以開始寫程式了!最後完成的成果如下:
package com.weomob.hadoop.com.henry.hadoop.hdfs;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
/**
* hdfs util 介面
* 使用者:hadoop
* 日期:6/6/2017
* 時間:9:32 PM
*/
@Slf4j
public class HDFSUtil {
/**
* 新建檔案
*
* @param configuration
* @param filePath
* @param data
* @throws IOException
*/
public static void createFile(Configuration configuration, String filePath, byte[] data) throws IOException {
FileSystem fileSystem = FileSystem.get(configuration);
FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path(filePath));
fsDataOutputStream.write(data);
fsDataOutputStream.close();
fileSystem.close();
}
/**
* 建立資料夾
*/
public static void mkDirs(Configuration configuration, String filePath) throws IOException {
FileSystem fileSystem = FileSystem.get(configuration);
fileSystem.mkdirs(new Path(filePath));
fileSystem.close();
}
/**
* 刪除文佳 true 為遞迴刪除 否則為非遞迴
*/
public static void deleteFile(Configuration configuration, String filePath, boolean isReturn) throws IOException {
FileSystem fileSystem = FileSystem.get(configuration);
boolean delete = fileSystem.delete(new Path(filePath), isReturn);
if (!delete) {
throw new RuntimeException("刪除失敗");
}
log.error("wocao");
fileSystem.close();
}
/**
* 讀取檔案內容
*
* @author fulei.yang
*/
public static String readFile(Configuration conf, String filePath)
throws IOException {
String res = null;
FileSystem fs = null;
FSDataInputStream inputStream = null;
ByteArrayOutputStream outputStream = null;
try {
fs = FileSystem.get(conf);
inputStream = fs.open(new Path(filePath));
outputStream = new ByteArrayOutputStream(inputStream.available());
IOUtils.copyBytes(inputStream, outputStream, conf);
res = outputStream.toString();
} finally {
if (inputStream != null)
IOUtils.closeStream(inputStream);
if (outputStream != null)
IOUtils.closeStream(outputStream);
}
return res;
}
/**
* 從本地上傳檔案到HDFS
*
* @param configuration
* @throws IOException
*/
public static void uploadFile(Configuration configuration, String localFilePath, String remoteFilePath) throws IOException {
FileSystem fileSystem = FileSystem.get(configuration);
fileSystem.copyFromLocalFile(new Path(localFilePath), new Path(remoteFilePath));
fileSystem.close();
}
/**
* 判斷目錄是否存在
*/
public static boolean fileExists(Configuration configuration, String filePath) throws IOException {
FileSystem fileSystem = FileSystem.get(configuration);
return fileSystem.exists(new Path(filePath));
}
}
歐漏,這和我們上面說畫的圖幾乎別無二致,當然,還有更多的可用方法大家可以去探尋。這是非常有意思的一個過程。
總結
通過這篇文章我們主要是想傳達HDFS是一個分散式的檔案系統,但是他的分散式的特性被Hadoop的開發者巧妙的隱藏了。也就是對於我們而言幾乎是透明的,我們只需要拿到我們想要的東西,按照一個使用普通檔案系統的方式去使用它即可。
當然,我們還重點的寫了一個簡單的client,這樣的話,我們就可以寫入東西到叢集和從叢集讀取檔案了。