1.PriorityBlockingQueue
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable
所有添加进入PriorityBlockingQueue的元素都必须实现Comparable接口。当插入元素时,PriorityBlockingQueue会使用CompareTo()方法来决定元素插入的位置,元素越大越靠后。PriorityBlockingQueue是一个阻塞式的数据结构。当它的方法被调用且不能立即执行时,调用这个方法的线程将被阻塞直到方法执行完成。
public class Event implements Comparable<Event> {private int thread;private int priority;public Event(int thread, int priority) {super();this.thread = thread;this.priority = priority;}public int getThread() {return thread;}public int getPriority() {return priority;}@Overridepublic int compareTo(Event o) {if(this.priority>o.getPriority()){return -1;}else if(this.priority<o.getPriority()){return 1;}else{return 0;}} }
public class Task implements Runnable {private int id;private PriorityBlockingQueue<Event> queue;public Task(int id, PriorityBlockingQueue<Event> queue) {super();this.id = id;this.queue = queue;}@Overridepublic void run() {for (int i = 0; i < 100; i++) {Event event = new Event(id, i);queue.add(event);}} }
public class PriorityBlockingQueueMain {public static void main(String[] args) {PriorityBlockingQueue<Event> queue = new PriorityBlockingQueue<Event>();Thread threads[] = new Thread[5];for (int i = 0; i < threads.length; i++) {Task task = new Task(i, queue);threads[i] = new Thread(task);}for (int i = 0; i < threads.length; i++) {threads[i].start();}for (int i = 0; i < threads.length; i++) {try {threads[i].join();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("Main:Queue Size:" + queue.size());for (int i = 0; i < threads.length * 100; i++) {Event event = queue.poll();System.out.println("Thread "+ event.getThread()+" :Priority "+event.getPriority());}System.out.println("Main:Queue Size:" + queue.size());System.out.println("Main: End");} }