Java多线程-自定义线程池


为什么要设置线程池?

在高并发的情况下,当一下子有多个任务到达时,如果对每个任务都创建一个线程,比较占用堆内存,甚至可能会出现OOM,而且会增加CPU的负荷,因为要并行处理这么多线程,上下文切换的开销大,所以我们使用线程池,使用一定数量的线程来处理多个任务,充分利用已有线程,多的任务就在阻塞队列里等待处理。


自定义线程池的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package juc.thread;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* @Author qq
* @Date 2022/3/20
*/
public class MyThreadPool{
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10,(queue,task) ->{
//拒绝策略,当线程池中的阻塞队列满了,接下来的任务该如何抉择
//1.死等
//queue.put(task);
//2.超时不等
//queue.offer(task,500,TimeUnit.MILLISECONDS);
//3.放弃执行
//什么也不写
//4.抛出异常
//throw new RuntimeException("任务执行失败");//后面的任务都不会执行
//自己执行任务
task.run();

});
for (int i = 0; i < 5; i++) {

int finalI = i;
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName()+"正在执行任务..."+ finalI);
});
}
}
}
interface rejecctPolicy<T>{
void reject(BlockingQueue<T> rejectpolicy,T task);
}
class ThreadPool{
//任务等待队列,Runnable是对任务的抽象,对业务逻辑的抽象
private BlockingQueue<Runnable> taskQueue;

//集合来存放线程集合,这里面是线程池中真正创建的线程,并且正在处理任务
private HashSet<Worker> threads = new HashSet<>();

//你想要线程池中一共有这么多个线程来处理任务
private int threadNum;

//获取任务的超时时间,超时就让线程结束啦
private long timeout;
private TimeUnit timeUnit;

private rejecctPolicy<Runnable> rejectpolicy;

public ThreadPool(int threadNum,long timeout,TimeUnit timeUnit,int QueueCapacity,rejecctPolicy<Runnable> rejectpolicy){
this.threadNum = threadNum;
this.timeout = timeout;
this.timeUnit = timeUnit;
taskQueue = new BlockingQueue<>(QueueCapacity);
this.rejectpolicy = rejectpolicy;
}


//如果有线程的话,线程池让线程执行任务
//如果没有线程,就创建线程,执行任务
public void execute(Runnable task){
synchronized (threads){
//当正在处理任务的线程数小于期望创建的线程数时,就创建线程,来执行,否则任务就加入等待队列
if(threads.size() < threadNum){
Worker thread = new Worker(task);
System.out.println("新增线程"+thread);
threads.add(thread);
thread.start();
}else{

//如果阻塞队列已经放不下了,新来的任务需要

// 1.死等
// taskQueue.put(task);
//2.超时则不等待
//taskQueue.offer(task,timeout,timeUnit);
//3.让调用者放弃任务执行
//4.让调用者抛出异常
//5.让调用者自己执行任务

taskQueue.tryPut(rejectpolicy,task);
}
}
}

//线程,用Worker类再包装一下
class Worker extends Thread{
private Runnable task;

public Worker(Runnable task){
this.task = task;
}
@Override
public void run(){
//执行task,当任务执行完毕,还需要去队列中获取任务继续执行
while(task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
try{
System.out.println(Thread.currentThread().getName()+"准备执行任务"+task);
task.run();
}catch (Exception e){

}finally {
task = null;
}
}
synchronized (threads){
System.out.println("线程被移除"+this);
threads.remove(this);
}
}
}
}
class BlockingQueue<T>{
//创建任务队列
private Deque<T> queue = new ArrayDeque<>();

//每次只能有一个线程成功取得队头任务,所以取任务的时候需要加锁
// 没有获得锁的线程只能等待获取下一个任务
private ReentrantLock lock = new ReentrantLock();

//生产者条件遍历(休息室)
private Condition fullWaitSet = lock.newCondition();

//消费者条件变量
private Condition emptyWaitSet = lock.newCondition();

//队列中能容纳的容量
private int capacity;

//外界定义队列的大小
public BlockingQueue(int capacity){
this.capacity = capacity;
}

//线程们阻塞获取任务
public T take(){
lock.lock();
try{
while(queue.isEmpty()){
try {
//队列空就等待,无法获取任务
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
//唤醒生产任务的线程,使之继续生产
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
//带超时的等待
public T poll(long timeout, TimeUnit timeUnit){
lock.lock();
try{
long nanos = timeUnit.toNanos(timeout);
while(queue.isEmpty()){
//将超时时间转换为纳秒
try {
//返回值是剩余等待时间,超时就不等了,不用永久等待了
if(nanos <= 0){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
//唤醒生产任务的线程,使之继续生产
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}

//带超时时间的阻塞添加,超时后任务就不等了
public boolean offer(T task,long timeout,TimeUnit timeUnit){
lock.lock();
try{
long nanos = timeUnit.toNanos(timeout);
while(queue.size() == capacity){
try {
if(nanos <= 0){
return false;
}
//队列满就等待,无法继续生产任务
System.out.println("等待加入队列....");
nanos = fullWaitSet.awaitNanos(nanos);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("任务对象加入阻塞队列"+task);
queue.addLast(task);
//添加完就应该唤醒获得任务的进程
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}

//阻塞添加任务
public void put(T task){
lock.lock();
try{
while(queue.size() == capacity){
try {
//队列满就等待,无法继续生产任务
System.out.println("等待加入队列....");
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("任务对象加入阻塞队列"+task);
queue.addLast(task);
//添加完就应该唤醒获得任务的进程
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
//外界获取队列元素个数
public int size(){
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}

public void tryPut(rejecctPolicy<T> rejectpolicy, T task) {
lock.lock();
try {
if(queue.size() == capacity){
rejectpolicy.reject(this,task);
}else{
System.out.println("任务对象加入阻塞队列"+task);
queue.addLast(task);
//添加完就应该唤醒获得任务的进程
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}


Java多线程-自定义线程池
https://vickkkyz.fun/2022/03/24/Java/JUC/自定义线程池/
作者
Vickkkyz
发布于
2022年3月24日
许可协议