所有分类
  • 所有分类
  • 未分类

Java线程池–核心参数/大小设置/使用示例

简介

本文介绍Java线程池的用法。包括:主要的参数、线程池大小的设置、使用步骤、使用实例。

核心参数

ThreadPoolExecutor原型:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  1. corePoolSize
    1. 线程池的核心线程数
    2. 即便是线程池里没有任何任务,也会有corePoolSize个线程在候着等任务。
  2. maximumPoolSize
    1. 最大线程数。
    2. 超过此数量,会触发拒绝策略。
  3. keepAliveTime
    1. 线程的存活时间。
    2. 当线程池里的线程数大于corePoolSize时,如果等了keepAliveTime时长还没有任务可执行,则线程退出。
  4. unit
    1. 指定keepAliveTime的单位
    2. 比如:秒:TimeUnit.SECONDS。
  5. workQueue
    1. 一个阻塞队列,提交的任务将会被放到这个队列里。
  6. threadFactory
    1. 线程工厂,用来创建线程
    2. 主要是为了给线程起名字,默认工厂的线程名字:pool-1-thread-3。
  7. handler
    1. 拒绝策略
    2. 当线程池里线程被耗尽,且队列也满了的时候会调用。
    3. 默认拒绝策略为AbortPolicy。即:不执行此任务,而且抛出一个运行时异常

java doc的建议

java doc不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池,例如:Executors.newCachedThreadPool();  Executors.newSingleThreadExecutor(); 等。

大公司的建议

很多大公司会明确要求使用创建ThreadPoolExecutor对象的方法来使用线程池,因为这样能使人明确各个参数的具体含义。Executors类中提供的静态方法都是调用了创建ThreadPoolExecutor对象的方法,例如:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

个数(大小)设置

任务分类 

根据任务所需要的cpu和io资源的量可以分为:

  • CPU密集型任务:
    • 主要是执行计算任务。响应时间很快,cpu一直在运行,这种任务cpu的利用率很高。
    • 线程池大小太大对程序性能而言,反而是不利的,但最少也不应低于处理器的核心数。因为当有多个任务处于就绪状态时,处理器核心需要在线程间频繁进行上下文切换,而这种切换对程序性能损耗较大。
  • IO密集型任务
    • 主要是进行IO操作,执行IO操作的时间较长,这时cpu处于空闲状态,导致cpu的利用率不高。
    • 当一个任务执行IO操作时,其线程将被阻塞,于是处理器可以立即进行上下文切换以便处理其他就绪线程。如果我们只有处理器可用核心数那么多个线程的话,即使有待执行的任务也无法处理,因为我们已经拿不出更多的线程供处理器调度了。 

CPU密集型任务与IO密集型任务区分方法

如果任务被阻塞的时间少于执行时间,即这些任务是计算密集型的,则程序所需线程数将随之减少,但最少也不应低于处理器的核心数。

如果任务被阻塞的时间大于执行时间,即该任务是IO密集型的,我们就需要创建比处理器核心数大几倍数量的线程。例如,如果任务有50%的时间处于阻塞状态,则程序所需线程数为处理器可用核心数的两倍。

常用线程池大小设置

  • CPU密集型:核心线程数 = CPU核数 + 1
  • IO密集型:核心线程数 = CPU核数 * 2 + 1

CPU核数可以用此法获得:Runtime.getRuntime().availableProcessors()

对于计算密集型的任务,一个有N个处理器的系统通常使用一个N+1个线程的线程池来获得最优的利用率。+1的原因:如果计算密集型的线程恰好在某时因为发生一个页错误或者因其它原因而暂停,刚好有一个”额外”的线程,可以确保在这种情况下CPU周期不会中断工作。

计算公式

N = CPU的数量

U = 期望的CPU的使用率,介于0-1之间

f:阻塞系数(阻塞时间占总时间的比例。总时间:阻塞时间 + 执行时间)

线程池大小 = N * U / (1 – f)     //一个完全阻塞的任务是注定要挂掉的,无须担心阻塞系数会达到1。

举例:CPU核心数是4,期望cpu的使用率是100%,等待时间是4秒,计算时间是1秒。那么最优的池大小就是:

4 * 100% / (1 – 4/5) = 20

线程池使用步骤

步骤

1、创建一个线程池对象,控制要创建几个线程对象。

2、实现线程:

  法1:新建一个类实现Runnable或者Callable接口。
  法2:直接用lambda表达式传值。比如:

public class Demo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        executor.execute(Demo::myRun);
    }

    public static void myRun() {
        System.out.println("hello world");
    }
}

3、提交线程调用如下任一方法即可

void execute(Runnable command);
Future<?> submit(Runnable task)
<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)

4、结束线程池

submit 与 execute

submitexecute
说明是ExecutorService中的方法。 用来提交一个任务。是Executor接口的方法。 在未来某个时间执行给定命令。该命令可能在新的线程、已入池的线程或正调用的线程中执行,由 Executor 的实现决定。
方法原型 <T> Future<T> submit(Callable<T> task)        //提交一个返回值的任务用于执行        //返回一个表示任务的未决结果的 Future。
Future<?> submit(Runnable task)         //提交一个 Runnable 任务用于执行         //返回一个表示该任务的 Future。
<T> Future<T> submit(Runnable task, T result)         //提交一个 Runnable 任务用于执行         //返回一个表示该任务的 Future。
void execute(Runnable command)
返回值返回值是future对象  可以获取执行结果没有返回值

Future

Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。

如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

也就是说Future提供了三种功能:

  1. 判断任务是否完成;
  2. 中断任务;
  3. 获取任务执行结果

future的方法

  • boolean cancel(boolean mayInterruptIfRunning) 试图取消对此任务的执行。
  • V get() 如有必要,等待计算完成,然后获取其结果。
  • V get(long timeout, TimeUnit unit) 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
  • boolean isCancelled() 如果在任务正常完成前将其取消,则返回 true。
  • boolean isDone() 如果任务已完成,则返回 true。

关闭线程池

  • void shutdown() 启动一次顺序关闭,等待执行以前提交的任务完成,但不接受新任务。
  • List<Runnable> shutdownNow() 试图立即停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

示例(ThreadPoolExecutor)

简介

ThreadPoolExecutor 结构图

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

示例1:睡眠任务

代码

package com.example;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class MyTask implements Runnable {
    private int taskNum;

    public MyTask(int num) {
        this.taskNum = num;
    }

    @Override
    public void run() {
        System.out.println("正在执行task " + taskNum);
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("-----------------------task " + taskNum + "执行完毕");
    }
}

public class Demo {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,
                200, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(4));

        for (int i = 0; i < 14; i++) {
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
                    executor.getQueue().size() + ",已执行完别的任务数目:" + executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }
}

运行结果

Connected to the target VM, address: '127.0.0.1:58452', transport: 'socket'
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 1
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 2
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 3
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:7,队列中等待执行的任务数目:4,已执行完别的任务数目:0
正在执行task 9
正在执行task 10
线程池中线程数目:8,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:9,队列中等待执行的任务数目:4,已执行完别的任务数目:0
正在执行task 11
正在执行task 12
线程池中线程数目:10,队列中等待执行的任务数目:4,已执行完别的任务数目:0
正在执行task 13
-----------------------task 3执行完毕
-----------------------task 2执行完毕
-----------------------task 1执行完毕
正在执行task 5
-----------------------task 4执行完毕
正在执行task 7
正在执行task 8
-----------------------task 0执行完毕
-----------------------task 11执行完毕
-----------------------task 10执行完毕
-----------------------task 9执行完毕
正在执行task 6
-----------------------task 13执行完毕
-----------------------task 12执行完毕
-----------------------task 5执行完毕
-----------------------task 8执行完毕
-----------------------task 6执行完毕
-----------------------task 7执行完毕
Disconnected from the target VM, address: '127.0.0.1:58452', transport: 'socket'

Process finished with exit code 0

从执行结果可以看出:

  1. 当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。
  2. 线程都执行完毕后,程序可以正常退出。

如果上面程序中,将for循环改成15,就会抛出任务拒绝异常了,日志如下:

Connected to the target VM, address: '127.0.0.1:58469', transport: 'socket'
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 1
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:7,队列中等待执行的任务数目:4,已执行完别的任务数目:0
正在执行task 9
线程池中线程数目:8,队列中等待执行的任务数目:4,已执行完别的任务数目:0
正在执行task 10
线程池中线程数目:9,队列中等待执行的任务数目:4,已执行完别的任务数目:0
正在执行task 11
线程池中线程数目:10,队列中等待执行的任务数目:4,已执行完别的任务数目:0
正在执行task 12
正在执行task 13
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.example.MyTask@27ddd392 rejected from java.util.concurrent.ThreadPoolExecutor@19e1023e[Running, pool size = 10, active threads = 10, queued tasks = 4, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.example.Demo.main(Demo.java:34)
-----------------------task 4执行完毕
-----------------------task 1执行完毕
-----------------------task 9执行完毕
-----------------------task 0执行完毕
-----------------------task 13执行完毕
-----------------------task 3执行完毕
-----------------------task 2执行完毕
-----------------------task 12执行完毕
-----------------------task 11执行完毕
-----------------------task 10执行完毕
正在执行task 8
正在执行task 6
正在执行task 5
正在执行task 7
-----------------------task 7执行完毕
-----------------------task 6执行完毕
-----------------------task 5执行完毕
-----------------------task 8执行完毕

结果:

  1. 在执行第15个线程时,直接抛出了异常,但没有影响其他进程的运行
  2. 程序无法正常退出

示例2:死循环任务

本处只是修改了上边的睡眠任务为死循环,其他不变

package com.example;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class MyTask implements Runnable {
    private int taskNum;

    public MyTask(int num) {
        this.taskNum = num;
    }

    @Override
    public void run() {
        System.out.println("正在执行task " + taskNum);
        while (true) ;
    }
}

public class Demo {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,
                200, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(4));

        for (int i = 0; i < 14; i++) {
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
                    executor.getQueue().size() + ",已执行完别的任务数目:" + executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }
}

执行结果:(程序无法退出)

Connected to the target VM, address: '127.0.0.1:60128', transport: 'socket'
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 1
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完别的任务数目:0
正在执行task 4
线程池中线程数目:6,队列中等待执行的任务数目:4,已执行完别的任务数目:0
正在执行task 2
线程池中线程数目:7,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:8,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:9,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:4,已执行完别的任务数目:0
正在执行task 9
正在执行task 10
正在执行task 12
正在执行task 11
正在执行task 13
3

评论2

请先

  1. 线程池是核心线程使用完之后i,先添加阻塞队列,满了之后在新建线程的吗
    flyingshy 2024-04-01 0
显示验证码
没有账号?注册  忘记密码?

社交账号快速登录