创建任务阻塞队列
package tech.ityoung.study.demo.juc.threadpool;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class BlockingQueue<T> {
private int capacity;
private Deque<T> queue = new ArrayDeque<>();
private ReentrantLock lock = new ReentrantLock();
private Condition fullQueueCondition = lock.newCondition();
private Condition emptyQueueCondition = lock.newCondition();
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
private int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public T poolTask(long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (this.size() <= 0) {
try {
if (nanos <= 0) {
log.info("get task timeout");
return null;
}
log.info("queue is empty");
nanos = emptyQueueCondition.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullQueueCondition.signal();
return t;
} finally {
lock.unlock();
}
}
public void addTask(T task) {
lock.lock();
try {
while (this.size() == capacity) {
try {
log.info("queue is full");
fullQueueCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyQueueCondition.signal();
} finally {
lock.unlock();
}
}
}
创建线程池
package tech.ityoung.study.demo.juc.threadpool;
import lombok.extern.slf4j.Slf4j;
import java.util.HashSet;
import java.util.Set;
@Slf4j
public class MyThreadPool {
private int coreSize;
private Set<Worker> workers = new HashSet<>();
private BlockingQueue<Runnable> taskQueue = new BlockingQueue<>(5);
public MyThreadPool() {
}
public MyThreadPool(int coreSize) {
this.coreSize = coreSize;
}
public void executeTask(Runnable task) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task, taskQueue, workers);
log.info("worker created: {}", worker);
workers.add(worker);
worker.start();
} else {
taskQueue.addTask(task);
}
}
}
创建线程执行对象
package tech.ityoung.study.demo.juc.threadpool;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class Worker extends Thread {
private Runnable task;
private BlockingQueue<Runnable> taskQueue;
Set<Worker> workers;
public Worker(Runnable task, BlockingQueue<Runnable> queue, Set<Worker> workers) {
this.task = task;
this.taskQueue = queue;
this.workers = workers;
}
@Override
public void run() {
while (task != null || (task = taskQueue.poolTask(5000, TimeUnit.MILLISECONDS)) != null) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
workers.remove(this);
}
}
创建任务并执行
package tech.ityoung.study.demo.juc.threadpool;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ThreadPoolDemo {
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(2);
for (int i = 0; i < 20; i++) {
int j = i;
myThreadPool.executeTask(() -> {
log.info(j + "");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
最终执行结果
00:24:34.739 [main] INFO tech.ityoung.study.demo.juc.threadpool.MyThreadPool - worker created: Thread[Thread-0,5,main]
00:24:34.743 [main] INFO tech.ityoung.study.demo.juc.threadpool.MyThreadPool - worker created: Thread[Thread-1,5,main]
00:24:34.743 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
00:24:34.769 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 1
00:24:34.769 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 0
00:24:35.280 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 2
00:24:35.280 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
00:24:35.280 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 3
00:24:35.781 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 5
00:24:35.781 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 4
00:24:35.781 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
00:24:36.294 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 6
00:24:36.294 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 7
00:24:36.294 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
00:24:36.809 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 9
00:24:36.809 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
00:24:36.809 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 8
00:24:37.310 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 10
00:24:37.310 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
00:24:37.310 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 11
00:24:37.813 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
00:24:37.813 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 12
00:24:37.813 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
00:24:37.813 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 13
00:24:38.316 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 14
00:24:38.316 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 15
00:24:38.822 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 17
00:24:38.822 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 16
00:24:39.328 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 18
00:24:39.328 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 19
00:24:39.832 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is empty
00:24:39.832 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is empty
00:24:44.839 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - get task timeout
00:24:44.839 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - get task timeout
Disconnected from the target VM, address: '127.0.0.1:6398', transport: 'socket'
评论前必须登录!
注册