Queue

May 16, 2019 · View on GitHub

queue-常见实现类

queueuml
SynchronousQueueSynchronousQueue
LinkedBlockingQueueLinkedBlockingQueue
ArrayBlockingQueueArrayBlockingQueue

BlockingQueue Method list

下面的方法,有些会阻塞,有些会抛出异常,在使用的时候,需要理解每个方法产生的影响,避免坑。

ActionThrows exceptionSpecial valueBlocksTimes out
Insertadd(e)offer(e)put(e)offer(e, time, unit)
Removeremove()poll()take()poll(time, unit)
Examineelement()peek()not applicablenot applicable

ArrayBlockingQueue

  • FIFO (first-in-first-out)先进先出
  • 底层实现是数组
  • 线程安全,只使用一个可重入锁来来控制线程访问
  • 添加元素总是在队列末部
  • 删除元素总是在队列头部
  • 基于数组,大小在初始化时固定不变
  • 如果 queue 满了,put方法继续添加元素的时候,就会阻塞
  • 如果 queue 是空的,take方法会阻塞一直到有数据插入

put

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

take

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

enqueue

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

dequeue

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

LinkedBlockingQueue

  • FIFO (first-in-first-out)
  • 底层使用链表而非数组存储元素
  • 添加元素总是在队列末部
  • 删除元素总是在队列头部
  • 使用两个锁来控制线程访问,这样队列可以同时进行 puttake 的操作,因此吞吐量相对 ArrayBlockingQueue
  • 可以不指定队列大小,此时默认大小为 Integer.MAX_VALUE (无边际的队列,会导致内存泄漏)

init

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // last head 都指向同一个node
        // 因此enqueue,dequeue操作的都是同一个对象new Node<E>
        // last,head可以理解为初始化时候new Node<E>的两个别名
        last = head = new Node<E>(null);
    }

LinkedBlockingQueue enqueue

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        //  last和last.next 都指向node
        last = last.next = node;
    }

LinkedBlockingQueue dequeue

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        // 从队列的头部取元素
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

demo

LinkedBlockingQueue插入的图解源文件(可导入draw.io进行编辑)

下面的图分为3部分:

  1. init
  2. 第一次put
  3. 第二次put

linked-blocking-queue-put

下面是demo,里面的方法参考LinkedBlockingQueue中实现

   public static void main(String[] args) {

        Node<String> node = new Node<>(null);
        Node<String> last = null;
        Node<String> head = null;
        last = head = node;
        System.out.println("last = " + last);
        System.out.println("head = " + head);

        Node<String> node1 = new Node<>("1");

        last = last.next = node1;//入队第一次)
        System.out.println("last = " + last);
        System.out.println("head = " + head);


        Node<String> node2 = new Node<>("2");

        last = last.next = node2;//入队(第二次)
        System.out.println("last = " + last);
        System.out.println("head = " + head);


        {   // 模拟出队(第一次)
            Node<String> h = head;
            Node<String> first = h.next;
            h.next = h; // help GC
            head = first;
            String x = first.item;
            first.item = null;

            System.out.println("last = " + last);
            System.out.println("head = " + head);
        }

        {   // 模拟出队(第二次)
            Node<String> h = head;
            Node<String> first = h.next;
            h.next = h; // help GC
            head = first;
            String x = first.item;
            first.item = null;

            System.out.println("last = " + last);
            System.out.println("head = " + head);

        }
    }


    static class Node<E> {
        E item;
        Node next;

        public Node(E item) {
            this.item = item;
        }

        @Override
        public String toString() {
            return "Node{" + "item=" + item + ", next=" + next + '}';
        }
    }

执行结果

last = Node{item=null, next=null}
head = Node{item=null, next=null}
last = Node{item=1, next=null}
head = Node{item=null, next=Node{item=1, next=null}}
last = Node{item=2, next=null}
head = Node{item=null, next=Node{item=1, next=Node{item=2, next=null}}}
last = Node{item=2, next=null}
head = Node{item=null, next=Node{item=2, next=null}}
last = Node{item=null, next=null}
head = Node{item=null, next=null}

ArrayBlockingQueue vs LinkedBlockingQueue

  1. ArrayBlockingQueue 初始化必须声明大小, LinkedBlockingQueue 则不用,默认容量是 Integer.MAX_VALUE
  2. ArrayBlockingQueue 基于数组, LinkedBlockingQueue 的数据结构是链表
  3. ArrayBlockingQueue 中使用一个可重入锁进行并发控制, LinkedBlockingQueue 中使用二个可以重入锁,实现put,take的并发控制
  4. LinkedBlockingQueue 中使用last,head 来维护链接,put 操作只改变 last,take 操作只改变 head,因此二种操作,不存在操作共享数据,可以用二个锁进行并发控制

SynchronousQueue

参考文档