1. 程式人生 > >Rxjava 子執行緒 主執行緒 切換 簡單實現

Rxjava 子執行緒 主執行緒 切換 簡單實現

package com.zgt.demo01.rxjava;

import com.zgt.demo01.os2.Handler;
import com.zgt.demo01.os2.Message;

public abstract  class MObservable {
    public static MObservable create(MObservableOnSubscribe source) {
        return new MObservableCreate(source);
    }
    public final MObservable subscribe(MObserver observer) {
        observer.onSubscribe();
        subscribeActual(observer);
        return
this; } protected abstract void subscribeActual(MObserver observer); //子執行緒 public MObservable subscribeOn(){ MObservableSubscribeOn on = new MObservableSubscribeOn(new MObservableOnSubscribe() { @Override public void subscribe(MObserver observer) throws
Exception { new Thread(new Runnable() { @Override public void run() { MObservable.this.subscribe(new MObserver() { @Override public void onSubscribe() { } @Override
public void onNext(String str) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("subscribeOn :"+str + ">>" + Thread.currentThread().getId()); observer.onNext("MObservableSubscribeOn >> "+str); } @Override public void onError() { } @Override public void onComplete() { } }); } }).start(); } }); return on; } private Handler handler = new Handler(){ @Override public void handleMessage(Message message) { MObserver observer = (MObserver) message.obj; observer.onNext((String) message.data); } }; //主執行緒 public MObservable observeOn(){ MObservableObserveOn on = new MObservableObserveOn(new MObservableOnSubscribe() { @Override public void subscribe(MObserver observer) throws Exception { MObservable.this.subscribe(new MObserver() { @Override public void onSubscribe() { } @Override public void onNext(String str) { System.out.println("zgt>>"+str +">>" + Thread.currentThread().getId()); //observer.onNext("observeOn:MObservableObserveOn >>" + str ); Message message = new Message(); message.obj = observer; message.data = "observeOn:MObservableObserveOn >>" + str; handler.sendMessage(message); } @Override public void onError() { } @Override public void onComplete() { } }); } }); return on; } }
package com.zgt.demo01.rxjava;

public class MObservableCreate extends MObservable {

    private MObservableOnSubscribe source;
    public MObservableCreate(MObservableOnSubscribe source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(MObserver observer) {
        try {
            this.source.subscribe(observer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
package com.zgt.demo01.rxjava;

/**
 * @author ex-zhangguangtao001
 * @date 2018/9/6 下午3:33
 */
public class MObservableObserveOn extends MObservable {

    private MObservableOnSubscribe source;
    public MObservableObserveOn(MObservableOnSubscribe source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(MObserver observer) {
        try {
            this.source.subscribe(observer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
package com.zgt.demo01.rxjava;

public interface MObservableOnSubscribe {
    void subscribe(MObserver observer) throws Exception;
}
package com.zgt.demo01.rxjava;

/**
 * @author ex-zhangguangtao001
 * @date 2018/9/6 下午3:29
 */
public class MObservableSubscribeOn extends MObservable{

    private MObservableOnSubscribe source;
    public MObservableSubscribeOn(MObservableOnSubscribe source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(MObserver observer) {
        try {
            this.source.subscribe(observer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
package com.zgt.demo01.rxjava;

public interface MObserver {
    void onSubscribe();
    void onNext(String str);
    void onError();
    void onComplete();
}
package com.zgt.demo01.rxjava;

import com.zgt.demo01.os2.Looper;

/**
 * @author ex-zhangguangtao001
 * @date 2018/9/6 下午3:17
 */
public class TestRxjava {


    public static void main(String[] args) {
        //啟動 loop 主迴圈
        Looper.prepare();

        MObservable observable = MObservable.create(new MObservableOnSubscribe() {
            @Override
            public void subscribe(MObserver observer) throws Exception {

                System.out.println(observer.getClass().getName());
                observer.onNext("MObservable.create 子執行緒 網路中獲取資料");
            }
        });

        observable
            .subscribeOn()
            .observeOn()
            .subscribe(new MObserver() {
            @Override
            public void onSubscribe() {

            }
            @Override
            public void onNext(String str) {
                System.out.println(str + ">>" + Thread.currentThread().getId());
            }
            @Override
            public void onError() {

            }
            @Override
            public void onComplete() {

            }
        });


       Looper.loop();
    }





}

Looper Handler 簡單實現

package com.zgt.demo01.os2;

import java.util.ArrayList;

/**
 * @author ex-zhangguangtao001
 * @date 2018/7/5 上午9:06
 */
public class ArrayQueue<E>  {

    private ArrayList<E> array;

    public ArrayQueue(){
        array = new ArrayList<E>();
    }

    public boolean isEmpty(){
        return array.size()<1?true:false;
    }

    public int length(){
        return array.size();
    }

    public void enqueue(E e){
        array.add(e);
    }

    public E dequeue(){
        if (isEmpty()){
            return null;
        }else {
            return array.remove(0);
        }
    }

}
package com.zgt.demo01.os2;

public class Handler {
    private Looper mLooper;
    private MessageQueue mQueue;

    public Handler() {
        mLooper = Looper.myLooper();
        mQueue = mLooper.mQueue;
    }

    public void sendMessage(Message message) {
        message.target = this;
        mQueue.enqueueMessage(message);
    }

    /**
     * 子類處理訊息
     *
     * @param message
     */
    public void handleMessage(Message message) {

    }

    /**
     * 分發訊息
     *
     * @param message
     */
    public void dispatchMessage(Message message) {
        handleMessage(message);
    }
}
package com.zgt.demo01.os2;

/**
 * @author ex-zhangguangtao001
 * @date 2018/7/2 下午2:18
 */
public class Looper {

    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal();
    public MessageQueue mQueue;

    public Looper() {
        mQueue = new MessageQueue();
    }

    /**
     * 例項化一個屬於當前執行緒的looper物件
     */
    public static void prepare() {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper());
    }

    public static Looper myLooper() {
        return sThreadLocal.get();
    }

    /**
     * 輪詢訊息佇列
     */
    public static void loop() {
        Looper me = myLooper();
        MessageQueue queue = me.mQueue;
        //輪詢
        Message msg;
        for (; ; ) {
            //會阻塞
            msg = queue.next();
            //獲取到傳送訊息的 msg.target (handler)本身,然後分發訊息
            if (msg == null || msg.target == null) {
                continue;
            }

            msg.target.dispatchMessage(msg);

        }
    }

}
package com.zgt.demo01.os2;

public class Message {

    public int what;

    public int arg1;


    public int arg2;

    public Object obj;

    public Object data;

    public Handler target;

//    @Override
//    public String toString() {
//    //這類只模擬String型別資料,為了方便日誌輸出
//        //return obj.toString();
//
//    }
}
package com.zgt.demo01.os2;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MessageQueue {
    private static final String TAG = MessageQueue.class.getName();
    private ArrayQueue<Message> queue;
    /**
     * 鎖
     */

    Lock mLock;
    /**
     * 條件變數  //可取
     */
    Condition mNotEmpty;
    /**
     * 可新增
     */
    //Condition mNotFull;

    public MessageQueue() {
        queue = new ArrayQueue<Message>();
        mLock = new ReentrantLock();
        mNotEmpty = mLock.newCondition();
        //mNotFull = mLock.newCondition();
    }

    /**
     * 訊息佇列取訊息 出隊
     *
     * @return
     */
    Message next() {
        Message msg = null;
        try {
            mLock.lock();
            //檢查佇列是否空了
            while (queue.isEmpty()) {
                //阻塞
                mNotEmpty.await();
                System.out.println("佇列空了,讀鎖阻塞");
            }
            //可能空
            msg = queue.dequeue();

            //通知生產者生產
            //mNotFull.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            mLock.unlock();
        }

        return msg;
    }

    /**
     * 新增訊息進佇列
     *
     * @param message
     */

    public void enqueueMessage(Message message) {

        try {
            mLock.lock();
            //檢查佇列是否滿了
//            while (mCount >= mItems.length) {
//                //阻塞
//                mNotFull.await();
//                System.out.println("佇列滿了,寫鎖阻塞");
//            }
            queue.enqueue(message);
            //通知消費者消費
            mNotEmpty.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            mLock.unlock();
        }


    }
}

相關推薦

Rxjava 執行 執行 切換 簡單實現

package com.zgt.demo01.rxjava; import com.zgt.demo01.os2.Handler; import com.zgt.demo01.os2.Message; public abstract class MObse

C# Delegate 如何從執行修改UI執行 執行介面

一、為什麼Control類提供了Invoke和BeginInvoke機制? 關於這個問題的最主要的原因已經是dotnet程式設計師眾所周知的,我在此費點筆墨再次記錄到自己的日誌,以便日後提醒一下自己。 1、windows程式訊息機制 Windows GUI程式是

效能優化-多執行-執行等待執行完成場景

專案 出行專案 需求 今日訂單查詢優化 場景描述 根據時間查詢出今日訂單,根據訂單去mongo查詢出規矩點,根據軌跡點去呼叫高德的地理/逆地理編碼介面(https://lbs.amap.com/api/webservice/guide/api/georegeo

Linux多執行──執行執行分別迴圈一定次數

條件變數 條件變數是執行緒可用的另一種同步機制。條件變數給多個執行緒提供了一個回合的場所。條件變數與互斥量一起使用時,允許執行緒以無競爭的方式等待鐵定的條件發生。 名稱: pthread_cond_wait/pthread_cond_timedwait 目標:

關於“執行執行的ContentProvider為什麼不會影響執行”的記錄

我們四大元件都是執行在UI執行緒上的,之前據我自己所看到的是主執行緒上有耗時的操作可能會造成ANR,今天做了一個實驗,建立一個工程,主Activity有一個可以觸發顯示一個Toast的按鈕,另外還有一個SQLiteOpenHelper的子類,另外一個繼承ContentProvider,提供往資料庫插入

Handler.post(Runable),Runable是執行執行中的。

 在Android中可以通過handler方法完成資料的執行緒間的傳遞,但一定要將handler得到的資料通過loop傳遞到主執行緒再更新UI嗎?其實也可以直接使用handler設計的post方法進行

Java多執行之——生產者、消費者簡單實現

   生產者與消費者模式是本科課程《作業系統》中較為重要的內容之一。當時只是囫圇吞棗的理解個大概。其實生產者消費者問題是研究多執行緒程式時繞不開的經典問題之一,實質上,很多後臺服務程式併發控制的基本原理都可以歸納為生產者/消費者模式。 1、問題描述:         生產

使用Fragment相容手機平板-之設定介面的模仿簡單實現

轉自:  記得我之前參與開發過一個華為的專案,要求程式可以支援好幾種終端裝置,其中就包括Android手機和Android Pad。然後為了節省人力,公司無節操地讓Android手機和Android Pad都由我們團隊開發。當時專案組定的方案是,製作兩個版本的App,

Android執行執行中傳送資訊

主要用到了Handler類,Looper類和Message類 先介紹下這幾個類 Looper類,是用來為一個執行緒開啟一個訊息佇列,預設情況下Android下新開啟的執行緒沒有開啟訊息佇列的,除了主執行緒外,主執行緒系統會預設為其開啟一個訊息佇列;looper是通過MessageQueu

【小家java】Java中執行(父執行)與執行的通訊和聯絡

相關閱讀 【小家java】java5新特性(簡述十大新特性) 重要一躍 【小家java】java6新特性(簡述十大新特性) 雞肋升級 【小家java】java7新特性(簡述八大新特性) 不溫不火 【小家java】java8新特性(簡述十大新特性) 飽受讚譽 【小家java】java9

toast彈框、imageview、進度條、執行訪問執行執行中的通訊handler)

1、imageview ?xml version="1.0" encoding="utf-8"?> <ImageView android:layout_width=“200dp” android:layout_marginLeft=“100dp” an

Java多執行--讓執行等待執行執行完畢

參考連結:https://www.cnblogs.com/eoss/p/5902939.html 使用Java多執行緒程式設計時經常遇到主執行緒需要等待子執行緒執行完成以後才能繼續執行,那麼接下來介紹一種簡單的方式使主執行緒等待。 java.util.concurrent.CountDown

正確實現執行任務全部完成後執行關閉的四種方法

方法一 Thread.sleep 方法二 ExecutorService 方法三 thread.join 方法四 Thread.yield and Thread.

Qt中通過訊號和槽在執行執行中進行資料傳遞

QT中兩個執行緒之間進行自定義型別資料傳遞 兩個執行緒中進行資料傳遞時,傳遞的資料放到佇列中(queue),所以在這個過程中,需要在傳遞前將資料拷貝、儲存到佇列中;為了儲存這些引數,Qt需要construct、destruct、copy這些物件,為了讓Qt知道

進度條與執行訪問執行

1.進度條(ProgressBar) 進度條的屬性: style=”?android:attr/progressBarStyleHorizontal” 預設為圓形 android:progress=”33” 進度條進行到的當前位置(去activity

Unity執行執行互動(委託方式)

using System; using System.Collections; using System.Collections.Generic; using System.Threading; using UnityEngine; /// <summary> /// 子執行緒與主執行

Java實現執行等待執行join,CountDownLatch

本文介紹兩種主執行緒等待子執行緒的實現方式,以5個子執行緒來說明: 1、使用Thread的join()方法,join()方法會阻塞主執行緒繼續向下執行。 2、使用Java.util.concurrent中的CountDownLatch,是一個倒數計數器。初始化時先設定

java執行等待所有執行執行完畢在執行(常見面試題)

java主執行緒等待所有子執行緒執行完畢在執行,這個需求其實我們在工作中經常會用到,比如使用者下單一個產品,後臺會做一系列的處理,為了提高效率,每個處理都可以用一個執行緒來執行,所有處理完成了之後才

ThreadPool執行池使用及解決執行執行執行順序問題

 執行緒池建立五個執行緒,每個執行緒往list中新增100個元素。synchronized只鎖執行緒共享變數list物件,程式碼段內僅新增元素及列印資訊。設定10ms睡眠時間給其餘執行緒機會。 ExecutorService fixedThreadPool = Execut

java在執行執行傳遞資料(回撥函式)

預習知識點: 什麼是回撥函式? 下面是知乎大神的回答,簡直不能再精闢 程式碼: package kun.thread; public class THread { static C c=new C(); //flag用來標誌子執行緒執行結束 stati