跳到主要内容

07、Java多线程:线程间通信

6.1 等待/通知机制

概念:A线程在运行时需要某个地址中的值,但是该地址还没有值,所以A等待。当B线程往该地址处写入了值后,B线程通知A线程,于是A线程继续执行。上面这样的一个过程就是等待/通知机制。

实现:

Object类中的wait()方法,可以使执行当前代码的线程等待,暂停执行。直到接到通知或被中断为止。注意:wait()方法只能在同步代码块中由锁对象调用,且调用wait()方法后,当前线程会释放锁。

Object类的notify()方法可以唤醒处于等待的线程,该方法也必须在同步代码块中由锁对象调用。如果有多个等待的线程,notify()只能唤醒其中一个,具体唤醒哪一个是不知道的。被notify()的线程需要重新去竞争锁才能被执行。

没有使用锁对象就调用wait()/notify()方法会产生异常:IllegalMonitorStateException。

wait()代码实例:

package wait;

public class Test01 {

    public static void main(String[] args) throws InterruptedException {
        String test = "abc";
        String another = "def";
        System.out.println("同步代码块前的代码");
        synchronized (test) {
            try {
                System.out.println("wait前的代码");
//                another.wait();  只有被锁住的对象才能调用wait()方法
                test.wait();
                System.out.println("wait后的代码");
            } catch (IllegalMonitorStateException e) {
                e.printStackTrace();
            }
        }
        System.out.println("同步代码块后的代码");
    }
}

wait()/notify()代码示例:

package wait;

/**
 * 需要通过notify唤醒线程
 */
public class Test02 {

    public static void main(String[] args) {
        String str = "wa";

        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (str) {
                    System.out.println("线程1开始等待");
                    try {
                        str.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程1被唤醒并执行结束了");
                }
            }
        }, "Thread1");

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (str) {
                    System.out.println("线程2唤醒线程1");
                    str.notify();
                }
            }
        }, "Thread2");

        thread1.start();
        thread2.start();
    }
}

执行了notify()的线程并不会立即释放锁,而是执行完同步代码块的所有代码后才会释放锁

interrupt()方法会中断wait():

当线程调用wait()处于等待状态时,调用线程对象的interrupt()方法会中断线程的等待状态,产生InterruptedException异常。

package wait;

import java.util.concurrent.TimeUnit;

/**
 * interrupt()会中断线程的wait状态
 */
public class Test04 {
    public static void main(String[] args) throws InterruptedException {
        SubThread subThread = new SubThread();
        subThread.start();
        TimeUnit.SECONDS.sleep(1);
        subThread.interrupt();
    }

    private static final Object lock = new Object();

    static class SubThread extends Thread {
        @Override
        public void run() {
            synchronized (lock) {
                System.out.println("subThread wait");
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    System.out.println("wait等待被中断了");
                }
                System.out.println("subThread end wait");
            }
        }
    }
}

notify()和notifyAll()的区别:

notify()只能唤醒一个线程,如果有多个线程处于等待状态,那么只能会有一个会被唤醒。notifyAll()可以唤醒所有等待的线程。

wait(long)的使用:

如果在指定时间内没有被唤醒,那么线程会自动唤醒。

package wait;

public class Test06 {
    public static void main(String[] args) {
        SubThread subThread = new SubThread();
        subThread.start();
    }

    static final Object lock = new Object();

    static class SubThread extends Thread{
        @Override
        public void run() {
            synchronized (lock) {
                try {
                    System.out.println("开始等待");
                    lock.wait(5000);
                    System.out.println("等待结束");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

6.2 生产者-消费者模式

在Java中,负责生产数据的是生产者,负责使用数据的是消费者。没有数据时,消费者等待;数据满时,生产者等待。

package producerdata;

public class ValueOP {

    private String value = "";

    //定义方法修改value字段的值
    public void setValue() throws InterruptedException {
        //如果value不是空串
        synchronized (this) {
            while(!value.equalsIgnoreCase("")) {
                this.wait();
            }
            //是空串
            value = System.currentTimeMillis() + "";
            System.out.println("setValue设置的值是:" + value);
            this.notify();
        }
    }

    //定义方法读取字段值
    public String getValue() throws InterruptedException {
        synchronized (this) {
            while(value.equalsIgnoreCase("")) {
                this.wait();
            }
            //不是空串,读取值
            System.out.println("value的值是:" + value);
            value = "";
            this.notify();
        }
        return value;
    }
}

假设只有一个生产者和消费者:那么生产者和消费者将按顺序执行。

如果有多个生产者和消费者,那么可能出现假死现象:

1、 一个消费者唤醒了一个生产者,但是在这个生产者拿到锁之前,另一个消费者抢先拿到了锁;
2、 三个生产者全部等待,某个消费者唤醒的不是生产者,而是另一个消费者;

解决上述假死现象的方法是:将notify()改成notifyAll(),保证消费者唤醒了生产者,生产者唤醒了消费者。

操作栈:

package producerstack;

import java.util.List;

import java.util.ArrayList;

public class MyStack {
    private List<Integer> list = new ArrayList<>();
    private static final int MAX_SIZE = 2;

    //定义方法模拟入栈
    public synchronized void push(int value) throws InterruptedException {
        //当栈中的数据已满,等待
        while(list.size() >= MAX_SIZE) {
            System.out.println(Thread.currentThread().getName() + " begin wait...");
            this.wait();
        }
        list.add(value);
        this.notifyAll();
        System.out.println(Thread.currentThread().getName() + "添加了数据:" + value);
    }

    //定义方法模拟出栈
    public synchronized void pop() throws InterruptedException {
        //当栈中的数据为空,等待
        while(list.size() == 0) {
            System.out.println(Thread.currentThread().getName() + " begin wait...");
            this.wait();
        }
        this.notifyAll();
        System.out.println(Thread.currentThread().getName() + "拿到了数据:" + list.remove(0));
    }
}

6.3 通过管道实现线程间通信

Java.io包的PipeStream管道流用于在线程之间传递数据,一个线程通过管道输出数据,另一个线程从管道中输入数据。相关类包括PipedInputStream、PipedOutputStream、PipedReader和PipedWriter。

package pipeStream;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;

/**
 * 使用PipedInputStream和PipedOutputStream在线程间传递字节流
 */

public class Test {
    public static void main(String[] args) throws IOException {
        //定义管道字节流
        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream();
        //建立管道之间的关系
        in.connect(out);

        //创建两个线程,分别往管道里写数据,和读数据
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    writeData(out);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }, "Thread1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    readData(in);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }, "Thread2").start();
    }

    //向管道流中写入数据
    public static void writeData(PipedOutputStream out) throws IOException {
        //分别把0~100的数据写入管道
        try {
            for(int i = 0; i <= 10000; i++) {
                out.write(("" + i).getBytes(StandardCharsets.UTF_8));  //把字节数组写入到输出管道流中
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            out.close();
        }
    }

    //从管道流中读取数据
    public static void readData(PipedInputStream in) throws IOException {
        int count = 0;
        byte[] bytes = new byte[1024];
        int len = 0;
        try {
            while((len = in.read(bytes)) != -1) {
                System.out.println(new String(bytes, 0, len));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            in.close();
        }
    }

}

6.4 ThreadLocal

除了控制资源的访问外,还可以通过增加资源来保证线程安全。ThreadLocal主要解决为每个线程绑定自己的值的问题。

在以下这个代码示例中我们发现,如果多个线程共用一个SimpleDateFormat对象的话,实际运行下来会出现问题。但是通过ThreadLocal,我们为每个线程分别创建了一个SimpleDateFormat,就不会出现什么问题了。

package threadlocal;

import sun.java2d.pipe.SpanShapeRenderer;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 在多线程环境中,将字符串转换为日期对象。发现多个线程使用同一个SimpleDateFormat对象产生了线程安全问题,有的线程报了错误
 */
public class Test2 {

    //定义SimpleDataFoemat对象,可以将字符串转换为日期
    //发现多个线程使用同一个SimpleDateFormat对象,会产生线程安全问题
//    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");

    //使用ThreadLocal,为每个线程创建一个SimpleDateFormat
    static ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<>();
    //定义Runnable接口的实现类
    static class ParseDate implements Runnable {

        private int i = 0;

        public ParseDate(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            String text = "2068年11月22日 08:22:" + i % 60;   //构建日期字符串
            try {
                if(threadLocal.get() == null) {
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss");
                    threadLocal.set(sdf);
                }
                SimpleDateFormat sdf = threadLocal.get();
                Date date = sdf.parse(text);
                System.out.println(i + "--" + date);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        //创建100个线程
        for(int i = 0; i < 100; i++) {
            new Thread(new ParseDate(i)).start();
        }
    }
}

ThreadLocal指定初始值:

定义ThreadLocal的子类,在子类中重写initialValue()方法指定初始值

package threadlocal;

import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 *
 * ThreadLocal初始值
 */
public class Test3 {

    static class SubThreadLocal extends ThreadLocal<Date> {
        @Override
        protected Date initialValue() {
            return new Date();
        }
    }
   //定义ThreadLocal对象
    static ThreadLocal<Date> threadLocal = new SubThreadLocal();

    //定义线程类
    static class SubThread extends Thread {
        @Override
        public void run() {
            for(int i = 0; i < 10; i++) {
                //第一次调用threadLocal的get方法,会返回null
                System.out.println("----" + Thread.currentThread().getName() + " value = " + threadLocal.get());
                if(threadLocal.get() == null) {
                    System.out.println("-------");
                    threadLocal.set(new Date());
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SubThread t1 = new SubThread();
        t1.start();
        TimeUnit.SECONDS.sleep(1);
        SubThread t2 = new SubThread();
        t2.start();
    }
}