1. 程式人生 > >實時Android語音對講系統架構

實時Android語音對講系統架構

本文主要包含以下內容:

  1. AudioRecord、AudioTrack
  2. Speex編解碼
  3. Android語音對講系統架構

一、AudioRecord、AudioTrack

AudioRecorder和AudioTracker是Android中獲取實時音訊資料的介面。在網路電話、語音對講等場景中,由於實時性的要求,不能採用檔案傳輸,因此,MediaRecorder和MediaPlayer就無法使用。

AudioRecorder和AudioTracker是Android在Java層對libmedia庫的封裝,所以效率較高,適合於實時語音相關處理的應用。在使用時,AudioRecorder和AudioTracker的構造器方法入參較多,這裡對其進行詳細的解釋。

AudioRecord

public AudioRecord(int audioSource, int sampleRateInHz, int channelConfig, int audioFormat, int bufferSizeInBytes)

其中,audioSource表示錄音來源,在AudioSource中列舉了不同的音訊來源,包括:

AudioSource.DEFAULT:預設音訊來源
AudioSource.MIC:麥克風(常用)
AudioSource.VOICE_UPLINK:電話上行
AudioSource.VOICE_DOWNLINK:電話下行
AudioSource.VOICE_CALL:電話、含上下行
AudioSource.CAMCORDER:攝像頭旁的麥克風
AudioSource.VOICE_RECOGNITION:語音識別
AudioSource.VOICE_COMMUNICATION:語音通訊

這裡比較常用的有MICVOICE_COMMUNICATIONVOICE_CALL

sampleRateInHz表示取樣頻率。音訊的採集過程要經過抽樣量化編碼三步。抽樣需要關注抽樣率。聲音是機械波,其特徵主要包括頻率和振幅(即音調和音量),頻率對應時間軸線,振幅對應電平軸線。取樣是指間隔固定的時間對波形進行一次記錄,取樣率就是在1秒內採集樣本的次數。量化過程就是用數字表示振幅的過程。編碼是一個減少資訊量的過程,任何數字音訊編碼方案都是有損的。PCM編碼(脈衝編碼調製)是一種保真水平較高的編碼方式。在Android平臺,44100Hz是唯一目前所有裝置都保證支援的取樣頻率。但比如22050、16000、11025也在大多數裝置上得到支援。8000是針對某些低質量的音訊通訊使用的。

channelConfig表示音訊通道,即選擇單聲道、雙聲道等引數。系統提供的選擇如下:

public static final int CHANNEL_IN_DEFAULT = 1;
// These directly match native
public static final int CHANNEL_IN_LEFT = 0x4;
public static final int CHANNEL_IN_RIGHT = 0x8;
public static final int CHANNEL_IN_FRONT = 0x10;
public static final int CHANNEL_IN_BACK = 0x20;
public static final int CHANNEL_IN_LEFT_PROCESSED = 0x40;
public static final int CHANNEL_IN_RIGHT_PROCESSED = 0x80;
public static final int CHANNEL_IN_FRONT_PROCESSED = 0x100;
public static final int CHANNEL_IN_BACK_PROCESSED = 0x200;
public static final int CHANNEL_IN_PRESSURE = 0x400;
public static final int CHANNEL_IN_X_AXIS = 0x800;
public static final int CHANNEL_IN_Y_AXIS = 0x1000;
public static final int CHANNEL_IN_Z_AXIS = 0x2000;
public static final int CHANNEL_IN_VOICE_UPLINK = 0x4000;
public static final int CHANNEL_IN_VOICE_DNLINK = 0x8000;
public static final int CHANNEL_IN_MONO = CHANNEL_IN_FRONT;
public static final int CHANNEL_IN_STEREO = (CHANNEL_IN_LEFT | CHANNEL_IN_RIGHT);

常用的是CHANNEL_IN_MONOCHANNEL_IN_STEREO分別表示單通道輸入和左右兩通道輸入。

audioFormat指定返回音訊資料的格式,常見的選擇包括ENCODING_PCM_16BITENCODING_PCM_8BITENCODING_PCM_FLOATENCODING_PCM_16BIT表示PCM 16bits每個樣本,所有裝置保證支援。ENCODING_PCM_8BIT自然表示PCM 8bits每個樣本。ENCODING_PCM_FLOAT表示一個單精度浮點數表示一個樣本。

bufferSizeInBytes表示錄音時音訊資料寫入的buffer的大小。這個數值是通過另一個方法來獲取的:getMinBufferSizegetMinBufferSize是AudioRecord類的靜態方法,返回值就是bufferSizeInBytes。這裡我們來看下它的入參:

static public int getMinBufferSize(int sampleRateInHz, int channelConfig, int audioFormat)

sampleRateInHz, channelConfig, audioFormat三個引數與上面的含義完全一樣,代表錄音的取樣率、通道以及資料輸出的格式。綜上,AudioRecord的初始化方法如下:

// 獲取音訊資料緩衝段大小
inAudioBufferSize = AudioRecord.getMinBufferSize(
        Constants.sampleRateInHz, Constants.inputChannelConfig, Constants.audioFormat);
// 初始化音訊錄製
audioRecord = new AudioRecord(Constants.audioSource,
        Constants.sampleRateInHz, Constants.inputChannelConfig, Constants.audioFormat, inAudioBufferSize);

其中,引數設定如下:

// 取樣頻率,44100保證相容性
public static final int sampleRateInHz = 44100;
// 音訊資料格式:PCM 16位每個樣本,保證裝置支援。
public static final int audioFormat = AudioFormat.ENCODING_PCM_16BIT;

// 音訊獲取源
public static final int audioSource = MediaRecorder.AudioSource.MIC;
// 輸入單聲道
public static final int inputChannelConfig = AudioFormat.CHANNEL_IN_MONO;

AudioTrack

public AudioTrack(int streamType, int sampleRateInHz, int channelConfig, int audioFormat,
        int bufferSizeInBytes, int mode) throws IllegalArgumentException {
    this(streamType, sampleRateInHz, channelConfig, audioFormat,
            bufferSizeInBytes, mode, AudioManager.AUDIO_SESSION_ID_GENERATE);
}

與AudioRecord類似,AudioTrack的構造器方法依然有很多需要選擇的引數。其中,streamType表示音訊流播放型別,AudioManager中列出了可選的型別如下:

/** The audio stream for phone calls */
public static final int STREAM_VOICE_CALL = AudioSystem.STREAM_VOICE_CALL;
/** The audio stream for system sounds */
public static final int STREAM_SYSTEM = AudioSystem.STREAM_SYSTEM;
/** The audio stream for the phone ring */
public static final int STREAM_RING = AudioSystem.STREAM_RING;
/** The audio stream for music playback */
public static final int STREAM_MUSIC = AudioSystem.STREAM_MUSIC;
/** The audio stream for alarms */
public static final int STREAM_ALARM = AudioSystem.STREAM_ALARM;
/** The audio stream for notifications */
public static final int STREAM_NOTIFICATION = AudioSystem.STREAM_NOTIFICATION;
/** @hide The audio stream for phone calls when connected to bluetooth */
public static final int STREAM_BLUETOOTH_SCO = AudioSystem.STREAM_BLUETOOTH_SCO;
/** @hide The audio stream for enforced system sounds in certain countries (e.g camera in Japan) */
public static final int STREAM_SYSTEM_ENFORCED = AudioSystem.STREAM_SYSTEM_ENFORCED;
/** The audio stream for DTMF Tones */
public static final int STREAM_DTMF = AudioSystem.STREAM_DTMF;
/** @hide The audio stream for text to speech (TTS) */
public static final int STREAM_TTS = AudioSystem.STREAM_TTS;

常用的有STREAM_VOICE_CALLSTREAM_MUSIC等,需要根據應用特點進行選擇。

sampleRateInHzaudioFormat需與AudioRecord中的引數保持一致,這裡不再介紹。

channelConfig與AudioRecord中的引數保持對應,比如AudioRecord選擇了AudioFormat.CHANNEL_IN_MONO(單通道音訊輸入),這裡需要選擇AudioFormat.CHANNEL_OUT_MONO(單通道音訊輸出)。

bufferSizeInBytes表述音訊播放緩衝區大小,同樣,也需要根據AudioTrack的靜態方法getMinBufferSize來獲取。

static public int getMinBufferSize(int sampleRateInHz, int channelConfig, int audioFormat) 

sampleRateInHzchannelConfigaudioFormat三個引數與上面的含義完全一樣,代表輸出音訊的取樣率、通道以及資料輸出的格式。

最後說明下modeAudioManager.AUDIO_SESSION_ID_GENERATEmode代表音訊輸出的模式:MODE_STATICMODE_STREAM,分別表示靜態模式和流模式。AudioManager.AUDIO_SESSION_ID_GENERATE表示AudioSessionId,即AudioTrack依附到哪個音訊會話。

比如,要給AudioRecord添加回聲消除AcousticEchoCancelerAcousticEchoCanceler的構建方法create的入參就是sessionId,通過AudioRecord例項的getAudioSessionId()方法獲取。

綜上,AudioTrack的初始化方法如下:

public Tracker() {
    // 獲取音訊資料緩衝段大小
    outAudioBufferSize = AudioTrack.getMinBufferSize(
            Constants.sampleRateInHz, Constants.outputChannelConfig, Constants.audioFormat);
    // 初始化音訊播放
    audioTrack = new AudioTrack(Constants.streamType,
            Constants.sampleRateInHz, Constants.outputChannelConfig, Constants.audioFormat,
            outAudioBufferSize, Constants.trackMode);
}

其中,引數設定如下:

// 音訊播放端
public static final int streamType = AudioManager.STREAM_VOICE_CALL;
// 輸出單聲道
public static final int outputChannelConfig = AudioFormat.CHANNEL_OUT_MONO;
// 音訊輸出模式
public static final int trackMode = AudioTrack.MODE_STREAM;

二、Speex編解碼

Speex是一個聲音編碼格式,目標是用於網路電話、線上廣播使用的語音編碼,基於CELP(一種語音編碼演算法)開發,Speex宣稱可以免費使用,以BSD授權條款開放原始碼。

Speex是由C語言開發的音訊處理庫,在Android中使用,需要通過JNI來呼叫。因此,對NDK開發不熟悉的朋友,可以先了解下文件:向您的專案新增 C 和 C++ 程式碼

在Android Studio中使用C/C++庫有兩種方式:cmake和ndk-build。cmake是最新支援的方法,通過配置CMakeLists.txt檔案來實現;ndk-build是傳統的方式,通過配置Android.mk檔案來實現。具體語法參考相關文件,這裡不做深入介紹。配置完上述檔案之後,需要將Gradle關聯到原生庫,通過AS的Link C++ Project with Gradle功能實現。

完成上述配置之後,正式開始在Android中使用Speex進行音訊編解碼。主要包括以下步驟:

  1. 下載Speex原始碼。推薦使用Speex 1.2.0穩定版,由於目前Speex 已不再繼續維護,官方建議使用Opus。但在某些場合,使用Speex已然足夠滿足需求。

    Speex原始碼

  2. src/main下建立jni資料夾,將上述Speex原始碼中includelibspeex資料夾拷貝到jni資料夾下。
  3. 編寫Android.mk檔案和Application.mk檔案。Android.mk
LOCAL_PATH := $(call my-dir)
include $(CLEAR_VARS)
LOCAL_LDLIBS :=-llog
LOCAL_MODULE    := libspeex
LOCAL_CFLAGS = -DFIXED_POINT -DUSE_KISS_FFT -DEXPORT="" -UHAVE_CONFIG_H
LOCAL_C_INCLUDES := $(LOCAL_PATH)/include
LOCAL_SRC_FILES := speex_jni.cpp \
        ./libspeex/bits.c \
        ./libspeex/cb_search.c \
        ./libspeex/exc_10_16_table.c \
        ./libspeex/exc_10_32_table.c \
        ./libspeex/exc_20_32_table.c \
        ./libspeex/exc_5_256_table.c \
        ./libspeex/exc_5_64_table.c \
        ./libspeex/exc_8_128_table.c \
        ./libspeex/filters.c \
        ./libspeex/gain_table_lbr.c \
        ./libspeex/gain_table.c \
        ./libspeex/hexc_10_32_table.c \
        ./libspeex/hexc_table.c \
        ./libspeex/high_lsp_tables.c \
        ./libspeex/kiss_fft.c \
        ./libspeex/kiss_fftr.c \
        ./libspeex/lpc.c \
        ./libspeex/lsp_tables_nb.c \
        ./libspeex/lsp.c \
        ./libspeex/ltp.c \
        ./libspeex/modes_wb.c \
        ./libspeex/modes.c \
        ./libspeex/nb_celp.c \
        ./libspeex/quant_lsp.c \
        ./libspeex/sb_celp.c \
        ./libspeex/smallft.c \
        ./libspeex/speex_callbacks.c \
        ./libspeex/speex_header.c \
        ./libspeex/speex.c \
        ./libspeex/stereo.c \
        ./libspeex/vbr.c \
        ./libspeex/vorbis_psy.c \
        ./libspeex/vq.c \
        ./libspeex/window.c \
include $(BUILD_SHARED_LIBRARY)

Application.mk

APP_ABI := armeabi armeabi-v7a
  1. 新建speex_config_types.h檔案。在jnispeex原始碼目錄下的include/speex資料夾下,有一個speex_config_types.h.in檔案,在include/speex目錄下建立speex_config_types.h,把speex_config_types.h.in的內容拷貝過來,然後把@[email protected]改成short,把@[email protected]改成int,對應Java資料型別。這個檔案的內容如下:
#ifndef __SPEEX_TYPES_H__
#define __SPEEX_TYPES_H__
typedef short spx_int16_t;
typedef unsigned short spx_uint16_t;
typedef int spx_int32_t;
typedef unsigned int spx_uint32_t;
#endif
  1. 在Java層定義編解碼需要的介面。
public class Speex {
    static {
        try {
            System.loadLibrary("speex");
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }
    public native int open(int compression);
    public native int getFrameSize();
    public native int decode(byte encoded[], short lin[], int size);
    public native int encode(short lin[], int offset, byte encoded[], int size);
    public native void close();
}
  1. 在C層實現上述方法(以encode為例)。
extern "C"
JNIEXPORT jint JNICALL Java_com_jd_wly_intercom_audio_Speex_encode
    (JNIEnv *env, jobject obj, jshortArray lin, jint offset, jbyteArray encoded, jint size) {

    jshort buffer[enc_frame_size];
    jbyte output_buffer[enc_frame_size];
    int nsamples = (size-1)/enc_frame_size + 1;
    int i, tot_bytes = 0;

    if (!codec_open)
        return 0;

    speex_bits_reset(&ebits);

    for (i = 0; i < nsamples; i++) {
        env->GetShortArrayRegion(lin, offset + i*enc_frame_size, enc_frame_size, buffer);
        speex_encode_int(enc_state, buffer, &ebits);
    }

    tot_bytes = speex_bits_write(&ebits, (char *)output_buffer, enc_frame_size);
    env->SetByteArrayRegion(encoded, 0, tot_bytes, output_buffer);

    return (jint)tot_bytes;
}
  1. 命令列到Android.mk資料夾下,執行命令ndk-build
D:\dev\study\intercom\WlyIntercom\app\src\main\jni>ndk-build
[armeabi] Compile++ thumb: speex <= speex_jni.cpp
[armeabi] Compile thumb  : speex <= bits.c
[armeabi] Compile thumb  : speex <= cb_search.c
[armeabi] Compile thumb  : speex <= exc_10_16_table.c
[armeabi] Compile thumb  : speex <= exc_10_32_table.c
[armeabi] Compile thumb  : speex <= exc_20_32_table.c
[armeabi] Compile thumb  : speex <= exc_5_256_table.c
[armeabi] Compile thumb  : speex <= exc_5_64_table.c
[armeabi] Compile thumb  : speex <= exc_8_128_table.c
[armeabi] Compile thumb  : speex <= filters.c
[armeabi] Compile thumb  : speex <= gain_table_lbr.c
[armeabi] Compile thumb  : speex <= gain_table.c
[armeabi] Compile thumb  : speex <= hexc_10_32_table.c
[armeabi] Compile thumb  : speex <= hexc_table.c
[armeabi] Compile thumb  : speex <= high_lsp_tables.c
[armeabi] Compile thumb  : speex <= kiss_fft.c
[armeabi] Compile thumb  : speex <= kiss_fftr.c
[armeabi] Compile thumb  : speex <= lpc.c
[armeabi] Compile thumb  : speex <= lsp_tables_nb.c
[armeabi] Compile thumb  : speex <= lsp.c
[armeabi] Compile thumb  : speex <= ltp.c
[armeabi] Compile thumb  : speex <= modes_wb.c
[armeabi] Compile thumb  : speex <= modes.c
[armeabi] Compile thumb  : speex <= nb_celp.c
[armeabi] Compile thumb  : speex <= quant_lsp.c
[armeabi] Compile thumb  : speex <= sb_celp.c
[armeabi] Compile thumb  : speex <= smallft.c
[armeabi] Compile thumb  : speex <= speex_callbacks.c
[armeabi] Compile thumb  : speex <= speex_header.c
[armeabi] Compile thumb  : speex <= speex.c
[armeabi] Compile thumb  : speex <= stereo.c
[armeabi] Compile thumb  : speex <= vbr.c
[armeabi] Compile thumb  : speex <= vorbis_psy.c
[armeabi] Compile thumb  : speex <= vq.c
[armeabi] Compile thumb  : speex <= window.c
[armeabi] StaticLibrary  : libstdc++.a
[armeabi] SharedLibrary  : libspeex.so
[armeabi] Install        : libspeex.so => libs/armeabi/libspeex.so

生成libs/armeabi/libspeex.so和對應的obj檔案,如需單獨使用,將上述過程生成的*.so包拷貝至jniLibs資料夾中。

  1. 最後,在Android中通過Java去呼叫encode方法進行音訊資料的編碼。
/**
 * 將raw原始音訊檔案編碼為Speex格式
 *
 * @param audioData 原始音訊資料
 * @return 編碼後的資料
 */
public static byte[] raw2spx(short[] audioData) {
    // 原始資料中包含的整數個encFrameSize
    int nSamples = audioData.length / encFrameSize;
    byte[] encodedData = new byte[((audioData.length - 1) / encFrameSize + 1) * encodedFrameSize];
    short[] rawByte;
    // 將原資料轉換成spx壓縮的檔案
    byte[] encodingData = new byte[encFrameSize];
    int readTotal = 0;
    for (int i = 0; i < nSamples; i++) {
        rawByte = new short[encFrameSize];
        System.arraycopy(audioData, i * encFrameSize, rawByte, 0, encFrameSize);
        int encodeSize = Speex.getInstance().encode(rawByte, 0, encodingData, rawByte.length);
        System.arraycopy(encodingData, 0, encodedData, readTotal, encodeSize);
        readTotal += encodeSize;
    }
    rawByte = new short[encFrameSize];
    System.arraycopy(audioData, nSamples * encFrameSize, rawByte, 0, audioData.length - nSamples * encFrameSize);
    int encodeSize = Speex.getInstance().encode(rawByte, 0, encodingData, rawByte.length);
    System.arraycopy(encodingData, 0, encodedData, readTotal, encodeSize);
    return encodedData;
}

這裡設定了每幀處理160個short型資料,壓縮比為5,每幀輸出為28個byte型資料。Speex壓縮模式特徵如下:

Speex壓縮模式特徵

原文綜合考慮音訊質量、壓縮比和演算法複雜度,最後選擇了Mode 5。

private static final int DEFAULT_COMPRESSION = 5;

三、Android語音對講專案系統架構

Android對講機系統架構

資料包要經過Record、Encoder、Transmission、Decoder、Play這一鏈條的處理,這種資料流轉就是對講機核心抽象。鑑於這種場景,本文的實現採用了責任鏈設計模式。責任鏈模式屬於行為型模式,表徵對物件的某種行為。

建立型模式,共五種:工廠方法模式、抽象工廠模式、單例模式、建造者模式、原型模式。結構型模式,共七種:介面卡模式、裝飾器模式、代理模式、外觀模式、橋接模式、組合模式、享元模式。行為型模式,共十一種:策略模式、模板方法模式、觀察者模式、迭代子模式、責任鏈模式、命令模式、備忘錄模式、狀態模式、訪問者模式、中介者模式、直譯器模式。

責任鏈設計模式的使用場景:在責任鏈模式裡,很多物件裡由每一個物件對其下家的引用而連線起來形成一條鏈。請求在這個鏈上傳遞,直到鏈上的某一個物件決定處理此請求。發出這個請求的客戶端並不知道鏈上的哪一個物件最終處理這個請求,這使得系統可以在不影響客戶端的情況下動態地重新組織和分配責任。下面來看下具體的程式碼:

首先定義一個JobHandler,代表每個物件,其中包含抽象方法handleRequest():

/**
 * 資料處理節點
 *
 * @param <I> 輸入資料型別
 * @param <O> 輸出資料型別
 * @author yanghao1
 */
public abstract class JobHandler<I, O> {

    private JobHandler<O, ?> nextJobHandler;

    public JobHandler<O, ?> getNextJobHandler() {
        return nextJobHandler;
    }

    public void setNextJobHandler(JobHandler<O, ?> nextJobHandler) {
        this.nextJobHandler = nextJobHandler;
    }

    public abstract void handleRequest(I audioData);

    /**
     * 釋放資源
     */
    public void free() {

    }
}

JobHandler<I, O>表示輸入資料型別為I,輸出型別為OnextJobHandler表示下一個處理請求的節點,其型別為JobHandler<O, ?>,即輸入資料型別必須為上一個處理節點的輸出資料型別。

繼承類必須實現抽象方法handleRequest(),引數型別為I,實現對資料包的處理。free()方法實現資源的釋放,繼承類可根據情況重寫該方法。這裡分別定義RecorderEncoderSenderReceiverDecoderTracker,均繼承自JobHandler

RecorderEncoderSender為例說明輸入側資料的處理(這裡僅列出部分程式碼,具體程式碼參考github地址):

/**
 * 音訊錄製資料格式ENCODING_PCM_16BIT,返回資料型別為short[]
 *
 * @author yanghao1
 */
public class Recorder extends JobHandler<short[], short[]> {

    @Override
    public void handleRequest(short[] audioData) {
        if (audioRecord.getRecordingState() == AudioRecord.RECORDSTATE_STOPPED) {
            audioRecord.startRecording();
        }
        // 例項化音訊資料緩衝
        audioData = new short[inAudioBufferSize];
        audioRecord.read(audioData, 0, inAudioBufferSize);
        getNextJobHandler().handleRequest(audioData);
    }
}

Recorder完成音訊採集之後,通過getNextJobHandler()方法獲取對下一個處理節點的引用,然後呼叫其方法handleRequest(),並且入參型別為short[]Recorder的下一個處理節點是Encoder,在EncoderhandleRequest()方法中,實現音訊資料的編碼,其輸入型別為short[],輸出為byte[]

/**
 * 音訊編碼,輸入型別為short[],輸出為byte[]
 *
 * @author yanghao1
 */
public class Encoder extends JobHandler<short[], byte[]> {

    @Override
    public void handleRequest(short[] audioData) {
        byte[] encodedData = AudioDataUtil.raw2spx(audioData);
        getNextJobHandler().handleRequest(encodedData);
    }
}

Encoder的下一個處理節點是Sender,在SenderhandleRequest()方法中,通過多播(組播),將音訊編碼資料傳送給區域網內的其它裝置。

/**
 * UDP多播發送
 *
 * @author yanghao1
 */
public class Sender extends JobHandler<byte[], byte[]> {

    @Override
    public void handleRequest(byte[] audioData) {
        DatagramPacket datagramPacket = new DatagramPacket(
                audioData, audioData.length, inetAddress, Constants.MULTI_BROADCAST_PORT);
        try {
            multicastSocket.send(datagramPacket);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

最後,在AudioInput類的建構函式中執行物件之間的關係:

/**
 * 音訊錄製、編碼、傳送執行緒
 *
 * @author yanghao1
 */
public class AudioInput implements Runnable {

    private Recorder recorder;
    private Encoder encoder;
    private Sender sender;
    private Handler handler;

    // 錄製狀態
    private boolean recording = false;

    public AudioInput(Handler handler) {
        this.handler = handler;
        initJobHandler();
    }

    /**
     * 初始化錄製、編碼、傳送,並指定關聯
     */
    private void initJobHandler() {
        recorder = new Recorder();
        encoder = new Encoder();
        sender = new Sender(handler);
        recorder.setNextJobHandler(encoder);
        encoder.setNextJobHandler(sender);
    }
}

即:在介面初始化AudioInput對應的執行緒的時候,就完成這些類的例項化,並指定Recorder的下一個處理者是Encoder,Encoder的下一個處理者是Sender。這樣使得整個處理流程非常靈活,比如,如果暫時沒有開發編解碼的過程,在Encoder的handleRequest()方法中直接指定下一個處理者:

public class Encoder extends JobHandler {

    @Override
    public void handleRequest(byte[] audioData) {
        getNextJobHandler().handleRequest(audioData);
    }
}

同樣的,在初始化AudioOutput對應的執行緒時,完成ReceiverDecoderTracker的例項化,並且指定Receiver的下一個處理者是DecoderDecoder的下一個處理者是Tracker

在Activity中,分別申明輸入、輸出Runable、執行緒池物件、介面更新Handler:

// 介面更新Handler
private AudioHandler audioHandler = new AudioHandler(this);

// 音訊輸入、輸出Runable
private AudioInput audioInput;
private AudioOutput audioOutput;

// 建立緩衝執行緒池用於錄音和接收使用者上線訊息(錄音執行緒可能長時間不用,應該讓其超時回收)
private ExecutorService inputService = Executors.newCachedThreadPool();

// 建立迴圈任務執行緒用於間隔的傳送上線訊息,獲取區域網內其他的使用者
private ScheduledExecutorService discoverService = Executors.newScheduledThreadPool(1);

// 設定音訊播放執行緒為守護執行緒
private ExecutorService outputService = Executors.newSingleThreadExecutor(new ThreadFactory() {
    @Override
    public Thread newThread(@NonNull Runnable r) {
        Thread thread = Executors.defaultThreadFactory().newThread(r);
        thread.setDaemon(true);
        return thread;
    }
});

可能有的同學會覺得這裡的責任鏈設計模式用法並非真正的責任鏈,真正的責任鏈模式要求一個具體的處理者物件只能在兩個行為中選擇一個:一是承擔責任,而是把責任推給下家。不允許出現某一個具體處理者物件在承擔了一部分責任後又把責任向下傳的情況。 本文中責任鏈設計模式的用法確實不是嚴格的責任鏈模式,但學習的目的不就是活學活用嗎?

Android執行緒池

上述程式碼涉及Android中的執行緒池,與Android執行緒池相關的類包括:ExecutorExecutorsExecutorServiceFutureCallableThreadPoolExecutor等,為了理清它們之間的關係,首先從Executor開始:

  • Executor介面中定義了一個方法 execute(Runnable command),該方法接收一個 Runable例項,它用來執行一個任務,任務即一個實現了Runnable 介面的類。
  • ExecutorService介面繼承自Executor 介面,它提供了更豐富的實現多執行緒的方法,比如,ExecutorService提供了關閉自己的方法,以及可為跟蹤一個或多個非同步任務執行狀況而生成Future 的方法。 可以呼叫ExecutorService 的shutdown()方法來平滑地關閉 ExecutorService,呼叫該方法後,將導致 ExecutorService停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢後將會關閉 ExecutorService。因此我們一般用該介面來實現和管理多執行緒。
  • Executors 提供了一系列工廠方法用於建立執行緒池,返回的執行緒池都實現了 ExecutorService介面。包括:
    • newCachedThreadPool() 建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒;
    • newFixedThreadPool(int) 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
    • newScheduledThreadPool(int) 建立一個定長執行緒池,支援定時及週期性任務執行。
    • newSingleThreadExecutor() 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
  • Callable介面與Runnable介面類似,ExecutorService<T> Future<T> submit(Callable<T> task)方法接受Callable作為入參,在 Java 5 之後,任務分兩類:一類是實現了 Runnable介面的類,一類是實現了 Callable 介面的類。兩者都可以被 ExecutorService 執行,但是 Runnable任務沒有返回值,而 Callable任務有返回值。並且Callable 的call()方法只能通過ExecutorService 的 submit(Callable task)方法來執行,並且返回一個 Future,是表示任務等待完成的Future
  • ThreadPoolExecutor繼承自AbstractExecutorServiceAbstractExecutorService實現了ExecutorService介面。ThreadPoolExecutor的構造器由於引數較多,不宜直接暴露給使用者。所以,Executors 中定義 ExecutorService例項的工廠方法,其實是通過定義ThreadPoolExecutor不同入參來實現的。

下面來看下ThreadPoolExecutor的構造器方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {

    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();

    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();

    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

其中,corePoolSize表示執行緒池中所儲存的核心執行緒數,包括空閒執行緒;maximumPoolSize表示池中允許的最大執行緒數;keepAliveTime表示執行緒池中的空閒執行緒所能持續的最長時間;unit表示時間的單位;workQueue表示任務執行前儲存任務的佇列,僅儲存由execute 方法提交的Runnable任務;threadFactory表示執行緒建立的工廠,指定執行緒的特性,比如前面程式碼中設定音訊播放執行緒為守護執行緒;handler表示佇列容量滿之後的處理方法。

ThreadPoolExecutor對於傳入的任務Runnable有如下處理流程:

  1. 如果執行緒池中的執行緒數量少於corePoolSize,即使執行緒池中有空閒執行緒,也會建立一個新的執行緒來執行新新增的任務;
  2. 如果執行緒池中的執行緒數量大於等於corePoolSize,但緩衝佇列workQueue 未滿,則將新新增的任務放到 workQueue中,按照 FIFO 的原則依次等待執行(執行緒池中有執行緒空閒出來後依次將緩衝佇列中的任務交付給空閒的執行緒執行);
  3. 如果執行緒池中的執行緒數量大於等於 corePoolSize,且緩衝佇列 workQueue 已滿,但執行緒池中的執行緒數量小於maximumPoolSize,則會建立新的執行緒來處理被新增的任務;
  4. 如果執行緒池中的執行緒數量等於了maximumPoolSize,交由RejectedExecutionHandler handler處理。

ThreadPoolExecutor主要用於某些特定場合,即上述工廠方法無法滿足的時候,自定義執行緒池使用。本文使用了三種特性的執行緒池工廠方法:newCachedThreadPool()newScheduledThreadPool(int)newSingleThreadExecutor

首先,對於錄音執行緒,由於對講機使用者大部分時間可能是在聽,而不是說。錄音執行緒可能長時間不用,應該讓其超時回收,所以錄音執行緒宜使用CachedThreadPool; 其次,對於發現區域網內的其它使用者的功能,該功能需要不斷迴圈執行,相當於迴圈的向區域網內傳送心跳訊號,因此宜使用ScheduledThreadPool; 最後,對於音訊播放執行緒,該執行緒需要一直在後臺執行,且播放需要序列執行,因此使用SingleThreadExecutor,並設定為守護執行緒,在UI執行緒(主執行緒是最後一個使用者執行緒)結束之後結束。

// 設定音訊播放執行緒為守護執行緒
private ExecutorService outputService = Executors.newSingleThreadExecutor(new ThreadFactory() {
    @Override
    public Thread newThread(@NonNull Runnable r) {
        Thread thread = Executors.defaultThreadFactory().newThread(r);
        thread.setDaemon(true);
        return thread;
    }
});

以上。詳細程式碼請移步github:intercom 。