用到生产者--消费者模式
一、测试类:
package com.concurrent.chapter08; import java.util.concurrent.TimeUnit; /** * @description: * @author: * @create: **/ public class ThreadPoolTest { public static void main(String[] args) throws InterruptedException { final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000); for (int i = 0; i < 200; i++){ threadPool.execute(() ->{ try { TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName() + " is running and done"); } catch (InterruptedException e) { e.printStackTrace(); } }); } for (;;){ System.out.println("getActiveCount: " + threadPool.getActiveCount()); System.out.println("getQueueSize: " + threadPool.getQueueSize()); System.out.println("getCoreSize: " + threadPool.getCoreSize()); System.out.println("getMaxSize: " + threadPool.getMaxSize()); System.out.println("========================================="); TimeUnit.SECONDS.sleep(1); } } }
package com.concurrent.chapter08; import java.util.concurrent.TimeUnit; /** * @description:shutdown线程池测试类 * @author: * @create: **/ public class ThreadPoolTestShutdown { public static void main(String[] args) throws InterruptedException { //定义线程池 //初始化线程数量为2, //核心线程数量为4, //最大线程数量为6, //任务队列最多容纳1000个任务 final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000); //定义20个任务并且提交到线程池 for (int i = 0; i < 20; i++) { threadPool.execute(() -> { try { TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName() + " is runnning and done."); } catch (InterruptedException e) { e.printStackTrace(); } }); } TimeUnit.SECONDS.sleep(12); threadPool.shutdown(); //Thread.currentThread().join(); for (;;){ System.out.println("getActiveCount: " + threadPool.getActiveCount()); System.out.println("getQueueSize: " + threadPool.getQueueSize()); System.out.println("getCoreSize: " + threadPool.getCoreSize()); System.out.println("getMaxSize: " + threadPool.getMaxSize()); System.out.println("========================================="); TimeUnit.SECONDS.sleep(1); } } }
二、线程池实现类:
package com.concurrent.chapter08; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @description: 初始化线程池 * @author: * @create: **/ public class BasicThreadPool extends Thread implements ThreadPool { //初始化线程数量 private final int initSize; //线程池最大线程数 private final int maxSize; //线程池核心线程数 private final int coreSize; //当前活跃线程数 private int activeCount; //线程工厂 private final ThreadFactory threadFactory; //线程池是否已经shutdown private volatile boolean isShutdown = false; //任务队列 private final RunnableQueue runnableQueue; //工作线程队列 private final QueuethreadQueue = new ArrayDeque<>(); private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy(); private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory(); private final long keepAliveTime; private final TimeUnit timeUnit; //构造时需要传递的参数:初始线程的数量,最大线程数,核心线程数,任务队列的最大数量 public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize){ this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY, queueSize, DEFAULT_DENY_POLICY, 10, TimeUnit.SECONDS); } //构造线程池 public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit){ this.initSize = initSize; this.maxSize = maxSize; this.coreSize = coreSize; this.threadFactory = threadFactory; this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy,this); this.keepAliveTime = keepAliveTime; this.timeUnit = timeUnit; this.init(); } private void init() { start(); for (int i = 0; i < initSize; i++){ newThread(); } } private void newThread(){ //创建任务线程,并启动 InternalTask internalTask = new InternalTask(runnableQueue); Thread thread = this.threadFactory.createThread(internalTask); ThreadTask threadTask = new ThreadTask(thread, internalTask); threadQueue.offer(threadTask); this.activeCount++; thread.start(); } @Override public void execute(Runnable runnable) { if (this.isShutdown){ throw new IllegalArgumentException("The thread pool is destory"); } this.runnableQueue.offer(runnable); } private void removeThread(){ ThreadTask threadTask = threadQueue.remove(); threadTask.internalTask.stop(); this.activeCount--; } @Override public void run(){ //run 方法继承自thread,主要用于维护线程数量,比如扩容、回收等工作 while (!isShutdown && !isInterrupted()){ try { timeUnit.sleep(keepAliveTime); } catch (InterruptedException e) { isShutdown = true; break; } synchronized (this){ if (isShutdown){ break; } if (runnableQueue.size() > 0 && activeCount < coreSize){ for (int i = initSize; i < coreSize; i++){ newThread(); } continue; } if (runnableQueue.size() > 0 && activeCount < maxSize){ for (int i = coreSize; i < maxSize; i++){ newThread(); } } if (runnableQueue.size() == 0 && activeCount > coreSize){ for (int i = coreSize; i < activeCount; i++){ removeThread(); } } } } } @Override public void shutdown() { synchronized (this){ if (isShutdown) return; isShutdown = true; threadQueue.forEach(threadTask -> { threadTask.internalTask.stop(); threadTask.thread.interrupt(); }); this.interrupt(); } } @Override public int getInitSize() { if (isShutdown){ throw new IllegalArgumentException("The thread pool is destory."); } return this.initSize; } @Override public int getMaxSize() { if (isShutdown){ throw new IllegalArgumentException("The thread pool is destory."); } return this.maxSize; } @Override public int getCoreSize() { if (isShutdown){ throw new IllegalArgumentException("The thread pool is destory."); } return this.coreSize; } @Override public int getQueueSize() { if (isShutdown){ throw new IllegalArgumentException("The thread pool is destory."); } return runnableQueue.size(); } @Override public int getActiveCount(){ synchronized (this){ return this.activeCount; } } @Override public boolean isShutdown() { return this.isShutdown; } private static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger GROUT_COUNTER = new AtomicInteger(1); private static final ThreadGroup group = new ThreadGroup("MyThreadPool-" + GROUT_COUNTER.getAndDecrement()); private static final AtomicInteger COUNTER = new AtomicInteger(0); @Override public Thread createThread(Runnable runnable) { return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndDecrement()); } } private class ThreadTask { Thread thread; InternalTask internalTask; public ThreadTask(Thread thread, InternalTask internalTask){ this.thread = thread; this.internalTask = internalTask; } } }
三、线程池接口:
package com.concurrent.chapter08; /** * @description: 线程池的基本方法 * @author: * @create: **/ public interface ThreadPool { //提交任务到线程池 void execute(Runnable runnable); //关闭线程池 void shutdown(); //获取线程池的初始大小 int getInitSize(); //获取线程池的最大线程数 int getMaxSize(); //获取线程池的核心线程数 int getCoreSize(); //获取线程池中用于缓存任务队列大小 int getQueueSize(); //获取线程池中活跃线程数 int getActiveCount(); //查看线程池是否已经被shutdown boolean isShutdown(); }
四、任务队列接口及实现类:
package com.concurrent.chapter08; /** * @description: 任务队列 * @author: * @create: **/ public interface RunnableQueue { //当有新的任务进来时offer到队列中 void offer(Runnable runnable); //工作线程通过take方法获取Runnable Runnable take() throws InterruptedException; //获取任务队列中任务的数量 int size(); }
package com.concurrent.chapter08; import java.util.LinkedList; /** * @description: 任务队列实现类 * @author: * @create: **/ public class LinkedRunnableQueue implements RunnableQueue { //任务队列的最大容量,在构造时传入 private final int limit; //任务队列满时的拒绝策略 private final DenyPolicy denyPolicy; //存放任务的队列 private final LinkedListrunnableList = new LinkedList<>(); private final ThreadPool threadPool; public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool){ this.limit = limit; this.denyPolicy = denyPolicy; this.threadPool = threadPool; } @Override public void offer(Runnable runnable) { synchronized(runnableList){ if (runnableList.size() >= limit){ denyPolicy.reject(runnable, threadPool); }else { runnableList.addLast(runnable); runnableList.notifyAll(); } } } @Override public Runnable take() throws InterruptedException { synchronized(runnableList){ while (runnableList.isEmpty()){ try { runnableList.wait(); } catch (InterruptedException e) { throw e; } } } return runnableList.removeFirst(); } @Override public int size() { synchronized (runnableList){ return runnableList.size(); } } }
五、任务队列满时的拒绝策略接口类:
package com.concurrent.chapter08; /** * @description: 任务队列满时的拒绝策略 * @author: * @create: **/ @FunctionalInterface public interface DenyPolicy { void reject(Runnable runnable, ThreadPool threadPool); //丢弃式拒绝 class DiscardDenyPolicy implements DenyPolicy { @Override public void reject(Runnable runnable, ThreadPool threadPool) { //do nothing } } //抛出异常式拒绝 class AbortDenyPolicy implements DenyPolicy { @Override public void reject(Runnable runnable, ThreadPool threadPool) { throw new RunnableDenyException("The runnable " + runnable + "will be abort."); } } //提交者线程中执行任务 class RunnerDenyPolicy implements DenyPolicy { @Override public void reject(Runnable runnable, ThreadPool threadPool) { if (!threadPool.isShutdown()){ runnable.run(); } } } }
六、线程池内部的创建线程工厂类:
package com.concurrent.chapter08; /** * @description: 创建线程的接口 * @author: * @create: **/ @FunctionalInterface public interface ThreadFactory { Thread createThread(Runnable runnable); }
七、线程池内线程执行类:
package com.concurrent.chapter08; /** * @description: 从任务队列中取出某个runnable并执行run方法 * @author: * @create: **/ public class InternalTask implements Runnable{ private final RunnableQueue runnableQueue; private volatile boolean running = true; public InternalTask(RunnableQueue runnableQueue){ this.runnableQueue = runnableQueue;} @Override public void run() { //如果当前任务为running并且没有被中断,则其将不断从queue中获取runnable,然后执行run方法 while (running && !Thread.currentThread().isInterrupted()){ try { Runnable task = runnableQueue.take(); task.run(); } catch (InterruptedException e) { running = false; break; } } } //停止当前任务, 主要会在线程池的shutdown方法中使用 public void stop(){ this.running = false; } }
八、线程池的一个异常类:拒绝策略用到
package com.concurrent.chapter08; /** * @description:通知任务提交者,任务队列无法接收新任务 * @author: * @create: **/ public class RunnableDenyException extends RuntimeException { public RunnableDenyException(String s) { super(s); } }