hadoop2.6.0原始碼剖析-客戶端(第二部分--讀(open)HDFS檔案)
在讀HDFS檔案前,需要先open該檔案,這個呼叫的是org.apache.hadoop.fs.FileSystem類物件,但是由於實際建立的物件是org.apache.hadoop.hdfs.DistributedFileSystem類物件,後者是前者的子類,所以呼叫父類中的FSDataInputStream open(Path f, int bufferSize)函式最終會呼叫到子類的該函式中,也就是說會呼叫org.apache.hadoop.hdfs.DistributedFileSystem類中的FSDataInputStream open(Path f, int bufferSize)函式,我們進入到該函式中,程式碼如下:
@Override public FSDataInputStream open(Path f, final int bufferSize) throws IOException { //新增讀操作次數 statistics.incrementReadOps(1); //獲取絕對路徑對應的Path類物件 Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataInputStream>() { @Override public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException { final DFSInputStream dfsis = dfs.open(getPathName(p), bufferSize, verifyChecksum); return dfs.createWrappedInputStream(dfsis); } @Override public FSDataInputStream next(final FileSystem fs, final Path p) throws IOException { return fs.open(p, bufferSize); } }.resolve(this, absF); }
我們看程式碼的第一行,首先分析一下statistics,這是一個FileSystem中的成員變數,定義程式碼如下:
protected Statistics statistics;
Statistics類是FileSystem類中的內部類,用來統計一些資料。
賦值是在public void initialize(URI name, Configuration conf) throws IOException函式中,這個函式是在建立org.apache.hadoop.hdfs.DistributedFileSystem類物件之後呼叫的,用來初始化,程式碼如下:
/** Called after a new FileSystem instance is constructed. * @param name a uri whose authority section names the host, port, etc. * for this FileSystem * @param conf the configuration */ public void initialize(URI name, Configuration conf) throws IOException { statistics = getStatistics(name.getScheme(), getClass()); resolveSymlinks = conf.getBoolean( CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY, CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT); }
我們進入到函式getStatistics中,程式碼如下:
/**
* Get the statistics for a particular file system
* @param cls the class to lookup
* @return a statistics object
*/
//這個函式是一個同步函式,scheme為協議名稱(如http,https,hdfs等)
//cls為對應的Class物件
public static synchronized
Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
/* Recording statistics per a FileSystem class
//statisticsTable是FileSystem中的一個靜態成員變數,
private static final Map<Class<? extends FileSystem>, Statistics>
statisticsTable = new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
*/
//先判斷該map中是否存在key為Class物件的元素
Statistics result = statisticsTable.get(cls);
//如果沒有
if (result == null) {
//那麼就建立Statistics類物件
result = new Statistics(scheme);
//將該Statistics類物件與Class物件關聯起來
statisticsTable.put(cls, result);
}
//否則直接返回
return result;
}
回到函式open,我們繼續往下分析,程式碼如下:
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
這裡首先建立一個匿名內部類(必須繼承一個了類或者實現某個介面,匿名內部類詳細介紹),該內部類繼承FileSystemLinkResolver<FSDataInputStream>類,我們進入到resolve函式,由於匿名內部類沒有過載該函式,那麼就會呼叫父類的,進入到該函式中,程式碼如下:
/**
* Attempt calling overridden {@link #doCall(Path)} method with
* specified {@link FileSystem} and {@link Path}. If the call fails with an
* UnresolvedLinkException, it will try to resolve the path and retry the call
* by calling {@link #next(FileSystem, Path)}.
* @param filesys FileSystem with which to try call
* @param path Path with which to try call
* @return Generic type determined by implementation
* @throws IOException
*/
public T resolve(final FileSystem filesys, final Path path)
throws IOException {
int count = 0;
T in = null;
Path p = path;
// Assumes path belongs to this FileSystem.
// Callers validate this by passing paths through FileSystem#checkPath
FileSystem fs = filesys;
for (boolean isLink = true; isLink;) {
try {
//呼叫doCall函式,這個函式會呼叫繼承了FileSystemLinkResolver<FSDataInputStream>的匿名
//類中的doCall函式
in = doCall(p);
isLink = false;
} catch (UnresolvedLinkException e) {//如果呼叫doCall函式丟擲了不能解析連結的異常
if (!filesys.resolveSymlinks) {//如果不能使用軟連結,那麼丟擲異常,提示要開啟的目錄包
//含了軟連結,而系統配置中關閉瞭解析軟連結的支援
throw new IOException("Path " + path + " contains a symlink"
+ " and symlink resolution is disabled ("
+ CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY
+ ").", e);
}
if (!FileSystem.areSymlinksEnabled()) {
throw new IOException("Symlink resolution is disabled in" +
" this version of Hadoop.");
}
//FsConstants.MAX_PATH_LINKS為最大解析軟連結的遞迴次數
if (count++ > FsConstants.MAX_PATH_LINKS) {
throw new IOException("Possible cyclic loop while " +
"following symbolic link " + path);
}
// Resolve the first unresolved path component
//解析第一個不能解析的路徑
//fs.getUri()返回namenode對應的URI類物件,裡面是namenode的訪問地址,p是Path類物件,裡
//面儲存了要開啟檔案的路徑資訊,
p = FSLinkResolver.qualifySymlinkTarget(fs.getUri(), p,
filesys.resolveLink(p));
fs = FileSystem.getFSofPath(p, filesys.getConf());
// Have to call next if it's a new FS
if (!fs.equals(filesys)) {
return next(fs, p);
}
// Else, we keep resolving with this filesystem
}
}
// Successful call, path was fully resolved
return in;
}
這裡使用到了HDFS的軟連結和硬連結知識,這個跟linux的軟連結和硬連結相同,關於這個概念可以檢視軟連結和硬連結,這裡不再贅述。我們下面來分析一下filesys.resolveLink(p)程式碼,這個函式是呼叫DistributedFileSystem類中的,用來解析軟連結,函式程式碼如下:
@Override
protected Path resolveLink(Path f) throws IOException {
statistics.incrementReadOps(1);
//getPathName用來獲取檔案路徑,getLinkTarget用來解析軟連結
String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
if (target == null) {
throw new FileNotFoundException("File does not exist: " + f.toString());
}
return new Path(target);
}
我們進入到getLinkTarget函式中,程式碼如下:
/**
* Resolve the *first* symlink, if any, in the path.
*
* @see ClientProtocol#getLinkTarget(String)
*/
public String getLinkTarget(String path) throws IOException {
checkOpen();
try {
//這裡最終會呼叫namenode中的對應函式,而namenode是一個代理物件
return namenode.getLinkTarget(path);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class);
}
}
關於namenode這個代理物件的由來我們前面章節已經介紹過了,我這邊在此總結一下:
DFSClient.jpeg
上圖中的drop.jpeg如下:
至此namenode這個代理物件的由來我們就總結到這裡,下面我們繼續往下分析。
namenode.getLinkTarget函式會呼叫ClientNamenodeProtocolTranslatorPB類中的getLinkTarget函式,而該函式如下:
@Override
public String getLinkTarget(String path) throws AccessControlException,
FileNotFoundException, IOException {
GetLinkTargetRequestProto req = GetLinkTargetRequestProto.newBuilder()
.setPath(path).build();
try {
GetLinkTargetResponseProto rsp = rpcProxy.getLinkTarget(null, req);
return rsp.hasTargetPath() ? rsp.getTargetPath() : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
其中的rpcProxy是ClientNamenodeProtocolPB類的代理物件,而該代理物件在呼叫介面函式的時候會先呼叫org.apache.hadoop.ipc.ProtobufRpcEngine.Invoker類中的invoke函式,這個函式會先將呼叫資訊序列化,然後通過RPC將資料傳送到遠端namenode端,namenode端收到訊息後,會先反序列化,然後解析資訊,並呼叫相應的函式,並將函式執行結果序列化,再將序列化資料通過RPC返回客戶端。我們看看invoke函式,程式碼如下:
/**
* This is the client side invoker of RPC method. It only throws
* ServiceException, since the invocation proxy expects only
* ServiceException to be thrown by the method in case protobuf service.
*
* ServiceException has the following causes:
* <ol>
* <li>Exceptions encountered on the client side in this method are
* set as cause in ServiceException as is.</li>
* <li>Exceptions from the server are wrapped in RemoteException and are
* set as cause in ServiceException</li>
* </ol>
*
* Note that the client calling protobuf RPC methods, must handle
* ServiceException by getting the cause from the ServiceException. If the
* cause is RemoteException, then unwrap it to get the exception thrown by
* the server.
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = Time.now();
}
if (args.length != 2) { // RpcController + Message
throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ args.length);
}
if (args[1] == null) {
throw new ServiceException("null param while calling Method: ["
+ method.getName() + "]");
}
TraceScope traceScope = null;
// if Tracing is on then start a new span for this rpc.
// guard it in the if statement to make sure there isn't
// any extra string manipulation.
if (Trace.isTracing()) {
traceScope = Trace.startSpan(
method.getDeclaringClass().getCanonicalName() +
"." + method.getName());
}
//構造請求頭域,標明在上面介面上呼叫上面方法
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Call -> " +
remoteId + ": " + method.getName() +
" {" + TextFormat.shortDebugString((Message) args[1]) + "}");
}
//獲取請求呼叫的引數,例如RenameRequestProto
Message theRequest = (Message) args[1];
final RpcResponseWrapper val;
try {
//呼叫PRC.Client傳送請求
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Exception <- " +
remoteId + ": " + method.getName() +
" {" + e + "}");
}
if (Trace.isTracing()) {
traceScope.getSpan().addTimelineAnnotation(
"Call got exception: " + e.getMessage());
}
throw new ServiceException(e);
} finally {
if (traceScope != null) traceScope.close();
}
if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime;
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
}
Message prototype = null;
try {
//獲取返回引數型別,RenameResponseProto
prototype = getReturnProtoType(method);
} catch (Exception e) {
throw new ServiceException(e);
}
Message returnMessage;
try {
//序列化響應資訊並返回
returnMessage = prototype.newBuilderForType()
.mergeFrom(val.theResponseRead).build();
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Response <- " +
remoteId + ": " + method.getName() +
" {" + TextFormat.shortDebugString(returnMessage) + "}");
}
} catch (Throwable e) {
throw new ServiceException(e);
}
//返回結果
return returnMessage;
}
將呼叫的函式和引數封裝到對應的類物件中後,使用PRC傳送請求到遠端namenode端。程式碼如下:
//呼叫PRC.Client傳送請求
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
其中RPC.RpcKind是一個列舉,裡面元素為序列化方式,這裡RPC.RpcKind.RPC_PROTOCOL_BUFFER表示使用ProtobufRpcEngine類中的序列化方式。new RpcRequestWrapper(rpcRequestHeader, theRequest)用來將函式和引數儲存到RpcRequestWrapper類物件中。這裡我們先來看看client的由來,client是一個org.apache.hadoop.ipc.Client類物件,建構函式如下:
/** Construct an IPC client whose values are of the given {@link Writable}
* class. */
//valueClass為繼承Writable的類對應的Class物件
//conf為配置檔案物件
//factory為一個建立Socket類物件的工廠類物件
public Client(Class<? extends Writable> valueClass, Configuration conf,
SocketFactory factory) {
this.valueClass = valueClass;
this.conf = conf;
this.socketFactory = factory;
this.connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY,
CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
}
根據上面的邏輯,factory通過NetUtils.getDefaultSocketFactory(conf)產生,該函式程式碼如下:
/**
* Get the default socket factory as specified by the configuration
* parameter <tt>hadoop.rpc.socket.factory.default</tt>
*
* @param conf the configuration
* @return the default socket factory as specified in the configuration or
* the JVM default socket factory if the configuration does not
* contain a default socket factory property.
*/
public static SocketFactory getDefaultSocketFactory(Configuration conf) {
String propValue = conf.get(
CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
CommonConfigurationKeysPublic.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_DEFAULT);
if ((propValue == null) || (propValue.length() == 0))
return SocketFactory.getDefault();
return getSocketFactoryFromProperty(conf, propValue);
}
該函式會返回一個工廠類物件,這個工廠類物件會建立Socket類物件,用來客戶端和遠端namenode端通訊。我們看一下建立Socket類物件的程式碼:
@Override
public Socket createSocket() throws IOException {
/*
* NOTE: This returns an NIO socket so that it has an associated
* SocketChannel. As of now, this unfortunately makes streams returned
* by Socket.getInputStream() and Socket.getOutputStream() unusable
* (because a blocking read on input stream blocks write on output stream
* and vice versa).
*
* So users of these socket factories should use
* NetUtils.getInputStream(socket) and
* NetUtils.getOutputStream(socket) instead.
*
* A solution for hiding from this from user is to write a
* 'FilterSocket' on the lines of FilterInputStream and extend it by
* overriding getInputStream() and getOutputStream().
*/
return SocketChannel.open().socket();
}
java.nio.channels.SocketChannel這個類是java的NIO提供的,關於NIO我們後面會講到。
我們繼續往下,進入到client.call函式中,程式碼如下:
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
* @param remoteId - the target rpc server
* @param serviceClass - service class for RPC
* @param fallbackToSimpleAuth - set to true or false during this method to
* indicate if a secure client falls back to simple auth
* @returns the rpc response
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
//構造Call物件,並將序列化型別資訊、函式和引數資訊封裝到Call類物件中
final Call call = createCall(rpcKind, rpcRequest);
//構造Connection物件,首先會根據remoteId到快取中去取,如果沒有找到那麼就建立一個物件,同時將該物件儲存到快取中去,方便下次使用。
Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
}
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}
//提取出Call物件中儲存的異常,直接丟擲
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
InetSocketAddress address = connection.getRemoteAddress();
throw NetUtils.wrapException(address.getHostName(),
address.getPort(),
NetUtils.getHostname(),
0,
call.error);
}
} else {//如果成功執行,則返回Call物件中儲存的RPC響應訊息
//伺服器成功發回響應資訊,返回RPC響應
return call.getRpcResponse();
}
}
}
call函式中呼叫createCall函式,該函式程式碼如下:
Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
return new Call(rpcKind, rpcRequest);
}
這個函式用來建立一個Call類物件,我們進入到Call的建構函式中,程式碼如下:
private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;//序列化型別
this.rpcRequest = param;//函式和引數封裝類物件
//callId的建立程式碼為
//private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
final Integer id = callId.get();
if (id == null) {
this.id = nextCallId();
} else {
callId.set(null);
this.id = id;
}
//retryCount的建立程式碼為
//private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
final Integer rc = retryCount.get();
if (rc == null) {
this.retry = 0;
} else {
this.retry = rc;
}
}
關於ThreadLocal的詳解請看,ThreadLocal類詳解
關於java中引用的詳解,java引用詳解
我們繼續回到call函式中,繼續進行講解,執行程式碼如下:
connection.sendRpcRequest(call);
我們進入到sendRpcRequest函式中,程式碼如下:
/** Initiates a rpc call by sending the rpc request to the remote server.
* Note: this is not called from the Connection thread, but by other
* threads.
* @param call - the rpc request
*/
public void sendRpcRequest(final Call call)
throws InterruptedException, IOException {
//判斷連線是否關閉了,如果關閉了,那麼就直接返回,就無需執行下面的程式碼
if (shouldCloseConnection.get()) {
return;
}
// Serialize the call to be sent. This is done from the actual
// caller thread, rather than the sendParamsExecutor thread,
// so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
//
// Format of a call on the wire:
// 0) Length of rest below (1 + 2)
// 1) RpcRequestHeader - is serialized Delimited hence contains length
// 2) RpcRequest
//
// Items '1' and '2' are prepared here.
//先構造RPC請求頭
final DataOutputBuffer d = new DataOutputBuffer();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
//將RPC請求頭寫入輸出流
header.writeDelimitedTo(d);
//將RPC請求(包括請求元資料和請求引數)寫入輸出流
call.rpcRequest.write(d);
//這裡使用執行緒池將請求傳送出去,請求包括三個部分:1、長度 2、PRC請求頭 3、RPC請求(包括請求元資料以及請求引數)
synchronized (sendRpcRequestLock) {
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
@Override
public void run() {
try {
synchronized (Connection.this.out) {
if (shouldCloseConnection.get()) {
return;
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
byte[] data = d.getData();
int totalLength = d.getLength();
out.writeInt(totalLength); // Total Length 總長度
out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest RPC請求頭+RPC請求(請求元資料+引數)
out.flush();
}
} catch (IOException e) {
// exception at this point would leave the connection in an
// unrecoverable state (eg half a call left on the wire).
// So, close the connection, killing any outstanding calls
//如果發生傳送異常,則直接關閉連線
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite to
// close early
//之前申請的buffer給關閉了,比較優雅
IOUtils.closeStream(d);
}
}
});
//獲取執行結果
try {
//這裡會一直阻塞,直到結果返回
senderFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
//如果有異常則直接丟擲
// cause should only be a RuntimeException as the Runnable above
// catches IOException
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException("unexpected checked exception", cause);
}
}
}
}
客戶端通過RPC將請求傳送到namenode端,然後namenode端進行相應的操作,並將結果返回到客戶端,open函式中最終會呼叫匿名函式的doCall函式,程式碼如下:
public FSDataInputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
final DFSInputStream dfsis =
dfs.open(getPathName(p), bufferSize, verifyChecksum);
return dfs.createWrappedInputStream(dfsis);
}
該函式呼叫dfs.open,該函式會建立DFSInputStream類物件,建構函式如下:
DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum
) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize;
this.src = src;
this.cachingStrategy =
dfsClient.getDefaultReadCachingStrategy();
openInfo();
}
該建構函式中最終會呼叫openInfo函式,程式碼如下:
/**
* Grab the open-file info from namenode
*/
synchronized void openInfo() throws IOException, UnresolvedLinkException {
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length. Lets
// retry for 3 times to get the length.
if (lastBlockBeingWrittenLength == -1) {
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
} else {
break;
}
retriesForLastBlockLength--;
}
if (retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
該函式會到遠端namenode上獲取要開啟的檔案的資訊,我們進入函式fetchLocatedBlocksAndGetLastBlockLength()中程式碼如下:
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo);
}
if (newInfo == null) {
throw new IOException("Cannot open filename " + src);
}
if (locatedBlocks != null) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
}
locatedBlocks = newInfo;
long lastBlockBeingWrittenLength = 0;
if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
if (last != null) {
if (last.getLocations().length == 0) {
if (last.getBlockSize() == 0) {
// if the length is zero, then no data has been written to
// datanode. So no need to wait for the locations.
return 0;
}
return -1;
}
final long len = readBlockLength(last);
last.getBlock().setNumBytes(len);
lastBlockBeingWrittenLength = len;
}
}
fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
currentNode = null;
return lastBlockBeingWrittenLength;
}
dfsClient.getLocatedBlocks(src, 0);獲取要開啟檔案的塊資訊,為後面的讀寫操作做準備,總結一下,open用來獲取指定檔案在namenode中的塊資訊,通過塊資訊就可以使得客戶端向datanode傳送讀寫請求,關於讀寫操作,我們後面會講到。