聊聊storm trident batch的分流與聚合
TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout) .partitionBy(new Fields("user")) .partitionAggregate(new Fields("user","score","batchId"),new OriginUserCountAggregator(),new Fields("result","aggBatchId")) .parallelismHint(3) .global() .aggregate(new Fields("result","aggBatchId"),new AggAgg(),new Fields("agg")) .each(new Fields("agg"),new PrintEachFunc(),new Fields()) ; 複製程式碼
- 這裡最後構造了3個bolt,分別為b-0、b-1、b-2
- b-0主要是partitionAggregate,它的parallelismHint為3
- b-1主要是處理CombinerAggregator的init,它的parallelismHint為1,由於它的上游bolt有3個task,因而它的TridentBoltExecutor的tracked.condition.expectedTaskReports為3,它要等到這三個task的聚合資料都到了之後,才能finishBatch
- b-2主要是處理CombinerAggregator的combine以及each操作
- 整個資料流從spout開始的一個batch,到了b-0通過partitionBy分流為3個子batch,到了b-1則聚合了3個子batch之後才finishBatch,到了b-2則在b-1聚合之後的結果在做最後的聚合
log例項
23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFOcom.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt1, 1] 23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFOcom.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt2, 1] 23:22:00.718 [Thread-49-spout-spout1-executor[11 11]] INFOcom.example.demo.trident.batch.DebugFixedBatchSpout - batchId:1,emit:[nickt3, 1] 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFOcom.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFOcom.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt2, 1, 1] 23:22:00.720 [Thread-45-b-0-executor[8 8]] INFOcom.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt2=1}} 23:22:00.722 [Thread-22-b-0-executor[7 7]] INFOcom.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFOcom.example.demo.trident.OriginUserCountAggregator - null init map, aggBatchId:1:0 23:22:00.723 [Thread-22-b-0-executor[7 7]] INFOcom.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt1, 1, 1] 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFOcom.example.demo.trident.OriginUserCountAggregator - null aggregate batch:1,tuple:[nickt3, 1, 1] 23:22:00.723 [Thread-22-b-0-executor[7 7]] INFOcom.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt1=1}} 23:22:00.723 [Thread-29-b-0-executor[6 6]] INFOcom.example.demo.trident.OriginUserCountAggregator - null complete agg batch:1:0,val:{1={nickt3=1}} 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFOcom.example.demo.trident.AggAgg - zero called 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFOcom.example.demo.trident.AggAgg - init tuple:[{1={nickt2=1}}, 1:0] 23:22:00.724 [Thread-36-b-1-executor[9 9]] INFOcom.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt2=1}} 23:22:00.726 [Thread-36-b-1-executor[9 9]] INFOcom.example.demo.trident.AggAgg - init tuple:[{1={nickt3=1}}, 1:0] 23:22:00.727 [Thread-36-b-1-executor[9 9]] INFOcom.example.demo.trident.AggAgg - combine val1:{1={nickt2=1}},val2:{1={nickt3=1}} 23:22:00.728 [Thread-36-b-1-executor[9 9]] INFOcom.example.demo.trident.AggAgg - init tuple:[{1={nickt1=1}}, 1:0] 23:22:00.728 [Thread-36-b-1-executor[9 9]] INFOcom.example.demo.trident.AggAgg - combine val1:{1={nickt3=1, nickt2=1}},val2:{1={nickt1=1}} 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFOcom.example.demo.trident.AggAgg - zero called 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFOcom.example.demo.trident.AggAgg - combine val1:{},val2:{1={nickt3=1, nickt2=1, nickt1=1}} 23:22:00.731 [Thread-31-b-2-executor[10 10]] INFOcom.example.demo.trident.PrintEachFunc - null each tuple:[{1={nickt3=1, nickt2=1, nickt1=1}}] 複製程式碼
- 這裡看到storm的執行緒的命名已經帶上了bolt的命名,比如b-0、b-1、b-2
TridentBoltExecutor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java
public void execute(Tuple tuple) { if(TupleUtils.isTick(tuple)) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate(); _lastRotate = now; } return; } String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId()); if(batchGroup==null) { // this is so we can do things like have simple DRPC that doesn't need to use batch processing _coordCollector.setCurrBatch(null); _bolt.execute(null, tuple); _collector.ack(tuple); return; } IBatchID id = (IBatchID) tuple.getValue(0); //get transaction id //if it already exists and attempt id is greater than the attempt there TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); //if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { //System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() //+ " (" + _batches.size() + ")" + //"\ntuple: " + tuple + //"\nwith tracked " + tracked + //"\nwith id " + id + //"\nwith group " + batchGroup //+ "\n"); // //} //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()); // this code here ensures that only one attempt is ever tracked for a batch, so when // failures happen you don't get an explosion in memory usage in the tasks if(tracked!=null) { if(id.getAttemptId() > tracked.attemptId) { _batches.remove(id.getId()); tracked = null; } else if(id.getAttemptId() < tracked.attemptId) { // no reason to try to execute a previous attempt than we've already seen return; } } if(tracked==null) { tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId()); _batches.put(id.getId(), tracked); } _coordCollector.setCurrBatch(tracked); //System.out.println("TRACKED: " + tracked + " " + tuple); TupleType t = getTupleType(tuple, tracked); if(t==TupleType.COMMIT) { tracked.receivedCommit = true; checkFinish(tracked, tuple, t); } else if(t==TupleType.COORD) { int count = tuple.getInteger(1); tracked.reportedTasks++; tracked.expectedTupleCount+=count; checkFinish(tracked, tuple, t); } else { tracked.receivedTuples++; boolean success = true; try { _bolt.execute(tracked.info, tuple); if(tracked.condition.expectedTaskReports==0) { success = finishBatch(tracked, tuple); } } catch(FailedException e) { failBatch(tracked, e); } if(success) { _collector.ack(tuple); } else { _collector.fail(tuple); } } _coordCollector.setCurrBatch(null); } private void failBatch(TrackedBatch tracked, FailedException e) { if(e!=null && e instanceof ReportedFailedException) { _collector.reportError(e); } tracked.failed = true; if(tracked.delayedAck!=null) { _collector.fail(tracked.delayedAck); tracked.delayedAck = null; } } private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { if(tracked.failed) { failBatch(tracked); _collector.fail(tuple); return; } CoordCondition cond = tracked.condition; boolean delayed = tracked.delayedAck==null && (cond.commitStream!=null && type==TupleType.COMMIT || cond.commitStream==null); if(delayed) { tracked.delayedAck = tuple; } boolean failed = false; if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) { if(tracked.receivedTuples == tracked.expectedTupleCount) { finishBatch(tracked, tuple); } else { //TODO: add logging that not all tuples were received failBatch(tracked); _collector.fail(tuple); failed = true; } } if(!delayed && !failed) { _collector.ack(tuple); } } 複製程式碼
- execute方法裡頭在TrackedBatch不存在時會建立一個,建立的時候會呼叫_bolt.initBatchState方法
- 這裡頭可以看到在接收到正常tuple的時候,先呼叫_bolt.execute(tracked.info, tuple)執行,然後在呼叫_collector的ack,如果_bolt.execute丟擲FailedException,則直接failBatch,它會標記tracked.failed為true,最後在整個batch的tuple收發結束之後呼叫checkFinish,一旦發現有tracked.failed,則會呼叫_collector.fail
-
這裡的_bolt有兩類,分別是TridentSpoutExecutor與SubtopologyBolt;如果是TridentSpoutExecutor,則tracked.condition.expectedTaskReports為0,這裡每收到一個tuple(
實際是發射一個batch的指令
),在_bolt.execute之後就立馬finishBatch;而對於SubtopologyBolt,這裡tracked.condition.expectedTaskReports不為0,需要等到最後的[id,count]指令再checkFinish
TridentSpoutExecutor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutExecutor.java
@Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector) { _emitter = _spout.getEmitter(_txStateId, conf, context); _collector = new AddIdCollector(_streamName, collector); } @Override public void execute(BatchInfo info, Tuple input) { // there won't be a BatchInfo for the success stream TransactionAttempt attempt = (TransactionAttempt) input.getValue(0); if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) { if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) { ((ICommitterTridentSpout.Emitter) _emitter).commit(attempt); _activeBatches.remove(attempt.getTransactionId()); } else { throw new FailedException("Received commit for different transaction attempt"); } } else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) { // valid to delete before what's been committed since // those batches will never be accessed again _activeBatches.headMap(attempt.getTransactionId()).clear(); _emitter.success(attempt); } else { _collector.setBatch(info.batchId); _emitter.emitBatch(attempt, input.getValue(1), _collector); _activeBatches.put(attempt.getTransactionId(), attempt); } } @Override public void finishBatch(BatchInfo batchInfo) { } @Override public Object initBatchState(String batchGroup, Object batchId) { return null; } 複製程式碼
- TridentSpoutExecutor使用的是AddIdCollector,它的initBatchState以及finishBatch方法均為空操作
- execute方法分COMMIT_STREAM_ID、SUCCESS_STREAM_ID、普通stream來處理
- 普通的stream發來的tuple就是發射batch的指令,這裡就呼叫_emitter.emitBatch發射batch的tuples
SubtopologyBolt
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/SubtopologyBolt.java
@Override public Object initBatchState(String batchGroup, Object batchId) { ProcessorContext ret = new ProcessorContext(batchId, new Object[_nodes.size()]); for(TridentProcessor p: _myTopologicallyOrdered.get(batchGroup)) { p.startBatch(ret); } return ret; } @Override public void execute(BatchInfo batchInfo, Tuple tuple) { String sourceStream = tuple.getSourceStreamId(); InitialReceiver ir = _roots.get(sourceStream); if(ir==null) { throw new RuntimeException("Received unexpected tuple " + tuple.toString()); } ir.receive((ProcessorContext) batchInfo.state, tuple); } @Override public void finishBatch(BatchInfo batchInfo) { for(TridentProcessor p: _myTopologicallyOrdered.get(batchInfo.batchGroup)) { p.finishBatch((ProcessorContext) batchInfo.state); } } protected static class InitialReceiver { List<TridentProcessor> _receivers = new ArrayList<>(); RootFactory _factory; ProjectionFactory _project; String _stream; public InitialReceiver(String stream, Fields allFields) { // TODO: don't want to project for non-batch bolts...??? // how to distinguish "batch" streams from non-batch streams? _stream = stream; _factory = new RootFactory(allFields); List<String> projected = new ArrayList<>(allFields.toList()); projected.remove(0); _project = new ProjectionFactory(_factory, new Fields(projected)); } public void receive(ProcessorContext context, Tuple tuple) { TridentTuple t = _project.create(_factory.create(tuple)); for(TridentProcessor r: _receivers) { r.execute(context, _stream, t); } } public void addReceiver(TridentProcessor p) { _receivers.add(p); } public Factory getOutputFactory() { return _project; } } 複製程式碼
比如AggregateProcessor、EachProcessor 比如AggregateProcessor 比如AggregateProcessor、EachProcessor
WindowTridentProcessor
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java
@Override public void startBatch(ProcessorContext processorContext) { // initialize state for batch processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>(); } @Override public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) { // add tuple to the batch state Object state = processorContext.state[tridentContext.getStateIndex()]; ((List<TridentTuple>) state).add(projection.create(tuple)); } @Override public void finishBatch(ProcessorContext processorContext) { Object batchId = processorContext.batchId; Object batchTxnId = getBatchTxnId(batchId); LOG.debug("Received finishBatch of : [{}] ", batchId); // get all the tuples in a batch and add it to trident-window-manager List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()]; tridentWindowManager.addTuplesBatch(batchId, tuples); List<Integer> pendingTriggerIds = null; List<String> triggerKeys = new ArrayList<>(); Iterable<Object> triggerValues = null; if (retriedAttempt(batchId)) { pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId)); if (pendingTriggerIds != null) { for (Integer pendingTriggerId : pendingTriggerIds) { triggerKeys.add(triggerKey(pendingTriggerId)); } triggerValues = windowStore.get(triggerKeys); } } // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers. if(triggerValues == null) { pendingTriggerIds = new ArrayList<>(); Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers(); LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size()); try { Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator(); List<Object> values = new ArrayList<>(); StoreBasedTridentWindowManager.TriggerResult triggerResult = null; while (pendingTriggersIter.hasNext()) { triggerResult = pendingTriggersIter.next(); for (List<Object> aggregatedResult : triggerResult.result) { String triggerKey = triggerKey(triggerResult.id); triggerKeys.add(triggerKey); values.add(aggregatedResult); pendingTriggerIds.add(triggerResult.id); } pendingTriggersIter.remove(); } triggerValues = values; } finally { // store inprocess triggers of a batch in store for batch retries for any failures if (!pendingTriggerIds.isEmpty()) { windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds); } } } collector.setContext(processorContext); int i = 0; for (Object resultValue : triggerValues) { collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue)); } collector.setContext(null); } 複製程式碼
比如ProjectedProcessor、PartitionPersistProcessor
小結
TridentBoltExecutor會在tuple處理完之後自動幫你進行ack WindowTridentProcessor每次在startBatch的時候都會重置state topology.trident.batch.emit.interval.millis,在defaults.yaml預設為500 tuple按emit的順序來,最後一個是[id,count],它就相當於結束batch的指令,用於檢測及觸發完成batch操作