JUC基础笔记

Posted by Fioncat on March 8, 2020

juc,即java.util.concurrent包的缩写,掌握了juc,就是拿到了Java并发编程的钥匙。

在《Java并发编程实战》等书中,已经详细介绍juc用法,如果你懒得看书,或者是忘了juc的用法,想快速回忆一下,可以看我这篇教程。

本教程很长,有很多的代码示例供食用~

基础

volatile关键字

volatile关键字不属于juc的内容,但是为了铺垫后面的内容,这里先介绍下。

当多个线程之间共享一个数据时,该数据对彼此之间是不可见的。即使是同一个数据,每个线程还是会将其保存在自己独立的内存下面。

下面的代码显示了这一特性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Worker implements Runnable {

    public boolean flag = false;

    @Override
    public void run() {

        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        this.flag = true;
    }
}

public class VolatileDemo {

    public static void main(String[] args) {
        Worker worker = new Worker();

        new Thread(worker).start();

        while (!worker.flag) ;

        System.out.println("程序结束");
    }
}

按理说,worker线程将flag改为true,主线程在flag变为true之后会及时跳出循环,程序退出。

但是,实际运行下来程序并没有退出,这是因为worker线程的flag和主线程的flag是不共享的,workerflag的修改并不会影响到主线程。

要想改变这一点,需要将flag声明为volatile的。这个关键词的作用是,当变量被某个线程改变时,会及时刷新到主存中,读取时也会从主存中读取。可以保证变量是线程之间可见的。

要想让上面的程序及时退出,将上面的flag声明改为:

1
public volatile boolean flag = false;

这样worker对flag的改变对于主线程就是可见的了,程序可以及时退出了。

原子性

如果一个变量需要被多个线程同时访问,对其进行操作就要格外当心。除了可见性问题,可以使用volatile修正,还有原子性问题。

如果一个变量的操作需要多步完成,操作可以细分,则该操作就不具备原子性,例如i++操作就不具备原子性。在并发操作时,就可能因为线程执行非原子操作导致数据读写不一致的情况。

我们可以通过给操作加上synchronized关键字,让操作只能允许一个线程进行,来实现操作的原子性。

另一种实现原子性的方法是使用CAS操作(Compare And Swap)。CAS操作由CPU直接提供,CAS需要下面三个操作数:

  • valueOffset:变量在内存中的位置
  • expect:变量的预估值
  • update:变量的更新值

CAS的操作过程:

  • 从valueOffset取出value,若等于expect,则将valueOffset的值设为update
  • 否则不进行任何操作

那么想要将i++变为原子的,只需要将valueOffset设为iexpect设为读取到的i的值,update设为i+1。这样,只有当数据一致时,才会执行i+1操作。

java.util.concurrent.atomi下,提供了很多原子变量,这些变量都具备:

  • 使用volatile确保变量可见性
  • 使用CAS操作确保操作是原子的

例如,下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package cn.offer.juc;

import java.util.concurrent.atomic.AtomicInteger;

class AtomWorker implements Runnable {

    private AtomicInteger i = new AtomicInteger();

    @Override
    public void run() {
        while (true) {
            System.out.println(i.addAndGet(1));
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class Atomicity {

    public static void main(String[] args) {
        AtomWorker worker = new AtomWorker();
        for (int i = 0; i < 10; ++i) {
            new Thread(worker).start();
        }
    }
}

就可以保证各个线程不会读到重复的i

ThreadPool

线程池的概念不再介绍,这里只介绍juc提供的线程池操作。

要想说线程池,就不得不说一下juc的Executor执行框架,在这个框架下,所有的并发执行单位都以“任务”的形式存在,将任务提交给ExecutorService,即可实现任务的并发调度执行。

ExecutorService可以有很多种,它负责接收任务,执行任务,使用Executors可以创建各种ExecutorService,有下面几种常用的:

  • newSingleThreadExecutor:单一线程,任务会顺序执行
  • newCachedThreadPool:大小不受限制的线程池
  • newFixedThreadPool:大小固定的线程池,当线程不够时,任务需要等待
  • newScheduledThreadPool:大小固定线程池,支持定时及周期性任务执行

ExecutorService提供了下面的将Runnable任务提交执行的方法:

1
2
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

关于Future的使用,见Callable和Future,这里不关心。

下面的代码演示了将线程提交给线程池执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package cn.offer.juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class Task implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getId() + "执行");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getId() + "执行完毕");
    }
}

public class ThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; ++i) {
            service.submit(new Task());
        }
    }
}

Callable和Future

使用传统的Runnable,可以启动一个线程并发执行,但是run方法是没有返回值的,如果我们想要线程能够返回一个值,就可以使用Callable+Future

我们想要一个线程能返回值,这时候让其实现java.util.concurrent.Callable接口,在泛型中指定返回类型,例如,我们让一个worker返回字符串:

1
2
3
4
5
6
7
class CallableWorker implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(2000);
        return "运行完毕";
    }
}

这个call方法和传统的run方法相比,有两个不同:

  • 方法有返回值
  • 方法可以抛出异常

那么,如何执行呢?一般使用ExecutorService来执行,该接口中有如下这个方法:

1
<T> Future<T> submit(Callable<T> task);

Future用于查询执行的Callable(或Runnable)的执行结果、是否完成等信息。该接口的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

这些函数的解释如下:

  • cancel:尝试取消任务的执行。
    • mayInterruptIfRunning:是否允许取消已经启动但是没有执行的任务。
    • 返回:有如下几种情况:
      • 任务已经完成,返回false
      • 任务还没有启动,取消任务,返回取消结果。
      • 任务已经启动,但是mayInterruptIfRunningfalse,返回false
      • 任务已经启动,且mayInterruptIfRunningtrue,取消任务,返回取消结果。
  • isCancelled:返回任务是否在其正常结束之前被取消。
  • isDone:任务是否结束。不论是任务正常结束、抛出异常、被cancel,该函数都会返回true。
  • get():阻塞直到任务结束,随后获取其返回值。
  • get(timeout, unit):在指定的timeout时间内等待任务结束并获取结果,如果超过这个时间没有结束,抛出TimeoutException异常。

另外说明一下get方法可能抛出的其它异常:

  • CancellationException:在等待途中任务被cancel
  • ExecutionException:任务抛出了异常
  • InterruptedException:阻塞过程中被打断

通过Future,我们就可以获取任务的返回值了:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class CallableDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CallableWorker worker = new CallableWorker();
        // 单一线程执行器
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(worker);

        // 获取线程的执行结果
        System.out.println("执行结果:" + future.get());
    }

}

你也可以用Future做很多其它事情,就看你自己发挥了。

Lock

Lock接口的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Lock {

    void lock();

    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();

    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    void unlock();

    Condition newCondition();
}

下面简单介绍一下这些方法:

  • lock:获取锁。如果锁已经被占用,会阻塞。
  • lockInterruptibly:可中断地等待锁,如果等待被中断,抛出InterruptedException
  • tryLock:尝试获取锁,如果获取失败,返回false。
  • tryLock(time, unit):在timeout时间内尝试获取锁,如果在这段时间内获取到锁,返回true,如果没有,返回false。
  • unlock:释放锁
  • newCondition:返回一个绑定到该锁的Condition示例,关于Condition,见Condition一节

我们一般会使用到下面这个Lock的实现类:

  • ReentrantLock:可重入锁,也叫递归锁。指的是,当一个线程获取锁之后,再次获取时,不需要重复等待,可以直接获取锁。
    • 构造时将fair设为true,表示公平锁,公平锁指的是严格按照先来先得的顺序排队等待去获取锁。
    • 构造时将fair设为false,表示非公平锁,非公平锁每次获取锁时,是先直接尝试获取锁,获取不到,再按照先来先得的顺序排队等待。
    • 默认是非公平锁。

锁的操作不难,下面我们重点介绍下读写锁。

ReadWriteLock

读写锁指的是没有线程进行写操作时,多个线程可同时进行读操作,当有线程进行写操作时,其它读写操作只能等待。

即,对于读写锁来说,“读-读能共存,读-写不能共存,写-写不能共存”。

ReadWriteLock接口定义如下:

1
2
3
4
5
6
public interface ReadWriteLock {

    Lock readLock();

    Lock writeLock();
}

其中,readLock用于获取读锁,writeLock用于获取写锁。

我们一般使用实现类ReentrantReadWriteLock,即可重入的读写锁。

下面我们来看一个具体的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package cn.offer.juc;

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class PublicValue {
    public volatile static int value = 0;
    public static ReadWriteLock lock = new ReentrantReadWriteLock();
}

class LockWorker implements Runnable {

    // 该worker是读者还是写者
    private boolean isReader;

    public LockWorker(boolean isReader) {
        this.isReader = isReader;
    }

    @Override
    public void run() {
        if (this.isReader) {
            // 读者
            while (true) {
                PublicValue.lock.readLock().lock();
                System.out.println(Thread.currentThread().getId() +
                        "读取到:" + PublicValue.value);
                PublicValue.lock.readLock().unlock();
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } else {
            // 写者
            while (true) {
                PublicValue.lock.writeLock().lock();
                System.out.println("写者修改value:" + PublicValue.value++);
                PublicValue.lock.writeLock().unlock();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

public class LockDemo {
    public static void main(String[] args) {

        LockWorker writer = new LockWorker(false);
        LockWorker reader = new LockWorker(true);

        // 5个读者和写者
        for (int i = 0; i < 5; ++i) {
            new Thread(reader).start();
            new Thread(writer).start();
        }
    }

}

读的频率比写的要高,使用读写锁,可以加快程序执行的效率。

Condition

在JDK1.5之前,线程的等待唤醒是通过waitnotifynotifyAll实现的。juc提供了Condition,可以更加方便地实现线程的等待唤醒。

Condition是由Lock创建的,它的awaitsignalsignalAll分别对应上面的三个方法。

使用Condition的好处是,使用Lock可以创建不同的Condition,我们可以把这些Condition分配给不同的线程,从而实现唤醒指定不同类别的线程。

等待唤醒的一个经典案例是生产者-消费者模型,下面是这个模型的Condition实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package cn.offer.juc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Shop {
    // 库存剩余
    private int production = 0;
    // 总库存
    private static final int maxProduction = 5;
    // 锁对象
    private Lock lock = new ReentrantLock();
    // 生产者条件,用于等待和唤醒生产者
    private Condition productionCondition = lock.newCondition();
    // 消费者条件,用于等待和唤醒消费者
    private Condition consumeCondition = lock.newCondition();

    // 生产一个数据
    public void produce() {
        this.lock.lock();
        try {
            // 库存满了,生产者等待
            while (this.production > maxProduction) {
                this.productionCondition.await();
            }

            // 生产数据
            System.out.println(Thread.currentThread().getId()
                    + "生产数据:" + (++this.production));

            // 进货了,可以唤醒其它消费者
            this.consumeCondition.signalAll();

        } catch (InterruptedException ignore) {
            // ignore
        } finally {
            this.lock.unlock();
        }
    }

    // 消费者
    public void consume() {
        this.lock.lock();
        try {
            // 库存空了,消费者等待
            while (this.production <= 0) {
                this.consumeCondition.await();
            }

            // 消费数据
            System.out.println(Thread.currentThread().getId()
                    + "消费数据:" + (--this.production));

            // 数据被消费了,唤醒其它生产者
            this.productionCondition.signalAll();

        } catch (InterruptedException e) {
            // ignore
        } finally {
            this.lock.unlock();
        }

    }
}

class Consumer implements Runnable {

    private Shop shop;
    public Consumer(Shop shop) {
        this.shop = shop;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // ignore
            }
            shop.consume();
        }
    }
}

class Producer implements Runnable {

    private Shop shop;
    public Producer(Shop shop) {
        this.shop = shop;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // ignore
            }
            shop.produce();
        }
    }
}

public class ConditionDemo {
    public static void main(String[] args) {
        Shop shop = new Shop();
        Consumer consumer = new Consumer(shop);
        Producer producer = new Producer(shop);

        for (int i = 0; i < 5; ++i) {
            new Thread(producer).start();
            new Thread(consumer).start();
        }

    }
}

这样就可以保证资源被安全地访问。

另一个Condition的例子是,让线程之间交替运行,例如,创建3个线程ABC,让3个线程以”ABCABCABC…“的顺序交替执行。

这可以创建3个ConditionConditionA执行完毕后唤醒ConditionBConditionB执行完毕唤醒ConditionCConditionC执行完毕唤醒ConditionA,以此类推:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package cn.offer.juc;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ExecOrder {
    public static Lock lock = new ReentrantLock();
    // 不同线程名称和其condition
    public static Map<String, Condition> conditionMap;
    // 当前正常执行的线程名称
    public static String currentExec = "A";
    static {
        // 为A、B、C创建condition
        conditionMap = new HashMap<>();
        conditionMap.put("A", lock.newCondition());
        conditionMap.put("B", lock.newCondition());
        conditionMap.put("C", lock.newCondition());
    }
}

class Exec implements Runnable {

    // 当前线程名称
    private String name;
    // 下一个执行线程名称
    private String next;
    public Exec(String name, String next) {
        this.name = name;
        this.next = next;
    }

    public void exec() {
        ExecOrder.lock.lock();
        try {
            if (!ExecOrder.currentExec.equals(this.name)) {
                // 当前不是自己执行
                ExecOrder.conditionMap.get(this.name).await();
            }
            // 当前是自己执行
            System.out.println(this.name);

            // 唤醒下一个
            ExecOrder.conditionMap.get(this.next).signal();
            ExecOrder.currentExec = this.next;

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            ExecOrder.lock.unlock();
        }
    }

    @Override
    public void run() {
        while (true) {
            this.exec();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class ConditionDemo2 {
    public static void main(String[] args) {
        new Thread(new Exec("A", "B")).start();
        new Thread(new Exec("B", "C")).start();
        new Thread(new Exec("C", "A")).start();
    }
}

线程同步

线程之间的工作很多时候需要进行协调,例如某个线程需要等待其余线程完成了才能继续工作。juc提供了很多这种勇于协调同步线程之间工作的工具。

CountDownLatch

也叫闭锁,这个工具很简单,就是用于等待一组事件结束再继续往后执行的。

CountDownLatch内部有一个计数器,初始值由我们指定,每次调用countDown方法,计数器会减一。当计数器减到0,await调用处才会终止阻塞,继续往后执行。

例如,使用4个线程对一个公共变量分别加一,主线程在它们完成后输出这个公共变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package cn.offer.juc;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

class PublicIntValue {
    public static AtomicInteger num = new AtomicInteger(0);
}

class AddValueWorker implements Runnable {

    private CountDownLatch countDownLatch;

    private int sleepSeconds;

    public AddValueWorker(CountDownLatch count, int sleepSeconds) {
        this.countDownLatch = count;
        this.sleepSeconds = sleepSeconds;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(this.sleepSeconds * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getId() +
                "执行,num=" + PublicIntValue.num.addAndGet(1));
        this.countDownLatch.countDown();
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(5);

        // 启动5个线程,分别需要执行1-5秒
        for (int i = 0; i < 5; ++i) {
            new Thread(new AddValueWorker(countDownLatch, i + 1)).start();
        }

        // 等待致5个线程完成
        countDownLatch.await();

        System.out.println("执行结束,结果=" + PublicIntValue.num.get());
    }
}

如果某个地方需要某些任务全部完成才能继续执行,则可以使用CountDownLatch

CyclicBarrier

栅栏用于等待其它线程,是一个线程“同步”的装置。

它的作用是,到达栅栏处,线程会阻塞,等待其它线程,必须所有线程都到达栅栏处,线程才会继续执行。

栅栏内部也有一个计数器,当线程调用await时,计数器减一,如果此时计数器不为0,会阻塞,直到计数器为0,才会继续执行。

例如,一个简单的“掷骰子”游戏,让5个线程分别生成1-6的随机数,所有线程生成完毕之后,再公开自己的数字(打印出来):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package cn.offer.juc;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

class Player implements Runnable {

    private static Random rand = new Random();

    private int sleepTime;
    private CyclicBarrier barrier;

    public Player(CyclicBarrier barrier, int sleepTime) {
        this.barrier = barrier;
        this.sleepTime = sleepTime;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(this.sleepTime * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 生成一个1-6的随机数
        int point = randomInt(1, 6);
        // 等待其它线程完成再公布自己的数
        try {
            this.barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getId() + "的点数:" + point);
    }

    public static int randomInt(int min, int max) {
        return rand.nextInt((max - min) + 1) + min;
    }
}

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(5);
        // 5个玩家
        for (int i = 0; i < 5; ++i) {
            new Thread(new Player(barrier, i + 1)).start();
        }
    }
}

可见,栅栏可以实现让快的线程等待慢的线程。

Semaphore

信号量用于控制访问资源的线程个数。

使用Semaphore时,需要调用acquire来获取资源,如果资源以及被占用满了,将会阻塞直到其它线程释放资源。

在使用资源结束后,一定要调用release来释放资源,将资源让给其它线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.offer.juc;

import java.util.concurrent.Semaphore;

class SourceWorker implements Runnable {

    private Semaphore semaphore;
    public SourceWorker(Semaphore semaphore) {
        this.semaphore = semaphore;
    }

    @Override
    public void run() {

        try {
            // 尝试获取资源
            semaphore.acquire();

            // 使用资源
            System.out.println(Thread.currentThread().getId() + "正在使用资源...");
            Thread.sleep(1000);


        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放资源
            semaphore.release();
            System.out.println(Thread.currentThread().getId() + "使用完毕,释放资源");
        }

    }
}

public class SemaphoreDemo {
    public static void main(String[] args) {
        // 资源只供3个线程同时访问
        Semaphore semaphore = new Semaphore(3);

        // 创建5个线程去访问资源
        for (int i = 0; i < 5; ++i) {
            new Thread(new SourceWorker(semaphore)).start();
        }
    }
}

这样就只能同时有3个线程访问资源了。这可以控制资源被并发访问的个数,可以用于控制资源访问的压力。

Exchanger

交换器用于两个线程执行到某个时间点,进行数据交换的。在某个时间点调用Exchangerexchange时,如果另一个线程没有执行到exchange,则当前线程会进行阻塞;当另一个线程也执行到exchange,会继续运行,并将对方的数据返回过来,自己的数据也会被返回给对方。

下面是一个简单的例子,两个线程分别计算2和3的平方值,计算完毕后将结果发送给对方,一方将两个结果相加,一方将两个结果相乘,随后各自打印出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package cn.offer.juc;

import java.util.concurrent.Exchanger;

class ExchangeWorker implements Runnable {

    // 操作数
    private int num;
    // 操作类型,1表示计算两个数平方和,2表示两个数平方积
    private int type;
    // 交换器,用于交换双方的数据
    private Exchanger<Integer> exchanger;
    // 模拟操作时间
    private int sleepTime;

    public ExchangeWorker(int num, int type, int sleepTime,
                          Exchanger<Integer> exchanger) {
        this.num = num;
        this.type = type;
        this.sleepTime = sleepTime;
        this.exchanger = exchanger;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(this.sleepTime * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        int result = this.num * this.num;
        System.out.println(this.num + "平方计算完毕:" + result);
        try {
            int other = this.exchanger.exchange(result);
            if (this.type == 1) {
                System.out.println("两个数的平方和:" + (result + other));
            } else {
                System.out.println("两个数的平方积:" + (result * other));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }
}

public class ExchangeDemo {
    public static void main(String[] args) {
        Exchanger<Integer> exchanger = new Exchanger<>();
        new Thread(new ExchangeWorker(
                2, 1, 1, exchanger)).start();
        new Thread(new ExchangeWorker(
                3, 2, 3, exchanger)).start();
    }
}

2的平方会被先计算完毕,随后会等待3的平方的计算。计算完毕后,双方会交换互相的结果,随后输出和跟积。

数据结构

juc也提供了一些并发安全的数据结构供我们使用。

阻塞队列

阻塞队列是由固定长度的,入队和出队操作是阻塞的:

  • 入队列时,如果队列满了,将会阻塞,直到队列有位置。
  • 出队列时,如果队列时空的,将会阻塞,直到队列有数据。

主要有下面这些阻塞队列供我们使用:

  • ArrayBlockingQueue:基于数组的阻塞队列
  • LinkedBlockingQueue:基于链表的阻塞队列
  • PriorityBlockingQueue:支持优先级排序的阻塞队列
  • DelayQueue:支持延时获取的无界阻塞队列
  • SynchronousQueue:不储存元素的阻塞队列
  • LinkedTransferQueue:基于链表的无边界阻塞队列
  • LinkedBlockingDeque:基于链表的双向阻塞队列

阻塞队列有以下方法:

  • put:阻塞地向队列插入数据
  • take:阻塞地从队列获取数据
  • drainTo(Collection<? super E> c):移除队列中所有元素,到Collection
  • int drainTo(Collection<? super E> c, int maxElements):移除最多maxElements个元素到Collection
  • add:不阻塞地插入数据,如果队列满,会返回false而不是阻塞
  • poll:不阻塞地获取数据,如果没有数据,返回null
  • poll(timeout, unit):在指定时间内阻塞获取数据,如果这个时间内没有获取到,返回null
  • remainingCapacity:当前队列剩余的可用空间
  • contain:队列是否包含某个数据
  • remove:删除某个元素,返回删除是否成功

下面来依次介绍这些队列:

固定长度的阻塞队列

定长阻塞队列可以用于实现消费者-生产者模型。现在,生产者只需要不断向阻塞队列添加数据,消费者从中获取即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package cn.offer.juc.bqueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

class BlockQueueBasedShop {
    // 长度为5的阻塞队列
    private BlockingQueue<Integer> queue =
           new LinkedBlockingQueue<>(5);

    public void produce(int num) {
        try {
            this.queue.put(num);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int consume() {
        try {
            return this.queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return -1;
    }
}

public class BaseBlockQueueDemo {
    // 生产的递增的数字
    private static AtomicInteger grow = new AtomicInteger(0);

    public static void main(String[] args) {
        BlockQueueBasedShop shop = new BlockQueueBasedShop();
        // 5个消费者和5个生产者
        for (int i = 0; i < 5; ++i) {

            // 消费者
            new Thread(() -> {
                while (true) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    int num = shop.consume();
                    if (num != -1) {
                        System.out.println("消费到了:" + num);
                    }
                }
            }).start();

            // 生产者
            new Thread(() -> {
                while (true) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    int num = grow.getAndAdd(1);
                    shop.produce(num);
                    System.out.println("生产了:" + num);
                }
            }).start();
        }
    }
}

这样的实现比使用等待唤醒会简单一些。

优先队列

我们再来关注一个队列:PriorityBlockingQueue。这种队列支持优先获取某些元素。

要想优先获取元素,队列储存的对象必须实现Comparable接口,例如,想依据商品的价格作为优先级,则需要下面的商品类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Product implements Comparable<Product> {
    private String name;
    private Integer price;

    public Product(String name, Integer price) {
        this.name = name;
        this.price = price;
    }

    @Override
    public int compareTo(Product o) {
        return -this.price.compareTo(o.price);
    }

    @Override
    public String toString() {
        return this.name + ",价格:" + this.price;
    }
}

这样就会按照商品价格的倒序来取元素了:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class PriorityBlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Product> queue = new PriorityBlockingQueue<>();
        queue.put(new Product("笔记本", 12));
        queue.put(new Product("苹果", 20));
        queue.put(new Product("铅笔", 6));
        queue.put(new Product("mac电脑", 10000));

        while (true) {
            System.out.println(queue.take());
        }
    }
}

输出:

1
2
3
4
mac电脑,价格:10000
苹果,价格:20
笔记本,价格:12
铅笔,价格:6

延迟阻塞队列

延迟阻塞队列是基于优先队列的,它允许我们延时获取元素。

要使用延迟阻塞队列,类需要实现Delayed接口,其中包含用于比较的compareTo和用于获取延时时间的getDelay

compareTo需要根据延时时间进行排序,这样可以优先获取低延时的元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ProductDelay implements Delayed {

    private String name;
    private long time;

    public ProductDelay(String name, long time) {
        this.name = name;
        this.time = time;
    }

    @Override
    public String toString() {
        return this.name;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.time -
                System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
                o.getDelay(TimeUnit.MILLISECONDS));
    }
}

这样使用DelayQueue,就能按照顺序从队列里延时获取元素了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class DelayQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<ProductDelay> queue = new DelayQueue<>();
        long now = System.currentTimeMillis();
        queue.offer(new ProductDelay("1000", now + 1000));
        queue.offer(new ProductDelay("200", now + 200));
        queue.offer(new ProductDelay("300", now + 300));
        queue.offer(new ProductDelay("500", now + 500));
        queue.offer(new ProductDelay("2000", now + 2000));

        while (true) {
            System.out.println(queue.take());
        }
    }
}

输出:

1
2
3
4
5
200
300
500
1000
2000

同步队列

同步队列不储存元素,每个put都需要等待一个take。同步队列一般用于线程之间传递某个单一的信号。

因为不储存元素,同步队列比较特殊,有如下特征:

  • 调用peekiterator永远返回null
  • isEmpty永远返回true
  • remainingCapacity永远返回0
  • removeremoveAll永远返回false

下面使用同步队列来实现两个线程交替执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package cn.offer.juc.bqueue;

import java.util.concurrent.SynchronousQueue;

class SynchronousWorker implements Runnable {

    private SynchronousQueue<Boolean> synIn;
    private SynchronousQueue<Boolean> synOut;

    private String name;
    public SynchronousWorker(
            String name, SynchronousQueue<Boolean> synIn,
            SynchronousQueue<Boolean> synOut) {
        this.name = name;
        this.synIn = synIn;
        this.synOut = synOut;
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 等待获取开始信号
                this.synIn.take();
                // 执行
                System.out.println(name);
                // 发送信号
                this.synOut.put(true);

                Thread.sleep(1000);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 用于同步的两个信号量
        SynchronousQueue<Boolean> synA = new SynchronousQueue<>();
        SynchronousQueue<Boolean> synB = new SynchronousQueue<>();

        new Thread(new SynchronousWorker("A", synA, synB)).start();
        new Thread(new SynchronousWorker("B", synB, synA)).start();

        // 需要给一个开始信号
        synA.put(true);
    }
}

通过两个信号,就可以实现两个线程的交替执行。

ConcurrentHashMap

众所周知,HashMap是非线程安全的,HashTable是线程安全的。但是HashTable是直接使用synchronized来将所有的方法进行互斥的,效率低下。

juc提供了一种更加高效的线程安全map,即ConcurrentHashMap。它采用了采用分离锁技术,将hash表的数组部分分成若干段,每段维护一个锁,这些段可以并发的进行写操作。

分段锁使得ConcurrentHashMap的吞吐量比一般的同步hash表高得多。在并发情况下如果要用到hash表,应该使用这个类,而不是HashTable

这个类的用法和HashMap一样,这里不再演示。

CopyOnWriteArrayList/Set

采用写时复制的容器。所有的线程都共享同一个容器对象,当某个线程修改容器时,会Copy一个新的容器进行修改,所有线程后续将会访问新的容器。

这样对容器进行读的时候不需要加锁,可以高效并发完成,因为读的容器不可能添加新的内容。写的时候无法进行读。

这是以增加写的代价换取读高效率。如果容器的读次数大于写的次数,则可以考虑使用这样的容器。

这种容器的用法和普通的容器一样,不再演示。