博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程池和队列学习,队列在线程池中的使用,什么是队列阻塞,什么是有界队列
阅读量:2488 次
发布时间:2019-05-11

本文共 8881 字,大约阅读时间需要 29 分钟。

一,线程池

首先了解线程池整个框架

java线程池框架

1,这里记住最核心的类是ThreadPoolExecutor 

2,在ExecuorService中提供了newSingleThreadExecutor,newFixedThreadPool,newCacheThreadPool,newScheduledThreadPool四个方法,这四个方法返回的类型是ThreadPoolExecutor。

3,这里Executor是接口,ExecutorService也是接口并继承了Executor.Executors是Executor的工具类,通过Executors.newSingleThreadExecutor可以调用上面四个方法。大家可以通过查看源码了解上图组成的各个关系。

4,线程池的核心参数

 corePoolSize : 池中核心的线程数

  maximumPoolSize : 池中允许的最大线程数。

  keepAliveTime : 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

  unit : keepAliveTime 参数的时间单位。

  workQueue : 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务。

  threadFactory : 执行程序创建新线程时使用的工厂。

  handler : 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

  ThreadPoolExecutor :Executors类的底层实现。

5,4中这里要详细介绍的是workQueue,理解为任务队列

大家可以理解线程池中使用到了队列,队列也是线程池的组成部分之一。

任务队列分类:

       SynchonousQueue: 同步队列,队列直接提交给线程执行而不保持它们,此时线程池通常是无界的  LinkedBlockingQueue: 无界对列,当线程池线程数达到最大数量时,新任务就会在队列中等待执行,可能会造成队列无限膨胀  ArrayBlockingQueue : 有界队列,有助于防止资源耗尽,一旦达到上限,可能会造成新任务丢失
  newSingleThreadExecutor、newFixedThreadPool使用的是LinkedBlockingQueue  newCachedThreadPool 使用的是 SynchonousQueue  newScheduledThreadPool使用的是 DelayedWorkQueue

根据上面说明线程池常用的四个方法都使用到了任务队列。因此有必要对任务队列了解了解:

二,队列

Queue: 基本上,一个队列就是一个先入先出(FIFO)的数据结构,注意是对尾插入,对头取出。

Queue接口与List、Set同一级别,都是继承了Collection接口。LinkedList实现了Deque接 口。

1,按阻塞队列和非阻塞队列划分为两类

1、没有实现的阻塞接口的LinkedList: 实现了java.util.Queue接口和java.util.AbstractQueue接口

  内置的不阻塞队列: PriorityQueue 和 ConcurrentLinkedQueue
  PriorityQueue 和 ConcurrentLinkedQueue 类在 Collection Framework 中加入两个具体集合实现。 
  PriorityQueue 类实质上维护了一个有序列表。加入到 Queue 中的元素根据它们的天然排序(通过其 java.util.Comparable 实现)或者根据传递给构造函数的 java.util.Comparator 实现来定位。
  ConcurrentLinkedQueue 是基于链接节点的、线程安全的队列。并发访问不需要同步。因为它在队列的尾部添加元素并从头部删除它们,所以只要不需要知道队列的大 小,          ConcurrentLinkedQueue 对公共集合的共享访问就可以工作得很好。收集关于队列大小的信息会很慢,需要遍历队列。

非阻塞对列常用的几个方法特性说明:

 add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;  remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;  offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;  poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;  peek():获取队首元素,若成功,则返回队首元素;否则返回null

2)实现阻塞接口的:
  java.util.concurrent 中加入了 BlockingQueue 接口和五个阻塞队列类。它实质上就是一种带有一点扭曲的 FIFO 数据结构。不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者元素可用。

这里怎么理解阻塞这里两个字呢?

使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,简单理解就是非阻塞队列时,一个线程去拿队列里的东西,发现这个队列是空的,那紧接着这个线程就执行完了,可当有任务进来的时候还有重新启动一个线程去队列中拿(也就是唤醒策略),这个实现起来就非常麻烦。但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒),也就是始终有个线程在等着这个队列,如果队列中有东西了,被阻塞的线程会主动去拿。这样提供了极大的方便性。理解阻塞两个字对下面线程池中理解核心线程数和最大线程数的关系很重要。

 

五个队列所提供的各有不同:
  * ArrayBlockingQueue :一个由数组支持的有界队列。
  * LinkedBlockingQueue :一个由链接节点支持的可选有界队列。
  * PriorityBlockingQueue :一个由优先级堆支持的无界优先级队列。
  * DelayQueue :一个由优先级堆支持的、基于时间的调度队列。
  * SynchronousQueue :一个利用 BlockingQueue 接口的简单聚集(rendezvous)机制。

阻塞对列常用的几个方法特性说明:

put(E e)  take()  offer(E e,long timeout, TimeUnit unit)  poll(long timeout, TimeUnit unit)    put方法用来向队尾存入元素,如果队列满,则等待;  take方法用来从队首取元素,如果队列为空,则等待;  offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;  poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;

可以看出put与take对应,offer与poll对应。

 

这里加入一个队列结合线程池使用的案例:

package com.yao;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class BlockingQueueTest { /** 定义装苹果的篮子  */ public static class Basket{  // 篮子,能够容纳3个苹果  BlockingQueue
basket = new ArrayBlockingQueue
(3); // 生产苹果,放入篮子 public void produce() throws InterruptedException{ // put方法放入一个苹果,若basket满了,等到basket有位置 basket.put("An apple"); } // 消费苹果,从篮子中取走 public String consume() throws InterruptedException{ // get方法取出一个苹果,若basket为空,等到basket有苹果为止 String apple = basket.take(); return apple; } public int getAppleNumber(){ return basket.size(); } } // 测试方法 public static void testBasket() { // 建立一个装苹果的篮子 final Basket basket = new Basket(); // 定义苹果生产者 class Producer implements Runnable { public void run() { try { while (true) { // 生产苹果 System.out.println("生产者准备生产苹果:" + System.currentTimeMillis()); basket.produce(); System.out.println("生产者生产苹果完毕:" + System.currentTimeMillis()); System.out.println("生产完后有苹果:"+basket.getAppleNumber()+"个"); // 休眠300ms Thread.sleep(300); } } catch (InterruptedException ex) { } } } // 定义苹果消费者 class Consumer implements Runnable { public void run() { try { while (true) { // 消费苹果 System.out.println("消费者准备消费苹果:" + System.currentTimeMillis()); basket.consume(); System.out.println("消费者消费苹果完毕:" + System.currentTimeMillis()); System.out.println("消费完后有苹果:"+basket.getAppleNumber()+"个"); // 休眠1000ms Thread.sleep(1000); } } catch (InterruptedException ex) { } } } ExecutorService service = Executors.newCachedThreadPool(); Producer producer = new Producer(); Consumer consumer = new Consumer(); service.submit(producer); service.submit(consumer); // 程序运行10s后,所有任务停止 try { Thread.sleep(10000); } catch (InterruptedException e) { } service.shutdownNow(); } public static void main(String[] args) { BlockingQueueTest.testBasket(); }}

上面案例,找到main方法,调用测试类,在测试类中先创建生产者内部类,再创建消费者内部类,然后创建线程池,把生产者提交到线程池中执行run()方法,生产苹果;再把消费者放到线程池中消费苹果。

 

三,介绍ThreadPoolExecutor的主要的三种类型各队列的对于关系(其实可以直接通过new  ThreadPoolExecutor(String param1,String param2...)的方式去创建想要的核心线程数,最大线程数,哪种工作队列),只不过已有的三种类型是常用的,默认为我们选择了使用的任务队列。

1,(new)FixedThreadPool

FixedThreadPool被称为可重用固定线程数的线程池。下面是FixedThreadPool的源代码实现。public static ExecutorService newFixedThreadPool(int nThreads) {     return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue());}

FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。

FixedThreadPool的execute()方法的运行示意图如图所示。

FixedThreadPool的execute()的运行示意图

上图的说明如下。

    1)如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。

    2)在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入 LinkedBlockingQueue。

    3)线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响。

    1)当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过       corePoolSize。

    2)由于1,使用无界队列时maximumPoolSize将是一个无效参数。 

    3)由于1和2,使用无界队列时keepAliveTime将是一个无效参数。

    4)由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或 shutdownNow())不会拒绝任务   (不会调用RejectedExecutionHandler.rejectedExecution方法)。

 

SingleThreadExecutor详解

SingleThreadExecutor是使用单个worker线程的Executor。下面是SingleThreadExecutor的源代码实现。

public static ExecutorService newSingleThreadExecutor() {     return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS    ,new LinkedBlockingQueue()));}

SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。其他参数与 FixedThreadPool相同。SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。SingleThreadExecutor使用无界队列作为工作队列对线程池带来的影响与FixedThreadPool相同,这里就不赘述了。

SingleThreadExecutor的运行示意图如图所示。

SingleThreadExecutor的execute()的运行示意

对上图的说明如下。

    1)如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。

    2)在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入Linked- BlockingQueue。

    3)线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。

 

CachedThreadPool详解

CachedThreadPool是一个会根据需要创建新线程的线程池。下面是创建CachedThreadPool的源代码。

public static ExecutorService newCachedThreadPool() {     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue());}

CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为 Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着 CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。

FixedThreadPool和SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但 CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于 maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。极端情况下, CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

CachedThreadPool的execute()方法的执行示意图如图所示。

对上图的说明如下。

    1)首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正在执行       SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线       程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行下面的步骤

    2)当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执       行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1)将失败。       此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。

    3)在步骤2)中新创建的线程将任务执行完后,会执行 SynchronousQueue.poll(keepAliveTime,       TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60       秒钟内主线程提交了一个新任务(主线程执行步骤1)),那么这个空闲线程将执行主线程提交的新任务;否       则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不        会使用任何资源。

前面提到过,SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行。CachedThreadPool中任务传递的示意图如图所示。

CachedThreadPool的任务传递示意图 

 

注:本文参考文章来源:

 

谢谢!!

转载地址:http://lgdrb.baihongyu.com/

你可能感兴趣的文章
vm+ubuntu联网
查看>>
netflix 推荐算法学习1(转)
查看>>
python从socket做个websocket的聊天室server
查看>>
java标号
查看>>
[Computation]集合、关系、语言
查看>>
20130328java基础学习笔记-循环结构for以及for,while循环区别
查看>>
caffe网络模型各层详解(一)
查看>>
第三章总结
查看>>
【转载】什么是C++虚函数、虚函数的作用和使用方法
查看>>
POJ 1745 Divisibility DP
查看>>
SPSS学习中涉及的统计知识
查看>>
Visual Studio 2013 添加一般应用程序(.ashx)文件到SharePoint项目
查看>>
正则表达式格式化日期
查看>>
Tools: geos 使用指南
查看>>
蟠桃记
查看>>
让不带www的域名跳转到带www的域名
查看>>
Celery 之异步任务、定时任务、周期任务
查看>>
jsonp 跨域原理详解
查看>>
像素包装:在内存中并不以紧密形式排列
查看>>
BZOJ3144: [Hnoi2013]切糕
查看>>