所有分类
  • 所有分类
  • 未分类

Java–使用阻塞队列实现顺序消费–方法/实例

简介

说明

本文用示例介绍使用阻塞队列来实现顺序消费。

需求

机器要对手机按顺序做如下任务:生产、打包、发货。消费者等待收货。

代码

手机产品

package org.example.a;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class Phone {
    /**
     * 手机的状态:
     * PRODUCED: 已生产
     * PACKED: 已打包
     * DELIVERED: 已发货
     * <p>手机的状态只能由PRODUCED->PACKED->DELIVERED转变
     */
    public enum Status {
        PRODUCED, PACKED, DELIVERED
    }

    // 默认状态为PRODUCED
    private Status status = Status.PRODUCED;

    private final int id;

    public Phone(int id) {
        this.id = id;
    }

    public void pack() {
        status = Status.PACKED;
    }

    public void deliver() {
        status = Status.DELIVERED;
    }

    public Status getStatus() {
        return status;
    }

    public int getId() {
        return id;
    }

    public String toString() {
        return "Phone id: " + id + ", status: " + status;
    }
}

队列

import java.util.concurrent.LinkedBlockingQueue;

public class PhoneQueue extends LinkedBlockingQueue<Phone> {
}

任务

生产手机的任务

/**
 * 生产手机的任务。
 */
public class Producer implements Runnable {
    private PhoneQueue phoneQueue;

    private int count = 0;

    private Random random = new Random(47);

    public Producer(PhoneQueue queue) {
        this.phoneQueue = queue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                TimeUnit.MILLISECONDS.sleep(300 + random.nextInt(500));
                //生产一部手机,这些手机是有序的
                Phone phone = new Phone(count++);
                System.out.println(phone);
                //放到PhoneQueue中
                phoneQueue.put(phone);
            }
        } catch (InterruptedException e) {
            System.out.println("Producer interrupted.");
        }
        System.out.println("Producer off.");
    }
}

打包的任务

/**
 * 打包的任务
 */
public class Packer implements Runnable {
    private PhoneQueue producedQueue;
    private PhoneQueue packedQueue;

    public Packer(PhoneQueue producedQueue, PhoneQueue packedQueue) {
        this.producedQueue = producedQueue;
        this.packedQueue = packedQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                //在取得下一个手机之前会一直阻塞
                Phone phone = producedQueue.take();
                phone.pack();
                System.out.println(phone);
                packedQueue.put(phone);
            }
        } catch (InterruptedException e) {
            System.out.println("Packer interrupted.");
        }
        System.out.println("Packer off.");
    }
}

发货的任务

/**
 * 发货的任务
 */
public class Delivery implements Runnable {
    private PhoneQueue butteredQueue;
    private PhoneQueue finishedQueue;

    public Delivery(PhoneQueue butteredQueue, PhoneQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
        this.butteredQueue = butteredQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                //在取得下一个手机之前会一直阻塞
                Phone phone = butteredQueue.take();
                phone.deliver();
                System.out.println(phone);
                finishedQueue.put(phone);
            }
        } catch (InterruptedException e) {
            System.out.println("Deliverer interrupted.");
        }
        System.out.println("Deliverer off.");
    }
}

消费者(买手机的人)

/**
 * 买手机的人,消费者。
 */
public class Consumer implements Runnable {
    private PhoneQueue finishedQueue;
    private int count = 0;

    public Consumer(PhoneQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                //在取得下一个手机之前会一直阻塞
                Phone phone = finishedQueue.take();
                //验证取得的手机是有序的,而且状态是DELIVERED的
                if (phone.getId() != count++
                        || phone.getStatus() != Phone.Status.DELIVERED) {
                    System.out.println("Error -> " + phone);
                    System.exit(-1);
                } else {
                    //使用手机
                    System.out.println(phone + "->Use");
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Consumer interrupted.");
        }
        System.out.println("Consumer off.");
    }
}

主类

public class Demo {
    public static void main(String[] args) {
        PhoneQueue producedQueue = new PhoneQueue();
        PhoneQueue packedQueue = new PhoneQueue();
        PhoneQueue deliveredQueue = new PhoneQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Producer(producedQueue));
        exec.execute(new Packer(producedQueue, packedQueue));
        exec.execute(new org.example.a.Delivery(packedQueue, deliveredQueue));
        exec.execute(new Consumer(deliveredQueue));
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (Exception e) {
            e.printStackTrace();
        }
        exec.shutdownNow();
    }
}

执行结果

Phone id: 0, status: PRODUCED
Phone id: 0, status: PACKED
Phone id: 0, status: DELIVERED
Phone id: 0, status: DELIVERED->Use
Phone id: 1, status: PRODUCED
Phone id: 1, status: PACKED
Phone id: 1, status: DELIVERED
Phone id: 1, status: DELIVERED->Use
Phone id: 2, status: PRODUCED
Phone id: 2, status: PACKED
Phone id: 2, status: DELIVERED
Phone id: 2, status: DELIVERED->Use
Phone id: 3, status: PRODUCED
Phone id: 3, status: PACKED
Phone id: 3, status: DELIVERED
Phone id: 3, status: DELIVERED->Use
Phone id: 4, status: PRODUCED
Phone id: 4, status: PACKED
Phone id: 4, status: DELIVERED
Phone id: 4, status: DELIVERED->Use
Phone id: 5, status: PRODUCED
Phone id: 5, status: PACKED
Phone id: 5, status: DELIVERED
Phone id: 5, status: DELIVERED->Use
Phone id: 6, status: PRODUCED
Phone id: 6, status: PACKED
Phone id: 6, status: DELIVERED
Phone id: 6, status: DELIVERED->Use
Phone id: 7, status: PRODUCED
Phone id: 7, status: PACKED
Phone id: 7, status: DELIVERED
Phone id: 7, status: DELIVERED->Use
Consumer interrupted.
Packer interrupted.
Producer interrupted.
Producer off.
Deliverer interrupted.
Packer off.
Consumer off.
Deliverer off.
3

评论0

请先

显示验证码
没有账号?注册  忘记密码?

社交账号快速登录