自定義Source實現斷點續傳
在Flume進行實時資料的採集過程中,可能會出現宕機的情況,再重新工作時不能斷點續傳,導致資料丟失或資料讀取重複而浪費資源。自定義Source可以解決這個問題。
1)寫一個類繼承AbstractSource抽象類並實現Configurable介面和EventDrivenSource介面。
2)宣告自定義source中的引數:filePath(檔案路徑),offsetPath(存放偏移量檔案的路徑),interval(檔案中沒有新內容寫入時sleep的引數),charset(字符集編碼)。
3)重寫Configurable的抽象方法configure,獲取source中的引數值。
4)寫一個synchronized修飾的start方法,建立執行緒池、建立實現了Runnerble的介面,將executor放入執行緒池中,呼叫其父類的start()方法。
5)寫一個synchronized修飾的stop方法,呼叫其父類的stop()方法,在停止flume時執行一次。
6)寫一個靜態內部類實現Runnable介面。定義偏移量的初始值0LWie內部類的成員變數。類中是實現Runnable介面方法的構造方法,裡面寫斷點續傳邏輯:判斷是否有偏移量檔案(如果不存在就建立一個);如果存在的話就讀取檔案中是否記錄了偏移量,將偏移量轉為Long型別;如果記錄了偏移量,就接著偏移量位置讀,沒有的話就從偏移量的初始值開始讀。
7)重寫實現Runnable介面類的run方法:定期讀取檔案,判斷是否有新內容,如果有的話將資料封裝到Event物件中,傳送給Channel,獲取最新偏移量值,再更新偏移量;如果日誌檔案中沒有新的內容,就讓執行緒sleep一下。
詳細程式碼如下:
package flume; import org.apache.commons.io.FileUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder;import org.apache.flume.source.AbstractSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.charset.Charset; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 自定義source,實現多點續傳 */
public class MyFileSource extends AbstractSource implements Configurable,EventDrivenSource { private static final Logger logger = LoggerFactory.getLogger(MyFileSource.class); private String filePath; private String offsetPath; private Long interval; private String charset; private ExecutorService executor; private MyRunnable myRunner; public void configure(Context context) { filePath = context.getString("filePath"); offsetPath = context.getString("offsetPath"); interval = context.getLong("interval",1000L); charset = context.getString("charset","UTF-8"); } public synchronized void start(){ //建立一個執行緒池 | 得到一個Channel物件 executor = Executors.newSingleThreadExecutor(); ChannelProcessor channelProcessor = getChannelProcessor(); //new一個Runnable的物件 myRunner = new MyRunnable(filePath,offsetPath,interval,charset,channelProcessor); //將exector放入執行緒池中 executor.execute(myRunner); super.start(); }
public synchronized void stop() { //停掉執行緒 myRunner.setFlag(false); //停掉執行緒池 executor.shutdown(); while (!executor.isTerminated()) { logger.debug("Waiting for exec executor service to stop"); try { executor.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.debug("Interrupted while waiting for exec executor service " + "to stop. Just exiting."); Thread.currentThread().interrupt(); } } super.stop(); } public static class MyRunnable implements Runnable{ private String filePath; private String offsetPath; private Long interval; private String charset; private ChannelProcessor channelProcessor; private Long offset = 0L; private File osfile; private boolean flag = true; private RandomAccessFile raf; public MyRunnable(String filePath, String offsetPath, Long interval, String charset, ChannelProcessor channelProcessor) { this.filePath = filePath; this.offsetPath = offsetPath; this.interval = interval; this.charset = charset; this.channelProcessor = channelProcessor; //把偏移量檔案裝進File物件裡 osfile = new File(offsetPath); //判斷是否有偏移量檔案,如果不存在就建立一個 if(!osfile.exists()){ try { osfile.createNewFile(); } catch (IOException e) { logger.debug("create osfile error",e); } } //存在的話,判斷檔案裡有沒有內容,先得到檔案中的內容轉為String try { String offsetStr = FileUtils.readFileToString(osfile); //如果有內容的話,就把內容轉為Long型別,沒有的話就初值 if(offsetStr!=null && !"".equals(offsetStr)){ offset = Long.parseLong(offsetStr); } } catch (IOException e) { logger.debug("read offset error",e); } //如果有偏移量,就接著讀,沒有的話就從頭讀 new一個隨機讀取檔案內容的物件 try { raf = new RandomAccessFile(filePath, "r"); raf.seek(offset); } catch (FileNotFoundException e) { logger.debug("file not found error",e); } catch (IOException e) { logger.debug("read offset error",e); } } public void run() { while(flag) { //定期讀取檔案,是否有新內容 try { String line = raf.readLine(); //將資料封裝進event物件 if(line!=null){ Event event = EventBuilder.withBody(line, Charset.forName(charset)); //event物件傳送給channel channelProcessor.processEvent(event); //獲取新的偏移量,再更新偏移量 offset = raf.getFilePointer(); FileUtils.writeStringToFile(osfile, offset + ""); }else { Thread.sleep(interval); } } catch (IOException e) { logger.debug("read line error",e); } catch (InterruptedException e) { logger.debug("thread sleep error",e); } } } public void setFlag(boolean flag){ this.flag = flag; } } }
相關推薦
自定義Source實現斷點續傳
在Flume進行實時資料的採集過程中,可能會出現宕機的情況,再重新工作時不能斷點續傳,導致資料丟失或資料讀取重複而浪費資源。自定義Source可以解決這個問題。1)寫一個類繼承AbstractSource抽象類並實現Configurable介面和EventDr
用AsyncTask實現斷點續傳
asynctask實現文件下載與斷點續傳 在學習四大組件之一的service時,正好可以利用asyncTask 和OKhttp來進行斷點續傳,並在手機的前臺顯示下載進度。 嘗試下載的是Oracle官網上的jdk1.7 在AS中使用OKhttp,只需要簡單的在app/build.grad
rsync 實現斷點續傳
direct pan 悲劇 文件 byte font scp命令 size receive Linux 主機之間即時傳送文件,scp命令大家都很熟悉但當要傳送的文件較大,過程中如果網絡中斷了,就比較悲劇了。這時候可以考慮使用rsync命令替代scp,實現斷點續傳文件。 試驗
用Java實現斷點續傳 (HTTP)
斷點續傳的原理 其實斷點續傳的原理很簡單,就是在 Http 的請求上和一般的下載有所不同而已。 打個比方,瀏覽器請求伺服器上的一個文時,所發出的請求如下: 假設伺服器域名為 www.sjtu.edu.cn,檔名為 down.zip。 GET /down.zip HTTP/1
5.2 SpringBoot實現斷點續傳功能 > 我的程式猿之路:第四十二章
功能使用webuploader元件分片下載檔案 文件地址: http://fex.baidu.com/webuploader/document.html 從 http://fex.baidu.com/webuploader/download.html中下載 用到的是:
iOS實現斷點續傳
網路下載是我們在專案中經常要用到的功能,如果是小檔案的下載,比如圖片和文字之類的,我們可以直接請求源地址,然後一次下載完畢。但是如果是下載較大的音訊和視訊檔案,不可能一次下載完畢,使用者可能下載一段時間,關閉程式,回家接著下載。這個時候,就需要實現斷點續傳的功能。讓使用者可以隨時暫停下載,下次開始下
java多執行緒實現斷點續傳下載
public class DownloadThread extends Thread {private int id;private int startindex;private int endindex;private String path;static int threadfinishedcount=0
flume高併發優化——(8)多檔案source擴充套件斷點續傳
在很多情況下,我們為了不丟失資料,一般都會為資料收集端擴充套件斷點續傳,而隨著公司日誌系統的完善,我們在原有的基礎上開發了斷點續傳的功能,以下是思路,大家共同討論:核心流程圖: 原始碼:/* * 作者:許恕
Python實現斷點續傳下載檔案,大檔案下載還怕下載到一半就斷了嗎?不存在!
這篇部落格簡單介紹python斷點續傳下載檔案,並加入花哨的下載進度顯示方法,涉及Python檔案操作的技巧,和一些函式庫的使用。 環境 Python 3.6 requests模組 對應檔案的下載連結 (要下載的檔案必須支援斷點續傳) (是不是很少東西
簡單實現斷點續傳+MVP+Retrofit+RxJava
依賴: compile 'com.squareup.retrofit2:retrofit:2.0.1' compile 'com.squareup.retrofit2:converter-gson:2.0.1' compile 'com.s
flink1.7自定義source實現
flink讀取source data 資料的來源是flink程式從中讀取輸入的地方。我們可以使用StreamExecutionEnvironment.addSource(sourceFunction)將源新增到程式中。 flink附帶大量預先實現好的各種讀取資料來源的函式,也可以通過為非並行源去實現Sour
Java 實現斷點續傳 (HTTP)
公司需要用Java做斷點續傳的實現,沒有接觸過,不過根據自己的理解就是檔案接著上次傳輸的繼續完成傳輸,具體的操作看到IBM這位仁兄的例子。 1、斷點續傳的原理 其實斷點續傳的原理很簡單,就是在 Http 的請求上和一般的下載有所不同而已。 打個比方,瀏覽器請求伺服
http range實現斷點續傳(斷點續載)
使用http range實現斷點續傳(伺服器端): public void downRangeFile( File downloadFile, HttpServletResponse response,
libcurl實現斷點續傳
實現斷點續傳主要就是通過curl_easy_setopt設定好CURLOPT_RESUME_FROM_LARGE屬性完成 首先獲取已下載檔案大小,然後設定CURLOPT_RESUME_FROM_LARGE屬性,從指定位元組開始下載 #include <sys/sta
C#檔案下載(實現斷點續傳)
1 public class WebDown 2 { 4 /// 下載檔案方法 6 /// 檔案儲存路徑和檔名 7 /// 返回伺服器檔名 9 public static bool DeownloadFil
用 Java 實現斷點續傳 (HTTP)
/* /* * SiteFileFetch.java */ package NetFox; import java.io.*; import java.net.*; public class SiteFileFetch extends Thread { Sit
Android中自定義MultipartEntity實現檔案上傳以及使用Volley庫實現檔案上傳
最近在參加CSDN部落格之星,希望大家給投一票,謝謝啦~ 點這裡投我一票吧~前言在開發當中,我們常常需要實現檔案上傳,比較常見的就是圖片上傳,比如修改個頭像什麼的。但是這個功能在Android和iOS中都沒有預設的實現類,對於And
iOS-AFNetworking實現斷點續傳功能
上一節說到利用系統原生的方法實現斷點續傳功能,這次我們說下當下最時尚的第三方網路請求庫AFNetworking,利用AFNet實現斷點續傳。其實原理是和NSURLSessionDataTask一樣的。因為我們都知道AFNet在3.0之後是封裝的NSURLSess
FTP下用reget實現斷點續傳 (FTP下get顯示進度)
同事在Linux通過FTP獲取一個1.3G的大檔案,傳了一個上午,結果在1.1G左右的時候,資料鏈路斷開,ftp命令假死狀態,檔案大小不在增加。如果重新匯入,下午肯定完不成了,得考慮斷點續傳,以前都是通過windows下的工具完成的,目前的問題是:在Unix下如何處理呢?方法
python實現斷點續傳下載檔案
最近的任務裡有一個功能是要我從日誌伺服器實時跟新日誌到本地,日誌在不斷新增內容就需要我隔一段時間從上次下載的位置繼續下載,並寫入本地檔案上次寫完的位置後面。 headers = {'Range': 'bytes=%d-' % local_f