storm中幾個小技巧
阿新 • • 發佈:2019-02-11
storm 中小技巧
1、 TimeCacheMap過期快取,採用桶實現,鎖的粒度小,O(1)時間內完成鎖操作,因此,大部分時間內都可以進行get和put操作。幾乎所有的操作都是相對於桶數目線性的(O(numBuckets))。唯一的問題是快取中可能存在過期的資料,也就是說真正的超時時間介於expirationSecs和expirationSecs * (1 + 1 / (numBuckets-1))之間。
2. 對元組的序列化採用kryo,並且使用了動態池的概念。這樣避免了大量物件建立和銷燬的開銷,增加了效率public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) { if(numBuckets<2) { throw new IllegalArgumentException("numBuckets must be >= 2"); } _buckets = new LinkedList<HashMap<K, V>>(); for(int i=0; i<numBuckets; i++) { _buckets.add(new HashMap<K, V>()); } _callback = callback; final long expirationMillis = expirationSecs * 1000L; final long sleepTime = expirationMillis / (numBuckets-1); _cleaner = new Thread(new Runnable() { @SuppressWarnings("unchecked") public void run() { try { while(true) { Map<K, V> dead = null; Time.sleep(sleepTime); synchronized(_lock) { dead = _buckets.removeLast(); _buckets.addFirst(new HashMap<K, V>()); } if(_callback!=null) { for(Entry<K, V> entry: dead.entrySet()) { _callback.expire(entry.getKey(), entry.getValue()); } } } } catch (InterruptedException ex) { } } }); _cleaner.setDaemon(true); _cleaner.start(); }
3. 對於supervisor程序和worker程序都在本地儲存一個kv庫,用於同步。kv實現為LocalState,一個基於map實現, 每次讀寫都需要直接操作磁碟的簡單資料庫。每次put資料時都要序列化資料,在序列化資料過程中,為了同步和原子性,採用了VersionedStore類,就是每次更新過程中不會去update現有的檔案, 而是不斷的產生遞增version的檔案, 故每一批更新都會產生一個新的檔案。public class ThreadResourceManager<T> { public static interface ResourceFactory<X> { X makeResource(); } ResourceFactory<T> _factory; ConcurrentLinkedQueue<T> _resources = new ConcurrentLinkedQueue<T>(); public ThreadResourceManager(ResourceFactory<T> factory) { _factory = factory; } public T acquire() { T ret = _resources.poll(); if(ret==null) { ret = _factory.makeResource(); } return ret; } public void release(T resource) { _resources.add(resource); } }
VersionedStore類中採用兩種檔案實現原子性和同步性:
VersionFile, _root + version, 真正的資料儲存檔案
TokenFile, _root + version + “.version”, 標誌位檔案, 標誌version檔案是否完成寫操作, 以避免讀到正在更新的檔案
4. 待續private void persist(Map<Object, Object> val, boolean cleanup) throws IOException { byte[] toWrite = Utils.serialize(val);//序列化要寫入的資料 String newPath = _vs.createVersion();//建立新的versionfile的path FileUtils.writeByteArrayToFile(new File(newPath), toWrite);//資料寫入versionfile _vs.succeedVersion(newPath);//呼叫succeedVersion, 建立tokenfile以標誌versionfile的寫入完成 if(cleanup) _vs.cleanup(4);//清除舊版本, 只保留4個版本 }