多线程编程

340人浏览 / 0人评论

线程的状态

  • NEW(初始化状态)
  • RUNNABLE(可运行 / 运行状态)
  • BLOCKED(阻塞状态)
  • WAITING(无时限等待)
  • TIMED_WAITING(有时限等待)
  • TERMINATED(终止状态)

线程池

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
  • corePoolSize: 核心线程数

  • maximumPoolSize: 最大线程数

  • keepAliveTime:存活时间

  • unit: 时间单位

  • workQueue:任务队列

  • threadFactory:创建线程的工厂

  • handler:拒绝策略

    • callerRunsPolicy: 调用者线程执行

    • AbortPolicy: 拒绝并抛出异常

    • DiscardPolicy: 拒绝,不做任何事

    • DiscardOldestPolicy:放弃下一个即将执行的任务,并立即执行新任务

并发模型

  • 流水线模型
    • 任务拆分
    • 每个线程干一类事儿
    • 每个线程包含一个任务队列和一个步骤的队列引用
  • 并行工作者模型
    • 每个线程干相同的事儿
  • 生产者消费者模型
    • 基于消息队列实现
  • Fork-Join模型
    • map-reduce
    • 拆分任务

无锁队列

基于CAS(compare and set)

  • 数组队列是一个循环数组,队列少用一个元素,当头等于尾标示队空,尾加1等于头标示队满。

  • 数组的元素用EMPTY(无数据,标示可以入队)和FULL(有数据,标示可以出队)标记指示,数组一开始全部初始化成 EMPTY标示空队列。

  • EnQue 操作:如果当前队尾位置为EMPTY,标示线程可以在当前位置入队,通过CAS原子操作把该位置设置为FULL,避免其它线程操作这个位置,操作完后修改队尾位置。各个线程竞争新的队尾位置。如下图所示

public class LockFreeQueue {
 
    private AtomicReferenceArray atomicReferenceArray;
    //代表为空,没有元素
    private static final  Integer EMPTY = null;
    //头指针,尾指针
    AtomicInteger head,tail;
 
 
    public LockFreeQueue(int size){
        atomicReferenceArray = new AtomicReferenceArray(new Integer[size + 1]);
        head = new AtomicInteger(0);
        tail = new AtomicInteger(0);
    }
 
    /**
     * 入队
     * @param element
     * @return
     */
    public boolean add(Integer element){
        int index = (tail.get() + 1) % atomicReferenceArray.length();
        if( index == head.get() % atomicReferenceArray.length()){
            System.out.println("当前队列已满,"+ element+"无法入队!");
            return false;
        }
        while(!atomicReferenceArray.compareAndSet(index,EMPTY,element)){
            return add(element);
        }
        tail.incrementAndGet(); //移动尾指针
        System.out.println("入队成功!" + element);
        return true;
    }
 
    /**
     * 出队
     * @return
     */
    public Integer poll(){
        if(head.get() == tail.get()){
            System.out.println("当前队列为空");
            return null;
        }
        int index = (head.get() + 1) % atomicReferenceArray.length();
        Integer ele = (Integer) atomicReferenceArray.get(index);
        if(ele == null){ //有可能其它线程也在出队
            return poll();
        }
        while(!atomicReferenceArray.compareAndSet(index,ele,EMPTY)){
            return poll();
        }
        head.incrementAndGet();
        System.out.println("出队成功!" + ele);
        return ele;
    }
}

全部评论