1. 程式人生 > >Hadoop原始碼分析:FileSystem類

Hadoop原始碼分析:FileSystem類

1、org.apache.hadoop.conf包

org.apache.hadoop.conf包位於hadoop-common模組下
這裡寫圖片描述

1.1 Configurable 介面

package org.apache.hadoop.conf;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/** Something that may be configured with a {@link Configuration}. */
@InterfaceAudience.Public @InterfaceStability.Stable public interface Configurable { /** Set the configuration to be used by this object. */ void setConf(Configuration conf); /** Return the configuration used by this object. */ Configuration getConf(); }

1.2 Configured類

package org.apache.hadoop.conf;
import
org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; /** Base class for things that may be configured with a {@link Configuration}. */ @InterfaceAudience.Public @InterfaceStability.Stable public class Configured implements Configurable { private
Configuration conf; /** Construct a Configured. */ public Configured() { this(null); } /** Construct a Configured. */ public Configured(Configuration conf) { setConf(conf); } // inherit javadoc @Override public void setConf(Configuration conf) { this.conf = conf; } // inherit javadoc @Override public Configuration getConf() { return conf; } }

1.3 Configuration類

package org.apache.hadoop.conf;

import ...

@InterfaceAudience.Public
@InterfaceStability.Stable
public class Configuration implements Iterable<Map.Entry<String,String>>,
                                      Writable {
  ...

  private static class Resource {
    private final Object resource;
    private final String name;

    public Resource(Object resource) {
      this(resource, resource.toString());
    }

    public Resource(Object resource, String name) {
      this.resource = resource;
      this.name = name;
    }

    public String getName(){
      return name;
    }

    public Object getResource() {
      return resource;
    }

    @Override
    public String toString() {
      return name;
    }
  }

  /** 
   * Set the <code>value</code> of the <code>name</code> property. If 
   * <code>name</code> is deprecated or there is a deprecated name associated to it,
   * it sets the value to both names. Name will be trimmed before put into
   * configuration.
   * 
   * @param name property name.
   * @param value property value.
   */
  public void set(String name, String value) {
    set(name, value, null);
  }

  /**
   * Add a configuration resource. 
   * 
   * The properties of this resource will override properties of previously 
   * added resources, unless they were marked <a href="#Final">final</a>. 
   * 
   * @param name resource to be added, the classpath is examined for a file 
   *             with that name.
   */
  public void addResource(String name) {
    addResourceObject(new Resource(name));
  }

2、org.apache.hadoop.fs包

org.apache.hadoop.fs包位於hadoop-common模組下

2.1 FileSystem

Hadoop有1個抽象的檔案系統概念,HDFS只是其中一個實現。該抽象檔案系統由抽象類org.apache.hadoop.fs.FileSystem 定義,該類繼承了org.apache.hadoop.conf.Configured類,並實現了java.io.Closeable介面。 該抽象類類提供了豐富的方法用於對檔案系統進行操作,比如建立目錄、刪除檔案、重新命名等。

package org.apache.hadoop.fs;

import ....

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured implements Closeable {

  //"fs.defaultFS"
  public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
  //"file:///";
  public static final String DEFAULT_FS = CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;


  /**
   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
   */
  public boolean mkdirs(Path f) throws IOException {
    return mkdirs(f, FsPermission.getDirDefault());
  }

  /** create a directory with the provided permission
   * The permission of the directory is set to be the provided permission as in
   * setPermission, not permission&~umask
   * 
   * @see #create(FileSystem, Path, FsPermission)
   * 
   * @param fs file system handle
   * @param dir the name of the directory to be created
   * @param permission the permission of the directory
   * @return true if the directory creation succeeds; false otherwise
   * @throws IOException
   */
  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
  throws IOException {
    // create the directory using the default permission
    boolean result = fs.mkdirs(dir);
    // set its permission to be the supplied one
    fs.setPermission(dir, permission);
    return result;
  }

}

2.2 FileSystem的子類DistributedFileSystem

FileSystem抽象類的一個針對於分散式檔案系統的實現子類,該類實現了DFS系統,通過該類使用者程式碼與HDFS互動。

package org.apache.hadoop.hdfs;
import ...
/****************************************************************
 * Implementation of the abstract FileSystem for the DFS system.
 * This object is the way end-user code interacts with a Hadoop
 * DistributedFileSystem.
 *
 *****************************************************************/
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
@InterfaceStability.Unstable
public class DistributedFileSystem extends FileSystem {
  private Path workingDir;
  private URI uri;
  //  "/user"
  private String homeDirPrefix = DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT;

  ...
}

2.3 FileSystem物件建立過程

Hadoop支援多鍾檔案系統,那麼Hadoop是如何通過FileSystem類引用實際的DistributedFileSystem檔案系統的呢,下面我們將通過原始碼逐步分析這個建立過程。

(1)建立FileSystem的小程式

public static void main(String[] args) throws Exception{
        //本地檔案路徑
        String local="D:\\word2.txt";
        String dest="hdfs://192.168.80.131:9000/user/root/input/word2.txt";
        Configuration cfg=new Configuration();
        FileSystem fs=  FileSystem.get(URI.create(dest),cfg,"root");
        fs.copyFromLocalFile(new Path(local), new Path(dest));
        fs.close();
    } 

(2)從該程式中的get()方法入手
下面進入FileSystem類的get(final URI uri, final Configuration conf, final String user)方法,發現呼叫get(URI uri, Configuration conf)方法

  /**
   * Get a filesystem instance based on the uri, the passed
   * configuration and the user
   * @param uri of the filesystem
   * @param conf the configuration to use
   * @param user to perform the get as
   * @return the filesystem instance
   * @throws IOException
   * @throws InterruptedException
   */
  public static FileSystem get(final URI uri, final Configuration conf,
        final String user) throws IOException, InterruptedException {
    String ticketCachePath =
      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
    UserGroupInformation ugi =
        UserGroupInformation.getBestUGI(ticketCachePath, user);
    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
      @Override
      public FileSystem run() throws IOException {
        return get(uri, conf);
      }
    });
  }

(3)進入get(URI uri, Configuration conf)方法
從下面的程式碼可以得知,get方法不是每次都建立FileSystem物件,會從快取中獲取FileSystem物件。

  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
   * of the URI determines a configuration property name,
   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
   * The entire URI is passed to the FileSystem instance's initialize method.
   */
  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();
    //根據fs.defaultFS的值獲取檔案系統,若未設定該引數則根據file:///返回檔案件系統
    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }
    //根據fs.defaultFS的值獲取URI,若未設定則使用file:///建立URI
    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }
    //disableCacheName是conf中關於禁用快取的配置,若該項配置false,則表示使用快取,進入createFileSystem()
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }
    //本配置檔案conf中並未配置關於快取的資訊,所以進入CACHE.get()方法
    return CACHE.get(uri, conf);
  }

(4)進入CACHE.get(URI uri, Configuration conf)方法
發現CACHE為FileSystem的一個內部類。在該get()方法中,uri和conf被放在了一個key中,key中儲存著使用者身份資訊和訪問的系統資訊。

  /** Caching FileSystem objects */
  static class Cache {
    private final ClientFinalizer clientFinalizer = new ClientFinalizer();

    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
    private final Set<Key> toAutoClose = new HashSet<Key>();

    /** A variable that makes all objects in the cache unique */
    private static AtomicLong unique = new AtomicLong(1);

    FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      return getInternal(uri, conf, key);
    }
    ...
    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
      FileSystem fs;
      //此處相當於快取機制,當用戶第一次進入該方法,map空;該使用者再次進入該方法,並訪問同一個uri
      //則檔案系統直接從map中獲取,免去再次初始化的過程
      synchronized (this) {
        fs = map.get(key);
      }
      if (fs != null) {
        return fs;
      }
      //建立檔案系統的核心程式碼
      fs = createFileSystem(uri, conf);
      synchronized (this) { // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }

        // now insert the new file system into the map
        if (map.isEmpty()
                && !ShutdownHookManager.get().isShutdownInProgress()) {
          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
        }
        fs.key = key;
        //使用者第一次進來,map空,在此處為map賦值,上個方法中的key與相應的檔案系統作為鍵值對存入map
        map.put(key, fs);
        if (conf.getBoolean("fs.automatic.close", true)) {
          toAutoClose.add(key);
        }
        return fs;
      }
    }
    ...
  }

(5)進入getInternal(URI uri, Configuration conf, Key key)方法
該方法內部類Cache中。

(6)進入createFileSystem(URI uri, Configuration conf)方法
該負責建立具體的檔案系統例項

  private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    //根據conf中配置的fs.defaultFS的值來獲取相應的檔案系統物件的class檔案,即DFS的位元組碼檔案
    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
    //通過反射機制,利用上面的class檔案,建立相應的檔案系統物件
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    //上面的fs物件僅是一個空系統,需要呼叫initialize()進行初始化
    fs.initialize(uri, conf);
    return fs;
  }

(7)進入initialize(URI uri, Configuration conf)方法
注意,由於此處是對DistributedFileSystem進行初始化,所以一定要檢視DistributedFileSystem類的initialize(URI uri, Configuration conf)方法

  @Override
  public void initialize(URI uri, Configuration conf) throws IOException {
    super.initialize(uri, conf);
    setConf(conf);
    //獲取NameNode主機名
    String host = uri.getHost();
    if (host == null) {
      throw new IOException("Incomplete HDFS URI, no host: "+ uri);
    }
    homeDirPrefix = conf.get(
        DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,    //"dfs.user.home.dir.prefix"
        DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);// "/user"
    //對dfs、url和workingDir進行初始化
    //最重要的是dfs,這是一個DFSClint,從名字可以看出是一個客戶端,負責與NameNode通訊,
    // 他的內部有一個RPC代理物件,負責遠端獲取NameNode上的資訊。這是一個複雜的物件。
    this.dfs = new DFSClient(uri, conf, statistics);
    this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
    this.workingDir = getHomeDirectory();
  }