当前位置:网站首页>多线程与高并发(9)——AQS其他同步组件(Semaphore、ReentrantReadWriteLock、Exchanger)

多线程与高并发(9)——AQS其他同步组件(Semaphore、ReentrantReadWriteLock、Exchanger)

2022-07-07 02:49:00 李王家的翠花

jdk除了CountDownLatch、CyclicBarrier、Phaser还提供了几个非常有用的并发工具类:Semphore、Exchanger、ReentrantReadWriteLock,本文将逐一讲解,同时会总结一下LockSupport用法。

一、Semphore(信号灯)

Semphore允许几个线程同时执行,灯亮执行,灯灭不执行。其主要作用是用来限流。其代码使用如下:

public static void main(String[] args) {
    
        // 一次只能允许执行的线程数量20。
        final Semaphore semaphore = new Semaphore(20);
        //指定线程数
        ExecutorService threadPool = Executors.newFixedThreadPool(30);
        for (int i = 0; i < 30; i++) {
    
            int finalI = i;
            threadPool.execute(() ->{
    
                try {
    
                    semaphore.acquire(5);//获取5个许可,所以可运行线程数量为20/5=4
                    Thread.sleep(1000);
                    System.out.println("车辆:" + finalI+"通过");
                    Thread.sleep(1000);
                    semaphore.release(5);//释放5个许可
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }

            });
        }
        threadPool.shutdown();

    }

运行结果是每次只有4个线程通过。
tryAcquire()方法:尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
Semaphore 有两种模式,公平模式和非公平模式。

  public Semaphore(int permits) {
    
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
    
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

Semaphore 与 CountDownLatch 一样,也是共享锁的一种实现。它默认构造 AQS 的 state 为 permits。当执行任务的线程数量超出 permits,那么多余的线程将会被放入阻塞队列 Park,并自旋判断 state 是否大于 0。只有当 state 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 release() 方法,release() 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。 如此,每次只有最多不超过 permits 数量的线程能自旋成功,便限制了执行任务线程的数量。

二、ReentrantReadWriteLock

当读写的时候,线程之间读不需要独占,写需要独占以避免结果出现偏差。这时候需要读写分离。看了之前的多线程的文章,我们也知道独占锁和共享锁的意义。这个时候读锁就是共享锁,写锁就是独占锁也叫互斥锁。
ReadWriteLock内部有两个方法readLock和writeLock,ReetrantReadWriteLock实现了ReadWriteLock接口,添加了可重入性并支持公平与非公平模式。ReadWriteLock支持锁降级就是从写锁变成读锁,不支持锁升级就是从读锁变成写锁。
示例代码如下:

 static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    //写锁
    private static void write(String name) {
    
        readWriteLock.writeLock().lock();
        try {
    
            Thread.sleep(1000);
            System.out.println(name + "开始签名啦");
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        } finally {
    
            readWriteLock.writeLock().unlock();
        }
    }

    //读锁
    private static void read(String name) {
    
        readWriteLock.readLock().lock();
        try {
    
            Thread.sleep(1000);
            System.out.println(name + "开始读书啦");
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        } finally {
    
            readWriteLock.readLock().unlock();
        }
    }

    public static void main(String[] args) {
    
        for (int i = 0; i < 5; i++) {
    
            int finalI = i;
            new Thread(() -> {
    
                write("小" + finalI);
            }, "小" + i).start();
        }

        for (int i = 0; i < 5; i++) {
    
            int finalI = i;
            new Thread(() -> {
    
                read("小" + finalI);
            }, "小" + i).start();
        }

    }

最终结果是写锁会一个个去写,顺序执行,而读锁在宏观上同时执行。

三、Exchanger

Exchanger的功能是使2个线程之间交换数据,而且是双向的。一个线程调用了exchange( )方法交换数据,到达了同步点,然后就会一直阻塞等待另一个线程调用exchange( )方法来交换数据。比如游戏中两个人交换装备,必须都交换才行。

public V exchange(V x) throws InterruptedException
等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
等待另一个线程到达此交换点(除非当前线程被中断,或者超出了指定的等待时间),然后将给定的对象传送给该线程,同时接收该线程的对象。

示例代码如下:

  static Exchanger<Object> exchanger = new Exchanger<>();
    static String a = "aaaaa";
    static String b = "bbbbbbbbbb";

    public static void main(String[] args) {
    
        new Thread(()->{
    
            System.out.println(" 亚索1号 ");
            try {
    
                Object exchange = exchanger.exchange(a);
                System.out.println(" 亚索1号 revicer data : " + exchange.toString());
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
        }).start();

        new Thread(()->{
    
            System.out.println(" 妲己2号 ");
            try {
    
                Object exchange = exchanger.exchange(b);
                System.out.println(" 妲己2号 revicer data : " + exchange.toString());
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
        }).start();
    }

运行结果如下:

 亚索1号 
 妲己2号 
 妲己2号 revicer data : aaaaa
 亚索1号 revicer data : bbbbbbbbbb

可以看出亚索和妲己在交换数据之前都在等待,交换数据之后开始往下运行。

四、LockSupport

LockSupport主要作用就是阻塞和唤醒线程。
其主要方法如下:

 public static void park() {
    
        UNSAFE.park(false, 0L);
    }
 //blocker是用来记录线程被阻塞时被谁阻塞的。用于线程监控和分析工具来定位原因的。
 public static void park(Object blocker) {
    
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
 public static void unpark(Thread thread) {
    
        if (thread != null)
            UNSAFE.unpark(thread);
    }

可以看出其调用了Unsafe类,就涉及到CPU原语了,这里不作深究。
既然是阻塞和唤醒线程的,我们先看下wait()、notify()方法,代码如下:

 private static Object obj = new Object();

    public static void main(String[] args) {
    
        new Thread(() -> {
    
            synchronized (obj) {
    
                System.out.println("wait 先开始!");
                try {
    
                    obj.wait();
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                }
                System.out.println("wait 后结束!");
            }
        }).start();

        new Thread(() -> {
    
            synchronized (obj) {
    
                System.out.println("notify 后开始!");
                obj.notify();
                System.out.println("notify 先结束!");
            }
        }).start();
    }

运行结果如下:

wait 先开始!
notify 后开始!
notify 先结束!
wait 后结束!

显而易见,这两个方法必须先获得锁对象,也就是他们一定要在同步代码块中使用;而且notify只能随机选择一个线程唤醒,无法唤醒指定的线程。
但是对于上面的代码,多长执行后,如果notify()先执行了怎么办?wait()就会一直阻塞着。
对于LockSupport的park()和unpark()方法呢,则没有此方面的考虑。其用法如下:

public static void main(String[] args) throws InterruptedException {
    
        Thread thread1 = new Thread(() -> {
    
            System.out.println("开始阻塞了哦!");
            LockSupport.park();
            System.out.println("结束阻塞了哦!");
        });
        thread1.start();
        Thread.sleep(1000);
        System.out.println("开始唤醒了哦!");
        LockSupport.unpark(thread1);
        System.out.println("结束唤醒了哦!");
    }

运行结果如下:

开始阻塞了哦!
开始唤醒了哦!
结束唤醒了哦!
结束阻塞了哦!

可以看出先是阻塞,然后唤醒之后结束阻塞。
LockSupport比Object的wait/notify有两大优势:
LockSupport不需要在同步代码块里,实现了线程间的解耦。
unpark()可以先于park调用,所以不需要担心线程间的执行的先后顺序。代码如下:

 public static void main(String[] args) throws InterruptedException {
    
        Thread thread1 = new MyThread();
        thread1.start();
        System.out.println("开始唤醒了哦!");
        LockSupport.unpark(thread1);
        System.out.println("结束唤醒了哦!");

    }
    static class MyThread extends Thread{
    
        @Override
        public void run() {
    
            System.out.println("开始阻塞了哦!");
            try {
    
                Thread.sleep(1000);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
            LockSupport.park();
            System.out.println("结束阻塞了哦!");
        }
    }

Thread.sleep(1000);让主线程先跑,先unpark(),最终结果如下,不会被阻塞:

开始唤醒了哦!
开始阻塞了哦!
结束唤醒了哦!
结束阻塞了哦!

原网站

版权声明
本文为[李王家的翠花]所创,转载请带上原文链接,感谢
https://blog.csdn.net/liwangcuihua/article/details/125633416