线程的状态
- NEW(初始化状态)
- RUNNABLE(可运行 / 运行状态)
- BLOCKED(阻塞状态)
- WAITING(无时限等待)
- TIMED_WAITING(有时限等待)
- TERMINATED(终止状态)
线程池
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize: 核心线程数
-
maximumPoolSize: 最大线程数
-
keepAliveTime:存活时间
-
unit: 时间单位
-
workQueue:任务队列
-
threadFactory:创建线程的工厂
-
handler:拒绝策略
并发模型
- 流水线模型
- 任务拆分
- 每个线程干一类事儿
- 每个线程包含一个任务队列和一个步骤的队列引用
- 并行工作者模型
- 生产者消费者模型
- Fork-Join模型
无锁队列
基于CAS(compare and set)
-
数组队列是一个循环数组,队列少用一个元素,当头等于尾标示队空,尾加1等于头标示队满。
-
数组的元素用EMPTY(无数据,标示可以入队)和FULL(有数据,标示可以出队)标记指示,数组一开始全部初始化成 EMPTY标示空队列。
-
EnQue 操作:如果当前队尾位置为EMPTY,标示线程可以在当前位置入队,通过CAS原子操作把该位置设置为FULL,避免其它线程操作这个位置,操作完后修改队尾位置。各个线程竞争新的队尾位置。如下图所示
public class LockFreeQueue {
private AtomicReferenceArray atomicReferenceArray;
//代表为空,没有元素
private static final Integer EMPTY = null;
//头指针,尾指针
AtomicInteger head,tail;
public LockFreeQueue(int size){
atomicReferenceArray = new AtomicReferenceArray(new Integer[size + 1]);
head = new AtomicInteger(0);
tail = new AtomicInteger(0);
}
/**
* 入队
* @param element
* @return
*/
public boolean add(Integer element){
int index = (tail.get() + 1) % atomicReferenceArray.length();
if( index == head.get() % atomicReferenceArray.length()){
System.out.println("当前队列已满,"+ element+"无法入队!");
return false;
}
while(!atomicReferenceArray.compareAndSet(index,EMPTY,element)){
return add(element);
}
tail.incrementAndGet(); //移动尾指针
System.out.println("入队成功!" + element);
return true;
}
/**
* 出队
* @return
*/
public Integer poll(){
if(head.get() == tail.get()){
System.out.println("当前队列为空");
return null;
}
int index = (head.get() + 1) % atomicReferenceArray.length();
Integer ele = (Integer) atomicReferenceArray.get(index);
if(ele == null){ //有可能其它线程也在出队
return poll();
}
while(!atomicReferenceArray.compareAndSet(index,ele,EMPTY)){
return poll();
}
head.incrementAndGet();
System.out.println("出队成功!" + ele);
return ele;
}
}