1. 程式人生 > >如果在android中應用activemq

如果在android中應用activemq

      由於andoid專案中有大量需要用到訊息推送,平臺端在實現訊息推送選擇activemq,為了能使用訊息推送,決定研究一下如果在android端實現activemq的訊息推送。
       這個問題說難不難,說易也不易,平臺端開發人員選擇activemq並且認為activemq是用java的,所以認為android使用它應該是很容易的,其實我很想告訴他們:"you are wrong!"。android其實能直接把activemq中的lib庫匯入android並不能正常使用,雖然編譯通過了,但是執行時會報一些類沒有找到的錯誤!在這個問題上我糾結了好久,網上也找過相關資料,根本就找不到。通過僅一天的查詢,除錯,到網路找相關專案,最後終於讓我找到一個能用的專案,毫無疑問,老外寫的。Android-Paho-Mqtt-Service。使用這個專案庫終於可以正常使用activemq的訊息推送了。
     activemq下載地址:http://activemq.apache.org/download-archives.html,我使用的是activemq5.9.1。為什麼不使用最新版本,因為我們伺服器就是使用的這個版本。

     Android-Paho-Mqtt-Service 下載地址:https://github.com/JesseFarebro/Android-Mqtt

     測試專案相關檔案

         1.MainActivity類

public class MainActivity extends Activity {
 @Override
 protected void onCreate(Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  setContentView(R.layout.activity_main);
  MqttService.actionStart(this);
 }
 
 @Override
 protected void onDestroy() {
  // TODO Auto-generated method stub
  super.onDestroy();
  MqttService.actionStop(this);
 }
}

2 MqttService類,這個類檔案是我從Android-Paho-Mqtt-Service專案中下載的檔案,修改了伺服器地址

public class MqttService extends Service implements MqttCallback
{
 public static final String DEBUG_TAG = "MqttService"; // Debug TAG
 
 private static final String	 MQTT_THREAD_NAME = "MqttService[" + DEBUG_TAG + "]"; // Handler Thread ID
 
 private static final String MQTT_BROKER = "192.168.11.62"; // Broker URL or IP Address
 private static final int MQTT_PORT = 1883;	 // Broker Port
 
 public static final int	 MQTT_QOS_0 = 0; // QOS Level 0 ( Delivery Once no confirmation )
 public static final int MQTT_QOS_1 = 1; // QOS Level 1 ( Delevery at least Once with confirmation )
 public static final int	 MQTT_QOS_2 = 2; // QOS Level 2 ( Delivery only once with confirmation with handshake )
 
 private static final int MQTT_KEEP_ALIVE = 240000; // KeepAlive Interval in MS
 private static final String	 MQTT_KEEP_ALIVE_TOPIC_FORAMT = "/users/%s/keepalive"; // Topic format for KeepAlives
 private static final byte[] MQTT_KEEP_ALIVE_MESSAGE = { 0 }; // Keep Alive message to send
 private static final int	 MQTT_KEEP_ALIVE_QOS = MQTT_QOS_0; // Default Keepalive QOS
 
 private static final boolean MQTT_CLEAN_SESSION = true; // Start a clean session?
 
 private static final String MQTT_URL_FORMAT = "tcp://%s:%d"; // URL Format normally don't change
 
 private static final String ACTION_START = DEBUG_TAG + ".START"; // Action to start
 private static final String ACTION_STOP	 = DEBUG_TAG + ".STOP"; // Action to stop
 private static final String ACTION_KEEPALIVE= DEBUG_TAG + ".KEEPALIVE"; // Action to keep alive used by alarm manager
 private static final String ACTION_RECONNECT= DEBUG_TAG + ".RECONNECT"; // Action to reconnect
 
 
 private static final String DEVICE_ID_FORMAT = "ljf_%s"; // Device ID Format, add any prefix you'd like
                  // Note: There is a 23 character limit you will get
                  // An NPE if you go over that limit
 private boolean mStarted = false; // Is the Client started?
 private String mDeviceId;	 // Device ID, Secure.ANDROID_ID
 private Handler mConnHandler;	 // Seperate Handler thread for networking
 
 private MqttDefaultFilePersistence mDataStore; // Defaults to FileStore
 private MemoryPersistence mMemStore; // On Fail reverts to MemoryStore
 private MqttConnectOptions mOpts;	 // Connection Options
 
 private MqttTopic mKeepAliveTopic;	 // Instance Variable for Keepalive topic
 
 private MqttClient mClient;	 // Mqtt Client
 
 private AlarmManager mAlarmManager;	 // Alarm manager to perform repeating tasks
 private ConnectivityManager mConnectivityManager; // To check for connectivity changes

 /**
  * Start MQTT Client
  * @param Context context to start the service with
  * @return void
  */
 public static void actionStart(Context ctx) {
  Intent i = new Intent(ctx,MqttService.class);
  i.setAction(ACTION_START);
  ctx.startService(i);
 }
 /**
  * Stop MQTT Client
  * @param Context context to start the service with
  * @return void
  */
 public static void actionStop(Context ctx) {
  Intent i = new Intent(ctx,MqttService.class);
  i.setAction(ACTION_STOP);
  ctx.startService(i);
 }
 /**
  * Send a KeepAlive Message
  * @param Context context to start the service with
  * @return void
  */
 public static void actionKeepalive(Context ctx) {
  Intent i = new Intent(ctx,MqttService.class);
  i.setAction(ACTION_KEEPALIVE);
  ctx.startService(i);
 }
 
 /**
  * Initalizes the DeviceId and most instance variables
  * Including the Connection Handler, Datastore, Alarm Manager
  * and ConnectivityManager.
  */
 @Override
 public void onCreate() {
  super.onCreate();
  
  mDeviceId = String.format(DEVICE_ID_FORMAT, 
    Secure.getString(getContentResolver(), Secure.ANDROID_ID));
  
  HandlerThread thread = new HandlerThread(MQTT_THREAD_NAME);
  thread.start();
  
  mConnHandler = new Handler(thread.getLooper());
  
  try {
   mDataStore = new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath());
  } catch(MqttPersistenceException e) {
   e.printStackTrace();
   mDataStore = null;
   mMemStore = new MemoryPersistence();
  }
  
  mOpts = new MqttConnectOptions();
  mOpts.setCleanSession(MQTT_CLEAN_SESSION);
  // Do not set keep alive interval on mOpts we keep track of it with alarm's
  
  mAlarmManager = (AlarmManager) getSystemService(ALARM_SERVICE);
  mConnectivityManager = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
 }
 
 /**
  * Service onStartCommand
  * Handles the action passed via the Intent
  * 
  * @return START_REDELIVER_INTENT
  */
 @Override
 public int onStartCommand(Intent intent, int flags, int startId) {
  super.onStartCommand(intent, flags, startId);
  
  String action = intent.getAction();
  
  Log.i(DEBUG_TAG,"Received action of " + action);
  
  if(action == null) {
   Log.i(DEBUG_TAG,"Starting service with no action\n Probably from a crash");
  } else {
   if(action.equals(ACTION_START)) {
    Log.i(DEBUG_TAG,"Received ACTION_START");
    start();
   } else if(action.equals(ACTION_STOP)) {
    stop();
   } else if(action.equals(ACTION_KEEPALIVE)) {
     keepAlive();
   } else if(action.equals(ACTION_RECONNECT)) {
    if(isNetworkAvailable()) {
     reconnectIfNecessary();
    }
   }
  }
  
  return START_REDELIVER_INTENT;
 }
 
 /**
  * Attempts connect to the Mqtt Broker
  * and listen for Connectivity changes
  * via ConnectivityManager.CONNECTVITIY_ACTION BroadcastReceiver
  */
 private synchronized void start() {
  if(mStarted) {
   Log.i(DEBUG_TAG,"Attempt to start while already started");
   return;
  }
  
  if(hasScheduledKeepAlives()) {
   stopKeepAlives();
  }
  
  connect();
  
  registerReceiver(mConnectivityReceiver,new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
 }
 /**
  * Attempts to stop the Mqtt client
  * as well as halting all keep alive messages queued
  * in the alarm manager
  */
 private synchronized void stop() {
  if(!mStarted) {
   Log.i(DEBUG_TAG,"Attemtpign to stop connection that isn't running");
   return;
  }
  
  if(mClient != null) {
   mConnHandler.post(new Runnable() {
    @Override
    public void run() {
     try {
      mClient.disconnect();
     } catch(MqttException ex) {
      ex.printStackTrace();
     }
     mClient = null;
     mStarted = false;
     
     stopKeepAlives();
    }
   });
  }
  
  unregisterReceiver(mConnectivityReceiver);
 }
 /**
  * Connects to the broker with the appropriate datastore
  */
 private synchronized void connect() {
  String url = String.format(Locale.US, MQTT_URL_FORMAT, MQTT_BROKER, MQTT_PORT);
  Log.i(DEBUG_TAG,"Connecting with URL: " + url);
  try {
   if(mDataStore != null) {
    Log.i(DEBUG_TAG,"Connecting with DataStore");
    mClient = new MqttClient(url,mDeviceId,mDataStore);
   } else {
    Log.i(DEBUG_TAG,"Connecting with MemStore");
    mClient = new MqttClient(url,mDeviceId,mMemStore);
   }
  } catch(MqttException e) {
   e.printStackTrace();
  }
  
  mConnHandler.post(new Runnable() {
   @Override
   public void run() {
    try {
     mClient.connect(mOpts);
     
     mClient.subscribe("hello", 0);
     
     mClient.setCallback(MqttService.this);
     
     mStarted = true; // Service is now connected
     
     Log.i(DEBUG_TAG,"Successfully connected and subscribed starting keep alives");
     
     startKeepAlives();
    } catch(MqttException e) {
     e.printStackTrace();
    }
   }
  });
 }
 /**
  * Schedules keep alives via a PendingIntent
  * in the Alarm Manager
  */
 private void startKeepAlives() {
  Intent i = new Intent();
  i.setClass(this, MqttService.class);
  i.setAction(ACTION_KEEPALIVE);
  PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
  mAlarmManager.setRepeating(AlarmManager.RTC_WAKEUP,
    System.currentTimeMillis() + MQTT_KEEP_ALIVE,
    MQTT_KEEP_ALIVE, pi);
 }
 /**
  * Cancels the Pending Intent
  * in the alarm manager
  */
 private void stopKeepAlives() {
  Intent i = new Intent();
  i.setClass(this, MqttService.class);
  i.setAction(ACTION_KEEPALIVE);
  PendingIntent pi = PendingIntent.getService(this, 0, i , 0);
  mAlarmManager.cancel(pi);
 }
 /**
  * Publishes a KeepALive to the topic
  * in the broker
  */
 private synchronized void keepAlive() {
  if(isConnected()) {
   try {
    sendKeepAlive();
    return;
   } catch(MqttConnectivityException ex) {
    ex.printStackTrace();
    reconnectIfNecessary();
   } catch(MqttPersistenceException ex) {
    ex.printStackTrace();
    stop();
   } catch(MqttException ex) {
    ex.printStackTrace();
    stop();
   }
  }
 }
 /**
  * Checkes the current connectivity
  * and reconnects if it is required.
  */
 private synchronized void reconnectIfNecessary() {
  if(mStarted && mClient == null) {
   connect();
  }
 }
 /**
  * Query's the NetworkInfo via ConnectivityManager
  * to return the current connected state
  * @return boolean true if we are connected false otherwise
  */
 private boolean isNetworkAvailable() {
  NetworkInfo info = mConnectivityManager.getActiveNetworkInfo();
  
  return (info == null) ? false : info.isConnected();
 }
 /**
  * Verifies the client State with our local connected state
  * @return true if its a match we are connected false if we aren't connected
  */
 private boolean isConnected() {
  if(mStarted && mClient != null && !mClient.isConnected()) {
   Log.i(DEBUG_TAG,"Mismatch between what we think is connected and what is connected");
  }
  
  if(mClient != null) {
   return (mStarted && mClient.isConnected()) ? true : false;
  }
  
  return false;
 }
 /**
  * Receiver that listens for connectivity chanes
  * via ConnectivityManager
  */
 private final BroadcastReceiver mConnectivityReceiver = new BroadcastReceiver() {
  @Override
  public void onReceive(Context context, Intent intent) {
   Log.i(DEBUG_TAG,"Connectivity Changed...");
  }
 };
 /**
  * Sends a Keep Alive message to the specified topic
  * @see MQTT_KEEP_ALIVE_MESSAGE
  * @see MQTT_KEEP_ALIVE_TOPIC_FORMAT
  * @return MqttDeliveryToken specified token you can choose to wait for completion
  */
 private synchronized MqttDeliveryToken sendKeepAlive()
  throws MqttConnectivityException, MqttPersistenceException, MqttException {
  if(!isConnected())
   throw new MqttConnectivityException();
  
  if(mKeepAliveTopic == null) {
   mKeepAliveTopic = mClient.getTopic(
     String.format(Locale.US, MQTT_KEEP_ALIVE_TOPIC_FORAMT,mDeviceId));
  }
  
  Log.i(DEBUG_TAG,"Sending Keepalive to " + MQTT_BROKER);
  
  MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE);
  message.setQos(MQTT_KEEP_ALIVE_QOS);
  
  return mKeepAliveTopic.publish(message);
 }
 /**
  * Query's the AlarmManager to check if there is
  * a keep alive currently scheduled
  * @return true if there is currently one scheduled false otherwise
  */
 private synchronized boolean hasScheduledKeepAlives() {
  Intent i = new Intent();
  i.setClass(this, MqttService.class);
  i.setAction(ACTION_KEEPALIVE);
  PendingIntent pi = PendingIntent.getBroadcast(this, 0, i, PendingIntent.FLAG_NO_CREATE);
  
  return (pi != null) ? true : false;
 }
 
 
 @Override
 public IBinder onBind(Intent arg0) {
  return null;
 }
 /**
  * Connectivity Lost from broker
  */
 @Override
 public void connectionLost(Throwable arg0) {
  stopKeepAlives();
  
  mClient = null;
  
  if(isNetworkAvailable()) {
   reconnectIfNecessary();
  }
 }
 /**
  * Publish Message Completion
  */
 @Override
 public void deliveryComplete(MqttDeliveryToken arg0) {
  
 }
 /**
  * Received Message from broker
  */
 @Override
 public void messageArrived(MqttTopic topic, MqttMessage message)
   throws Exception {
  Log.i(DEBUG_TAG," Topic:\t" + topic.getName() +
                " Message:\t" + new String(message.getPayload()) +
                " QoS:\t" + message.getQos());
  Toast.makeText(this, topic.getName(), Toast.LENGTH_LONG).show();
 }
 /**
  * MqttConnectivityException Exception class
  */
 private class MqttConnectivityException extends Exception {
  private static final long serialVersionUID = -7385866796799469420L; 
 }
}

3.manifest 檔案
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
    package="com.example.activemqforandroid"
    android:versionCode="1"
    android:versionName="1.0" >

    <uses-sdk
        android:minSdkVersion="8"
        android:targetSdkVersion="19" />

    <uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED" />
   <!--別忘了加下面的許可權-->
    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

    <application
        android:allowBackup="true"
        android:icon="@drawable/ic_launcher"
        android:label="@string/app_name"
        android:theme="@style/AppTheme" >
        <activity
            android:name="com.example.activemqforandroid.MainActivity"
            android:label="@string/app_name" >
            <intent-filter>
                <action android:name="android.intent.action.MAIN" />

                <category android:name="android.intent.category.LAUNCHER" />
            </intent-filter>
        </activity>

        <receiver
            android:name=".MyReceiver"
            android:enabled="true"
            android:exported="false" >
            <intent-filter>
                <action android:name="android.intent.action.BOOT_COMPLETED" />
            </intent-filter>
        </receiver>

   <!--別忘了加service-->
        <service
            android:name="com.ljf.mqservice.MqttService"
            android:exported="false" />
    </application>

</manifest>

測試專案原始碼下載地址:http://download.csdn.net/detail/junfeng120125/7582209