# 线程

返回:专题

🐉 Java并发工具类CyclicBarrier与CountDownLatch 🐉 线程一些常见操作,如休眠等
线程池 ThreadLocal
进程和线程基础知识全家桶

# 结构化并发

# 同步

# 1.synchronized

同步是一种高开销的操作,因此应该尽量减少同步的内容。
通常没有必要同步整个方法,使用synchronized代码块同步关键代码即可。
测试发现存在问题,结果虽然对但是过程中出现过问题,

# 2.volatile(废弃)

修饰需要同步的变量

# 3.ReentrantLock

注:关于Lock对象和synchronized关键字的选择:

  • 最好两个都不用,使用一种***java.util.concurrent***包提供的机制, 能够帮助用户处理所有与锁相关的代码。
  • 如果synchronized关键字能满足用户的需求,就用synchronized,因为它能简化代码。
  • 如果需要更高级的功能,就用ReentrantLock类,此时要注意及时释放锁,否则会出现死锁,通常在finally代码释放锁。

# 各种锁

准确的来说我们使用锁就是为了安全。
Java中加锁准确的来说是为了保证并发安全,同时也是为了解决内存中的一致性,原子性,有序性三种问题

# 悲观锁

Java中悲观锁的实现有synchronized关键字和Lock的实现类都是悲观锁

# 乐观锁

在数据库中我们可以使用version版本号去实现。在Java中我们是使用CSA来实现

private AtomicLong atomicLong = new AtomicLong();//乐观锁

    @Test
    public synchronized void testLock(){//悲观锁

    }
1
2
3
4
5
6
# CAS

CAS(Compare And Swap)算法是一种无锁算法,是Java提供的非阻塞原子性操作。
简单的来说CAS就是一个死循环,在循环中判断预期的值和内存中的值是否相等,如果相等的话就执行修改,如果如果不相等的话就继续循环,直到执行成功后退出。

CAS需要三个操作值:

  • 预期值的 A
  • 内存中的V
  • 将要修改的B

# 公平锁和非公平锁

private ReentrantLock fairLock = new ReentrantLock(true);
    private ReentrantLock unfairLock = new ReentrantLock(false);
    private ReentrantLock defaultLock = new ReentrantLock();

    public static void main(String[] args) {
        final ThreadTest threadTest = new ThreadTest();
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++){
            executorService.execute(() -> {
                System.out.println("线程ID:"+Thread.currentThread().getName()+",我来了");
                threadTest.lockTest();
            });
        }
    }

    public void lockTest(){
        try {
            fairLock.lock();
            Thread.sleep(1000);
            System.out.println("获取锁的进程ID:"+Thread.currentThread().getName()+",拿到了锁");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            fairLock.unlock();
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
线程ID:pool-1-thread-1,我来了
线程ID:pool-1-thread-2,我来了
线程ID:pool-1-thread-4,我来了
线程ID:pool-1-thread-5,我来了
线程ID:pool-1-thread-3,我来了
获取锁的进程ID:pool-1-thread-1,拿到了锁
获取锁的进程ID:pool-1-thread-2,拿到了锁
获取锁的进程ID:pool-1-thread-4,拿到了锁
获取锁的进程ID:pool-1-thread-5,拿到了锁
获取锁的进程ID:pool-1-thread-3,拿到了锁
1
2
3
4
5
6
7
8
9
10

按照顺序来的,可以发现。在需要公平资源的场景下使用公平锁,如果不需要特殊的公平对待的话尽量使用非公平锁,因为公平锁会带来性能的开销。

其他如共享锁、独占锁、自旋锁

# Semaphore的使用和详解(限制并发数量)

back

Semaphore的作用:限制线程并发的数量。类似springBoot[七、应用限流]

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
1
2
3

类Semaphore的构造函数permits 是许可的意思,代表同一时间,最多允许permits执行acquire() 和release() 之间的代码。

例如:

Semaphore semaphore = new Semaphore(1);
1

表示同一时间内最多只允许1个线程执行 acquire()和release()之间的代码

  • 方法acquire(n) 的功能是每调用1次此方法,就消耗掉n个许可。
  • 方法release(n) 的功能是每调用1次此方法,就动态添加n个许可。
  • 方法acquireUnnterruptibly()作用是是等待进入acquire() 方法的线程不允许被中断。
  • 方法availablePermits() 返回Semaphore对象中当前可以用的许可数。
  • 方法drainPermits() 获取并返回所有的许可个数,并且将可用的许可重置为0
  • 方法 getQueueLength() 的作用是取得等待的许可的线程个数
  • 方法 hasQueueThreads() 的作用是判断有没有线程在等待这个许可
  • 公平和非公平信号量:

有些时候获取许可的的顺序与线程启动的顺序有关,这是的信号量就要分为公平和非公平的。所谓的公平信号量是获得锁的顺序与线程启动的顺序有关,但不代表100%获得信号量,仅仅是在概率上能保证,而非公平信号量就是无关的。

例如:

Semaphore semaphore = new Semaphore(1false);
1

False:表示非公平信号量,即线程启动的顺序与调用semaphore.acquire() 的顺序无关,也就是线程先启动了并不代表先获得 许可。
True:公平信号量,即线程启动的顺序与调用semaphore.acquire() 的顺序有关,也就是先启动的线程优先获得许可 。

  • 方法tryAcquire() 的作用是尝试获取1个许可。如果获取不到则返回false,通常与if语句结合使用,其具有无阻塞的特点。无阻塞的特点可以使不至于在同步处于一直持续等待的状态。
  • 方法tryAcquire(n) 的作用是尝试获取n个许可,如果获取不到则返回false
  • 方法tryAcquire(long timeout,TimeUnit unit)的作用是在指定的时间内尝试获取1个许可,如果获取不到则返回false
  • 方法tryAcquire(int permits,long timeout,TimeUnit unit) 的作用是在指定的时间内尝试获取n 个许可,如果获取不到则返回false
  • 多进路-多处理-多出路:允许多个线程同时处理任务
@Test
    public void test3(){
        try {
            semaphore.acquire();
            log.info("[{}]开始时间:[{}]",Thread.currentThread().getName(), Instant.now().toEpochMilli());
            for (int i = 0; i < 10; i++) {
                log.info("[{}]打印[{}]次",Thread.currentThread().getName(),i+1);
            }
            log.info("[{}]结束时间:[{}]",Thread.currentThread().getName(),Instant.now().toEpochMilli());
            semaphore.release();
        } catch (Exception e){
            log.error(e.getMessage());
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private final Semaphore semaphore = new Semaphore(1);

public Result rtuCommand(JSONObject param) throws Exception {
    ExecuteCommand executeCommand = new ExecuteCommand(param);
    return executeCommand.call();
}

class ExecuteCommand implements Callable<Result> {
    private JSONObject param;

    public ExecuteCommand(JSONObject param) {
        this.param = param;
    }

    /**
        * Computes a result, or throws an exception if unable to do so.
        *
        * @return computed result
        * @throws Exception if unable to compute a result
        */
    @Override
    public Result call() throws Exception {
        semaphore.acquire();
        String type = param.getString("ctrl");
        Result result;
        if (StringUtils.isBlank(type)) {
            result = rtuRead(param);
        } else {
            result = rtuOrder(param);
        }
        semaphore.release();
        return result;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# CompletableFuture

back

CompletableFuture是java8中新增的一个类,算是对Future的一种增强,用起来很方便,也是会经常用到的一个工具类

  • 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
  • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
  • 它实现了Future和CompletionStage接口

# CompletionStage接口

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

# 常见的方法

# runAsyncsupplyAsync方法

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
1
2
3
4

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
示例:

//无返回值
public static void runAsync() throws Exception {
 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
 try {
 TimeUnit.SECONDS.sleep(1);
 } catch (InterruptedException e) {
 }
 System.out.println("run end ...");
 });
 future.get();
}
//有返回值
public static void supplyAsync() throws Exception {
 CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
 try {
 TimeUnit.SECONDS.sleep(1);
 } catch (InterruptedException e) {
 }
 System.out.println("run end ...");
 return System.currentTimeMillis();
 });
 long time = future.get();
 System.out.println("time = "+time);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 计算结果完成时的回调方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
1
2
3
4
# whenComplete 和 whenCompleteAsync 的区别
  • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
  • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

示例:

public static void whenComplete() throws Exception {
 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
 try {
 TimeUnit.SECONDS.sleep(1);
 } catch (InterruptedException e) {
 }
 if(new Random().nextInt()%2>=0) {
 int i = 12/0;
 }
 System.out.println("run end ...");
 });
 future.whenComplete(new BiConsumer<Void, Throwable>() {
 @Override
 public void accept(Void t, Throwable action) {
 System.out.println("执行完成!");
 }
 });
 future.exceptionally(new Function<Throwable, Void>() {
 @Override
 public Void apply(Throwable t) {
 System.out.println("执行失败!"+t.getMessage());
 return null;
 }
 });
 TimeUnit.SECONDS.sleep(2);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# thenApply

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
1
2
3

T:上一个任务返回结果的类型、U:当前任务的返回值类型
示例:

private static void thenApply() throws Exception {
 CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
 @Override
 public Long get() {
 long result = new Random().nextInt(100);
 System.out.println("result1="+result);
 return result;
 }
 }).thenApply(new Function<Long, Long>() {
 @Override
 public Long apply(Long t) {
 long result = t*5;
 System.out.println("result2="+result);
 return result;
 }
 });
 long result = future.get();
 System.out.println(result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# handle

handle 是执行任务完成时对结果的处理。
handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
1
2
3

示例:

public static void handle() throws Exception{
 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
 @Override
 public Integer get() {
 int i= 10/0;
 return new Random().nextInt(10);
 }
 }).handle(new BiFunction<Integer, Throwable, Integer>() {
 @Override
 public Integer apply(Integer param, Throwable throwable) {
 int result = -1;
 if(throwable==null){
 result = param - 2;
 }else{
 System.out.println(throwable.getMessage());
 }
 return result;
 }
 });
 System.out.println(future.get());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# thenAccept

接收任务的处理结果,并消费处理,无返回结果。

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public static void thenAccept() throws Exception{
 CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
 @Override
 public Integer get() {
 return new Random().nextInt(10);
 }
 }).thenAccept(integer -> {
 System.out.println(integer);
 });
 future.get();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# thenRun

跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun 。

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

public static void thenRun() throws Exception{
 CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
 @Override
 public Integer get() {
 return new Random().nextInt(10);
 }
 }).thenRun(() -> {
 System.out.println("thenRun ...");
 });
 future.get();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# thenCombine

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

private static void thenCombine() throws Exception {
 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {
 @Override
 public String get() {
 return "hello";
 }
 });
 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(new Supplier<String>() {
 @Override
 public String get() {
 return "hello";
 }
 });
 CompletableFuture<String> result = future1.thenCombine(future2, new BiFunction<String, String, String>() {
 @Override
 public String apply(String t, String u) {
 return t+" "+u;
 }
 });
 System.out.println(result.get());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# thenAcceptBoth

当两个CompletionStage都执行完成后,把结果一块交给thenAcceptBoth来进行消耗

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);

private static void thenAcceptBoth() throws Exception {
 CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
 @Override
 public Integer get() {
 int t = new Random().nextInt(3);
 try {
 TimeUnit.SECONDS.sleep(t);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 System.out.println("f1="+t);
 return t;
 }
 });
 CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
 @Override
 public Integer get() {
 int t = new Random().nextInt(3);
 try {
 TimeUnit.SECONDS.sleep(t);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 System.out.println("f2="+t);
 return t;
 }
 });
 f1.thenAcceptBoth(f2, new BiConsumer<Integer, Integer>() {
 @Override
 public void accept(Integer t, Integer u) {
 System.out.println("f1="+t+";f2="+u+";");
 }
 });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# Future

# FutureTask

futureTask.get()方法要想捕获实际方法抛出的异常,需要catch ExecutionException,否则只会被Exception吞噬

420541210266609.png

327621410259278.png

上面的实例中,即使在future.get()中捕获CommonRuntimeException,也无济于事,最终都只会走到Exception中,

必须捕获ExecutionException,再通过e.getCause()获取真实的异常类型

# java并发编程:一步一步带你手写CountDownLatch

back

  • LockSupport 用于锁定线程
    ConcurrentLinkedQueue 用于同步线程队列

  • countdownlatch没有实现任何的类,其最基本的两个方法 await和conutdown

package com.gunplan.toutiao.pages;
public class MyCountDownLatched {
 public MyCountDownLatched(int value){
 }
 public void await(){
 }
 public void countDown(){
 }
}
1
2
3
4
5
6
7
8
9
  • 需要有一个对于目前数字的记录器,count值,因为这个类肯定会被并发访问,所以我们用atomticInt的cas机制来解决线程安全的问题
    同时使用 volatile 关键字,是volatile写对所有读同步(happens-before)原则
private volatile AtomicInteger count;
public MyCountDownLatched(int value) {
 count = new AtomicInteger(value);
}
1
2
3
4
  • 下面我们来实现countdown,这个其实也很简单
public void countDown() {
 count.decrementAndGet();
}
1
2
3
  • 下面我们写一下await方法
public void await() {
 if (count.get()!=0)
 {
 LockSupport.park();
 }
}
1
2
3
4
5
6
  • 当数量不为0的时候,线程等待,将其记录下来,当conut为0时唤醒
    使用这个队列进行记录。
private ConcurrentLinkedQueue<Thread> threads = new ConcurrentLinkedQueue<>()
1
  • countdown需要具有唤醒功能
public void countDown() {
 if (count.decrementAndGet() == 0) {
 //这里线程安全,因为==0只能有一次
 threads.parallelStream().forEach(LockSupport::unpark);
 threads.clear();
 }
}
1
2
3
4
5
6
7
  • 完整代码
package com.gunplan.toutiao.pages;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
public class MyCountDownLatched {
 private volatile AtomicInteger count;
 private volatile ConcurrentLinkedQueue<Thread> threads = new ConcurrentLinkedQueue<>();
 public MyCountDownLatched(int value) {
 count = new AtomicInteger(value);
 }
 public long getCount() {
 return count.intValue();
 }
 public void await() {
 if (count.get() != 0) {
 threads.add(Thread.currentThread());
 LockSupport.park();
 }
 }
 public void countDown() {
 if (count.decrementAndGet() == 0) {
 //这里线程安全,因为==0只能有一次
 threads.parallelStream().forEach(LockSupport::unpark);
 threads.clear();
 }
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# java多线程里面经常提到原子性

back

通过volatile关键字解决可见性问题,一个线程对共享变量的修改,能够及时的被其他线程看到。只要加了volatile关键字,所有对变量的读取立刻进行同步。volatile关键字的用途:禁止缓存;相关的变量不做重排序

# 线程安全

线程安全是多线程编程时的计算机程序代码中的一个概念。当多个线程访问某个方法时,不管你通过怎样的调用方式或者说这些线程如何交替的执行,我们在主程序中不需要去做任何的同步,这个类的结果行为都是我们设想的正确行为,那么我们就可以说这个类时线程安全的。

# 两个线程进行数据交换都不会

Exchanger就是线程之间的数据交换器,只能用于两个线程之间的数据交换

private static void test1() {
    Exchanger exchanger = new Exchanger();

    new Thread(() -> {
        try {
            Object data = "-公众号Java技术栈AAA";
            System.out.println(Thread.currentThread().getName() + data);

            // 开始交换数据
            data = exchanger.exchange(data);
            System.out.println(Thread.currentThread().getName() + data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    new Thread(() -> {
        try {
            Object data = "-公众号Java技术栈BBB";
            System.out.println(Thread.currentThread().getName() + data);

            // 开始交换数据
            data = exchanger.exchange(data);
            System.out.println(Thread.currentThread().getName() + data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static void test4() {
    Exchanger exchanger = new Exchanger();

    for (int i = 1; i <= 10; i++) {
        Integer data = i;
        new Thread(() -> {
            try {
                Object exchange = exchanger.exchange(data);
                System.out.println(Thread.currentThread().getName() + "-" + exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "Java技术栈" + i).start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

发现两两交换数据

Thread-1--3
Thread-2--0
Thread-7--6
Thread-9--8
Thread-6--7
Thread-4--5
Thread-5--4
Thread-8--9
Thread-3--1
Thread-0--2
1
2
3
4
5
6
7
8
9
10