1. 程式人生 > >發表在IBM Developworks上的文章,Spark Streaming 圖片處理案例介紹

發表在IBM Developworks上的文章,Spark Streaming 圖片處理案例介紹

插播小廣告,本人的《大話Java效能優化》一書已經在亞馬遜、京東、噹噹、天貓出售,提前謝謝大家支援。

原文地址:http://www.ibm.com/developerworks/cn/opensource/os-cn-spark-streaming-picture/

流式處理框架特徵

流式處理框架的特徵主要有以下五個方面。

1. 強實時處理

流式處理需要確保資料的實時產生、實時計算,此外,也需要確保處理結果的實時傳送。大多數流式處理架構多采用記憶體計算方式,即當資料到達後直接在記憶體中計算,只有少量資料會被儲存到硬碟,或者乾脆不儲存資料。這樣的系統架構可以確保我們能夠提供低延遲計算能力,可以快速地進行資料計算,在資料較短的時間內完成計算,體現資料的有用性。對於時效性特別短、潛在價值又很大的資料可以優先計算。

2. 高容錯能力

由於資料很容易丟失,這就需要系統具有一定的容錯能力,要充分地利用好僅有的一次資料計算機會,儘可能全面、準確、有效地從資料流中得出有價值的資訊。

3. 動態變化

一般採用流式處理架構的應用場景都存在資料速率不固定的情況,即可能存在前一時刻資料速率和後一時刻資料速率有較大的差異。這樣的需求要求系統具有很好的可伸縮性,能夠動態適應流入的資料流,具有很強的系統計算能力和大資料流量動態匹配的能力。一方面,在高資料流速的情況下,保證不丟棄資料,或者識別並選擇性地丟棄部分不重要的資料;另一方面,在低資料速率的情況下,保證不會太久或過多地佔用系統資源。

4. 多資料來源

由於可能存在很多的資料來源,而且各資料來源、資料流之間又可能是相互獨立的,所以無法保證資料是有序的,這就需要系統在資料計算過程中具有很好的資料分析和發現規律的能力,不能過多地依賴資料流間的內在邏輯或者資料流內部的內在邏輯。

5. 高可擴充套件

由於資料是實時產生、動態增加的,即只要資料來源處於活動狀態,資料就會一直產生和持續增加下去。可以說,潛在的資料量是無限的,無法用一個具體確定的資料實現對其進行量化。系統在資料計算過程中,無法儲存全部資料。由於硬體中沒有足夠大的空間來儲存這些無限增長的資料,也沒有合適的軟體來有效地管理這麼多資料。

流式處理框架技術需求

針對具有強實時處理、高容錯能力、動態變化、多資料來源、高可擴充套件等特徵的流式處理框架需求,那麼理想的流式處理框架應該表現出低延遲、高吞吐、持續穩定執行和彈性可伸縮等特性,這需要系統設計架構、任務執行方式、高可用性技術等關鍵技術的合理規劃和良好設計。

  • 系統設計架構

系統架構是系統中各子系統間的組合方式,流式處理框架需要選擇特定的系統架構進行流式計算任務的部署。當前,針對流式處理框架較為流行的系統架構主要有無中心節點的 point-point 架構和有中心節點的 Master-Slaves 架構兩種。

(1) 對稱式架構。如圖 1 所示,系統中各個節點的作用是完全相同的,即所有節點之間互相可以做備份,這樣整個系統具有良好的可伸縮性。但是由於不存在中心節點,因此在資源排程、系統容錯、負載均衡等方面需要通過分散式協議幫助實現。目前商業產品 S4、Puma 屬於這類架構,S4 通過 Zookeeper 實現系統容錯、負載均衡等功能。

圖 1. 無中心節點架構
圖 1. 無中心節點架構

(2) 主從式系統架構。如圖 2 所示,系統存在一個主節點和多個從節點。主節點負責系統資源的管理和任務的協調,並完成系統容錯、負載均衡等方面的工作,從節點負責接收來自於主節點的任務,並在計算完成後進行反饋。各從節點間可以選擇是否資料往來,但是系統的整體執行狀態依賴主節點控制。Storm、Spark Streaming 屬於這種架構。

圖 2. 有中心節點架構
圖 2. 有中心節點架構
  • 任務執行方式

任務執行方式是指完成有向任務圖到物理計算節點的部署之後,各個計算節點之間的資料傳輸方式。資料的傳輸方式分為主動推送方式和被動拉取方式兩種。

(1) 主動推送方式。在上游節點產生或計算完資料後,主動將資料傳送到相應的下游節點,其本質是讓相關資料主動尋找下游的計算節點,當下遊節點報告發生故障或負載過重時,將後續資料流推送到其他相應節點。主動推送方式的優勢在於資料計算的主動性和及時性,但由於資料是主動推送到下游節點,往往不會過多地考慮到下游節點的負載狀態、工作狀態等因素,可能會導致下游部分節點負載不夠均衡;

(2) 被動拉取方式。只有下游節點顯式進行資料請求,上游節點才會將資料傳輸到下游節點,其本質是讓相關資料被動地傳輸到下游計算節點。被動拉取方式的優勢在於下游節點可以根據自身的負載狀態、工作狀態適時地進行資料請求,但上游節點的資料可能未必得到及時的計算。

大資料流式計算的實時性要求較高,資料需要得到及時處理,往往選擇主動推送的資料傳輸方式。當然,主動推送方式和被動拉取方式不是完全對立的,也可以將兩者進行融合,從而在一定程度上實現更好的效果。

  • 高可用性技術

流式計算框架的高可用性是通過狀態備份和故障恢復策略實現的。當故障發生後,系統根據預先定義的策略進行資料的重放和恢復。按照實現策略,可以被細分為被動等待 (passive standby)、主動等待 (active standby) 和上游備份 (upstream backup) 這 3 種策略。

(1) 被動等待策略

圖 3 所示,主節點 B 進行資料計算,副本節點 B’處於待命狀態,系統會定期地將主節點 B 上的最新的狀態備份到副本節點 B’上。出現故障時,系統從備份資料中進行狀態恢復。被動等待策略支援資料負載較高、吞吐量較大的場景,但故障恢復時間較長,可以通過對備份資料的分散式儲存縮短恢復時間。該方式更適合於精確式資料恢復,可以很好地支援不確定性應用計算,在當前流式資料計算中應用最為廣泛。

圖 3. 被動等待策略
圖 3. 被動等待策略

(2) 主動等待策略

圖 4 所示,系統在為主節點 B 傳輸資料的同時,也為副本節點 B’傳輸一份資料副本。以主節點 B 為主進行資料計算,當主節點 B 出現故障時,副本節點 B’完全接管主節點 B 的工作,主副節點需要分配同樣的系統資源。該種方式故障恢復時間最短,但資料吞吐量較小,也浪費了較多的系統資源。在廣域網環境中,系統負載往往不是過大時,主動等待策略是一個比較好的選擇,可以在較短的時間內實現系統恢復。

圖 4. 主動等待策略
圖 4. 主動等待策略

(3) 上游備份策略

每個主節點均記錄其自身的狀態和輸出資料到日誌檔案,當某個主節點 B 出現故障後,上游主節點會重放日誌檔案中的資料到相應副本節點 B’中進行資料的重新計算。上游備份策略所佔用的系統資源最小,在無故障期間,由於副本節點 B’保持空閒狀態,資料的執行效率很高。但由於其需要較長的時間進行恢復狀態的重構,故障的恢復時間往往較長,如需要恢復時間視窗為 30 分鐘的聚類計算,就需要重放該 30 分鐘內的所有元組。可見,於系統資源比較稀缺、運算元狀態較少的情況,上游備份策略是一個比較好的選擇方案。如圖 5 和圖 6 所示。

圖 5. 上游備份策略 1
圖 4. 主動等待策略
圖 6. 上游備份策略 2
圖 6. 上游備份策略 2

Spark Streaming 所處地位

Spark Streaming 是 Spark 的擴充套件,專門用來實現流式分析方式處理資料。Spark Streaming 支援 Kafka、Flume、Twitter、ZeroMQ、Kinesis、TCP Sockets 等多種資料來源。此外,也可以使用一個複雜的演算法,如 map、reduce、join、window,這些來處理資料。處理完的資料可以被髮送給檔案系統、資料庫、其他第三方。圖 7 引用自 Spark Streaming 官網,比較好地描述了 Spark Streaming 的地位。

圖 7. Spark Streaming 地位
圖 7. Spark Streaming 地位

Spark Streaming 接收輸出資料流,然後將這些資料分割後放入批處理流程 (batches),Spark 引擎稍後會處理這些資料,最終生成計算結果併發送到外部系統。

筆者的前一篇文章已經詳細地通過 WordCount 示例介紹了 Spark Streaming 的執行次序、基本架構、RDD 概念,請讀者參閱文章《Spark Streaming 新手指南》。

Spark Streaming 應用例項

我們以一個流式處理圖片的例子作為本文的例項。我們把圖片檔案通過基於 Spark Streaming 的程式讀取成資料流,重新將資料流寫成圖片檔案並存儲在檔案系統上。

整個程式的流程圖如圖 8 所示。

圖 8. 圖片處理程式流程圖
圖 8. 圖片處理程式流程圖

如圖 8 所示,第一步我們需要實現一個服務,該服務不停地向 HDFS 檔案系統裡寫入圖片檔案,這些圖片檔案後續會被用來當作資料來源的原始資料,並被進行處理。程式碼如清單 1 所示。

清單 1. 迴圈寫入圖片檔案程式碼
public ServerSocket getServerSocket(int port){
ServerSocket server=null;
try {
server = new ServerSocket();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return server;
}

public void sendData(String path,ServerSocket server){
OutputStream out=null;
FileInputStream in=null;
BufferedOutputStream bf =null;
try {
out = server.accept().getOutputStream();
File file = new File(path);
in = new FileInputStream(file);
bf = new BufferedOutputStream(out);
byte[] bt = new byte[(int)file.length()];
in.read(bt);
bf.write(bt);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(in!=null){
try {
in.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(bf!=null){
try {
bf.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(out!=null){
try {
out.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(!server.isClosed()){
try {
server.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
if(args.length<4){
System.err.println("Usage:server3 <port> <file or dir> <send-times> <sleep-time(ms)>");
System.exit(1);
}

Map<Integer, String> fileMap = null;

Server s = new Server();
for (int i = 0; i < Integer.parseInt(args[2]) ; i++) {
ServerSocket server =null;
while(server==null){
server = s.getServerSocket(Integer.parseInt(args[0]));
try {
Thread.sleep(Integer.parseInt(args[3]));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
while(!server.isBound()){
try {
server.bind(new InetSocketAddress(Integer.parseInt(args[0])));
System.out.println("第"+(i+1)+"個服務端繫結成功");
Thread.sleep(Integer.parseInt(args[3]));
} catch (NumberFormatException | IOException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

fileMap = s.getFileMap(args[1]);

System.out.println("fileMap.size="+fileMap.size());
//System.out.println("fileMap="+fileMap);

s.sendData(fileMap.get(s.getNum(0, fileMap.size()-1)), server);
//s.sendData(args[1], server);
}
}


public Map<Integer, String> getMap(String dir,Map<Integer, String> fileMap){
File file = new File(dir);
if(file.isFile()){
if(file.getName().endsWith(".jpg")||file.getName().endsWith(".bmp")|file.getName().
                                            endsWith(".JPG")||file.getName().endsWith(".BMP")){
if(file.length()<1024*1024*2){
fileMap.put(fileMap.size(),file.getAbsolutePath());
}
}else{
}
}
if(file.isDirectory()){
File[] files = file.listFiles();
for (int j = 0; j < files.length; j++) {
getMap(files[j].getAbsolutePath(), fileMap);
}
}
return fileMap;
}

public Map<Integer, String> getFileMap(String dir){
Map<Integer, String> fileMap = new HashMap<Integer, String>();
return getMap(dir, fileMap);
}

public int getNum(int offset,int max){
int i = offset+(int)(Math.random()*max);
if(i>max){
return i-offset;
}else{
return i;
}
}

接下來開啟一個程式,實現開啟 Socket 監聽,從指定埠讀取圖片檔案,這裡使用的是 Spark Streaming 的 socketStream 方法獲取資料流。程式程式碼是用 Scala 語言編寫的,如清單 4 所示。

清單 2. 讀取檔案
val s = new SparkConf().setAppName("face")
 val sc = new SparkContext(s)
 val ssc = new StreamingContext(sc, Seconds(args(0).toInt))
 val img = new ImageInputDStream(ssc, args(1), args(2).toInt, 
                   StorageLevel.MEMORY_AND_DISK_SER)//呼叫重寫的 ImageInputDStream 方法讀取圖片
 val imgMap = img.map(x => (new Text(System.currentTimeMillis().toString), x))
 imgMap.saveAsNewAPIHadoopFiles("hdfs://spark:9000/image/receiver/img", "", classOf[Text], 
           classOf[BytesWritable], classOf[ImageFileOutputFormat],
                       ssc.sparkContext.hadoopConfiguration)//呼叫 ImageFileOutputFormat 方法寫入圖片

 imgMap.map(x => (x._1, {
 if (x._2.getLength > 0) imageModel(x._2) else "-1"
 }))//獲取 key 的值,即圖片
 .filter(x => x._2 != "0" && x._2 != "-1")
 .map(x => "{time:" + x._1.toString +","+ x._2 + "},").print()

 ssc.start()
ssc.awaitTermination()

清單 2 程式碼設定 Spark 上下文環境,設定了每隔多少時間 (使用者輸入的第一個引數,單位:秒) 讀取一次資料來源,然後開始呼叫重寫的方法讀入圖片,我們需要對圖片進行分析,分析過程不是本程式關注的重點,這裡忽略,讀者可以自己網上搜索圖片分析的開源庫,匯入即可實現圖片分析功能。

清單 3 當中自己定義了一個 Scala 類 ImageInputDStream,用於載入 Java 的讀入圖片類。

清單 3. Scala 實現讀取檔案
class ImageInputDStream(@transient ssc_ : StreamingContext,host: String,port: 
                          Int,storageLevel: StorageLevel) extends 
                                                ReceiverInputDStream[BytesWritable](ssc_) with Logging{
 override def getReceiver(): Receiver[BytesWritable] = {
 new ImageRecevier(host,port,storageLevel)
 }
}


class ImageRecevier(host: String,port: Int,storageLevel: StorageLevel) extends 
                                        Receiver[BytesWritable](storageLevel) with Logging{
 override def onStart(): Unit = {
 new Thread("Image Socket"){
 setDaemon(true)
 override def run(): Unit = {
 receive()
 }
 }.start()
 }

 override def onStop(): Unit = {

 }

 def receive(): Unit ={
 var socket:Socket=null
 var in:InputStream =null
 try{
 logInfo("Connecting to " + host + ":" + port)
 socket = new Socket(host, port)
 logInfo("Connected to " + host + ":" + port)
 in= socket.getInputStream
 val buf = new ArrayBuffer[Byte]()
 var bytes = new Array[Byte](1024)
 var len = 0
 while(-1 < len){
 len=in.read(bytes)
 if(len > 0){
 buf ++=bytes
 }
 }
 val bw = new BytesWritable(buf.toArray)
 logError("byte:::::"+ bw.getLength)
 store(bw)
 logInfo("Stopped receiving")
 restart("Retrying connecting to " + host + ":" + port)
 }catch {
 case e: java.net.ConnectException =>
 restart("Error connecting to " + host + ":" + port, e)
 case t: Throwable =>
 restart("Error receiving data", t)
 }finally {
 if(in!=null){
 in.close()
 }
 if (socket != null) {
 socket.close()
 logInfo("Closed socket to " + host + ":" + port)
 }
 }
 }

清單 2 裡面定義了寫回圖片檔案時需要呼叫 ImageFileOutputFormat 類,這個類繼承了 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat 類,通過緩衝讀取的方式加快資料讀取。程式碼如清單 4 所示。

清單 4. 寫入檔案
public class ImageFileOutFormat extends FileOutputFormat<Text,BytesWritable> {
 @Override
 public RecordWriter<Text, BytesWritable> getRecordWriter(TaskAttemptContext taskAttemptContext)
                                                 throws IOException, InterruptedException {
 Configuration configuration = taskAttemptContext.getConfiguration();
 Path path = getDefaultWorkFile(taskAttemptContext, "");
 FileSystem fileSystem = path.getFileSystem(configuration);
 FSDataOutputStream out = fileSystem.create(path,false);
 return new ImageFileRecordWriter(out);
 }


 protected class ImageFileRecordWriter extends RecordWriter<Text, BytesWritable>{

 protected DataOutputStream out;
 private final byte[] keyValueSeparator;
 private static final String colon=",";

 public ImageFileRecordWriter(DataOutputStream out){
 this(colon,out);
 }

 public ImageFileRecordWriter(String keyValueSeparator,DataOutputStream out) {
 this.out=out;
 this.keyValueSeparator = keyValueSeparator.getBytes();
 }

 @Override
 public void write(Text text, BytesWritable bytesWritable) throws IOException, InterruptedException {
 if(bytesWritable!=null){
 out.write(bytesWritable.getBytes());
 }
 }

 @Override
 public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
 out.close();
 }
 }
}

通過清單 1-4 的程式,我們可以實現讀入圖片檔案->對圖片進行一些業務處理->寫回分析成果物 (文字資訊、圖片)。

結束語

通過本文的學習,讀者可以大致瞭解流式處理框架的設計原理、Spark Streaming 的工作原理,並通過一個讀取、分析、寫入圖片的示例幫助讀者進行加深瞭解。目前市面上釋出的 Spark 中文書籍對於初學者來說大多較為難讀懂,更沒有專門針對 Spark Streaming 的文章。作者力求推出一系列 Spark 文章,讓讀者能夠從實際入手的角度來了解 Spark Streaming。後續除了應用之外的文章,還會致力於基於 Spark 及 Spark Streaming 的系統架構、原始碼解釋等方面的文章釋出。


相關推薦

發表IBM Developworks文章Spark Streaming 圖片處理案例介紹

插播小廣告,本人的《大話Java效能優化》一書已經在亞馬遜、京東、噹噹、天貓出售,提前謝謝大家支援。 原文地址:http://www.ibm.com/developerworks/cn/opensource/os-cn-spark-streaming-picture/

Spark Streaming 圖片處理案例介紹

前文回顧 前文《Spark Streaming 新手指南》介紹了 Spark Streaming 的基本工作原理,並以 WordCount 示例進行解釋。此外,針對 Spark Streaming 的優缺點也做了一些描述。 本文重點主要是解釋流式處理架構的

Apache 流框架 FlinkSpark StreamingStorm對比分析(2)

此文已由作者嶽猛授權網易雲社群釋出。 歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。 2.Spark Streaming架構及特性分析 2.1 基本架構 基於是spark core的spark streaming架構。 Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處

kafka(六):與spark streaming對接spark streaming接收kafka資料來源

1.功能實現 spark streaming從kafka接收資料,有兩種方式,receiver和direct兩種方式。 2.pom依賴 針對kafka_2.10-0.8.2.1版本         <!-- https

Apache 流框架 FlinkSpark StreamingStorm對比分析(二)

本文由 網易雲 釋出2.Spark Streaming架構及特性分析2.1 基本架構基於是spark core的spark streaming架構。Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark,也就是把Spark Str

Apache 流框架 FlinkSpark StreamingStorm對比分析(一)

本文由 網易雲 釋出1.Flink架構及特性分析Flink是個相當早的專案,開始於2008年,但只在最近才得到注意。Flink是原生的流處理系統,提供high level的API。Flink也提供 API來像Spark一樣進行批處理,但兩者處理的基礎是完全不同的。Flink把

FlinkSpark StreamingStorm對比分析

1.Flink架構及特性分析Flink是個相當早的專案,開始於2008年,但只在最近才得到注意。Flink是原生的流處理系統,提供high level的API。Flink也提供 API來像Spark一樣進行批處理,但兩者處理的基礎是完全不同的。Flink把批處理當作流處理中的

關於IBM developworks的tomcat comet示例

IBM developworks網站上這個示例的網址:使用 Java 實現 Comet 風格的 Web 應用, 我們都知道servlet3.0添加了類似comet風格的非同步功能,但是至今除了glassfish3以外,我還沒發現其它實現servlet3.0的web容器。所以

導讀:有多少人知道連結的生命週期有多久?《紐約時報》(New York Times)的專利作家尼克·畢爾頓(Nick Bilton)今天發表了一篇文章引用了網際網路連線分析機構BitLy提供的分析報告

 導讀:有多少人知道連結的生命週期有多久?《紐約時報》(New York Times)的專利作家尼克·畢爾頓(Nick Bilton)今天發表了一篇文章,引用了網際網路連線分析機構BitLy提供的分析報告,向外界展示了網際網路上鍊接的短暫生命週期。   以下為分析全文:  

一篇文章學會spark-streaming

1.什麼是spark-streaming? 實際生產中會有許多應用到實時處理的場景,比如:實時監測頁面點選,實時監測系統異常,實時監測來自於外部的攻擊。針對這些場景,twitter研發了實時資料處理工具storm,並在後來開源。spark針對這些場景設計了spark

Spark Streaming實時處理應用

1 框架一覽   事件處理的架構圖如下所示。 2 優化總結   當我們第一次部署整個方案時,kafka和flume元件都執行得非常好,但是spark streaming應用需要花費4-8分鐘來處理單個batch。這個延遲的原因有兩點,一是我們使用DataFrame來強化資料,而強化資料需要從h

spark streaming的入門案例

1, spark streaming: tcp 源 maven依賴: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spa

Spark 定製版:004~Spark Streaming事務處理徹底掌握

本講內容: a. Exactly Once b. 輸出不重複 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧: 上節課通過案例透視了Spark Streaming Job架構和執行機,並結合原

Spark版本定製4-Spark Streaming事務處理徹底理解

本講內容: a. Exactly Once  b. 輸出不重複 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧: 上節課通過案例透視了Spark Streaming Job架構和執行機,並結合原始碼進行了詳細

spark streaming 同時處理兩個不同kafka叢集的資料

如題,總是不那麼完美,要處理的資料在兩個不同的kafka叢集裡面,日子得過,問題也得解決,我們建立兩個DStream,連線兩個不同的kafka叢集的不同topic,然後再把這兩個DStream union在一起處理,程式碼如下: package com.king

串列埠高效能處理串列埠資料按位讀取處理案例

在現在的工控或者家用裝置通訊專案中,用到很多串列埠或者類串列埠通訊協議,其中 很多協議需要讀取操作,在讀取中為了防止阻塞,提高處理效能,縮短處理時間經常用到 select 函式來 讀取串列埠資料,select 是linux 真是個 神器啊,監控某一個檔案或者裝置,當有緩衝過來即可處理,而為了試用不同協議的長短

spark streaming應用提交到yarn一直處於ACCEPTED狀態也未報錯

原因已經找到,這裡做個記錄,防止下次再犯類似的錯誤。實際上是因為程式碼中將執行模式設定為本地模式,在提交到yarn上後driver端的程式碼正常執行,並且也正常運行了很多批次。但由於是本地模式,所以driver不會向resourcemanager申請資源,所以也就不會向rm註

知乎一位朋友總結的特別好的spark文章很不錯以轉載!

private def addPendingTask(index: Int, readding: Boolean = false) { // Utility method that adds `index` to a list only if readding=false or it's not alr

Spark Streaming從Kafka中獲取數據並進行實時單詞統計統計URL出現的次數

scrip 發送消息 rip mark 3.2 umt 過程 bject ttr 1、創建Maven項目 創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2、啟動Kafka A:安裝ka

緊接著文章既然

sco style () back -c 自己 new spa sta 註意描述:一個是插入隊列(1是可以插入尾部eg一遍的序列尾部追加,2是可以插入中間eg優先隊列) 而移除的描素都是刪除頭部 import java.util.Comparator; impor