大型網際網路高併發解決方案之訊息中介軟體技術-activeMQ詳解
點選上方藍字關注的都是靚仔和仙女
概述
ActiveMQ是Apache所提供的一個開源的訊息系統,完全採用Java來實現,因此,它能很好地支援J2EE提出的JMS(Java Message Service,即Java訊息服務)規範。JMS是一組Java應用程式介面,它提供訊息的建立、傳送、讀取等一系列服務。JMS提供了一組公共應用程式介面和響應的語法,類似於Java資料庫的統一訪問介面JDBC,它是一種與廠商無關的API,使得Java程式能夠與不同廠商的訊息元件很好地進行通訊。
JMS支援兩種訊息傳送和接收模型。一種稱為P2P(Ponit to Point)模型,即採用點對點的方式傳送訊息。P2P模型是基於佇列的,訊息生產者傳送訊息到佇列,訊息消費者從佇列中接收訊息,佇列的存在使得訊息的非同步傳輸稱為可能,P2P模型在點對點的情況下進行訊息傳遞時採用。
另一種稱為Pub/Sub(Publish/Subscribe,即釋出-訂閱)模型,釋出-訂閱模型定義瞭如何向一個內容節點發布和訂閱訊息,這個內容節點稱為topic(主題)。主題可以認為是訊息傳遞的中介,訊息釋出這將訊息釋出到某個主題,而訊息訂閱者則從主題訂閱訊息。主題使得訊息的訂閱者與訊息的釋出者互相保持獨立,不需要進行接觸即可保證訊息的傳遞,釋出-訂閱模型在訊息的一對多廣播時採用。
ActiveMQ的安裝
下載最新的安裝包apache-activemq-5.13.2-bin.tar.gz(此包linux下的,案例也是針對linux系統進行闡述,當然ActiveMQ也有win版的,這裡就不贅述了),可以去官網下載,也可以在下方留言區留下你的郵箱,博主會發給你的~
下載之後解壓: tar -zvxf apache-activemq-5.13.2-bin.tar.gz
ActiveMQ目錄內容有:
-
bin目錄包含ActiveMQ的啟動指令碼
-
conf目錄包含ActiveMQ的所有配置檔案
-
data目錄包含日誌檔案和永續性訊息資料
-
example: ActiveMQ的示例
-
lib: ActiveMQ執行所需要的lib
-
webapps: ActiveMQ的web控制檯和一些相關的demo
執行命令:activemq start(在activemq/bin下執行)
INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/users/shr/apache-activemq-5.13.2//data/activemq.pid' (pid '986')
檢視activemq是否執行命令:ps -aux | grep activemq
shr 986 1.2 9.7 1281720 201936 pts/5 Sl 19:43 0:17 /users/shr/util/JavaDir/jdk/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/users/shr/apache-activemq-5.13.2//tmp -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data -jar /users/shr/apache-activemq-5.13.2//bin/activemq.jar start
shr 1501 0.0 0.0 5176 724 pts/5 S+ 20:06 0:00 grep activemq
關閉命令: activemq stop
INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Waiting at least 30 seconds for regular process termination of pid '986' :
Java Runtime: Oracle Corporation 1.7.0_79 /users/shr/util/JavaDir/jdk1.7.0_79/jre
Heap sizes: current=63232k free=62218k max=932096k
JVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data
Extensions classpath:
[/users/shr/apache-activemq-5.13.2/lib,/users/shr/apache-activemq-5.13.2/lib/camel,/users/shr/apache-activemq-5.13.2/lib/optional,/users/shr/apache-activemq-5.13.2/lib/web,/users/shr/apache-activemq-5.13.2/lib/extra]
ACTIVEMQ_HOME: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_BASE: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_CONF: /users/shr/apache-activemq-5.13.2/conf
ACTIVEMQ_DATA: /users/shr/apache-activemq-5.13.2/data
Connecting to pid: 986
..Stopping broker: localhost
.. TERMINATED
ActiveMQ的預設服務埠為61616,這個可以在conf/activemq.xml配置檔案中修改:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
案例
在下載的apache-activemq-5.13.2-bin.tar.gz包中解壓有一個jar包:activemq-all-5.13.2.jar,引入這個jar到你的專案中即可開始編寫案例程式碼。
博主的activemq伺服器地址為10.10.195.187,這個在下面程式碼中會有體現。
按照JMS的規範,我們首先需要獲得一個JMS connection factory.,通過這個connection factory來建立connection.在這個基礎之上我們再建立session, destination, producer和consumer。因此主要的幾個步驟如下:
-
獲得JMS connection factory. 通過我們提供特定環境的連線資訊來構造factory。
-
利用factory構造JMS connection
-
啟動connection
-
通過connection建立JMS session.
-
指定JMS destination.
-
建立JMS producer或者建立JMS message並提供destination.
-
建立JMS consumer或註冊JMS message listener.
-
傳送和接收JMS message.
-
關閉所有JMS資源,包括connection, session, producer, consumer等。
下面來看程式碼舉例(P2P式)。
通過Java實現的基於ActiveMQ的請求提交:
package com.zzh.activemq;
import java.io.Serializable;
import java.util.HashMap;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class RequestSubmit
{
//訊息傳送者
private MessageProducer producer;
//一個傳送或者接受訊息的執行緒
private Session session;
public void init() throws Exception
{
//ConnectionFactory連線工廠,JMS用它建立連線
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://10.10.195.187:61616");
//Connection:JMS客戶端到JMS Provider的連線,從構造工廠中得到連線物件
Connection connection = connectionFactory.createConnection();
//啟動
connection.start();
//獲取連線操作
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destinatin = session.createQueue("RequestQueue");
//得到訊息生成(傳送)者
producer = session.createProducer(destinatin);
//設定不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
public void submit(HashMap<Serializable,Serializable> requestParam) throws Exception
{
ObjectMessage message = session.createObjectMessage(requestParam);
producer.send(message);
session.commit();
}
public static void main(String[] args) throws Exception{
RequestSubmit submit = new RequestSubmit();
submit.init();
HashMap<Serializable,Serializable> requestParam = new HashMap<Serializable,Serializable>();
requestParam.put("朱小廝", "zzh");
submit.submit(requestParam);
}
}
建立Session時有兩個非常重要的引數,第一個boolean型別的引數用來表示是否採用事務訊息。如果是事務訊息,對於的引數設定為true,此時訊息的提交自動有comit處理,訊息的回滾則自動由rollback處理。加入訊息不是事務的,則對應的該引數設定為false,此時分為三種情況:
-
Session.AUTO_ACKNOWLEDGE表示Session會自動確認所接收到的訊息。
-
Session.CLIENT_ACKNOWLEDGE表示由客戶端程式通過呼叫訊息的確認方法來確認所接收到的訊息。
-
Session.DUPS_OK_ACKNOWLEDGE使得Session將“懶惰”地確認訊息,即不會立即確認訊息,這樣有可能導致訊息重複投遞。
提供Java實現的基於ActiveMQ的請求處理:
package com.zzh.activemq;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class RequestProcessor
{
public void requestHandler(HashMap<Serializable,Serializable> requestParam) throws Exception
{
System.out.println("requestHandler....."+requestParam.toString());
for(Map.Entry<Serializable, Serializable> entry : requestParam.entrySet())
{
System.out.println(entry.getKey()+":"+entry.getValue());
}
}
public static void main(String[] args) throws Exception
{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://10.10.195.187:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("RequestQueue");
//訊息消費(接收)者
MessageConsumer consumer = session.createConsumer(destination);
RequestProcessor processor = new RequestProcessor();
while(true)
{
ObjectMessage message = (ObjectMessage) consumer.receive(1000);
if(null != message)
{
System.out.println(message);
HashMap<Serializable,Serializable> requestParam = (HashMap<Serializable,Serializable>) message.getObject();
processor.requestHandler(requestParam);
}
else
{
break;
}
}
}
}
輸出結果:
ActiveMQObjectMessage {commandId = 6, responseRequired = false, messageId = ID:hidden-PC-58748-1460550507055-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-58748-1460550507055-1:1:1:1, destination = queue://RequestQueue, transactionId = TX:ID:hidden-PC-58748-1460550507055-1:1:1, expiration = 0, timestamp = 1460550507333, arrival = 0, brokerInTime = 1460550505969, brokerOutTime = 1460550509143, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = [email protected], marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
requestHandler.....{朱小廝=zzh}
朱小廝:zzh
可以通過頁面檢視佇列的使用情況,在瀏覽器中輸入http://10.10.195.187:8161/admin/queues.jsp,使用者名稱和密碼都是:admin,看到以下頁面:
這個是在jetty伺服器下跑的,可以修改conf/jetty.xml來修改相關jetty配置。
上面的例子是關於P2P模式的,不過有個不妥之處,就是沒有資源的釋放。下面舉一個Pub/Sub模式的。
通過JMS建立ActiveMQ的topic,並給topic傳送訊息:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.Produce;
public class TopicRequest
{
//訊息傳送者
private MessageProducer producer;
//一個傳送或者接受訊息的執行緒
private Session session;
//Connection:JMS客戶端到JMS Provider的連線
private Connection connection;
public void init() throws Exception
{
//ConnectionFactory連線工廠,JMS用它建立連線
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://10.10.195.187:61616");
//從構造工廠中得到連線物件
connection = connectionFactory.createConnection();
//啟動
connection.start();
//獲取連線操作
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MessageTopic");
producer = session.createProducer(topic);
//設定不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
public void submit(String mess) throws Exception
{
TextMessage message = session.createTextMessage();
message.setText(mess);
producer.send(message);
}
public void close()
{
try
{
if(session != null)
session.close();
if(producer != null)
producer.close();
if(connection !=null )
connection.close();
}
catch (JMSException e)
{
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception
{
TopicRequest topicRequest = new TopicRequest();
topicRequest.init();
topicRequest.submit("I'm first");
topicRequest.close();
}
}
訊息傳送到對應的topic後,需要將listener註冊到需要訂閱的topic上,以便能夠接收該topic的訊息:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicReceive
{
private MessageConsumer consumer;
private Session session;
public void init() throws Exception
{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://10.10.195.187:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MessageTopic");
consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message message)
{
TextMessage tm = (TextMessage) message;
System.out.println(tm);
try
{
System.out.println(tm.getText());
}
catch (JMSException e)
{
e.printStackTrace();
}
}
});
}
public static void main(String[] args) throws Exception
{
TopicReceive receive = new TopicReceive();
receive.init();
}
}
輸出結果:
ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:hidden-PC-50073-1460597487065-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-50073-1460597487065-1:1:1:1, destination = topic://MessageTopic, transactionId = null, expiration = 0, timestamp = 1460597487308, arrival = 0, brokerInTime = 1460597487297, brokerOutTime = 1460597487298, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = [email protected], marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = I'm first}
I'm first
想更加詳細,更加深入的瞭解activeMQ嗎?
在這裡部落告訴大家一個小祕密
今晚8:30
動腦學院 jack大神
將在騰訊課堂 動腦學院 免費Java公開課中
給大家詳細講解
《大型網際網路高併發解決方案之訊息中介軟體技術-activeMQ》
你只需要在今晚8:30的時候
點選文章最末 閱讀原文
即可進行觀看
推薦閱讀
推薦程式設計師必備微訊號
▼
相關推薦
大型網際網路高併發解決方案之訊息中介軟體技術-activeMQ詳解
點選上方藍字關注的都是靚仔和仙女 概述 ActiveMQ是Apache所提供的一個開源的訊息系統,完全採用Java來實現,因此,它能很好地支援J2EE提出的JMS(Java Message Service,即Java訊息服務)規範。JMS是一組Java應用程式介面,它提供訊息的建立、傳送、讀取等一系列服
Java高併發解決方案之非同步處理
(() -> { // 請求1 CompletableFuture<List<Integer>> completionStage1 = CompletableFuture.supplyAsync(() -> { //
高併發解決方案之負載均衡
1.什麼是負載均衡? 當一臺伺服器的效能達到極限時,我們可以使用伺服器叢集來提高網站的整體效能。那麼,在伺服器叢集中,需要有一臺伺服器充當排程者的角色,使用者的所有請求都會首先由它接收,排程者再根據每
java系統高併發解決方案之圖片伺服器分離
說明一下: 1、圖片服務通過lvs作為入口,處理能力上還是有保障的。 2、利用nginx直接對外服務,不必用squid。 3、圖中的紅線是指主nginx會將/2006和/2007年的圖片分別代理到兩臺存檔伺服器,如果發現主nginx的cpu佔用比較大,那麼可以考慮使用nginx的proxy_store將圖片存
大型網站應用之海量資料和高併發解決方案總結
一、網站應用背景 開發一個網站的應用程式,當用戶規模比較小的時候,使用簡單的:一臺應用伺服器+一臺資料庫伺服器+一臺檔案伺服器,這樣的話完全可以解決一部分問題,也可以通過堆硬體的方式來提高網站應用的訪問效能,當然,也要考慮成本的問題。 當問題的規模在經濟條件下通過堆硬體的
大型網站應用之海量資料和高併發解決方案總結一二
一、網站應用背景 開發一個網站的應用程式,當用戶規模比較小的時候,使用簡單的:一臺應用伺服器+一臺資料庫伺服器+一臺檔案伺服器,這樣的話完全可以解決一部分問題,也可以通過堆硬體的方式來提高網站應用的訪問效能,當然,也要考慮成本的問題。 當問題的規模在經濟條件
大規模分散式應用之海量資料和高併發解決方案總結視訊教程網盤
大規模分散式應用之海量資料和高併發解決方案總結視訊教程網盤 39套Java架構師,高併發,高效能,高可用,分散式,叢集,電商,快取,微服務,微信支付寶支付,公眾號開發,java8新特性,P2P金融專案,程式設計,功能設計,資料庫設計,第三方支付,web安全,效能調優,設計模式,資料結構,併發程式
大規模分散式應用之海量資料和高併發解決方案總結
一、網站應用背景 開發一個網站的應用程式,當用戶規模比較小的時候,使用簡單的:一臺應用伺服器+一臺資料庫伺服器+一臺檔案伺服器,這樣的話完全可以解決一部分問題,也可以通過堆硬體的方式來提高網站應用的訪問效能,當然,也要考慮成本的問題。 當問題的規模在經濟條件下通過堆硬體的
高併發解決方案(負載均衡)
1,什麼是負載均衡? 當一臺伺服器的效能達到極限時,我們可以使用伺服器叢集來提高網站的整體效能。那麼,在伺服器叢集中,需要有一臺伺服器充當排程者的角色,使用者的所有請求都會首先由它接收,排程者再根據每臺伺服器的負載情況將請求分配給某一臺後端伺服器去處理。 那麼在這個過程中,排程者如何合理分配
高併發解決方案 -負載均衡
上一篇文章說過會轉載一篇負載均衡的介紹方面的文章,就是下面這個了~~~ 什麼是負載均衡? 當一臺伺服器的效能達到極限時,我們可以使用伺服器叢集來提高網站的整體效能。那麼,在伺服器叢集中,需要有一臺伺服器充當排程者的角色,使用者的所有請求都會首先由它接收,排程者再根據每臺伺服器的負載情
Java高併發解決方案
Java高併發,如何解決,什麼方式解決 對於我們開發的網站,如果網站的訪問量非常大的話,那麼我們就需要考慮相關的併發訪問問題了。而併發問題是絕大部分的程式設計師頭疼的問題, 但話又說回來了,既然逃避不掉,那我們就坦然面對吧~今天就讓我們一起來研究一下常見的併
長文慎入-探索Java併發程式設計與高併發解決方案[轉]
轉自:https://yq.aliyun.com/articles/636038 所有示例程式碼,請見/下載於https://github.com/Wasabi1234/concurrency 高併發處理的思路及手段
小程式video層級過高 !解決方案之------- cover-view!
在小程式用應用video map等元件時會發現由於這些元件層級過高會覆蓋到我們的導航欄!即使我們設定了定位給z-index 9999, 但是在真機測試時依然會失效!那麼就要用到元件cover-view 用cover-view寫導航欄就可以覆蓋到video上面了!但是有幾個小坑要注意! 1
高併發解決方案-mysql篇
1、mysql篇 高併發大多的瓶頸在後臺,在儲存,mysql的正常的優化方案如下: 1)程式碼中sql語句優化 2)資料庫欄位優化,索引優化 3)加快取,redis/memcache等 4)主從,讀寫分離 5)分割槽表 6)垂直拆分,解耦模組 7)水平切分 點評: 1、1
熱點賬戶高併發解決方案
背景:2018年初,直播答題風靡全國。來的快,去的也快,抖音突然崛起,具有了挑戰微信的實力。 我司與頭條合作,負責頭條的紅包雨業務。頭條要求,200tps,最後最高達到140tps。 自此之後,公司開始了賬戶優化。 此前已經對第三方支付的賬戶進行了詳
分散式事務解決方案之訊息最終一致性(可靠訊息服務)下篇
背景:1.支付成功 通知訂單完成2.訂單完成,通知會計記賬上游訂單服務,必須開放可查詢訂單狀態介面,判斷訊息是否可以傳送下游會計消費成功後,必須回撥訊息服務,ACK操作(約束:冪等性。 例如:訊息id等)流程:訂單服務: 預儲存訊息 -> 訂單完成 ->
【高併發解決方案】高併發解決方案彙總
第1章 引言隨著網際網路應用的廣泛普及,海量資料的儲存和訪問成為了系統設計的瓶頸問題。對於一個大型的網際網路應用,每天幾十億的PV無疑對資料庫造成了相當高的負載。對於系統的穩定性和擴充套件性造成了極大的問題。通過資料切分來提高網站效能,橫向擴充套件資料層已經成為架構研發人員首選的方式。水平切分資料庫:可以降低
java系統高併發解決方案(轉載)
package com.jb.y2t034.thefifth.web.servlet; import java.io.ByteArrayOutputStream; import java.io.FileOutputStream; import java.io.IOException; impo
Java高併發解決方案(參考文)
對於我們開發的網站,如果網站的訪問量非常大的話,那麼我們就需要考慮相關的併發訪問問題了。而併發問題是絕大部分的程式設計師頭疼的問題,但話又說回來了,既然逃避不掉,那我們就坦然面對吧~今天就讓我們一起來研究一下常見的併發和同步吧。 為了更好的理解併發和同步,我們需要先明白兩個重要的概念:同步和
併發程式設計與高併發解決方案學習(併發程式設計初體驗)
以下都是發生執行緒安全的案例: 模擬5000個請求,併發數200 package vip.fkandy.chapter02; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDown