Semaphore 是JDK1.5的 java.util.concurrent 并发包中提供的一个并发工具类。 所谓 Semaphore 即 信号量 的意思。 这个叫法并不能很好地表示它的作用,更形象的说法应该是 许可证管理器 。 Semaphore 是一个计数信号量。 从概念上将,Semaphore 包含一组许可证。 如果有需要的话,每个 acquire() 方法都会阻塞,直到获取一个可用的许可证。 每个 release() 方法都会释放持有许可证的线程,并且归还 Semaphore 一个可用的许可证。 然而,实际上并没有真实的许可证对象供线程使用,Semaphore 只是对可用的数量进行管理维护;
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源
Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控.
每秒只允许10个线程执行: public void testthread()throws Exception{ Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() { public void run() { semaphore.release(MAX_QPS / 2);//释放曾经被占用过的坑 } }, 1000, 500, TimeUnit.MILLISECONDS); //创建一个有一百个线程的线程池 ExecutorService pool = Executors.newFixedThreadPool(100); //提交一百个线程,去执行任务 for (int i = 100; i > 0; i--) { final int x = i; pool.submit(new Runnable() { public void run() { for (int j = 1000; j > 0; j--) {//不断的获取被空出来的坑 semaphore.acquireUninterruptibly(1);//每个线程只获取一个坑 remoteCall(x, 0); } } }); } pool.shutdown(); pool.awaitTermination(5, TimeUnit.SECONDS); System.out.println("DONE"); } private static void remoteCall(int i, int j) { System.out.println(String.format("%s - %s: %d %d", new Date(), Thread.currentThread(), i, j)); } AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA /** @Description : 模拟30的并发实际上限制只能10个线程跑 */ public class MyTest1{ /** * 线程数量 */ private static final int THREAD_COUNT = 30; /** * 线程池 */ private static ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore semaphore = new Semaphore(10); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { executor.execute(new Runnable() { public void run() { try { // 获取一个"许可证" semaphore.acquire(); // 模拟数据保存 TimeUnit.SECONDS.sleep(2); System.out.println("save date..."+Thread.currentThread().getName()); // 执行完后,归还"许可证" semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } executor.shutdown(); } AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA 场景:停车场2个车位,6辆车想停,前2辆停完离开后,后2辆才能进入。 public class SemaPhoreDemo { public static ExecutorService threadPool = Executors.newCachedThreadPool(); public static final int cars = 6; public static final int part = 2; public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(cars); Semaphore semaphore = new Semaphore(part, false); for(int i=0; i<cars; i++) { final int j = i; threadPool.execute(()->{ try { semaphore.acquire(); System.out.println("car "+j+" 进入停车位 \n"); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); latch.countDown(); System.out.println("********* car "+j+" 离开停车位\n"); } }); } try { latch.await(); System.out.println("################### 所有车辆离开 ##################"); threadPool.shutdown(); } catch (InterruptedException e) { } } }
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
0 条评论