訊息中介軟體之ActiveMQ傳輸檔案
Activemq傳輸檔案
我用的是MQ訊息中介軟體activemq來傳輸檔案的,當然還有很多其他的方式,選擇哪種方式還請自行搜尋各個訊息中介軟體的特點來定。
ActiveMQ傳輸檔案的方式有byteMessage、StreamMessage、BlobMessage。其中bytemessage和streammessage如果不加工處理的話,只能傳輸小檔案,小於100M的檔案應該都可以傳,blobmessage可以傳輸較大的檔案,可以用activemq自帶的fileserver來做檔案的伺服器,但是自activemq5.14之後,因為fileserver的漏洞官方就取消了這一模組,所以想用blobmessage的話,還要自己搭建檔案伺服器如ftp。我這裡主要跟大家講一下byteMessage
先說一下我下面這段程式碼的主要功能吧,功能的主要流程是監聽windows下的某一目錄,如果有檔案(包括新檔案和舊檔案)就將其移動到另一目錄下。
Main方法:初始化訊息生產者,初始化訊息消費者建立消費者執行緒監聽訊息佇列,有檔案的話建立訊息生產者執行緒將檔案以位元組的形式傳送給訊息佇列。
程式碼如下:
publicclassDirTest {
public static void main(String[] args)throwsJMSException {
ExecutorService cachedThreadPool= Executors.newCachedThreadPool
//初始化主類物件
DirTest main=newDirTest();
//初始化生產者
Producer producter = newProducer();
producter.init();
//初始化消費者物件
Consumer comsumer = newConsumer();
comsumer.init();
//從快取執行緒池取出執行緒,啟動消費執行緒,進行移動檔案
cachedThreadPool.execute(newRunnable() {
public void run() {
try {
comsumer.getMessage("sweep");
} catch(JMSException e){
e.printStackTrace();
}
}
});
// new Thread(main.newConsumerMq(comsumer)).start();
//如果目錄下有檔案先移出來
String path="D:\\listener";
File file=newFile(path);
for (File temp:file.listFiles()){
if (temp.isFile()){
//建立生產者執行緒
cachedThreadPool.execute(newRunnable() {
@Override
public void run() {
producter.sendMessage("sweep",temp.getName());
}
});
// new Thread(main.newProductorMq(producter,temp.getName())).start();
}
}
//監聽目錄
try (WatchService ws = FileSystems.getDefault().newWatchService()){
Path dirToWatch = Paths.get(path);
dirToWatch.register(ws,ENTRY_CREATE,ENTRY_MODIFY,ENTRY_DELETE);
while (true) {
WatchKey key = ws.take();
for (WatchEvent<?> event :key.pollEvents()) {
Kind<?>eventKind = event.kind();
if (eventKind == OVERFLOW) {
System.out.println("Event overflow occurred");
continue;
}
WatchEvent<Path> currEvent = (WatchEvent<Path>) event;
Path dirEntry = currEvent.context();
if ("CREATE".equals(eventKind.toString().substring(6,12))){
// System.out.println("目錄新成員:"+dirEntry.toString());
//建立執行緒將新件名傳送至訊息佇列
cachedThreadPool.execute(newRunnable() {
@Override
public void run() {
producter.sendMessage("sweep",dirEntry.toString());
}
});
}
}
boolean isKeyValid = key.reset();
if (!isKeyValid) {
System.out.println("No longer watching "+ dirToWatch);
break;
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
訊息生產者:主要負責傳送檔案位元組流給訊息佇列,並加入回執地址引數,並接收回執訊息。程式碼如下:
/** * 訊息佇列訊息生產者 */ public class Producer { //ActiveMq 的預設使用者名稱 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //ActiveMq 的預設登入密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //ActiveMQ 的連結地址 private static final String BROKEN_URL = "tcp://ip地址:61616"; //連結工廠 ConnectionFactory connectionFactory; //連結物件 ActiveMQConnection connection; //事務管理 ActiveMQSession session; ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>(); ExecutorService threadPool = Executors.newFixedThreadPool(8); /** * 初始化訊息生產者 * @throws JMSException */ public void init() throws JMSException { try { //建立一個連結工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( USERNAME,PASSWORD,BROKEN_URL ); //從工廠中建立一個連結 connection = (ActiveMQConnection)connectionFactory.createConnection(); //開啟連結 connection.start(); //建立一個事務(這裡通過引數可以設定事務的級別) session=(ActiveMQSession) connection.createSession( false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } /** * 將訊息傳送至訊息佇列 * @param disname * @param text */ public void sendMessage(String disname,String text) { InputStream is=null; try { //建立一個點對點訊息佇列,佇列名 Destination queue = session.createQueue(disname); //訊息生產者 MessageProducer messageProducer = null; //設定訊息生產者 if(threadLocal.get()!=null){ messageProducer = threadLocal.get(); }else{ messageProducer = session.createProducer(queue); threadLocal.set(messageProducer); } File file=new File("D:\\listener\\"+text); while (!file.renameTo(file)){ //當該檔案正在被操作時 Thread.sleep(1000); } MessageProducer producer = session.createProducer(queue); BytesMessage bytesMessage=session.createBytesMessage(); is = new FileInputStream(file); // 讀取資料到byte陣列中 byte[] buffer=new byte[is.available()]; is.read(buffer); bytesMessage.writeBytes(buffer); //建立一個回執地址 Destination reback=session.createQueue("reback"); MessageConsumer reConsumer=session.createConsumer(reback); reConsumer.setMessageListener(new reListener(session)); //將回執地址寫到訊息 bytesMessage.setJMSReplyTo(reback); bytesMessage.setStringProperty("FileName",text); producer.send(bytesMessage); is.close(); //刪除檔案 if (file.exists()&&file.isFile()){ file.delete(); } System.out.println(file.getName()+"傳送訊息成功!"); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally { } } private class reListener implements MessageListener { ActiveMQSession session=null; public reListener(ActiveMQSession session) { this.session=session; } @Override public void onMessage(Message message) { threadPool.execute(new Runnable() { @Override public void run() { TextMessage tx= (TextMessage) message; try { System.out.println(tx.getText()+"----"); } catch (JMSException e) { e.printStackTrace(); } } }); } } }
訊息消費者:接收訊息佇列中的檔案流資訊,並寫入本地,將處理後的訊息回執給生產者。程式碼如下:
/** * 訊息佇列消費者 */ public class Consumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEN_URL = "tcp://10.66.10.36:61616"; ConnectionFactory connectionFactory; Connection connection; Session session; ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>(); /** * 初始化訊息佇列消費者連線池、事務 */ public void init(){ try { connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,"tcp://10.66.10.36:61616"); connection = connectionFactory.createConnection(); connection.start(); //建立事務 session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { e.printStackTrace(); } } /** * 建立點對點消費者訊息佇列,通過MessageListener監聽訊息佇列 * @param disname * @throws JMSException */ public void getMessage(String disname) throws JMSException { //建立點對點訊息佇列 Destination queue = session.createQueue(disname); MessageConsumer consumer = null; //設定訊息佇列消費方式(這裡是訊息監聽器) if (threadLocal.get() != null) { consumer = threadLocal.get(); } else { consumer = session.createConsumer(queue); consumer.setMessageListener(new Listener(session)); threadLocal.set(consumer); } } }
Listener監聽類:
/** * 訊息佇列監聽器 */ public class Listener implements MessageListener { private Session session; public Listener(Session session){ this.session=session; } //建立一個可重用固定執行緒數的執行緒池,以共享的無界佇列方式來執行這些執行緒。 private ExecutorService threadPool = Executors.newFixedThreadPool(8); private long startTime=System.currentTimeMillis(); @Override public void onMessage(Message message) { threadPool.execute(new Runnable() { @Override public void run() { String fileName=""; FileOutputStream out=null; try { if (message instanceof BytesMessage) { BytesMessage bytesMessage= (BytesMessage) message; fileName=bytesMessage.getStringProperty("FileName"); out=new FileOutputStream("D:\\sink\\"+fileName); byte[] bytes=new byte[1024]; int len=0; while ((len=bytesMessage.readBytes(bytes))!=-1){ out.write(bytes,0,len); } //獲得回執地址 Destination recall_destination = message.getJMSReplyTo(); // 建立回執訊息 TextMessage textMessage = session.createTextMessage(fileName+"已處理完畢"); // 以上收到訊息之後,從新建立生產者,然後在回執過去 MessageProducer producer = session.createProducer(recall_destination); producer.send(textMessage); } } catch (JMSException e) { //遞迴 if (System.currentTimeMillis()-startTime<=15000){ onMessage(message); } else{ System.out.println(fileName+"檔案傳輸失敗,有可能是檔案過大!"); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { System.out.println(fileName+"傳輸失敗!"); }finally { try { out.close(); } catch (IOException e) { e.printStackTrace(); } } } }); } }