博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java多线程(7)实现一个线程池
阅读量:7112 次
发布时间:2019-06-28

本文共 13332 字,大约阅读时间需要 44 分钟。

用到生产者--消费者模式

 

一、测试类:

package com.concurrent.chapter08; import java.util.concurrent.TimeUnit; /**  * @description:  * @author:  * @create:  **/ public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000); for (int i = 0; i < 200; i++){
threadPool.execute(() ->{
try {
TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName() + " is running and done"); } catch (InterruptedException e) {
e.printStackTrace(); } }); } for (;;){
System.out.println("getActiveCount: " + threadPool.getActiveCount()); System.out.println("getQueueSize: " + threadPool.getQueueSize()); System.out.println("getCoreSize: " + threadPool.getCoreSize()); System.out.println("getMaxSize: " + threadPool.getMaxSize()); System.out.println("========================================="); TimeUnit.SECONDS.sleep(1); } } }
package com.concurrent.chapter08; import java.util.concurrent.TimeUnit; /**  * @description:shutdown线程池测试类  * @author:  * @create:  **/ public class ThreadPoolTestShutdown {
public static void main(String[] args) throws InterruptedException {
//定义线程池 //初始化线程数量为2, //核心线程数量为4, //最大线程数量为6, //任务队列最多容纳1000个任务 final ThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000); //定义20个任务并且提交到线程池 for (int i = 0; i < 20; i++) {
threadPool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName() + " is runnning and done."); } catch (InterruptedException e) {
e.printStackTrace(); } }); } TimeUnit.SECONDS.sleep(12); threadPool.shutdown(); //Thread.currentThread().join(); for (;;){
System.out.println("getActiveCount: " + threadPool.getActiveCount()); System.out.println("getQueueSize: " + threadPool.getQueueSize()); System.out.println("getCoreSize: " + threadPool.getCoreSize()); System.out.println("getMaxSize: " + threadPool.getMaxSize()); System.out.println("========================================="); TimeUnit.SECONDS.sleep(1); } } }
二、线程池实现类:
package com.concurrent.chapter08; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /**  * @description: 初始化线程池  * @author:  * @create:  **/ public class BasicThreadPool extends Thread implements ThreadPool {
//初始化线程数量 private final int initSize; //线程池最大线程数 private final int maxSize; //线程池核心线程数 private final int coreSize; //当前活跃线程数 private int activeCount; //线程工厂 private final ThreadFactory threadFactory; //线程池是否已经shutdown private volatile boolean isShutdown = false; //任务队列 private final RunnableQueue runnableQueue; //工作线程队列 private final Queue
threadQueue = new ArrayDeque<>(); private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy(); private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory(); private final long keepAliveTime; private final TimeUnit timeUnit; //构造时需要传递的参数:初始线程的数量,最大线程数,核心线程数,任务队列的最大数量 public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize){
this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY, queueSize, DEFAULT_DENY_POLICY, 10, TimeUnit.SECONDS); } //构造线程池 public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit){
this.initSize = initSize; this.maxSize = maxSize; this.coreSize = coreSize; this.threadFactory = threadFactory; this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy,this); this.keepAliveTime = keepAliveTime; this.timeUnit = timeUnit; this.init(); } private void init() {
start(); for (int i = 0; i < initSize; i++){
newThread(); } } private void newThread(){
//创建任务线程,并启动 InternalTask internalTask = new InternalTask(runnableQueue); Thread thread = this.threadFactory.createThread(internalTask); ThreadTask threadTask = new ThreadTask(thread, internalTask); threadQueue.offer(threadTask); this.activeCount++; thread.start(); } @Override public void execute(Runnable runnable) {
if (this.isShutdown){
throw new IllegalArgumentException("The thread pool is destory"); } this.runnableQueue.offer(runnable); } private void removeThread(){
ThreadTask threadTask = threadQueue.remove(); threadTask.internalTask.stop(); this.activeCount--; } @Override public void run(){
//run 方法继承自thread,主要用于维护线程数量,比如扩容、回收等工作 while (!isShutdown && !isInterrupted()){
try {
timeUnit.sleep(keepAliveTime); } catch (InterruptedException e) {
isShutdown = true; break; } synchronized (this){
if (isShutdown){
break; } if (runnableQueue.size() > 0 && activeCount < coreSize){
for (int i = initSize; i < coreSize; i++){
newThread(); } continue; } if (runnableQueue.size() > 0 && activeCount < maxSize){
for (int i = coreSize; i < maxSize; i++){
newThread(); } } if (runnableQueue.size() == 0 && activeCount > coreSize){
for (int i = coreSize; i < activeCount; i++){
removeThread(); } } } } } @Override public void shutdown() {
synchronized (this){
if (isShutdown) return; isShutdown = true; threadQueue.forEach(threadTask -> {
threadTask.internalTask.stop(); threadTask.thread.interrupt(); }); this.interrupt(); } } @Override public int getInitSize() {
if (isShutdown){
throw new IllegalArgumentException("The thread pool is destory."); } return this.initSize; } @Override public int getMaxSize() {
if (isShutdown){
throw new IllegalArgumentException("The thread pool is destory."); } return this.maxSize; } @Override public int getCoreSize() {
if (isShutdown){
throw new IllegalArgumentException("The thread pool is destory."); } return this.coreSize; } @Override public int getQueueSize() {
if (isShutdown){
throw new IllegalArgumentException("The thread pool is destory."); } return runnableQueue.size(); } @Override public int getActiveCount(){
synchronized (this){
return this.activeCount; } } @Override public boolean isShutdown() {
return this.isShutdown; } private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger GROUT_COUNTER = new AtomicInteger(1); private static final ThreadGroup group = new ThreadGroup("MyThreadPool-" + GROUT_COUNTER.getAndDecrement()); private static final AtomicInteger COUNTER = new AtomicInteger(0); @Override public Thread createThread(Runnable runnable) {
return new Thread(group, runnable, "thread-pool-" + COUNTER.getAndDecrement()); } } private class ThreadTask {
Thread thread; InternalTask internalTask; public ThreadTask(Thread thread, InternalTask internalTask){
this.thread = thread; this.internalTask = internalTask; } } }
三、线程池接口:
package com.concurrent.chapter08; /**  * @description: 线程池的基本方法  * @author:  * @create:  **/ public interface ThreadPool {
//提交任务到线程池 void execute(Runnable runnable); //关闭线程池 void shutdown(); //获取线程池的初始大小 int getInitSize(); //获取线程池的最大线程数 int getMaxSize(); //获取线程池的核心线程数 int getCoreSize(); //获取线程池中用于缓存任务队列大小 int getQueueSize(); //获取线程池中活跃线程数 int getActiveCount(); //查看线程池是否已经被shutdown boolean isShutdown(); }
四、任务队列接口及实现类:
package com.concurrent.chapter08; /**  * @description: 任务队列  * @author:  * @create:  **/ public interface RunnableQueue {
//当有新的任务进来时offer到队列中 void offer(Runnable runnable); //工作线程通过take方法获取Runnable Runnable take() throws InterruptedException; //获取任务队列中任务的数量 int size(); }
package com.concurrent.chapter08; import java.util.LinkedList; /**  * @description: 任务队列实现类  * @author:  * @create:  **/ public class LinkedRunnableQueue implements RunnableQueue {
//任务队列的最大容量,在构造时传入 private final int limit; //任务队列满时的拒绝策略 private final DenyPolicy denyPolicy; //存放任务的队列 private final LinkedList
runnableList = new LinkedList<>(); private final ThreadPool threadPool; public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool){
this.limit = limit; this.denyPolicy = denyPolicy; this.threadPool = threadPool; } @Override public void offer(Runnable runnable) {
synchronized(runnableList){
if (runnableList.size() >= limit){
denyPolicy.reject(runnable, threadPool); }else {
runnableList.addLast(runnable); runnableList.notifyAll(); } } } @Override public Runnable take() throws InterruptedException {
synchronized(runnableList){
while (runnableList.isEmpty()){
try {
runnableList.wait(); } catch (InterruptedException e) {
throw e; } } } return runnableList.removeFirst(); } @Override public int size() {
synchronized (runnableList){
return runnableList.size(); } } }
五、任务队列满时的拒绝策略接口类:
package com.concurrent.chapter08; /**  * @description: 任务队列满时的拒绝策略  * @author:  * @create:  **/ @FunctionalInterface public interface DenyPolicy {
void reject(Runnable runnable, ThreadPool threadPool); //丢弃式拒绝 class DiscardDenyPolicy implements DenyPolicy {
@Override public void reject(Runnable runnable, ThreadPool threadPool) {
//do nothing } } //抛出异常式拒绝 class AbortDenyPolicy implements DenyPolicy {
@Override public void reject(Runnable runnable, ThreadPool threadPool) {
throw new RunnableDenyException("The runnable " + runnable + "will be abort."); } } //提交者线程中执行任务 class RunnerDenyPolicy implements DenyPolicy {
@Override public void reject(Runnable runnable, ThreadPool threadPool) {
if (!threadPool.isShutdown()){
runnable.run(); } } } }
六、线程池内部的创建线程工厂类:
package com.concurrent.chapter08; /**  * @description: 创建线程的接口  * @author:  * @create:  **/ @FunctionalInterface public interface ThreadFactory {
Thread createThread(Runnable runnable); }
七、线程池内线程执行类:
package com.concurrent.chapter08; /**  * @description: 从任务队列中取出某个runnable并执行run方法  * @author:  * @create:  **/ public class InternalTask implements Runnable{
private final RunnableQueue runnableQueue; private volatile boolean running = true; public InternalTask(RunnableQueue runnableQueue){ this.runnableQueue = runnableQueue;} @Override public void run() {
//如果当前任务为running并且没有被中断,则其将不断从queue中获取runnable,然后执行run方法 while (running && !Thread.currentThread().isInterrupted()){
try {
Runnable task = runnableQueue.take(); task.run(); } catch (InterruptedException e) {
running = false; break; } } } //停止当前任务, 主要会在线程池的shutdown方法中使用 public void stop(){
this.running = false; } }
八、线程池的一个异常类:拒绝策略用到
package com.concurrent.chapter08; /**  * @description:通知任务提交者,任务队列无法接收新任务  * @author:  * @create:  **/ public class RunnableDenyException extends RuntimeException {
public RunnableDenyException(String s) {
super(s); } }
 

转载于:https://www.cnblogs.com/herosoft/p/10754815.html

你可能感兴趣的文章
DB2 IBM InfoSphere Data Replication Center - SQL Replication[翻译]
查看>>
CentOS时间同步
查看>>
PHP防XSS 防SQL注入的代码
查看>>
MongoDB权威指南——管理
查看>>
ActiveMQ持久化方式
查看>>
手动(批量)添加nginx虚拟主机和rewrite规则
查看>>
安装JBOSS EAP 6 Standalone模式
查看>>
手动修改IP和MAC地址
查看>>
(20)Powershell中的特殊运算符
查看>>
IIS 7.0 六大新特性
查看>>
32. mac上传下载文件到远程服务器scp
查看>>
阿里云数据库2-3月刊:阿里云峰会云数据库四大发布
查看>>
像Google一样构建机器学习系统 - 利用MPIJob运行ResNet101
查看>>
Django book 笔记---Form表单
查看>>
为什么我不同意建房子
查看>>
使用webpack实现jquery按需加载
查看>>
phalcon queueing使用心得
查看>>
The difference between hard and soft links
查看>>
Python学习日记---字符串
查看>>
脚本入门之算术运算
查看>>