天下脸皮共十分
我占八分

手写线程池

创建任务阻塞队列
 package tech.ityoung.study.demo.juc.threadpool;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 ​
 @Slf4j
 public class BlockingQueue<T> {
     private int capacity;
 ​
     private Deque<T> queue = new ArrayDeque<>();
 ​
     private ReentrantLock lock = new ReentrantLock();
 ​
     private Condition fullQueueCondition = lock.newCondition();
 ​
     private Condition emptyQueueCondition = lock.newCondition();
 ​
     public BlockingQueue(int capacity) {
         this.capacity = capacity;
    }
 ​
     private int size() {
         lock.lock();
         try {
             return queue.size();
        } finally {
             lock.unlock();
        }
    }
 ​
     public T poolTask(long timeout, TimeUnit timeUnit) {
         lock.lock();
         try {
             long nanos = timeUnit.toNanos(timeout);
             while (this.size() <= 0) {
                 try {
                     if (nanos <= 0) {
                         log.info("get task timeout");
                         return null;
                    }
                     log.info("queue is empty");
                     nanos = emptyQueueCondition.awaitNanos(nanos);
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
            }
             T t = queue.removeFirst();
             fullQueueCondition.signal();
             return t;
        } finally {
             lock.unlock();
        }
    }
 ​
     public void addTask(T task) {
         lock.lock();
         try {
             while (this.size() == capacity) {
                 try {
                     log.info("queue is full");
                     fullQueueCondition.await();
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
            }
             queue.addLast(task);
             emptyQueueCondition.signal();
        } finally {
             lock.unlock();
        }
    }
 }
 ​
创建线程池
 package tech.ityoung.study.demo.juc.threadpool;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 import java.util.HashSet;
 import java.util.Set;
 ​
 @Slf4j
 public class MyThreadPool {
     private int coreSize;
 ​
     private Set<Worker> workers = new HashSet<>();
 ​
     private BlockingQueue<Runnable> taskQueue = new BlockingQueue<>(5);
 ​
     public MyThreadPool() {
    }
 ​
     public MyThreadPool(int coreSize) {
         this.coreSize = coreSize;
    }
 ​
     public void executeTask(Runnable task) {
         if (workers.size() < coreSize) {
             Worker worker = new Worker(task, taskQueue, workers);
             log.info("worker created: {}", worker);
             workers.add(worker);
             worker.start();
        } else {
             taskQueue.addTask(task);
        }
    }
 }
创建线程执行对象
 package tech.ityoung.study.demo.juc.threadpool;
 ​
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 ​
 public class Worker extends Thread {
     private Runnable task;
 ​
     private BlockingQueue<Runnable> taskQueue;
 ​
     Set<Worker> workers;
 ​
     public Worker(Runnable task, BlockingQueue<Runnable> queue, Set<Worker> workers) {
         this.task = task;
         this.taskQueue = queue;
         this.workers = workers;
    }
 ​
     @Override
     public void run() {
         while (task != null || (task = taskQueue.poolTask(5000, TimeUnit.MILLISECONDS)) != null) {
             try {
                 task.run();
            } catch (Exception e) {
                 e.printStackTrace();
            } finally {
                 task = null;
            }
        }
         workers.remove(this);
    }
 }
创建任务并执行
 package tech.ityoung.study.demo.juc.threadpool;
 ​
 import lombok.extern.slf4j.Slf4j;
 ​
 @Slf4j
 public class ThreadPoolDemo {
     public static void main(String[] args) {
         MyThreadPool myThreadPool = new MyThreadPool(2);
         for (int i = 0; i < 20; i++) {
             int j = i;
             myThreadPool.executeTask(() -> {
                 log.info(j + "");
                 try {
                     Thread.sleep(500);
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
            });
        }
    }
 }
最终执行结果
 00:24:34.739 [main] INFO tech.ityoung.study.demo.juc.threadpool.MyThreadPool - worker created: Thread[Thread-0,5,main]
 00:24:34.743 [main] INFO tech.ityoung.study.demo.juc.threadpool.MyThreadPool - worker created: Thread[Thread-1,5,main]
 00:24:34.743 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
 00:24:34.769 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 1
 00:24:34.769 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 0
 00:24:35.280 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 2
 00:24:35.280 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
 00:24:35.280 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 3
 00:24:35.781 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 5
 00:24:35.781 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 4
 00:24:35.781 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
 00:24:36.294 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 6
 00:24:36.294 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 7
 00:24:36.294 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
 00:24:36.809 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 9
 00:24:36.809 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
 00:24:36.809 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 8
 00:24:37.310 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 10
 00:24:37.310 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
 00:24:37.310 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 11
 00:24:37.813 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
 00:24:37.813 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 12
 00:24:37.813 [main] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is full
 00:24:37.813 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 13
 00:24:38.316 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 14
 00:24:38.316 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 15
 00:24:38.822 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 17
 00:24:38.822 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 16
 00:24:39.328 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 18
 00:24:39.328 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.ThreadPoolDemo - 19
 00:24:39.832 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is empty
 00:24:39.832 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - queue is empty
 00:24:44.839 [Thread-0] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - get task timeout
 00:24:44.839 [Thread-1] INFO tech.ityoung.study.demo.juc.threadpool.BlockingQueue - get task timeout
 Disconnected from the target VM, address: '127.0.0.1:6398', transport: 'socket'
赞(3) 打赏
未经允许不得转载:Stephen Young » 手写线程池
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏