1. 程式人生 > >Java執行緒間通訊--管道

Java執行緒間通訊--管道

Java提供了多種輸入輸出流用於對資料進行操作,其中管道流pipeStream是一種特殊的流,用於在不同執行緒間直接傳送資料。

pis.read的時候,如果管道內沒有資料,會阻塞。

public class PipeStreamMain {
    public static void main(String[] args) throws IOException {
        PipedInputStream pis=new PipedInputStream();
        PipedOutputStream pos=new PipedOutputStream();

        pos.connect(pis);

        TaskReader tr=new TaskReader(pis);
        TaskWriter tw=new TaskWriter(pos);

        tr.start();
        tw.start();
    }
    static class TaskWriter extends Thread{
        private final PipedOutputStream pos;

        TaskWriter(PipedOutputStream pos){
            this.pos=pos;
        }

        @Override
        public void run() {
            System.out.println("writer開始寫");
            try {
                for (int i = 0; i < 10; i++) {
                    pos.write(i);

                }
                System.out.println("寫完了");
                pos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    static class TaskReader extends Thread{
        private final PipedInputStream pis;

        TaskReader(PipedInputStream pis){
            this.pis=pis;
        }

        @Override
        public void run() {
            System.out.println("reader ");
            byte[] bytes=new byte[5];
            try {
                int len=pis.read(bytes);
                while (len!=-1){
                    System.out.println(bytes[0]+" "+bytes[1]+" "+bytes[2]+" ");
                    len=pis.read(bytes);
                }
                pis.close();

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

pos的write方法,會呼叫pis的receive方法
在這裡插入圖片描述

public void write(int b)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        sink.receive(b);
    }
writeSide = Thread.currentThread();

pos的close方法,呼叫pis的receivedLast方法

public void close()  throws IOException {
        if (sink != null) {
            sink.receivedLast();
        }
    }

pis的read方法:
首先會嘗試讀取一個位元組,沒讀取到則阻塞並每隔1秒嘗試讀取一次,讀取到了,再判斷還有沒有內容,如果有則讀到buf中,直到沒有可讀內容或者buf讀滿了,則返回buf給使用者。

public synchronized int read(byte b[], int off, int len)  throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        /* possibly wait on the first character */
        int c = read();
        if (c < 0) {
            return -1;
        }
        b[off] = (byte) c;
        int rlen = 1;
        while ((in >= 0) && (len > 1)) {

            int available;

            if (in > out) {
                available = Math.min((buffer.length - out), (in - out));
            } else {
                available = buffer.length - out;
            }

            // A byte is read beforehand outside the loop
            if (available > (len - 1)) {
                available = len - 1;
            }
            System.arraycopy(buffer, out, b, off + rlen, available);
            out += available;
            rlen += available;
            len -= available;

            if (out >= buffer.length) {
                out = 0;
            }
            if (in == out) {
                /* now empty */
                in = -1;
            }
        }
        return rlen;
    }
每次從buffer中讀取一個位元組,如果in==out,說明讀取完了。
如果in<0,說明沒有可讀內容,會阻塞1秒(不斷重複)。

public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
        int trials = 2;
        while (in < 0) {
            if (closedByWriter) {
                /* closed by writer, return EOF */
                return -1;
            }
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                throw new IOException("Pipe broken");
            }
            /* might be a writer waiting */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        int ret = buffer[out++] & 0xFF;
        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            /* now empty */
            in = -1;
        }

        return ret;
    }