多线程(八)并发容器

多线程(八)并发容器,第1张

文章目录
    • 一、ThreadLocal
      • 1.1 ThreadLocal的一些核心方法
        • 1.1.1 void set(T value)
        • 1.1.2 T get()
        • 1.1.3 void remove()
      • 1.2 ThreadLocalMap实现分析
        • 1.2.1 set方法
        • 1.2.2 getEntry方法
        • 1.2.3 remove方法
        • 1.2.4 Thread.exit()
      • 1.3 ThreadLocal的相关问题
        • 1.3.1 ThreadLocal的使用场景
        • 1.3.2 ThreadLocal中内存泄漏的原因
        • 1.3.3 为什么ThreadLocal的实现中要使用弱引用
        • 1.3.4 ThreadLocal最佳实践
    • 二、CopyOnWriteArrayList
      • 2.1 COW的设计思想
      • 2.2 CopyOnWriteArrayList的实现原理
        • 2.2.1 get方法
        • 2.2.2 add方法
      • 3.3 COW相关问题
        • 3.3.1 COW和读写锁的区别
        • 3.3.2 COW的缺点
    • 三、ConcurrentHashMap
    • 四、ConcurrentLinkedQueue
      • 4.1 ConcurrentLinkedQueue的结构
      • 4.2 元素入队
      • 4.3 元素出队

本系列文章:
  多线程(一)线程与进程、Thread
  多线程(二)Java内存模型、同步关键字
  多线程(三)线程池
  多线程(四)显式锁、队列同步器
  多线程(五)可重入锁、读写锁
  多线程(六)线程间通信机制
  多线程(七)原子 *** 作、阻塞队列
  多线程(八)并发容器
  多线程(九)并发工具类
  多线程(十)多线程编程示例

一、ThreadLocal

  线程安全问题的核心在于多个线程会对同一个临界区共享资源进行 *** 作。锁的解决思路是将对共享资源同一时刻的 *** 作由并发改为串行,但这样会导致效率下降,并且这只是一种解决并发问题的方式,不是唯一方式。
  另外一种思路是:如果每个线程都使用自己的“共享资源”,各自使用各自的,又互相不影响到彼此即让多个线程间达到隔离的状态,这样也不会出现线程安全的问题。这是一种“空间换时间”的方案,ThreadLocal就是使用这种方式的。
  ThreadLocal表示线程的“本地变量”,即每个线程都拥有该变量副本,各用各的,从而避免共享资源的竞争。

1.1 ThreadLocal的一些核心方法 1.1.1 void set(T value)

  set方法用于设置在当前线程中threadLocal变量的值:

	public void set(T value) {
		//1. 获取当前线程实例对象
	    Thread t = Thread.currentThread();
		//2. 通过当前线程实例获取到ThreadLocalMap对象
	    ThreadLocalMap map = getMap(t);
	    if (map != null)
			//3. 如果Map不为null,则以当前threadLocl实例为key,值为value进行存入
	        map.set(this, value);
	    else
			//4.map为null,则新建ThreadLocalMap并存入value
	        createMap(t, value);
	}

  可以看出:value是存放在ThreadLocalMap里,以当前threadLocal实例为 key。

  • getMap(Thread t)
      该方法方法用于获取ThreadLocalMap实例:
	ThreadLocalMap getMap(Thread t) {
	    return t.threadLocals;
	}

  threadLocals变量定义在Thread类中:

	ThreadLocal.ThreadLocalMap threadLocals = null;

  也就是说ThreadLocalMap的引用是作为Thread的一个成员变量,被Thread进行维护的。

  • createMap(t,value)
      在set方法中,当map为空时,会调用createMap(t,value)方法来创建ThreadLocalMap:
	void createMap(Thread t, T firstValue) {
  	    t.threadLocals = new ThreadLocalMap(this, firstValue);
	}

  对 set 方法进行总结:

  • 1、通过当前线程对象thread,获取该thread所维护的成员变量threadLocalMap;
  • 2、若threadLocalMap不为null,则以threadLocal实例为 key,值为value的键值对存入threadLocalMap;
  • 3、若 threadLocalMap为null的话,就新建threadLocalMap,然后再以threadLocal为键,值为value的键值对存入即可。
1.1.2 T get()

  get 方法是获取当前线程中threadLocal变量的值:

	public T get() {
		//1. 获取当前线程的实例对象
	    Thread t = Thread.currentThread();
		//2. 获取当前线程的threadLocalMap
	    ThreadLocalMap map = getMap(t);
	    if (map != null) {
			//3. 获取map中当前threadLocal实例为key的值的entry
	        ThreadLocalMap.Entry e = map.getEntry(this);
	        if (e != null) {
	            @SuppressWarnings("unchecked")
				//4. 当前entitiy不为null的话,就返回相应的值value
	            T result = (T)e.value;
	            return result;
	        }
	    }
		//5. 若map为null或者entry为null的话通过该方法初始化,并返回该方法返回的value
	    return setInitialValue();
	}

  get方法的逻辑,和set方法是相反的。

  • setInitialValue
	private T setInitialValue() {
	    T value = initialValue();
	    Thread t = Thread.currentThread();
	    ThreadLocalMap map = getMap(t);
	    if (map != null)
	        map.set(this, value);
	    else
	        createMap(t, value);
	    return value;
	}
  • initialValue
	protected T initialValue() {
	    return null;
	}

  此处表明:继承ThreadLocal的子类可以重写该方法,实现赋予初始值的逻辑。
  get方法的逻辑:

  • 1、通过当前线程thread实例获取到它所维护的threadLocalMap,以当前threadLocal实例为key获取该map中的键值对(Entry);
  • 2、若Entry不为null则返回Entry的value;
  • 3、如果获取threadLocalMap为null或者Entry为null的话,就以当前threadLocal为Key,value为null存入map后,并返回null。
1.1.3 void remove()

  删除数据

	public void remove() {
		//1. 获取当前线程的threadLocalMap
		ThreadLocalMap m = getMap(Thread.currentThread());
	 	if (m != null)
			//2. 从map中删除以当前threadLocal实例为key的键值对
			m.remove(this);
	}
1.2 ThreadLocalMap实现分析

  ThreadLocalMap是threadLocal的一个静态内部类,threadLocalMap内部维护了一个Entry类型的table数组:

	private Entry[] table;

  table数组的长度为2的幂次方。下Entry:

        static class Entry extends WeakReference<ThreadLocal<?>> {
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

  可以看出:Entry是一个以ThreadLocal为 key,Object为value的键值对
  这里的threadLocal是弱引用,因为Entry继承了 WeakReference,在Entry的构造方法中,调用了super(k)方法就会将threadLocal实例包装成一个 WeakReferenece。
  Thread、ThreadLocal、ThreadLocalMap和Entry之间的关系:

  上图中的实线表示强引用,虚线表示弱引用。

  每个线程实例中可以通过threadLocals获取到threadLocalMap,而threadLocalMap实际上就是一个以threadLocal实例为key,任意对象为value的Entry数组。
  当为threadLocal变量赋值,实际上就是以当前threadLocal实例为key,值为value的Entry往这个threadLocalMap 中存放。
  Entry中的key是弱引用,当threadLocal外部强引用(threadLocalInstance)被置为 null,那么系统GC的时候,根据可达性分析,这个threadLocal实例就没有任何一条链路能够引用到它,这个ThreadLocal势必会被回收,这样一来,ThreadLocalMap中就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value,如果当前线程再迟迟不结束的话,这些key为null的Entry的value就会一直存在一条强引用链:Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value永远无法回收,造成内存泄漏
  如果当前 thread 运行结束,threadLocal、threadLocalMap、Entry没有引用链可达,在垃圾回收的时候都会被系统进行回收。在实际开发中,会使用线程池去维护线程的创建和复用,比如固定大小的线程池,线程为了复用是不会主动结束的。

1.2.1 set方法

  ThreadLocalMap底层是用散列表(哈希表)进行实现的。
  ThreadLocalMap类中的set方法:

        private void set(ThreadLocal<?> key, Object value) {

            Entry[] tab = table;
            int len = tab.length;
            //根据threadLocal的hashCode确定Entry应该存放的位置
            int i = key.threadLocalHashCode & (len-1);
			//采用开放地址法,hash冲突的时候使用线性探测
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();
				//覆盖旧Entry
                if (k == key) {
                    e.value = value;
                    return;
                }
				//当key为null时,说明threadLocal强引用已经被释放掉,那么就无法再通过
				//这个key获取threadLocalMap中对应的entry,这里就存在内存泄漏的可能性
                if (k == null) {
                	//用当前插入的值替换掉这个key为null的“脏”entry
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }
			//新建entry并插入table中i处
            tab[i] = new Entry(key, value);
            int sz = ++size;
            //插入后再次清除一些key为null的“脏”entry,如果大于阈值就需要扩容
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }
  • 1、ThreadLocal的hashcode
	private final int threadLocalHashCode = nextHashCode();
	private static final int HASH_INCREMENT = 0x61c88647;
	private static AtomicInteger nextHashCode =new AtomicInteger();

	private static int nextHashCode() {
	    return nextHashCode.getAndAdd(HASH_INCREMENT);
	}

  0x61c88647这个数是有特殊意义的,它能够保证哈希表的每个散列桶能够均匀的分布。也正是能够均匀分布,所以ThreadLocal选择使用开放地址法来解决hash冲突的问题。

  • 2、新值插入到哈希表中的位置
      源码为:key.threadLocalHashCode & (len-1),因为哈希表大小总是为 2 的幂次方,所以与运算等同于一个取模,这样就可以通过 Key 分配到具体的哈希桶中去。为什么取模要通过位与运算?因为位运算的执行效率远远高于了取模运算。
  • 3、怎样解决hash冲突?
      通过nextIndex(i, len)方法解决hash冲突的问题,该方法为:
        private static int nextIndex(int i, int len) {
            return ((i + 1 < len) ? i + 1 : 0);
        }

  也就是不断往后线性探测,当到哈希表末尾的时候再从0开始,成环形。

  • 4、怎样解决“脏”Entry?
      ThreadLocal有可能存在内存泄漏(对象创建出来后,在之后的逻辑一直没有使用该对象,但是垃圾回收器无法回收这个部分的内存),在源码中针对这种key为 null 的Entry称之为“stale entry”,可以理解为“脏 entry”。
      在set方法的for循环中寻找和当前Key相同的可覆盖entry的过程中,通过replaceStaleEntry方法解决脏entry的问题。
  • 5、扩容
      在第一次为ThreadLocal进行赋值时,会创建初始大小为16的ThreadLocalMap,并且通过 setThreshold 方法设置 threshold,其值为当前哈希数组长度乘以(2/3),也就是说加载因子为 2/3(加载因子是衡量哈希表密集程度的一个参数,如果加载因子越大的话,说明哈希表被装载的越多,出现 hash 冲突的可能性越大,反之,则被装载的越少,出现 hash 冲突的可能性越小。同时如果过小,很显然内存使用率不高,该值取值应该考虑到内存使用率和 hash 冲突概率的一个平衡,如 hashMap,concurrentHashMap 的加载因子都为 0.75)。
      ThreadLocalMap 初始大小为 16,加载因子为 2/3,所以哈希表可用大小为:16*2/3=10,即哈希表可用容量为 10。
      从 set 方法中可以看出当 hash 表的 size 大于 threshold 的时候,会通过 resize 方法进行扩容。
        private void resize() {
            Entry[] oldTab = table;
            int oldLen = oldTab.length;
            //新数组为原数组的2倍
            int newLen = oldLen * 2;
            Entry[] newTab = new Entry[newLen];
            int count = 0;

            for (int j = 0; j < oldLen; ++j) {
                Entry e = oldTab[j];
                if (e != null) {
                    ThreadLocal<?> k = e.get();
                    if (k == null) {
                        e.value = null; // Help the GC
                    } else {
                    	//重新确定entry在新数组的位置,然后进行插入
                        int h = k.threadLocalHashCode & (newLen - 1);
                        while (newTab[h] != null)
                            h = nextIndex(h, newLen);
                        newTab[h] = e;
                        count++;
                    }
                }
            }
			//设置新哈希表的threshHold和size属性
            setThreshold(newLen);
            size = count;
            table = newTab;
        }

  resize方法的逻辑:新建一个大小为原来数组长度的两倍的数组,然后遍历旧数组中的 entry 并将其插入到新的 hash 数组中,主要注意的是,在扩容的过程中针对脏 entry 的话会令 value 为 null,以便能够被垃圾回收器能够回收,解决隐藏的内存泄漏的问题。

1.2.2 getEntry方法
        private Entry getEntry(ThreadLocal<?> key) {
        	//1. 确定在散列数组中的位置
            int i = key.threadLocalHashCode & (table.length - 1);
            //2. 根据索引i获取entry
            Entry e = table[i];
            //3. 满足条件则返回该entry
            if (e != null && e.get() == key)
                return e;
            else
            	//4. 未查找到满足条件的entry,额外处理
                return getEntryAfterMiss(key, i, e);
        }

  getEntryAfterMiss:

        private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;

            while (e != null) {
                ThreadLocal<?> k = e.get();
                if (k == key)
                	//找到和查询的key相同的entry则返回
                    return e;
                if (k == null)
                	//解决脏entry的问题
                    expungeStaleEntry(i);
                else
                	//继续向后环形查找
                    i = nextIndex(i, len);
                e = tab[i];
            }
            return null;
        }

  getEntryAfterMiss的逻辑:通过nextIndex往后环形查找,如果找到和查询的key相同的entry的话就直接返回,如果在查找过程中遇到脏entry的话使用expungeStaleEntry方法进行处理。

1.2.3 remove方法
        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                	//将entry的key置为null
                    e.clear();
                    //将该entry的value也置为null
                    expungeStaleEntry(i);
                    return;
                }
            }
        }

  remove方法的逻辑:通过往后环形查找到与指定key相同的entry后,先通过clear方法将key置为null后,使其转换为一个脏entry,然后调用expungeStaleEntry方法将其value置为null,以便垃圾回收时能够清理,同时将table[i]置为null。

1.2.4 Thread.exit()

  上面的几个方法都是在ThreadLocal中的,接下来的这个方法在Thread中。当线程退出时会执行exit方法:

	private void exit() {
	    if (group != null) {
	        group.threadTerminated(this);
	        group = null;
	    }
	    target = null;
	    threadLocals = null;
	    inheritableThreadLocals = null;
	    inheritedAccessControlContext = null;
	    blocker = null;
	    uncaughtExceptionHandler = null;
	}

  从源码可以看出当线程结束时,会令threadLocals=null,也就意味着GC的时候就可以将threadLocalMap进行垃圾回收,换句话说threadLocalMap生命周期实际上thread的生命周期相同

1.3 ThreadLocal的相关问题 1.3.1 ThreadLocal的使用场景

  ThreadLocal适用于共享对象会造成线程安全 的业务场景。ThreadLocal经典的使用场景是为每个线程分配一个JDBC连接Connection。这样就可以保证每个线程的都在各自的Connection上进行数据库的 *** 作,不会出现A线程关了B线程正在使用的Connection。
  ThreadLocal使用示例:

public class ThreadLocalTest {
    private static ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<>();

    public static void main(String[] args) {
    	ExecutorService executorService = Executors.newFixedThreadPool(2);
    	for (int i = 0; i<10; i++) {
    		executorService.submit(new DateUtil("2021-10-" + (i%10+1)));
    	}
    }

	static class DateUtil implements Runnable {
	    private String date;
	
	    public DateUtil(String date) {
	        this.date = date;
	    }
	
	    @Override
	    public void run() {
	        if (sdf.get() == null) {
	            sdf.set(new SimpleDateFormat("yyyy-MM-dd"));
	        } else {
	            try {
	                Date date = sdf.get().parse(this.date);
	                System.out.println(date);
	            } catch (ParseException e) {
	                e.printStackTrace();
	            } finally {
	            	sdf.remove();
				}
	        }
	    }
	}
}

  结果示例:

Mon Oct 04 00:00:00 CST 2021
Wed Oct 06 00:00:00 CST 2021
Sun Oct 03 00:00:00 CST 2021
Sun Oct 10 00:00:00 CST 2021
Sat Oct 09 00:00:00 CST 2021

  代码逻辑很简单:如果当前线程不持有SimpleDateformat对象实例,那么就新建一个并把它设置到当前线程中;如果已经持有,就直接使用。

1.3.2 ThreadLocal中内存泄漏的原因

  ThreadLocal是为了解决对象不能被多线程共享访问的问题,通过threadLocal.set方法将对象实例保存在每个线程自己所拥有的ThreadLocalMap中,这样每个线程使用自己的对象实例,彼此不会影响达到隔离的作用,从而就解决了对象在被共享访问带来线程安全问题
  ThreadLocal、ThreadLocalMap、Entry之间的关系:

  实线代表强引用,虚线代表的是弱引用,如果threadLocal外部强引用被置为null(threadLocalInstance=null)的话,threadLocal实例就没有一条引用链路可达,很显然在gc(垃圾回收)的时候势必会被回收,因此entry就存在key为null的情况,无法通过一个Key为null去访问到该entry的value。同时,就存在了这样一条引用链:threadRef->currentThread->threadLocalMap->entry->valueRef->valueMemory,导致在垃圾回收的时候进行可达性分析的时候,value可达从而不会被回收掉,但是该value永远不能被访问到,这样就熬成了内存泄漏。

1.3.3 为什么ThreadLocal的实现中要使用弱引用

  通过threadLocal、threadLocalMap、entry的引用关系,看起来threadLocal存在内存泄漏的问题似乎是因为threadLocal是被弱引用修饰的。那为什么要使用弱引用呢?
  假设threadLocal使用的是强引用,在业务代码中执行threadLocalInstance==null *** 作,以清理掉threadLocal实例的目的,但是因为threadLocalMap的Entry强引用threadLocal,因此在gc的时候进行可达性分析,threadLocal依然可达,对threadLocal并不会进行垃圾回收,这样就无法真正达到业务逻辑的目的,出现逻辑错误

  因为堆中的ThreadLocal实例到Entry之间的引用是弱引用,所以当断开ThreadLocalRef到ThreadLocal实例之间的引用(threadLocalInstance = null)时,Entry和ThreadLocal实例之间的引用就是孤零零的弱引用,这样就可以被GC(一旦JVM发现某个弱引用,就会将其回收)。
  假设Entry弱引用threadLocal,尽管会出现内存泄漏的问题,但是在threadLocal的生命周期里(set,getEntry,remove)里,都会针对key为null的脏entry进行处理。

1.3.4 ThreadLocal最佳实践

  每次使用完ThreadLocal,都调用它的remove()方法,清除数据。示例:

        ThreadLocal<M> tl = new ThreadLocal<>();
        tl.set(new M());
        tl.remove();
二、CopyOnWriteArrayList

  CopyOnWriteArrayList可以保证线程安全,保证读读线程之间不会被阻塞,因此CopyOnWriteArrayList被广泛应用于很多读多写少的场景中。

2.1 COW的设计思想

  如果简单的使用读写锁来保证线程安全的话,在写锁被获取之后,读写线程被阻塞,只有当写锁被释放后读线程才有机会获取到锁从而读到最新的数据。站在读线程的角度来看,即读线程任何时候都是获取到最新的数据,满足数据实时性。
  要进行优化,一种思路是牺牲数据实时性满足数据的最终一致性即可,CopyOnWriteArrayList就是实现了这种思路的容器。CopyOnWriteArrayList通过Copy-On-Write(COW),即写时复制的思想来通过延时更新的策略来实现数据的最终一致性,并且能够保证读线程间不阻塞。
  COW通俗的理解是:当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。
  对COW容器进行并发的读的时候,不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,延时更新的策略是通过在写的时候针对的是不同的数据容器来实现的,放弃数据实时性达到数据的最终一致性。

2.2 CopyOnWriteArrayList的实现原理

  CopyOnWriteArrayList内部维护了一个数组:

	private transient volatile Object[] array;
2.2.1 get方法
	public E get(int index) {
	    return get(getArray(), index);
	}

	final Object[] getArray() {
	    return array;
	}
	
	private E get(Object[] a, int index) {
	    return (E) a[index];
	}

  get方法实现非常简单:所有的读线程只是会读取数据容器中的数据,并不会进行修改。

2.2.2 add方法
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
	//1. 使用Lock,保证写线程在同一时刻只有一个
    lock.lock();
    try {
		//2. 获取旧数组引用
        Object[] elements = getArray();
        int len = elements.length;
		//3. 创建新的数组,并将旧数组的数据复制到新数组中
        Object[] newElements = Arrays.copyOf(elements, len + 1);
		//4. 往新数组中添加新的数据
		newElements[len] = e;
		//5. 将旧数组引用指向新的数组
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

  add方法的逻辑:

  1. 采用 ReentrantLock,保证同一时刻只有一个写线程正在进行数组的复制,否则的话内存中会有多份被复制的数据;
  2. 数组引用是volatile修饰的,因此将旧的数组引用指向新的数组,根据volatile的 happens-before 规则,写线程对数组引用的修改对读线程是可见的;
  3. 由于在写数据的时候,是在新的数组中插入数据的,从而保证读写实在两个不同的数据容器中进行 *** 作。
3.3 COW相关问题 3.3.1 COW和读写锁的区别

  COW和ReentrantReadWriteLock两者的相同点:

  1. 两者都是通过读写分离的思想实现;
    2.读线程间是互不阻塞的。

  不同点:使用ReentrantReadWriteLock时,对读线程而言,为了实现数据实时性,在写锁被获取后,读线程会等待或者当读锁被获取后,写线程会等待,从而解决“脏读”等问题。也就是说如果使用读写锁依然会出现读线程阻塞等待的情况。
  COW则完全放开了牺牲数据实时性而保证数据最终一致性,即读线程对数据的更新是延时感知的,因此读线程不会存在等待的情况。

3.3.2 COW的缺点

  CopyOnWrite容器有很多优点,但是同时也存在两个问题:内存占用问题和数据一致性问题。

  • 1、内存占用问题
      因为CopyOnWrite的写时复制机制,所以在进行写 *** 作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对 象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。
      如果这些对象占用的内存比较大,那么这个时候很有可能造成频繁的minor GC和major GC。
  • 2、数据一致性问题
      CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。
三、ConcurrentHashMap

  ConcurrentHashMap相关实现参考;Java集合(五)LinkedHashMap、TreeMap、ConcurrentHashMap。

四、ConcurrentLinkedQueue

  在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。
  ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。
  ConcurrentLinkedQueue用CAS来实现。

4.1 ConcurrentLinkedQueue的结构

  ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

	private transient volatile Node<E> tail = head;
4.2 元素入队

  即把入队节点添加到队列的尾部。
  假设想在一个队列中依次插入4个节点,过程图示:

  过程:

  1. 添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。
  2. 添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。
  3. 添加元素3,设置tail节点的next节点为元素3节点。
  4. 添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。

  入队主要做两件事情:第一是将入队节点设置成当前队列尾节点的下一个节点;第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点。
  从单线程入队的角度理解了入队过程,但是多个线程同时进行入队的情况就变得更加复杂了,因为可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队 *** 作,然后重新获取尾节点。

  • offer(E e)
	public boolean offer(E e) {
		if (e == null) throw new NullPointerException();
			//入队前,创建一个入队节点
			Node<E> n = new Node<E>(e);
			retry:
			//死循环,入队不成功反复入队。
			for (;;) {
				//创建一个指向tail节点的引用
				Node<E> t = tail;
				//p用来表示队列的尾节点,默认情况下等于tail节点。
				Node<E> p = t;
				for (int hops = 0; ; hops++) {
				//获得p节点的下一个节点。
				Node<E> next = succ(p);
				//next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点
				if (next != null) {
					//循环了两次及其以上,并且当前节点还是不等于尾节点
					if (hops > HOPS && t != tail)
					continue retry;
					p = next;
				}
				//如果p是尾节点,则设置p节点的next节点为入队节点。
				else if (p.casNext(null, n)) {
					/*如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点,
					更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点*/
					if (hops >= HOPS)
						casTail(t, n); // 更新tail节点,允许失败
					return true;
				}
				//p有next节点,表示p的next节点是尾节点,则重新设置p节点
				else {
					p = succ(p);
				}
			}
		}
	}

  整个入队过程主要做两件事情:第一是定位出尾节点;第二是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试。

  • 定位尾节点
      tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点。尾节点可能是tail节点,也可能是tail节点的next节点。代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点可能是尾节点。获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加节点,所以需要返回head节点。获取p节点的next节点代码:
	final Node<E> succ(Node<E> p) {
		Node<E> next = p.getNext();
		return (p == next) head : next;
	}
  • 设置入队节点为尾节点
      p.casNext(null,n)方法用于将入队节点设置为当前队列尾节点的next节点,如果p是null,表示p是当前队列的尾节点,如果不为null,表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。
4.3 元素出队

  元素出队的就是从队列里返回一个节点元素,并清空该节点对元素的引用。

  并不是每次出队时都更新head节点,当head节点里有元素时,直接d出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队 *** 作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。

  • poll
	public E poll() {
		Node<E> h = head;
		//p表示头节点,需要出队的节点
 		Node<E> p = h;
		for (int hops = 0;; hops++) {
			//获取p节点的元素
			E item = p.getItem();
			//如果p节点的元素不为空,使用CAS设置p节点引用的元素为null,
			//如果成功则返回p节点的元素。
			if (item != null && p.casItem(item, null)) {
				if (hops >= HOPS) {
					//将p节点下一个节点设置成head节点
					Node<E> q = p.getNext();
					updateHead(h, (q != null) q : p);
				}
				return item;
			}
			//如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外
			//一个线程修改了。那么获取p节点的下一个节点
			Node<E> next = succ(p);
			//如果p的下一个节点也为空,说明这个队列已经空了
			if (next == null) {
				//更新头节点。
				updateHead(h, p);
				break;
			}
			//如果下一个元素不为空,则将头节点的下一个节点设置成头节点
			p = next;
		}
		return null;
	}

  首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队 *** 作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队 *** 作更新了head节点,导致元素发生了变化,需要重新获取头节点。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/924053.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存