Semaphore 是JDK1.5的 java.util.concurrent 并发包中提供的一个并发工具类。 
所谓 Semaphore 即 信号量 的意思。 这个叫法并不能很好地表示它的作用,更形象的说法应该是 许可证管理器 。
 Semaphore 是一个计数信号量。 从概念上将,Semaphore 包含一组许可证。 
如果有需要的话,每个 acquire() 方法都会阻塞,直到获取一个可用的许可证。 
每个 release() 方法都会释放持有许可证的线程,并且归还 Semaphore 一个可用的许可证。 
然而,实际上并没有真实的许可证对象供线程使用,Semaphore 只是对可用的数量进行管理维护;

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源

Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控.

每秒只允许10个线程执行:

public void testthread()throws Exception{

    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {

        public void run() {
            semaphore.release(MAX_QPS / 2);//释放曾经被占用过的坑
        }

    }, 1000, 500, TimeUnit.MILLISECONDS);

    //创建一个有一百个线程的线程池
    ExecutorService pool = Executors.newFixedThreadPool(100);
    //提交一百个线程,去执行任务
    for (int i = 100; i > 0; i--) {
        final int x = i;
        pool.submit(new Runnable() {

            public void run() {
                for (int j = 1000; j > 0; j--) {//不断的获取被空出来的坑
                    semaphore.acquireUninterruptibly(1);//每个线程只获取一个坑
                    remoteCall(x, 0);
                }
            }

        });
    }

    pool.shutdown();
    pool.awaitTermination(5, TimeUnit.SECONDS);
    System.out.println("DONE");

}

private static void remoteCall(int i, int j) {
    System.out.println(String.format("%s - %s: %d %d", new Date(), Thread.currentThread(), i, j));

}


AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
/**

 @Description : 模拟30的并发实际上限制只能10个线程跑
 */
public class MyTest1{

    /**
     * 线程数量
     */
    private static final int THREAD_COUNT = 30;

    /**
     * 线程池
     */
    private static ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);


    private static Semaphore semaphore = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.execute(new Runnable() {
                public void run() {
                    try {
                        // 获取一个"许可证"
                        semaphore.acquire();

                        // 模拟数据保存
                        TimeUnit.SECONDS.sleep(2);
                        System.out.println("save date..."+Thread.currentThread().getName());

                        // 执行完后,归还"许可证"
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executor.shutdown();
    }



AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

    场景:停车场2个车位,6辆车想停,前2辆停完离开后,后2辆才能进入。
    public class SemaPhoreDemo {
        public static  ExecutorService threadPool = Executors.newCachedThreadPool();
        public static final int cars = 6;
        public static final int part = 2;
        public static void main(String[] args) {
            CountDownLatch latch = new CountDownLatch(cars);
            Semaphore semaphore = new Semaphore(part, false);
            for(int i=0; i<cars; i++) {
                final int j = i;
                threadPool.execute(()->{
                    try {
                        semaphore.acquire();
                        System.out.println("car "+j+" 进入停车位 \n");
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        semaphore.release();
                        latch.countDown();
                        System.out.println("********* car "+j+" 离开停车位\n");
                    }
                });
            }
            try {
                latch.await();
                System.out.println("################### 所有车辆离开  ##################");
                threadPool.shutdown();
            } catch (InterruptedException e) {
            }
        }
    }

AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

 


0 条评论

发表回复

Avatar placeholder

您的邮箱地址不会被公开。 必填项已用 * 标注

此站点使用 Akismet 来减少垃圾评论。了解我们如何处理您的评论数据

蜀ICP备16001794号
© 2014 - 2024 linpxing.cn All right reserved.