原创

归纳篇:Java的多线程与并发问题

2022-04-10 22:21:20 星期日


一、线程的核心概念

    在此我将对 Java 中的多线程知识做一个全面的总结:
    首先,一个 CPU 在一个时间节点只能执行一个代码,因为代码切换得很快,所以可以给我们造成多线程的错觉。实际上,在只有一个 CPU 的情况下,多线程只是一种模拟出来的概念。只有在多个 CPU,即多核系统中,才存在真正的多线程。
    线程分为用户线程和守护线程,每个线程都可以被设置为守护线程,每个线程都有优先级,守护线程的优先级往往比较低,每个线程都可以通过 Thread.setDaemon() 方法设置为守护线程,当进程中只存在守护线程的时候,JVM 就会退出。
    一个进程中通常包括多个线程,在程序运行时,即使我们没有自己创建线程,后台也会存在多个线程,如 GC 线程,主线程。这些线程在进程中是相互独立的,就像公路上用分隔带分开的一条条车道,互不相交。
    main() 称之为主线程,为系统的入口点,用于执行整个程序。
    在一个进程中,如果开辟了多个线程,线程的执行由调度器安排调度,调度器与操作系统紧密相关,先后顺序不能人为干预。这些线程的执行没有先后顺序。
    对同一份资源操作时,会存在资源抢夺的问题,这时候往往需要加入并发控制。
    线程会带来额外的开销,如 CPU 调度时间,并发控制开销。
    每个线程只与自己的工作内存交互,工作内存再与主存交互,因此加载和存储主内存控制不当会造成数据不一致(当多个线程同时操作一个主存时,就可能产生主存与工作内存数据不一致的问题)。

二、线程创建

    在 Java 中,创建线程有三种方式:继承 Thread 类(重写 Run() )、实现 Runnable 接口(重写 Run() )、实现 Callable 接口(重写 Call() )。因为 Java 存在单继承的局限性,所以一般不采用继承 Thread 类的方法(如果你继承了 Thread 类,后期你发现不得不继承其他类的时候怎么办?把前面的代码全部重构?而且类可能只要求可执行就行,继承整个 Thread 类开销太大,采用实现接口的方式也便于共享资源),而 Callable 接口存在于 JUC 并发包下,使用的并不多,因此创建线程最常见的方法就是实现 Runnable 接口。
    实现 Runnable 和 Callable 接⼝的类只能当做⼀个可以在线程中运⾏的任务,不是真正意义上的线程,因此最后还需要通过 Thread 来调⽤。可以理解为任务是通过线程驱动从⽽执⾏的。
    启动线程需要调用 start() 方法,而不是 Run() 方法。当调⽤ start() ⽅法启动⼀个线程时,虚拟机会将该线程放⼊就绪队列中等待被调度,当⼀个线程被调度时会执⾏该线程的 run() ⽅法,如果直接调用 run() ⽅法,启动的只是进程。现在我们来具体探讨这三种方法:

    1、继承 Thread 类

    Thread 类存在于 lang 包下,继承了 Object 类,实现了 Runnable 接口。

package com.hyc.thread;

public class MyThread extends Thread {
    public void run() {
        // ...
    }
}

    通过start() 方法启动线程。

public static void main(String[] args) {
        MyThread mt = new MyThread();
        mt.start();
    }

    实例1:多线程抢票。

package com.hyc.thread;

public class GetTicket implements Runnable {
    //票数
    private int ticketNumbers = 99;
    @Override
    public void run() {
        while (true){
            if (ticketNumbers < 0){
                break;
            }
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"--->"+ticketNumbers);
            ticketNumbers --;
        }
    }

    public static void main(String[] args) {
        //一份资源
        GetTicket gt = new GetTicket();
        //多个代理
        new Thread(gt,"Agency1").start();
        new Thread(gt,"Agency2").start();
        new Thread(gt,"Agency3").start();
    }
}

    通过运行上述实例,我们会发现很明显的并发问题,即在抢票系统中出现有不同代理抢得同一张票的情况(尤其是在模拟了系统时延的时候,这个问题尤为突出)。本实例作为一个引子,此类有关多线程的并发问题我将在后文中进行详细的探讨。

    2、实现 Runnable 接口

    实现接⼝中的 run() ⽅法。

package com.hyc.thread;

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        // ...
    }
}

    使⽤ Runnable 对象再创建⼀个 Thread 对象,然后调⽤ Thread 对象的 start() ⽅法来启动线程,这个 Thread 对象也被称为代理对象。

public static void main(String[] args) {
        MyRunnable instance = new MyRunnable();
        Thread thread = new Thread(instance);
        thread.start();
    }

    实例2:百米赛跑。

package com.hyc.thread;

public class Competition implements Runnable{
    private String winner;//胜利者
    @Override
    public void run() {
        for (int steps = 1; steps <= 100;steps++){
            //速度慢的一方
            if (Thread.currentThread().getName().equals("player2") && steps % 10 == 0){
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName()+"--->"+steps);
            //比赛是否结束
            boolean flag = gameOver(steps);
            if (flag){
                break;
            }
        }
    }
    private boolean gameOver(int steps){
        if (winner != null){
            return true;
        }else {
            if (steps == 100){
                winner = Thread.currentThread().getName();
                System.out.println("winner--->"+winner);
                return false;
            }
        }
        return false;
    }

    public static void main(String[] args) {
        Competition c = new Competition();
        new Thread(c,"player1").start();
        new Thread(c,"player2").start();
    }
}

    3、实现 Callable 接口

    实现接口中的 Call() 方法。

package com.hyc.thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class MyCallable implements Callable<Integer> {
    public Integer call() {
        return 123;
    }
}

    与 Runnable 相⽐,Callable 可以有返回值,返回值通过 FutureTask 进⾏封装。

public static void main(String[] args) throws ExecutionException,
            InterruptedException {
        MyCallable mc = new MyCallable();
        FutureTask<Integer> ft = new FutureTask<>(mc);
        Thread thread = new Thread(ft);
        thread.start();
        System.out.println(ft.get());
    }

三、线程状态

    ⼀个线程只能处于⼀种状态,这⾥的线程状态特指 Java 虚拟机的线程状态,不能反映线程在特定操作系统下的状态。

    1、新建(NEW)。

    创建后尚未启动。

    2、可运行(RUNABLE)。

    正在 Java 虚拟机中运⾏。但是在操作系统层⾯,它可能处于运⾏状态,也可能等待资源调度(例如处理器资源),资源调度完成就进⼊运⾏状态。所以该状态的可运⾏是指可以被运⾏,具体有没有运⾏要看底层操作系统的资源调度。

    3、阻塞(BLOCKED)。

    请求获取 monitor lock 从⽽进⼊ synchronized 函数或者代码块,但是其它线程已经占⽤了该 monitorlock,所以出于阻塞状态。要结束该状态进⼊从⽽ RUNABLE 需要其他线程释放 monitor lock。

    4、无限期等待(WAITING)。

    等待其它线程显式地唤醒。
    阻塞和等待的区别在于,阻塞是被动的,它是在等待获取 monitor lock 。而等待是主动的,通过调用Object.wait()等方法进入。

进入方法退出方法
没有设置 Timeout 参数的Object.wait()方法Object.notify() / Object.notifyAll()
没有设置 Timeout 参数的Thread.join()方法被调用的线程执行完毕
LockSupport.park()方法LockSupport.unpark(Thread)
    5、限期等待(TIMED_WAITING)。

    无需等待其他线程显式的唤醒,在一定时间之后会被系统自动唤醒。

进入方法退出方法
Thread.sleep()方法时间结束
设置了 Timeout 参数的Object.wait()方法时间结束/Object.notify() / Object.notifyAll()
设置了 Timeout 参数的Thread.join()方法时间结束 / 被调用的线程执行完毕
LockSupport.parkNanos()方法LockSupport.unpark(Thread)
LockSupport.parkUntil()方法LockSupport.unpark(Thread)

    调用Thread.sleep()方法使线程进入限期等待状态时,常常用“使一个线程睡眠”进行描述。调用Object.wait()方法使线程进入限期等待或者无限期等待时,常常用“挂起一个线程”进行描述。睡眠和挂起是用来描述行为,而阻塞和等待用来描述状态。

    6、死亡(TERMINATED)。

    可以是线程结束任务之后自己结束,或者产生了异常而结束。

四、基础线程机制

Executor

    Executor 管理多个异步任务的执⾏,⽽⽆需程序员显式地管理线程的⽣命周期。这⾥的异步是指多个任 务的执⾏互不⼲扰,不需要进⾏同步操作。
    主要有三种 Executor:
    CachedThreadPool:⼀个任务创建⼀个线程;
    FixedThreadPool:所有任务只能使⽤固定⼤⼩的线程;
    SingleThreadExecutor:相当于⼤⼩为 1 的 FixedThreadPool。

public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            executorService.execute(new MyRunnable());
        }
        executorService.shutdown();
    }

Daemon

    守护线程是程序运⾏时在后台提供服务的线程,不属于程序中不可或缺的部分。当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程。main()属于非守护线程。在线程启动之前使用setDaemon()方法可以将一个线程设置为守护线程。

public static void main(String[] args) {
        Thread thread = new Thread(new MyRunnable());
        thread.setDaemon(true);
    }

sleep()

    Thread.sleep(millisec)方法会休眠当前正在执行的线程,millisec 单位为毫秒。
sleep()可能会抛出 InterruptedException,因为异常不能跨线程传播回main()中,因此必须在本地进行处理。线程中抛出的其它异常也同样需要在本地进行处理。

public void run() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

yield()

    对静态⽅法Thread.yield()的调⽤声明了当前线程已经完成了⽣命周期中最重要的部分,可以切换给其 它线程来执⾏。该⽅法只是对线程调度器的⼀个建议,⽽且也只是建议具有相同优先级的其它线程可以运⾏。

public void run() {
        Thread.yield();
    }

五、中断

    一个线程执行完毕之后会自动结束,如果在运行中发生异常也会提前结束。

InterruptedException()

    通过调用一个线程的interrupt()来中断该线程,如果该线程处于阻塞、限期等待或者无限期等待状态,那么就会抛出 InterruptedException,从而提前结束该线程。但是不能中断 I / O 阻塞和 synchronized 锁阻塞。对于以下代码,在main()中启动一个线程之后再中断它,由于线程中调用了Thread.sleep()方法,因此会抛出一个 InterruptedException,从而提前结束线程,不执行之后的语句。

public class InterruptExample {
        private static class MyThread1 extends Thread {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                    System.out.println("Thread run");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new MyThread1();
        thread1.start();
        thread1.interrupt();
        System.out.println("Main run");
    }
Main run
    java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at InterruptExample.lambda$main$0(InterruptExample.java:5)
    at InterruptExample$$Lambda$1/713338599.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)

Interrupted()

    如果一个线程的run()方法执行一个无限循环,并且没有执行sleep()等会抛出 InterruptedException 的操作,那么调用线程的interrupt()方法就无法使线程提前结束。
    但是调用interrupt()方法会设置线程的中断标记,此时调用interrupted()方法会返回true。因此可以在循环体中使用interrupted()方法来判断线程是否处于中断状态,从而提前结束线程。

public class InterruptExample {
        private static class MyThread2 extends Thread {
            @Override
            public void run() {
                while (!interrupted()) {
                    // ..
                }
                System.out.println("Thread end");
            }
        }
    }
public static void main(String[] args) throws InterruptedException {
        Thread thread2 = new MyThread2();
        thread2.start();
        thread2.interrupt();
    }
Thread end

Executor 的中断操作

    调用Executor的shutdown()方法会等待线程都执行完毕之后再关闭,但是如果调用的是shutdownNow()方法,则相当于调用每个线程的interrupt()方法。
    以下使用 Lambada 创建线程,相当于创建了一个匿名内部线程。

public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("Thread run");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executorService.shutdownNow();
        System.out.println("Main run");
    }
Main run
    java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at
ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
    at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
            142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
            617)
    at java.lang.Thread.run(Thread.java:745)

    如果只想中断 Executor 中的一个线程,可以通过使用submit()方法来提交一个线程,它会返回一个Future<?>对象,通过调用该对象的cancel(true)方法就可以中断线程。

Future<?> future = executorService.submit(() -> {
        // ..
    });
future.cancel(true);

六、互斥同步

    Java 提供了两种锁机制来控制多个线程对共享资源的互斥访问,第一个是 JVM 实现的 synchronized,另一个是 JDK 实现的 ReentrantLock。

synchronized

    1、同步一个代码块

public void func() {
        synchronized (this) {
            // ...
        }
    }

    它只作用于一个对象,如果调用两个对象上的同步代码块,就不会进行同步。
    对于以下代码,使用 ExecutorService 执行了两个线程,由于调用的是同一个对象的同步代码块,因此这两个线程会进行同步,当一个线程j进入同步语句块时,另一个线程就必须等待。

public class SynchronizedExample {
        public void func1() {
            synchronized (this) {
                for (int i = 0; i < 10; i++) {
                    System.out.print(i + " ");
                }
            }
        }
    }
public static void main(String[] args) {
        SynchronizedExample e1 = new SynchronizedExample();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> e1.func1());
        executorService.execute(() -> e1.func1());
    }
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

    对于以下代码,两个线程调用了不同对象的同步代码块,因此这两个线程就不需要同步。从输出结果可以看出,两个线程交叉运行。

public static void main(String[] args) {
        SynchronizedExample e1 = new SynchronizedExample();
        SynchronizedExample e2 = new SynchronizedExample();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> e1.func1());
        executorService.execute(() -> e2.func1());
    }
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9

    2、同步一个方法

public synchronized void func () {
        // ...
    }

    它和同步代码块一样,作用于同一个对象。
    3、同步一个类

public void func() {
        synchronized (SynchronizedExample.class) {
            // ...
        }
    }

    作用于整个类,也就是说两个线程调用同一个类上的不同对象上的这种同步语句,也会进行同步。

public class SynchronizedExample {
        public void func2() {
            synchronized (SynchronizedExample.class) {
                for (int i = 0; i < 10; i++) {
                    System.out.print(i + " ");
                }
            }
        }
    }
public static void main(String[] args) {
        SynchronizedExample e1 = new SynchronizedExample();
        SynchronizedExample e2 = new SynchronizedExample();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> e1.func2());
        executorService.execute(() -> e2.func2());
    }
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

    4、同步一个静态方法

public synchronized static void fun() {
        // ...
    }

    作用于整个类。

ReentrantLock

    ReentrantLock 是 java.util.concurrent (J.U.C) 包中的锁。

public class LockExample {
        private Lock lock = new ReentrantLock();
        public void func() {
            lock.lock();
            try {
                for (int i = 0; i < 10; i++) {
                    System.out.print(i + " ");
                }
            } finally {
                lock.unlock(); // 确保释放锁,从⽽避免发⽣死锁。
            }
        }
    }
public static void main(String[] args) {
        LockExample lockExample = new LockExample();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> lockExample.func());
        executorService.execute(() -> lockExample.func());
    }
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

比较 synchronized 与 ReentrantLock

    1、锁的实现
    synchronized是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。
    2、性能
    新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同。
    3、等待可中断
当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。
ReentrantLock 可中断,而 synchronized 不行。
    4、公平锁
    公平锁是指多个线程在等待同一个锁时,必须按照申请锁的世界顺序来依次获得锁。
synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的。
    5、锁绑定多个条件
    一个 ReentrantLock 可以同时绑定多个Condition对象。

使用选择:

    除非需要使用 ReentrantLock 的高级功能,否则优先使用 synchronized。这是因为 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且使用 synchronized 不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。

七、线程之间的协作

    当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。

join()

    在线程中调用另一个线程的join()方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。
    对于以下代码,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join()方法,b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出。

public class JoinExample {
        private class A extends Thread {
            @Override
            public void run() {
                System.out.println("A");
            }
        }

        private class B extends Thread {
            private A a;

            B(A a) {
                this.a = a;
            }

            @Override
            public void run() {
                try {
                    a.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("B");
            }
        }

        public void test() {
            A a = new A();
            B b = new B(a);
            b.start();
            a.start();
        }
    }
public static void main(String[] args) {
        JoinExample example = new JoinExample();
        example.test();
    }
    A
    B

wait() notify() notifyAll()

    调用wait()使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用notify()或者notifyAll()来唤醒挂起的线程。
    它们都属于 Object 的一部分,而不属于 Thread。
    只能用在同步方法或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateException。
    使用wait()挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行notify()或者notifyAll()来唤醒挂起的线程,造成死锁。

public class WaitNotifyExample {
        public synchronized void before() {
            System.out.println("before");
            notifyAll();
        }
        public synchronized void after() {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("after");
        }
    }
public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        WaitNotifyExample example = new WaitNotifyExample();
        executorService.execute(() -> example.after());
        executorService.execute(() -> example.before());
    }
before
after

wait()sleep()的区别

    wait()是 Object 的方法,而sleep()是 Thread 的静态方法;
    wait()会释放锁,sleep()不会。

await() signal() signalAll()

    java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用await()方法使线程等待,其它线程调用signal()signalAll()方法唤醒等待的线程。
    相比于wait()这种等待方式,await()可以指定等待的条件,因此更加灵活。
    使用 Lock 来获取一个 Condition 对象。

package com.hyc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AwaitSignalExample {
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    public void before() {
        lock.lock();
        try {
            System.out.println("before");
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
    public void after() {
        lock.lock();
        try {
            condition.await();
            System.out.println("after");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        AwaitSignalExample example = new AwaitSignalExample();
        executorService.execute(() -> example.after());
        executorService.execute(() -> example.before());
    }
before
after
Java
  • 作者:年轻的空指针(联系作者)
  • 发表时间:2022-04-20 21:36
  • 版权声明:严禁商用,转载请注明出处
  • 公众号转载:请在文末添加作者公众号二维码
  • 评论