ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize: 核心线程数
maximumPoolSize: 最大线程数
keepAliveTime:存活时间
unit: 时间单位
workQueue:任务队列
threadFactory:创建线程的工厂
handler:拒绝策略
callerRunsPolicy: 调用者线程执行
AbortPolicy: 拒绝并抛出异常
DiscardPolicy: 拒绝,不做任何事
DiscardOldestPolicy:放弃下一个即将执行的任务,并立即执行新任务
基于CAS(compare and set)
数组队列是一个循环数组,队列少用一个元素,当头等于尾标示队空,尾加1等于头标示队满。
数组的元素用EMPTY(无数据,标示可以入队)和FULL(有数据,标示可以出队)标记指示,数组一开始全部初始化成 EMPTY标示空队列。
EnQue 操作:如果当前队尾位置为EMPTY,标示线程可以在当前位置入队,通过CAS原子操作把该位置设置为FULL,避免其它线程操作这个位置,操作完后修改队尾位置。各个线程竞争新的队尾位置。如下图所示
public class LockFreeQueue {
private AtomicReferenceArray atomicReferenceArray;
//代表为空,没有元素
private static final Integer EMPTY = null;
//头指针,尾指针
AtomicInteger head,tail;
public LockFreeQueue(int size){
atomicReferenceArray = new AtomicReferenceArray(new Integer[size + 1]);
head = new AtomicInteger(0);
tail = new AtomicInteger(0);
}
/**
* 入队
* @param element
* @return
*/
public boolean add(Integer element){
int index = (tail.get() + 1) % atomicReferenceArray.length();
if( index == head.get() % atomicReferenceArray.length()){
System.out.println("当前队列已满,"+ element+"无法入队!");
return false;
}
while(!atomicReferenceArray.compareAndSet(index,EMPTY,element)){
return add(element);
}
tail.incrementAndGet(); //移动尾指针
System.out.println("入队成功!" + element);
return true;
}
/**
* 出队
* @return
*/
public Integer poll(){
if(head.get() == tail.get()){
System.out.println("当前队列为空");
return null;
}
int index = (head.get() + 1) % atomicReferenceArray.length();
Integer ele = (Integer) atomicReferenceArray.get(index);
if(ele == null){ //有可能其它线程也在出队
return poll();
}
while(!atomicReferenceArray.compareAndSet(index,ele,EMPTY)){
return poll();
}
head.incrementAndGet();
System.out.println("出队成功!" + ele);
return ele;
}
}
全部评论