1. 程式人生 > >storm中幾個小技巧

storm中幾個小技巧

storm 中小技巧

1、 TimeCacheMap過期快取,採用桶實現,鎖的粒度小,O(1)時間內完成鎖操作,因此,大部分時間內都可以進行get和put操作。幾乎所有的操作都是相對於桶數目線性的(O(numBuckets))。唯一的問題是快取中可能存在過期的資料,也就是說真正的超時時間介於expirationSecs和expirationSecs * (1 + 1 / (numBuckets-1))之間。

 public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
        if(numBuckets<2) {
            throw new IllegalArgumentException("numBuckets must be >= 2");
        }
        _buckets = new LinkedList<HashMap<K, V>>();
        for(int i=0; i<numBuckets; i++) {
            _buckets.add(new HashMap<K, V>());
        }


        _callback = callback;
        final long expirationMillis = expirationSecs * 1000L;
        final long sleepTime = expirationMillis / (numBuckets-1);
        _cleaner = new Thread(new Runnable() {
            @SuppressWarnings("unchecked")
			public void run() {
                try {
                    while(true) {
                        Map<K, V> dead = null;
                        Time.sleep(sleepTime);
                        synchronized(_lock) {
                            dead = _buckets.removeLast();
                            _buckets.addFirst(new HashMap<K, V>());
                        }
                        if(_callback!=null) {
                            for(Entry<K, V> entry: dead.entrySet()) {
                                _callback.expire(entry.getKey(), entry.getValue());
                            }
                        }
                    }
                } catch (InterruptedException ex) {

                }
            }
        });
        _cleaner.setDaemon(true);
        _cleaner.start();
    }
2. 對元組的序列化採用kryo,並且使用了動態池的概念。這樣避免了大量物件建立和銷燬的開銷,增加了效率
public class ThreadResourceManager<T> {
    public static interface ResourceFactory<X> {
        X makeResource();
    }
    
    ResourceFactory<T> _factory;
    ConcurrentLinkedQueue<T> _resources = new ConcurrentLinkedQueue<T>();
    
    public ThreadResourceManager(ResourceFactory<T> factory) {
        _factory = factory;
    }
    
    public T acquire() {
        T ret = _resources.poll();
        if(ret==null) {
            ret = _factory.makeResource();
        }
        return ret;
    }
    
    public void release(T resource) {
        _resources.add(resource);
    }
}
3. 對於supervisor程序和worker程序都在本地儲存一個kv庫,用於同步。kv實現為LocalState,一個基於map實現, 每次讀寫都需要直接操作磁碟的簡單資料庫。每次put資料時都要序列化資料,在序列化資料過程中,為了同步和原子性,採用了VersionedStore類,就是每次更新過程中不會去update現有的檔案, 而是不斷的產生遞增version的檔案, 故每一批更新都會產生一個新的檔案。
 VersionedStore類中採用兩種檔案實現原子性和同步性:
VersionFile, _root + version, 真正的資料儲存檔案 
TokenFile, _root + version + “.version”, 標誌位檔案, 標誌version檔案是否完成寫操作, 以避免讀到正在更新的檔案 
private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
    byte[] toWrite = Utils.serialize(val);//序列化要寫入的資料 
    String newPath = _vs.createVersion();//建立新的versionfile的path
    FileUtils.writeByteArrayToFile(new File(newPath), toWrite);//資料寫入versionfile 
    _vs.succeedVersion(newPath);//呼叫succeedVersion, 建立tokenfile以標誌versionfile的寫入完成 
    if(cleanup) _vs.cleanup(4);//清除舊版本, 只保留4個版本
}
4. 待續