1. 程式人生 > >balance transfer 解析及api深度追蹤(三)加入通道

balance transfer 解析及api深度追蹤(三)加入通道

一 程式碼解析 var util = require(‘util’); var path = require(‘path’); var fs = require(‘fs’);

var Peer = require(‘fabric-client/lib/Peer.js’); var EventHub = require(‘fabric-client/lib/EventHub.js’); var tx_id = null; var nonce = null; var config = require(’…/config.json’); var helper = require(’./helper.js’); var logger = helper.getLogger(‘Join-Channel’);

//helper.hfc.addConfigFile(path.join(__dirname, ‘network-config.json’)); var ORGS = helper.ORGS; var allEventhubs = [];

// //Attempt to send a request to the orderer with the sendCreateChain method

// 在admin user下 獲取 channel的 genises.block 然後 指定peers加入channel // var joinChannel = function(channelName, peers, username, org) { //關閉事件回撥連線 var closeConnections = function(isSuccess) { if (isSuccess) { logger.debug(’\n============ Join Channel is SUCCESS \n’); } else { logger.debug(’\n!!! ERROR: Join Channel FAILED !!!\n’); } logger.debug(’’); for (var key in allEventhubs) { var eventhub = allEventhubs[key]; if (eventhub && eventhub.isconnected()) {

//logger.debug(‘Disconnecting the event hub’); eventhub.disconnect(); } } }; //logger.debug(’\n Join Channel ============\n’) logger.info(util.format( ‘Calling peers in organization “%s” to join the channel’, org)); var client = helper.getClientForOrg(org); var channel = helper.getChannelForOrg(org); var eventhubs = []; // 1 在管理員環境下 獲取原始塊(peer想要加入channel 就得需要這個東西) return helper.getOrgAdmin(org).then((admin) => {
logger.info
(util.format('received member object for admin of the organization “%s”: ', org)); tx_id = client.newTransactionID(); let request = { txId : tx_id }; return channel.getGenesisBlock(1)(request); }).then((genesis_block) => { // 2 封裝加入到channel的請求 tx_id = client.newTransactionID(); var request = { targets: helper.newPeers(peers, org)(2), txId: tx_id, block: genesis_block }; // 3.1 設定事件監聽連線 eventhubs = helper.newEventHubs(2)(peers, org); for (let key in eventhubs) { let eh = eventhubs[key]; eh.connect();// allEventhubs.push(eh); } var eventPromises = []; eventhubs.forEach((eh) => { let txPromise = new Promise((resolve, reject) => { //3.2 監聽區塊資訊 並設定超時時間 let handle = setTimeout(reject, parseInt(config.eventWaitTime)); eh.registerBlockEvent((block) => { clearTimeout(handle); // in real-world situations, a peer may have more than one channels so // we must check that this block came from the channel we asked the peer to join if (block.data.data.length === 1) { // Config block must only contain one transaction var channel_header = block.data.data[0].payload.header.channel_header; //3.3 屬於本channel的資訊 就繼續執行 否則就拒絕 if (channel_header.channel_id === channelName) { resolve(); } else { reject(); } } }); }); //3.4 設定 eventPromises集合 eventPromises.push(txPromise); }); //4 請求加入到channel let sendPromise = channel.joinChannel(request)(3); // 5 promise流 ( 監聽事件 傳送事件 ) 的 promise 全榮俱榮 一損俱損 return Promise.all([sendPromise].concat(eventPromises)); }, (err) => { logger.error(‘Failed to enroll user ‘’ + username + ‘’ due to error: ’ + err.stack ? err.stack : err); throw new Error(‘Failed to enroll user ‘’ + username + ‘’ due to error: ’ + err.stack ? err.stack : err); }).then((results) => { //7 根據state 判斷結果 如果到這裡了 說明是sendPromise 返回的結果 logger.debug(util.format(‘Join Channel R E S P O N S E : %j’, results)); // 加入成功 if (results[0] && results[0][0] && results[0][0].response && results[0][0] .response.status == 200) { logger.info(util.format( ‘Successfully joined peers in organization %s to the channel ‘%s’’, org, channelName)); closeConnections(true); let response = { success: true, message: util.format( ‘Successfully joined peers in organization %s to the channel ‘%s’’, org, channelName) }; return response; } else { //加入失敗 logger.error(’ Failed to join channel’); closeConnections(); throw new Error(‘Failed to join channel’); } }, (err) => { logger.error('Failed to join channel due to error: ’ + err.stack ? err.stack : err); closeConnections(); throw new Error('Failed to join channel due to error: ’ + err.stack ? err.stack : err); }); }; 二api深度追蹤 (1)

  • @typedef {Object} OrdererRequest * @property {TransactionID} txId - Optional. Object with the transaction id and nonce * @property {Orderer} orderer - Optional. The orderer instance or string name * of the orderer to retrieve genesis block from / /* * A channel’s first block is called the “genesis block”. This block captures the * initial channel configuration. For a peer node to join the channel, it must be * provided the genesis block. This method must be called before calling * [joinChannel()]{@link Channel#joinChannel}. * * @param {OrdererRequest} request - Optional - A transaction ID object * @returns {Promise} A Promise for an encoded protobuf “Block” */ getGenesisBlock(request) { logger.debug(‘getGenesisBlock - start’); if (!request) { request = {}; } // verify that we have an orderer configured var orderer = this._clientContext.getTargetOrderer(request.orderer, this._orderers, this._name); var signer = null; var tx_id = request.txId; if (!tx_id) { signer = this._clientContext._getSigningIdentity(true); tx_id = new TransactionID(signer, true); } else { signer = this._clientContext._getSigningIdentity(tx_id.isAdmin()); } // now build the seek info, will be used once the channel is created // to get the genesis block back // build start var seekSpecifiedStart = new _abProto.SeekSpecified(); seekSpecifiedStart.setNumber(0); var seekStart = new _abProto.SeekPosition(); seekStart.setSpecified(seekSpecifiedStart); // build stop var seekSpecifiedStop = new _abProto.SeekSpecified(); seekSpecifiedStop.setNumber(0); var seekStop = new _abProto.SeekPosition(); seekStop.setSpecified(seekSpecifiedStop); // seek info with all parts var seekInfo = new _abProto.SeekInfo(); seekInfo.setStart(seekStart); seekInfo.setStop(seekStop); seekInfo.setBehavior(_abProto.SeekInfo.SeekBehavior.BLOCK_UNTIL_READY); // build the header for use with the seekInfo payload var seekInfoHeader = clientUtils.buildChannelHeader( _commonProto.HeaderType.DELIVER_SEEK_INFO, this._name, tx_id.getTransactionID(), this._initial_epoch, null, clientUtils.buildCurrentTimestamp(), orderer.getClientCertHash() ); var seekHeader = clientUtils.buildHeader(signer, seekInfoHeader, tx_id.getNonce()); var seekPayload = new _commonProto.Payload(); seekPayload.setHeader(seekHeader); seekPayload.setData(seekInfo.toBuffer()); var seekPayloadBytes = seekPayload.toBuffer(); let sig = signer.sign(seekPayloadBytes); let signature = Buffer.from(sig); // building manually or will get protobuf errors on send var envelope = { signature: signature, payload: seekPayloadBytes }; return orderer.sendDeliver(envelope); } (2) function newRemotes(names, forPeers, userOrg) { let client = getClientForOrg(userOrg); let targets = []; // find the peer that match the names for (let idx in names) { let peerName = names[idx]; if (ORGS[userOrg].peers[peerName]) { // found a peer matching the name let data = fs.readFileSync(path.join(__dirname, ORGS[userOrg].peers[peerName][‘tls_cacerts’])); let grpcOpts = { pem: Buffer.from(data).toString(), ‘ssl-target-name-override’: ORGS[userOrg].peers[peerName][‘server-hostname’] }; if (forPeers) { targets.push(client.newPeer(ORGS[userOrg].peers[peerName].requests, grpcOpts)); } else { let eh = client.newEventHub()//尋找到peer節點,; eh.setPeerAddr(ORGS[userOrg].peers[peerName].events, grpcOpts); targets.push(eh); } } } if (targets.length === 0) { logger.error(util.format(‘Failed to find peers matching the names %s’, names)); } return targets; (3)
    • For a peer node to become part of a channel, it must be sent the genesis
      • block, as explained [here]{@link Channel#getGenesisBlock}. This method
      • sends a join channel proposal to one or more endorsing peers.
      • @param {JoinChannelRequest} request
      • @param {Number} timeout - A number indicating milliseconds to wait on the
      •                          response before rejecting the promise with a
        
      •                          timeout error. This overrides the default timeout
        
      •                          of the {@link Peer} instance(s) and the global timeout in the config settings.
        
      • @returns {Promise} A Promise for an array of {@link ProposalResponse} from the target peers */ joinChannel(request, timeout) { logger.debug(‘joinChannel - start’); var errorMsg = null; // verify that we have targets (Peers) to join this channel // defined by the caller if (!request) { errorMsg = ‘Missing all required input request parameters’; } // verify that we have transaction id else if (!request.txId) { errorMsg = ‘Missing txId input parameter with the required transaction identifier’; } else if (!request.block) { errorMsg = ‘Missing block input parameter with the required genesis block’; } if (errorMsg) { logger.error('joinChannel - error ’ + errorMsg); throw new Error(errorMsg); } var targets = this._getTargets(request.targets); //no role, will get all peers var signer = this._clientContext._getSigningIdentity(request.txId.isAdmin()); var chaincodeInput = new _ccProto.ChaincodeInput(); var args = []; args.push(Buffer.from(‘JoinChain’, ‘utf8’)); args.push(request.block.toBuffer()); chaincodeInput.setArgs(args); var chaincodeID = new _ccProto.ChaincodeID(); chaincodeID.setName(Constants.CSCC); var chaincodeSpec = new _ccProto.ChaincodeSpec(); chaincodeSpec.setType(_ccProto.ChaincodeSpec.Type.GOLANG); chaincodeSpec.setChaincodeId(chaincodeID); chaincodeSpec.setInput(chaincodeInput); var channelHeader = clientUtils.buildChannelHeader( _commonProto.HeaderType.ENDORSER_TRANSACTION, ‘’, request.txId.getTransactionID(), null, //no epoch Constants.CSCC, clientUtils.buildCurrentTimestamp(), targets[0].getClientCertHash() ); var header = clientUtils.buildHeader(signer, channelHeader, request.txId.getNonce()); var proposal = clientUtils.buildProposal(chaincodeSpec, header); var signed_proposal = clientUtils.signProposal(signer, proposal); return clientUtils.sendPeersProposal(targets, signed_proposal, timeout)(4) .then( function (responses) { return Promise.resolve(responses); } ).catch( function (err) { logger.error(‘joinChannel - Failed Proposal. Error: %s’, err.stack ? err.stack : err); return Promise.reject(err); } ); } (4)
  • This function will return one Promise when sending a proposal to many peers */ module.exports.sendPeersProposal = function (peers, proposal, timeout) { let targets = peers; if (!Array.isArray(peers)) { targets = [peers]; } // make function to return an individual promise const fn = function (peer) { return new Promise(function (resolve, reject) { peer.sendProposal(proposal, timeout)(5).then( function (result) { resolve(result); } ).catch( function (err) { logger.error(‘sendPeersProposal - Promise is rejected: %s’, err.stack ? err.stack : err); return reject(err); } ); }); };

(5) /** * Send an endorsement proposal to an endorser. This is used to call an * endorsing peer to execute a chaincode to process a transaction proposal, * or runs queries. * * @param {Proposal} proposal - A protobuf encoded byte array of type * [Proposal]{@link https://github.com/hyperledger/fabric/blob/v1.0.0/protos/peer/proposal.proto#L134} * @param {Number} timeout - A number indicating milliseconds to wait on the * response before rejecting the promise with a * timeout error. This overrides the default timeout * of the Peer instance and the global timeout in the config settings. * @returns {Promise} A Promise for a {@link ProposalResponse} */ sendProposal(proposal, timeout) { logger.debug(‘Peer.sendProposal - Start’); let self = this; let rto = self._request_timeout; if (typeof timeout === ‘number’) rto = timeout; if(!proposal) { return Promise.reject(new Error(‘Missing proposal to send to peer’)); } // Send the transaction to the peer node via grpc // The rpc specification on the peer side is: // rpc ProcessProposal(Proposal) returns (ProposalResponse) {} return new Promise(function(resolve, reject) { var send_timeout = setTimeout(function(){ logger.error(‘sendProposal - timed out after:%s’, rto); return reject(new Error(‘REQUEST_TIMEOUT’)); }, rto); self._endorserClient.processProposal(proposal, function(err, proposalResponse) { clearTimeout(send_timeout); if (err) { logger.debug(‘Received proposal response from: %s status: %s’,self._url, err); if(err instanceof Error) { reject(err); } else { reject(new Error(err)); } } else { if (proposalResponse) { logger.debug(‘Received proposal response from peer “%s”: status - %s’, self._url, proposalResponse.response.status); resolve(proposalResponse); } else { logger.error(‘GRPC client failed to get a proper response from the peer “%s”.’, self._url); reject(new Error(util.format(‘GRPC client failed to get a proper response from the peer “%s”.’, self._url))); } } }); }); }