山西教育学会网站建设,甘肃做网站找谁,网站标题格式,稿定设计app免费版官方1.Netty的两大性能优化工具 (1)FastThreadLocal FastThreadLocal的作用与ThreadLocal相当#xff0c;但比ThreadLocal更快。ThreadLocal的作用是多线程访问同一变量时能够通过线程本地化的方式避免多线程竞争、实现线程隔离。 Netty的FastThreadLocal重新实现了JDK的ThreadLoc…1.Netty的两大性能优化工具 (1)FastThreadLocal FastThreadLocal的作用与ThreadLocal相当但比ThreadLocal更快。ThreadLocal的作用是多线程访问同一变量时能够通过线程本地化的方式避免多线程竞争、实现线程隔离。 Netty的FastThreadLocal重新实现了JDK的ThreadLocal的功能且访问速度更快但前提是使用FastThreadLocalThread线程。 (2)Recycler Recycler实现了一个轻量级的对象池机制。对象池的作用是一方面可以实现快速创建对象另一方面可以避免反复创建对象、减少YGC压力。 Netty使用Recycler的方式来获取ByteBuf对象的原因是ByteBuf对象的创建在Netty里是非常频繁的且又比较占空间。但是如果对象比较小使用对象池也不是那么划算。 2.FastThreadLocal的实现之构造方法 Netty为FastThreadLocal量身打造了FastThreadLocalThread和InternalThreadLocalMap两个重要的类FastThreadLocal的构造方法会设置一个由final修饰的int型变量index该index变量的值会由InternalThreadLocalMap的静态方法通过原子类来进行获取。
//A special variant of ThreadLocal that yields higher access performance when accessed from a FastThreadLocalThread.
//Internally, a FastThreadLocal uses a constant index in an array, instead of using hash code and hash table, to look for a variable.
//Although seemingly very subtle, it yields slight performance advantage over using a hash table, and it is useful when accessed frequently.
//To take advantage of this thread-local variable, your thread must be a FastThreadLocalThread or its subtype.
//By default, all threads created by DefaultThreadFactory are FastThreadLocalThread due to this reason.
//Note that the fast path is only possible on threads that extend FastThreadLocalThread,
//because it requires a special field to store the necessary state.
//An access by any other kind of thread falls back to a regular ThreadLocal.
//param V the type of the thread-local variable
public class FastThreadLocalV {//每个FastThreadLocal都有一个唯一的身份标识IDprivate final int index;//类初始化时调用所以默认为variablesToRemoveIndex 0//第n个值存放在数组下标为n的位置下标为0的位置存所有FastThreadLocalVprivate static final int variablesToRemoveIndex InternalThreadLocalMap.nextVariableIndex();public FastThreadLocal() {index InternalThreadLocalMap.nextVariableIndex();}...
}//The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals.
//Note that this class is for internal use only and is subject to change at any time.
//Use FastThreadLocal unless you know what you are doing.
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {public static final Object UNSET new Object();public static int nextVariableIndex() {int index nextIndex.getAndIncrement();if (index 0) {nextIndex.decrementAndGet();throw new IllegalStateException(too many thread-local indexed variables);}return index;}...
}//The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals.
//Note that this class is for internal use only and is subject to change at any time.
//Use FastThreadLocal unless you know what you are doing.
class UnpaddedInternalThreadLocalMap {static final ThreadLocalInternalThreadLocalMap slowThreadLocalMap new ThreadLocalInternalThreadLocalMap();static final AtomicInteger nextIndex new AtomicInteger();//Used by FastThreadLocalObject[] indexedVariables;...
} 3.FastThreadLocal的实现之get()方法 (1)FastThreadLocalThread的关键属性 FastThreadLocal继承了Thread类每个FastThreadLocalThread线程对应一个InternalThreadLocalMap实例。只有FastThreadLocal和FastThreadLocalThread线程组合在一起使用的时候才能发挥出FastThreadLocal的性能优势。
public class FastThreadLocalThread extends Thread {private InternalThreadLocalMap threadLocalMap;public FastThreadLocalThread() { }public FastThreadLocalThread(Runnable target) {super(target);}public FastThreadLocalThread(ThreadGroup group, Runnable target) {super(group, target);}public FastThreadLocalThread(String name) {super(name);}public FastThreadLocalThread(ThreadGroup group, String name) {super(group, name);}public FastThreadLocalThread(Runnable target, String name) {super(target, name);}public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) {super(group, target, name);}public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) {super(group, target, name, stackSize);}//Returns the internal data structure that keeps the thread-local variables bound to this thread. //Note that this method is for internal use only, and thus is subject to change at any time.public final InternalThreadLocalMap threadLocalMap() {return threadLocalMap;}//Sets the internal data structure that keeps the thread-local variables bound to this thread.//Note that this method is for internal use only, and thus is subject to change at any time.public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {//这个方法会在调用InternalThreadLocalMap.get()方法时被调用//具体就是通过fastGet()方法设置FastThreadLocalThread一个新创建的InternalThreadLocalMap对象this.threadLocalMap threadLocalMap;}
} (2)Thread与ThreadLocalMap 注意每个Thread线程对应一个ThreadLocalMap实例。ThreadLocal.ThreadLocalMap是采用线性探测法来解决哈希冲突的。
public class Thread implements Runnable {...//ThreadLocal values pertaining to this thread. This map is maintained by the ThreadLocal class. ThreadLocal.ThreadLocalMap threadLocals null;...
}public class ThreadLocalT {...//Returns the value in the current threads copy of this thread-local variable. //If the variable has no value for the current thread, //it is first initialized to the value returned by an invocation of the #initialValue method.//return the current threads value of this thread-localpublic T get() {Thread t Thread.currentThread();ThreadLocalMap map getMap(t);if (map ! null) {ThreadLocalMap.Entry e map.getEntry(this);if (e ! null) {SuppressWarnings(unchecked)T result (T)e.value;return result;}}return setInitialValue();}//ThreadLocalMap is a customized hash map suitable only for maintaining thread local values. //No operations are exported outside of the ThreadLocal class. //The class is package private to allow declaration of fields in class Thread. //To help deal with very large and long-lived usages, the hash table entries use WeakReferences for keys. //However, since reference queues are not used, //stale entries are guaranteed to be removed only when the table starts running out of space.static class ThreadLocalMap {//The entries in this hash map extend WeakReference, using its main ref field as the key (which is always a ThreadLocal object). //Note that null keys (i.e. entry.get() null) mean that the key is no longer referenced, so the entry can be expunged from table. //Such entries are referred to as stale entries in the code that follows.static class Entry extends WeakReferenceThreadLocal? {//The value associated with this ThreadLocal.Object value;Entry(ThreadLocal? k, Object v) {super(k);value v;}}//The initial capacity -- MUST be a power of two.private static final int INITIAL_CAPACITY 16;//The table, resized as necessary.//table.length MUST always be a power of two.private Entry[] table;//The number of entries in the table.private int size 0;//The next size value at which to resize.private int threshold; // Default to 0...}...
} (3)FastThreadLocal.get()方法的实现流程 首先获取由当前FastThreadLocalThread线程维护的InternalThreadLocalMap。然后通过唯一标识当前FastThreadLocal对象的索引index从InternalThreadLocalMap中取出数组元素值。如果取出的值是缺省的则对该数组元素进行初始化并将当前FastThreadLocal对象保存到待清理的Set中。
//A special variant of ThreadLocal that yields higher access performance when accessed from a FastThreadLocalThread.
//Internally, a FastThreadLocal uses a constant index in an array, instead of using hash code and hash table, to look for a variable.
//Although seemingly very subtle, it yields slight performance advantage over using a hash table, and it is useful when accessed frequently.
//To take advantage of this thread-local variable, your thread must be a FastThreadLocalThread or its subtype.
//By default, all threads created by DefaultThreadFactory are FastThreadLocalThread due to this reason.
//Note that the fast path is only possible on threads that extend FastThreadLocalThread,
//because it requires a special field to store the necessary state.
//An access by any other kind of thread falls back to a regular ThreadLocal.
//param V the type of the thread-local variable
public class FastThreadLocalV {//每个FastThreadLocal都有一个唯一的身份标识ID//每个FastThreadLocal对应的V值存储在当前FastThreadLocalThread线程维护的InternalThreadLocalMap的下标为index的位置private final int index;//类初始化时调用所以默认为variablesToRemoveIndex 0//第n个值存放在数组下标为n的位置下标为0的位置会存储所有FastThreadLocalVprivate static final int variablesToRemoveIndex InternalThreadLocalMap.nextVariableIndex();public FastThreadLocal() {//每new一个FastThreadLocalindex就会自增1所以index是FastThreadLocal的唯一身份IDindex InternalThreadLocalMap.nextVariableIndex();}//Returns the current value for the current threadSuppressWarnings(unchecked)public final V get() {//首先获取由当前FastThreadLocalThread线程维护的InternalThreadLocalMapInternalThreadLocalMap threadLocalMap InternalThreadLocalMap.get();//从数组中取出index位置的元素Object v threadLocalMap.indexedVariable(index);//如果获取到的元素不是一个UNSET即一个new Object()则返回该元素if (v ! InternalThreadLocalMap.UNSET) {return (V) v;}//如果获取到的数组元素是缺省对象则对threadLocalMap在index位置的元素值执行初始化操作return initialize(threadLocalMap);}private V initialize(InternalThreadLocalMap threadLocalMap) {V v null;try {//通过initialValue()方法对threadLocalMap在index位置的元素值进行初始化//initialValue()方法可以被FastThreadLocalV的子类重写v initialValue();} catch (Exception e) {PlatformDependent.throwException(e);}//设置threadLocalMap数组在下标index处的元素值threadLocalMap.setIndexedVariable(index, v);addToVariablesToRemove(threadLocalMap, this);return v;}//Returns the initial value for this thread-local variable.protected V initialValue() throws Exception {return null;}private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal? variable) {//获取threadLocalMap数组下标为0的元素Object v threadLocalMap.indexedVariable(variablesToRemoveIndex);SetFastThreadLocal? variablesToRemove;//将variable添加到数组下标为0位置的Set集合中以便可以通过remove()方法统一删除if (v InternalThreadLocalMap.UNSET || v null) {//创建FastThreadLocal类型的Set集合variablesToRemove Collections.newSetFromMap(new IdentityHashMapFastThreadLocal?, Boolean());//将variablesToRemove这个Set集合设置到数组下标为0的位置threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);} else {//强转获得Set集合variablesToRemove (SetFastThreadLocal?) v;}variablesToRemove.add(variable);}...
} (4)从InternalThreadLocalMap中获取值 InternalThreadLocalMap.get()方法会分别通过fastGet()和slowGet()来获取一个InternalThreadLocalMap对象。 如果当前线程是普通线程则调用slowGet()方法让每个线程通过JDK的ThreadLocal来拿到一个InternalThreadLocalMap对象所以如果普通线程使用FastThreadLocal其实和普通线程使用ThreadLocal是一样的。 如果当前线程是FastThreadLocalThread线程则调用fastGet()方法由于FastThreadLocalThread线程维护了一个InternalThreadLocalMap对象所以fastGet()方法相当于直接从线程对象里把这个InternalThreadLocalMap拿出来。 注意Reactor线程所创建的线程实体便是FastThreadLocalThread线程。
//The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals.
//Note that this class is for internal use only and is subject to change at any time.
//Use FastThreadLocal unless you know what you are doing.
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {public static final Object UNSET new Object();private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE 32;...private InternalThreadLocalMap() {//设置父类的成员变量indexedVariables的初始值super(newIndexedVariableTable());}private static Object[] newIndexedVariableTable() {//初始化一个32个元素的数组Object[] array new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];//每个元素都是UNSET值Arrays.fill(array, UNSET);return array;}//index是当前访问的FastThreadLocal在JVM里的索引//indexedVariables数组是当前线程维护的InternalThreadLocalMap对象在初始化时创建的public Object indexedVariable(int index) {Object[] lookup indexedVariables;//直接通过索引来取出对象return index lookup.length? lookup[index] : UNSET;}public static InternalThreadLocalMap get() {Thread thread Thread.currentThread();if (thread instanceof FastThreadLocalThread) {return fastGet((FastThreadLocalThread) thread);} else {return slowGet();}}private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {InternalThreadLocalMap threadLocalMap thread.threadLocalMap();if (threadLocalMap null) {thread.setThreadLocalMap(threadLocalMap new InternalThreadLocalMap());}return threadLocalMap;}private static InternalThreadLocalMap slowGet() {//如果普通线程使用FastThreadLocal其实和普通线程使用ThreadLocal是一样的//因为此时返回的是一个通过ThreadLocal维护的InternalThreadLocalMap对象ThreadLocalInternalThreadLocalMap slowThreadLocalMap UnpaddedInternalThreadLocalMap.slowThreadLocalMap;InternalThreadLocalMap ret slowThreadLocalMap.get();if (ret null) {ret new InternalThreadLocalMap();slowThreadLocalMap.set(ret);}return ret;}...
}class UnpaddedInternalThreadLocalMap {static final ThreadLocalInternalThreadLocalMap slowThreadLocalMap new ThreadLocalInternalThreadLocalMap();static final AtomicInteger nextIndex new AtomicInteger();//Used by FastThreadLocalObject[] indexedVariables;...UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {this.indexedVariables indexedVariables;}...
} InternalThreadLocalMap的indexedVariable()方法中的入参index索引指的是每一个FastThreadLocal对象在JVM里的标识ID通过自增的方式由原子类进行创建。 对于当前线程维护的InternalThreadLocalMap对象里的数组indexedVariables可以通过下标方式indexedVariables[index]获取一个Object。 初始化InternalThreadLocalMap对象时会初始化一个32个元素的indexedVariables数组每一个元素都是UNSET值。 4.FastThreadLocal的实现之set()方法 (1)FastThreadLocal.set()方法的实现流程 首先获取由当前FastThreadLocalThread线程维护的InternalThreadLocalMap。然后通过唯一标识当前FastThreadLocal对象的索引index给InternalThreadLocalMap数组中对应的位置设置值。接着将当前FastThreadLocal对象保存到待清理的Set中如果设置的值是一个缺省的Object对象则通过remove()方法删除InternalThreadLocalMap数组中对应位置的元素。
public class FastThreadLocalV {//每个FastThreadLocal都有一个唯一的身份标识ID//每个FastThreadLocal对应的V值存储在当前FastThreadLocalThread线程维护的InternalThreadLocalMap的下标为index的位置 private final int index;//类初始化时调用所以默认为variablesToRemoveIndex 0//第n个值存放在数组下标为n的位置下标为0的位置会存储所有FastThreadLocalVprivate static final int variablesToRemoveIndex InternalThreadLocalMap.nextVariableIndex();...//Set the value for the current thread.public final void set(V value) {if (value ! InternalThreadLocalMap.UNSET) {InternalThreadLocalMap threadLocalMap InternalThreadLocalMap.get();setKnownNotUnset(threadLocalMap, value);} else {remove();}}private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {//将当前FastThreadLocal对象对应的数据添加到当前线程维护的InternalThreadLocalMap中if (threadLocalMap.setIndexedVariable(index, value)) {//将当前FastThreadLocal对象保存到待清理的Set中addToVariablesToRemove(threadLocalMap, this);}}private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal? variable) {//获取threadLocalMap数组下标为0的元素Object v threadLocalMap.indexedVariable(variablesToRemoveIndex);SetFastThreadLocal? variablesToRemove;//将variable添加到数组下标为0位置的Set集合中以便可以通过remove()方法统一删除if (v InternalThreadLocalMap.UNSET || v null) {//创建FastThreadLocal类型的Set集合variablesToRemove Collections.newSetFromMap(new IdentityHashMapFastThreadLocal?, Boolean());//将variablesToRemove这个Set集合设置到数组下标为0的位置threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);} else {//强转获得Set集合variablesToRemove (SetFastThreadLocal?) v;}variablesToRemove.add(variable);}//Sets the value to uninitialized; //a proceeding call to get() will trigger a call to initialValue().public final void remove() {remove(InternalThreadLocalMap.getIfSet());}//Sets the value to uninitialized for the specified thread local map;//a proceeding call to get() will trigger a call to initialValue().//The specified thread local map must be for the current thread.SuppressWarnings(unchecked)public final void remove(InternalThreadLocalMap threadLocalMap) {if (threadLocalMap null) {return;}//删除数组下标index位置对应的valueObject v threadLocalMap.removeIndexedVariable(index);//从数组下标0的位置取出Set集合删除当前FastThreadLocal对象removeFromVariablesToRemove(threadLocalMap, this);if (v ! InternalThreadLocalMap.UNSET) {try {//和initValue()方法一样可以被FastThreadLocal的子类重写onRemoval((V) v);} catch (Exception e) {PlatformDependent.throwException(e);}}}//Returns the initial value for this thread-local variable.protected V initialValue() throws Exception {return null;}//Invoked when this thread local variable is removed by #remove().//Be aware that #remove() is not guaranteed to be called when the Thread completes which means //you can not depend on this for cleanup of the resources in the case of Thread completion.protected void onRemoval(SuppressWarnings(UnusedParameters) V value) throws Exception {}...
} FastThreadLocal的addToVariablesToRemove()方法解释了为什么InternalThreadLocalMap的value数据是从下标1的位置开始进行存储的因为下标0的位置存储的是一个Set集合集合里面的元素是FastThreadLocal对象。 (2)向InternalThreadLocalMap设置值
//The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals.
//Note that this class is for internal use only and is subject to change at any time.
//Use FastThreadLocal unless you know what you are doing.
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {public static final Object UNSET new Object();...//index是当前访问的FastThreadLocal在JVM里的索引//indexedVariables数组是当前线程维护的InternalThreadLocalMap对象在初始化时创建的public Object indexedVariable(int index) {Object[] lookup indexedVariables;//直接通过索引来取出对象return index lookup.length? lookup[index] : UNSET;}//如果设置的是新值则返回true如果设置的是旧值则返回falsepublic boolean setIndexedVariable(int index, Object value) {Object[] lookup indexedVariables;if (index lookup.length) {Object oldValue lookup[index];//直接将数组index位置的元素设置为value时间复杂度为O(1)lookup[index] value;return oldValue UNSET;} else {//扩容数组expandIndexedVariableTableAndSet(index, value);return true;}}//通过无符号右移和位或运算实现2^n * 2这与HashMap的扩容原理是一样的private void expandIndexedVariableTableAndSet(int index, Object value) {Object[] oldArray indexedVariables;final int oldCapacity oldArray.length;int newCapacity index;//假设index 16也就是1000newCapacity | newCapacity 1;//变为1100newCapacity | newCapacity 2;//变为1111newCapacity | newCapacity 4;//还是1111newCapacity | newCapacity 8;//还是1111newCapacity | newCapacity 16;//还是1111newCapacity ;//变为10000Object[] newArray Arrays.copyOf(oldArray, newCapacity);Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);newArray[index] value;indexedVariables newArray;}//根据不同的Thread返回InternalThreadLocalMappublic static InternalThreadLocalMap getIfSet() {Thread thread Thread.currentThread();if (thread instanceof FastThreadLocalThread) {return ((FastThreadLocalThread) thread).threadLocalMap();}return slowThreadLocalMap.get();}...
} (3)InternalThreadLocalMap的扩容原理 InternalThreadLocalMap的数组扩容原理和JDK的HashMap扩容原理是一样的。注意InternalThreadLocalMap是以index为基准进行扩容而不是以原数组长度为基准进行扩容。 假设初始化了70个FastThreadLocal但这些FastThreadLocal从来没有调用过set()方法此时InternalThreadLocalMap数组的长度还是默认的32。当index 70的FastThreadLocal调用set()方法时就不能以32为基准进行两倍扩容了。 5.FastThreadLocal的总结 (1)FastThreadLocal不一定比ThreadLocal快 只有FastThreadLocalThread线程使用FastThreadLocal才会更快如果普通线程使用FastThreadLocal其实和普通线程使用ThreadLocal是一样的。 (2)FastThreadLocal并不会浪费很大的空间 虽然FastThreadLocal采用了空间换时间的思路但其设计之初就认为不会存在特别多的FastThreadLocal对象而且在数据中没有使用的元素只是存放了同一个缺省对象的引用(UNSET)所以并不会占用太多内存空间。 (3)FastThreadLocal如何实现高效查找 一.FastThreadLocal
在定位数据时可以直接根据数组下标index进行定位时间复杂度为O(1)所以在数据较多时也不会存在Hash冲突。在进行数组扩容时只需要把数组容量扩容2倍然后再把原数据拷贝到新数组。 二.ThreadLocal
在定位数据时是根据哈希算法进行定位的在数据较多时容易发生Hash冲突。发生Hash冲突时采用线性探测法解决Hash冲突要不断向下寻找效率较低。在进行数组扩容时由于采用了哈希算法所以在数组扩容后需要再做一轮rehash。 (4)FastThreadLocal具有更高的安全性 FastThreadLocal不仅提供了remove()方法可以主动清除对象而且在线程池场景中(也就是SingleThreadEventExecutor和DefaultThreadFactory)Netty还封装了FastThreadLocalRunnable。 FastThreadLocalRunnable最后会执行FastThreadLocal的removeAll()方法将Set集合中所有的FastThreadLocal对象都清理掉。 ThreadLocal使用不当可能会造成内存泄露只能等待线程销毁。或者只能通过主动检测的方式防止内存泄露从而增加了额外的开销。 6.Recycler的设计理念 (1)对象池和内存池都是为了提高并发处理能力 Java中频繁创建和销毁对象的开销是很大的所以通常会将一些对象缓存起来。当需要某个对象时优先从对象池中获取对象实例。通过重用对象不仅避免了频繁创建和销毁对象带来的性能损耗而且对于JVM GC也是友好的这就是对象池的作用。 (2)Recycler是Netty提供实现的轻量级对象池 通过Recycler在创建对象时就不需要每次都通过new的方式去创建了。如果Recycler里面有已经用过的对象则可以直接把这个对象取出来进行二次利用。在不需要该对象时只需要把它放到Recycler里以备下次需要时取出。 7.Recycler的使用 定义一个对象池实例RECYCLER时先实现newObject()方法。如果对象池里没有可用的对象就会调用newObject()方法来新建对象。此外需要创建Recycler.Handle对象与T对象进行绑定这样才可以通过RECYCLER.get()方法从对象池中获取对象。如果对象不再需要使用则通过调用T类实现的recycle()方法将对象回收到对象池。
public class RecycleTest {private static final RecyclerUser RECYCLER new RecyclerUser() {protected User newObject(HandleUser handle) {return new User(handle);}};private static class User {//创建Recycler.HandleUser对象与User对象进行绑定private final Recycler.HandleUser handle;public User(Recycler.HandleUser handle) {this.handle handle;}public void recycle() {handle.recycle(this);}}public static void main(String[] args) {//1.从对象池中获取User对象User user1 RECYCLER.get();//2.回收对象到对象池user1.recycle();//3.从对象池中获取对象User user2 RECYCLER.get();System.out.println(user1 user2);}
} 8.Recycler的四个核心组件 (1)Recycler的初始化和get()方法
public abstract class RecyclerT {...private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD 4 * 1024;//Use 4k instances as default.private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;private static final int MAX_SHARED_CAPACITY_FACTOR;private static final int MAX_DELAYED_QUEUES_PER_THREAD;private static final int LINK_CAPACITY;private static final int RATIO;static {int maxCapacityPerThread SystemPropertyUtil.getInt(io.netty.recycler.maxCapacityPerThread,SystemPropertyUtil.getInt(io.netty.recycler.maxCapacity, DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));if (maxCapacityPerThread 0) {maxCapacityPerThread DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;}DEFAULT_MAX_CAPACITY_PER_THREAD maxCapacityPerThread;MAX_SHARED_CAPACITY_FACTOR max(2, SystemPropertyUtil.getInt(io.netty.recycler.maxSharedCapacityFactor, 2));MAX_DELAYED_QUEUES_PER_THREAD max(0, SystemPropertyUtil.getInt(io.netty.recycler.maxDelayedQueuesPerThread, NettyRuntime.availableProcessors() * 2)); LINK_CAPACITY safeFindNextPositivePowerOfTwo(max(SystemPropertyUtil.getInt(io.netty.recycler.linkCapacity, 16), 16));RATIO safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt(io.netty.recycler.ratio, 8));...}private static final FastThreadLocalMapStack?, WeakOrderQueue DELAYED_RECYCLED new FastThreadLocalMapStack?, WeakOrderQueue() {Overrideprotected MapStack?, WeakOrderQueue initialValue() {return new WeakHashMapStack?, WeakOrderQueue();}};private final int maxCapacityPerThread;private final int maxSharedCapacityFactor;private final int interval;private final int maxDelayedQueuesPerThread;private final FastThreadLocalStackT threadLocal new FastThreadLocalStackT() {Overrideprotected StackT initialValue() {return new StackT(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, interval, maxDelayedQueuesPerThread);}Overrideprotected void onRemoval(StackT value) {//Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overheadif (value.threadRef.get() Thread.currentThread()) {if (DELAYED_RECYCLED.isSet()) {DELAYED_RECYCLED.get().remove(value);}}}};protected Recycler() {this(DEFAULT_MAX_CAPACITY_PER_THREAD);}protected Recycler(int maxCapacityPerThread) {this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);}protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);}protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) { interval safeFindNextPositivePowerOfTwo(ratio);if (maxCapacityPerThread 0) {this.maxCapacityPerThread 0;this.maxSharedCapacityFactor 1;this.maxDelayedQueuesPerThread 0;} else {this.maxCapacityPerThread maxCapacityPerThread;this.maxSharedCapacityFactor max(1, maxSharedCapacityFactor);this.maxDelayedQueuesPerThread max(0, maxDelayedQueuesPerThread);}}SuppressWarnings(unchecked)public final T get() {if (maxCapacityPerThread 0) {return newObject((HandleT) NOOP_HANDLE);}StackT stack threadLocal.get();DefaultHandleT handle stack.pop();if (handle null) {//创建一个DefaultHandlehandle stack.newHandle();//创建一个对象并绑定这个DefaultHandlehandle.value newObject(handle);}return (T) handle.value;}...
} (2)第一个核心组件是Stack Stack是整个对象池的顶层数据结构。Stack描述了整个对象池的构造用于存储当前同线程回收的对象这里同线程的意思就是和后续的异线程相对的。在多线程的场景下Netty为了避免锁竞争问题每个线程都会持有各自的Stack。Recycler会通过FastThreadLocal实现每个线程对Stack的私有化。
public abstract class RecyclerT {...private static final class StackT {//所属的Recyclerfinal RecyclerT parent;//所属线程的弱引用final WeakReferenceThread threadRef;//异线程回收对象时其他线程能保存的被回收对象的最大个数final AtomicInteger availableSharedCapacity;//WeakOrderQueue最大个数private final int maxDelayedQueues;//对象池的最大大小默认最大为4Kprivate final int maxCapacity;//存储缓存数据的数组DefaultHandle?[] elements;//缓存的DefaultHandle对象个数int size;//WeakOrderQueue链表的三个重要结点private WeakOrderQueue cursor, prev;private volatile WeakOrderQueue head;Stack(RecyclerT parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int interval, int maxDelayedQueues) { this.parent parent;threadRef new WeakReferenceThread(thread);this.maxCapacity maxCapacity;availableSharedCapacity new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));elements new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];this.interval interval;handleRecycleCount interval;//Start at interval so the first one will be recycled.this.maxDelayedQueues maxDelayedQueues;}...}...
} Stack的结构如下图示其中Thread A是同线程Thread B、Thread C、Thread D是异线程。 (3)第二个核心组件是WeakOrderQueue WeakOrderQueue的作用是用于存储其他线程(异线程)回收由当前线程所分配的对象并且在合适的时机Stack会从异线程的WeakOrderQueue中收割对象。
public abstract class RecyclerT {private static final int LINK_CAPACITY;static {...LINK_CAPACITY safeFindNextPositivePowerOfTwo(max(SystemPropertyUtil.getInt(io.netty.recycler.linkCapacity, 16), 16)); ...}...//a queue that makes only moderate guarantees about visibility: items are seen in the correct order,//but we arent absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintainprivate static final class WeakOrderQueue extends WeakReferenceThread {...static final class Link extends AtomicInteger {final DefaultHandle?[] elements new DefaultHandle[LINK_CAPACITY];int readIndex;Link next;}private static final class Head {private final AtomicInteger availableSharedCapacity;Link link;...}...}...
} (4)第三个核心组件是Link 每个WeakOrderQueue中都包含一个Link链表。回收对象都会被存放在Link链表的结点上每个Link结点默认存储16个对象。当每个Link结点存储满了会创建新的Link结点并放入链表尾部。 (5)第四个核心组件是DefaultHandle 一个对象在发起回收时需要调用DefaultHandle的recycle()方法进行具体的回收处理这个要被回收的对象会保存在DefaultHandle中。 Stack和WeakOrderQueue都使用DefaultHandle来存储被回收的对象。在Stack和Link中都有一个elements数组该数组保存的就是一个个的DefaultHandle实例。
public abstract class RecyclerT {...private static final class DefaultHandleT implements HandleT {int lastRecycledId;int recycleId;boolean hasBeenRecycled;Stack? stack;Object value;DefaultHandle(Stack? stack) {this.stack stack;}Overridepublic void recycle(Object object) {if (object ! value) {throw new IllegalArgumentException(object does not belong to handle);}Stack? stack this.stack;if (lastRecycledId ! recycleId || stack null) {throw new IllegalStateException(recycled already);}stack.push(this);}}...
} 9.Recycler的初始化 创建Recycler的方法是直接new一个Recycler。每个Recycler里都有一个threadLocal变量即FastThreadLocalStackT。所以每个Recycler里对于每个线程都会有一个Stack对象。调用Recycler的get()方法获取T类对象时便会初始化一个Stack对象。
public abstract class RecyclerT {...private final FastThreadLocalStackT threadLocal new FastThreadLocalStackT() {//在Recycler中调用threadLocal.get()时便会触发调用这个initialValue()方法Overrideprotected StackT initialValue() {return new StackT(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, interval, maxDelayedQueuesPerThread);}Overrideprotected void onRemoval(StackT value) {//Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overheadif (value.threadRef.get() Thread.currentThread()) {if (DELAYED_RECYCLED.isSet()) {DELAYED_RECYCLED.get().remove(value);}}}};private final int interval;private final int maxCapacityPerThread;private final int maxSharedCapacityFactor;private final int maxDelayedQueuesPerThread;protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) { //默认是8用于控制对象的回收比率interval safeFindNextPositivePowerOfTwo(ratio);if (maxCapacityPerThread 0) {this.maxCapacityPerThread 0;this.maxSharedCapacityFactor 1;this.maxDelayedQueuesPerThread 0;} else {//对象池的最大大小能存多少元素默认4Kthis.maxCapacityPerThread maxCapacityPerThread;//默认是2this.maxSharedCapacityFactor max(1, maxSharedCapacityFactor);//默认2倍CPU核数this.maxDelayedQueuesPerThread max(0, maxDelayedQueuesPerThread);}}private static final class StackT {...//异线程回收对象时其他线程能保存的被回收对象的最大个数final AtomicInteger availableSharedCapacity;Stack(RecyclerT parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int interval, int maxDelayedQueues) { ...availableSharedCapacity new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));...}...}...
} 当调用对象的recycle()方法将对象进行回收的时候并非每次都会把对象进行回收而是通过interval参数来控制回收比率。 Stack的availableSharedCapacity表示当前线程创建的对象能够在其他线程里缓存的最大个数它由Recycler的maxSharedCapacityFactor和maxCapacityPerThread计算得出默认为2K。 Recycler的maxDelayedQueuesPerThread表示的是当前线程创建的对象可以在多少个线程里缓存或者说一个Stack中拥有WeakOrderQueue的最大个数默认是2倍CPU核数。 如果在当前的线程里通过Recycler的get()方法获取一个对象那么这个对象其实是由当前线程的Stack进行管理的。如果这个对象是在其他线程中进行释放的那么该对象不会放到当前线程的Stack的数组里而是放到其他线程的WeakOrderQueue的Link链表元素的数组里。 每个线程都有一个Stack变量每个Stack里都有一个DefaultHandle数组这个DefaultHandle数组用于存放由该线程创建的但已被回收的对象。 每个DefaultHandle都包装了一个被回收的对象并且这个DefaultHandle可以被外部对象引用来回收这个对象。 10.Recycler的对象获取 从Recycler中获取对象的过程是 一.获取当前线程缓存的Stack 二.从Stack里弹出对象
Stack.pop()返回的是一个DefaultHandle对象如果对象不为空则直接返回否则进行下一步。 三.创建对象并绑定到DefaultHandle
一个DefaultHandle对象会和一个Stack对象进行绑定多对一。一个DefaultHandle对象也会和一个Object对象进行绑定一对一。 总结起来就是当Stack中的DefaultHandle数组有对象数据时直接从栈顶弹出。当Stack中的DefaultHandle数组没有对象数据时尝试从WeakOrderQueue中回收一个Link包含的对象实例到Stack中然后再从栈顶弹出。当Stack中的DefaultHandle数组没有对象数据以及WeakOrderQueue的Link链表中也没有对象数据时那么就通过Recycler子类实现的newObject()方法来创建一个对象进行返回。
public abstract class RecyclerT {...public final T get() {...//获取当前线程缓存的StackStackT stack threadLocal.get();//从Stack中弹出一个DefaultHandle对象DefaultHandleT handle stack.pop();if (handle null) {//创建一个DefaultHandlehandle stack.newHandle();//创建一个对象并保存到DefaultHandlehandle.value newObject(handle);}return (T) handle.value;}//由Recycler的子类来实现创建对象protected abstract T newObject(HandleT handle);private static final class StackT {...DefaultHandleT pop() {int size this.size;if (size 0) {//尝试从其他线程回收的对象中转移一些到Stack的DefaultHandle数组中if (!scavenge()) {return null;}size this.size;if (size 0) {return null;}}size --;//将对象实例从DefaultHandle数组(elements)的栈顶弹出DefaultHandle ret elements[size];elements[size] null;//As we already set the element[size] to null we also need to store the updated size before we do any validation. //Otherwise we may see a null value when later try to pop again without a new element added before.this.size size;if (ret.lastRecycledId ! ret.recycleId) {throw new IllegalStateException(recycled multiple times);}ret.recycleId 0;ret.lastRecycledId 0;return ret;}DefaultHandleT newHandle() {return new DefaultHandleT(this);}...}...
} 11.Recycler的对象回收 (1)回收对象的入口 回收线程的对象主要是靠Stack和WeakOrderQueue回收对象的入口是DefaultHandle的recycle()方法该方法会调用Stack的push()方法进行具体的回收处理。
public abstract class RecyclerT {...public final T get() {...//获取当前线程缓存的StackStackT stack threadLocal.get();//从Stack中弹出一个DefaultHandle对象DefaultHandleT handle stack.pop();if (handle null) {//创建一个DefaultHandlehandle stack.newHandle();//创建一个对象并保存到DefaultHandlehandle.value newObject(handle);}return (T) handle.value;}//从Recycler的get()方法可知//一个对象在创建时就会和一个新创建的DefaultHandle绑定//而该DefaultHandle又会和创建该对象的线程绑定因为DefaultHandle会和Stack绑定而Stack又会绑定创建该对象的线程 private static final class DefaultHandleT implements HandleT {...Stack? stack;Object value;DefaultHandle(Stack? stack) {this.stack stack;}Overridepublic void recycle(Object object) {...Stack? stack this.stack;stack.push(this);}}private static final class StackT {//一个线程调用Recycler.get()方法创建一个对象时会先获取和该线程绑定的Stack//然后由该Stack创建一个DefaultHandle接着再创建一个对象并将对象保存到这个DefaultHandle中final WeakReferenceThread threadRef;...//当一个对象调用它绑定的DefaultHandle进行回收时可以通过threadRef取到创建它的线程void push(DefaultHandle? item) {Thread currentThread Thread.currentThread();if (threadRef.get() currentThread) {//The current Thread is the thread that belongs to the Stack, we can try to push the object now.pushNow(item);} else {//The current Thread is not the one that belongs to the Stack (or the Thread that belonged to the Stack was collected already), //we need to signal that the push happens later.pushLater(item, currentThread);}}private void pushNow(DefaultHandle? item) {//防止被多次回收if ((item.recycleId | item.lastRecycledId) ! 0) {throw new IllegalStateException(recycled already);}item.recycleId item.lastRecycledId OWN_THREAD_ID;int size this.size;//通过dropHandle(item)来实现回收每8个对象中的一个if (size maxCapacity || dropHandle(item)) {//超出最大容量或被回收速率控制则不回收return;}if (size elements.length) {//扩容elements Arrays.copyOf(elements, min(size 1, maxCapacity));}elements[size] item;this.size size 1;}//回收每8个对象中的一个boolean dropHandle(DefaultHandle? handle) {if (!handle.hasBeenRecycled) {//interval默认是8if (handleRecycleCount interval) {handleRecycleCount;//Drop the object.return true;}handleRecycleCount 0;handle.hasBeenRecycled true;}return false;}private void pushLater(DefaultHandle? item, Thread thread) {...//取出由当前线程帮助其他线程回收对象的缓存MapStack?, WeakOrderQueue delayedRecycled DELAYED_RECYCLED.get();//取出对象绑定的Stack对应的WeakOrderQueueWeakOrderQueue queue delayedRecycled.get(this);if (queue null) {//最多帮助2 * CPU核数的线程回收对象if (delayedRecycled.size() maxDelayedQueues) {//WeakOrderQueue.DUMMY就是new一个WeakOrderQueue表示无法再帮该Stack回收对象delayedRecycled.put(this, WeakOrderQueue.DUMMY);return;}//新建WeakOrderQueueif ((queue newWeakOrderQueue(thread)) null) {//drop objectreturn;}delayedRecycled.put(this, queue);} else if (queue WeakOrderQueue.DUMMY) {//drop objectreturn;}//添加对象到WeakOrderQueue的Link链表中queue.add(item);}//Allocate a new WeakOrderQueue or return null if not possible.private WeakOrderQueue newWeakOrderQueue(Thread thread) {return WeakOrderQueue.newQueue(this, thread);}...}//每个线程都有一个MapStack?, WeakOrderQueue存放了其他线程及其对应WeakOrderQueue的映射private static final FastThreadLocalMapStack?, WeakOrderQueue DELAYED_RECYCLED new FastThreadLocalMapStack?, WeakOrderQueue() {Overrideprotected MapStack?, WeakOrderQueue initialValue() {return new WeakHashMapStack?, WeakOrderQueue();}};...private static final class WeakOrderQueue extends WeakReferenceThread {...static final class Link extends AtomicInteger {//一个大小为16的DefaultHandle数组final DefaultHandle?[] elements new DefaultHandle[LINK_CAPACITY];int readIndex;Link next;}static WeakOrderQueue newQueue(Stack? stack, Thread thread) {//We allocated a Link so reserve the spaceif (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {return null;}final WeakOrderQueue queue new WeakOrderQueue(stack, thread);stack.setHead(queue);return queue;}private WeakOrderQueue(Stack? stack, Thread thread) {super(thread);tail new Link();head new Head(stack.availableSharedCapacity);head.link tail;interval stack.interval;handleRecycleCount interval;}void add(DefaultHandle? handle) {handle.lastRecycledId id;if (handleRecycleCount interval) {handleRecycleCount;return;}handleRecycleCount 0;Link tail this.tail;int writeIndex;if ((writeIndex tail.get()) LINK_CAPACITY) {Link link head.newLink();if (link null) {return;}this.tail tail tail.next link;writeIndex tail.get();}tail.elements[writeIndex] handle;handle.stack null;tail.lazySet(writeIndex 1);}...}
} (2)同线程回收对象 在当前线程创建对象也在当前线程进行回收。 Stack的pushNow()方法的主要逻辑是回收每8个对象中的一个以及将回收的对象存放到Stack.elements数组时可能需要的扩容处理。 (3)异线程回收对象 在当前线程创建对象但在其他线程进行回收。 说明一
Stack.pushLater()方法中的DELAYED_RECYCLED是一个FastThreadLocal类型的对象可以通过这个FastThreadLocal对象获取一个MapStack, WeakOrderQueue。 所以每个线程都会有一个Map这个Map的key是Stack这个Map的value是WeakOrderQueue。每个线程的Map表示的是对这个线程来说不同的Stack对应着其他线程绑定的Stack而由其他线程创建的对象会存放在其他线程绑定的Stack的WeakOrderQueue里。 使用Map结构是为了防止DELAYED_RECYCLED内存膨胀使用Map结构也可以在当前线程中帮忙快速定位其他线程应该对应于当前线程Stack的哪个WeakOrderQueue。从delayedRecycled.size() maxDelayedQueues可以看出每个线程最多帮助2倍CPU核数的线程回收对象。 说明二
在pushLater(item, currentThread)中假设currentThread是线程2里面的this是线程1的Stack。 那么queue delayedRecycled.get(this)会去拿线程1对应的WeakOrderQueue。拿到后若queue null那么就说明线程2从来没有回收过线程1的对象。 queue newWeakOrderQueue(thread)会创建一个WeakOrderQueue也就是去为线程2(thread)去分配一个线程1的Stack对应的WeakOrderQueue然后通过delayedRecycled.put(this, queue)绑定线程1。 说明三
WeakOrderQueue中有一个Link链表。一个Link的大小是16每个元素都是一个DefaultHandle。为什么一个Link对应16个DefaultHandle而不是一个Link对应一个DefaultHandle。 如果一个Link对应一个DefaultHandle那么每次同线程去回收异线程都要创建一个Link对象。比如线程2去回收线程1创建的对象每次都需要创建一个Link对象然后添加到Link链表中并且每次都需要判断当前线程1是否还允许继续分配一个Link。 把多个DefaultHandle放在一个Link中的好处是不必每次都要判断当前线程2能否回收线程1的对象只需要判断当前Link是否满了即可这也体现了Netty进行优化处理的一个思路也就是通过批量的方式减少某些操作的频繁执行。 说明四
创建WeakOrderQueue对象时也就是执行代码new WeakOrderQueue(stack, thread)使用的是头插法往当前线程的Stack添加WeakOrderQueue对象。 说明五
WeakOrderQueue的add()方法添加对象到WeakOrderQueue时首先会将tail结点的Link填满然后再新创建一个Link插入到tail结点。其中LINK_CAPACITY表示Link的DefaultHandle数组的最大长度默认是16。 12.异线程收割对象 说明一
线程的Stack里有3指针head、pre、cursor。往Stack中插入一个WeakOrderQueue都是往头部插入的(头插法)。head指向第一个WeakOrderQueuecursor指向当前回收对象的WeakOrderQueue。 说明二
scavenge()方法会从其他线程回收的对象中尝试转移对象。如果成功则返回否则设置cursor为head以便下次从头开始获取。 说明三
scavengeSome()方法会首先判断cursor是否为null。如果cursor为null则设置cursor为head结点。接着执行do while循环不断尝试从其他线程的WeakOrderQueue中转移一些对象到当前线程的Stack的一个DefaultHandle数组中。 说明四
cursor.transfer()方法会把WeakOrderQueue里的对象传输到当前线程的Stack的elements数组里。如果传输成功就结束do while循环如果传输失败就获取cursor的下一个结点cursor.next继续处理。 说明五
cursor.transfer()方法每次都只取WeakOrderQueue里的一个Link然后传输Link里的elements数组元素到目标Stack的elements数组中。如果cursor null或success true则do while循环结束。所有的WeakOrderQueue默认时最多总共可以有2K / 16 128个Link。
public abstract class RecyclerT {...private static final class StackT {private WeakOrderQueue cursor, prev;private volatile WeakOrderQueue head;...//尝试从其他线程回收的对象中转移一些到elements数组private boolean scavenge() {if (scavengeSome()) {return true;}prev null;cursor head;return false;}private boolean scavengeSome() {WeakOrderQueue prev;WeakOrderQueue cursor this.cursor;//首先判断cursor是否为null如果cursor为null则设置cursor为head结点if (cursor null) {prev null;cursor head;if (cursor null) {return false;}} else {prev this.prev;}boolean success false;do {//从其他线程的WeakOrderQueue也就是cursor中//转移一些对象到当前线程的StackT的一个DefaultHandle数组中if (cursor.transfer(this)) {success true;break;}WeakOrderQueue next cursor.getNext();...cursor next;} while (cursor ! null !success);this.prev prev;this.cursor cursor;return success;}}private static final class WeakOrderQueue extends WeakReferenceThread {...//从当前的WeakOrderQueue中转移一些对象到目标StackT的一个DefaultHandle数组中boolean transfer(Stack? dst) {Link head this.head.link;if (head null) {return false;}if (head.readIndex LINK_CAPACITY) {if (head.next null) {return false;}head head.next;this.head.relink(head);}final int srcStart head.readIndex;int srcEnd head.get();final int srcSize srcEnd - srcStart;if (srcSize 0) {return false;}final int dstSize dst.size;final int expectedCapacity dstSize srcSize;if (expectedCapacity dst.elements.length) {final int actualCapacity dst.increaseCapacity(expectedCapacity);srcEnd min(srcStart actualCapacity - dstSize, srcEnd);}if (srcStart ! srcEnd) {final DefaultHandle[] srcElems head.elements;final DefaultHandle[] dstElems dst.elements;int newDstSize dstSize;for (int i srcStart; i srcEnd; i) {DefaultHandle? element srcElems[i];if (element.recycleId 0) {element.recycleId element.lastRecycledId;} else if (element.recycleId ! element.lastRecycledId) {throw new IllegalStateException(recycled already);}srcElems[i] null;if (dst.dropHandle(element)) {//Drop the object.continue;}element.stack dst;dstElems[newDstSize ] element;}if (srcEnd LINK_CAPACITY head.next ! null) {//Add capacity back as the Link is GCed.this.head.relink(head.next);}head.readIndex srcEnd;if (dst.size newDstSize) {return false;}dst.size newDstSize;return true;} else {//The destination stack is full already.return false;}}...}...
} 13.Recycler的总结 (1)获取对象和回收对象的思路总结 对象池有两个重要的组成部分Stack和WeakOrderQueue。 从Recycler获取对象时优先从Stack中查找可用对象。如果Stack中没有可用对象会尝试从WeakOrderQueue迁移一些对象到Stack中。 Recycler回收对象时分为同线程对象回收和异线程对象回收这两种情况。同线程回收直接向Stack添加对象异线程回收会向WeakOrderQueue中的最后一个Link添加对象。 同线程回收和异线程回收都会控制回收速率默认每8个对象会回收一个其他的全部丢弃。 (2)获取对象的具体步骤总结 如何从一个对象池里获取对象 步骤一首先通过FastThreadLocal方式拿到当前线程的Stack。 步骤二如果这个Stack里的elements数组有对象则直接弹出。如果这个Stack里的elements数组没有对象则从当前Stack关联的其他线程的WeakOrderQueue里的Link结点的elements数组中转移对象到当前Stack里的elements数组里。 步骤三如果转移成功那么当前Stack里的elements数组就有对象了这时就可以直接弹出。如果转移失败那么接下来就直接创建一个对象然后和当前Stack进行关联。 步骤四关联之后后续如果是当前线程自己进行对象回收则将该对象直接存放到当前线程的Stack里。如果是其他线程进行对象回收则将该对象存放到其他线程与当前线程的Stack关联的WeakOrderQueue里。 (3)对象池的设计核心 为什么要分同线程和异线程进行处理并设计一套比较复杂的数据结构因为对象池的使用场景一般是高并发的环境希望通过对象池来减少对象的频繁创建带来的性能损耗。所以在高并发的环境下从对象池中获取对象和回收对象就只能通过以空间来换时间的思路进行处理而ThreadLocal恰好是通过以空间换时间的思路来实现的因此引入了FastThreadLocal来管理对象池里的对象。但是如果仅仅使用FastThreadLocal管理同线程创建和回收的对象那么并不能充分体现对象池的作用。所以通过FastThreadLocal获取的Stack对象应该不仅可以管理同线程的对象也可以管理异线程的对象。为此Recycler便分同线程和异线程进行处理并设计了一套比较复杂的数据结构。 14.Netty设计模式之单例模式 (1)单例模式的特点 一.一个类全局只有一个对象
二.延迟创建
三.避免线程安全问题 (2)单例模式的例子
public class Singleton {private volatile static Singleton singleton;private Singleton() {}public static Singleton getInstance() {if (singleton null) {synchronized(Singleton.class) {if (singleton null) {singleton new Singleton();}}}return singleton;}
} (3)Netty中的单例模式 Netty中的单例模式大都使用饿汉模式比如ReadTimeoutException、MqttEncoder。
public final class ReadTimeoutException extends TimeoutException {public static final ReadTimeoutException INSTANCE PlatformDependent.javaVersion() 7 ? new ReadTimeoutException(true) : new ReadTimeoutException();ReadTimeoutException() {}private ReadTimeoutException(boolean shared) {super(shared);}
}ChannelHandler.Sharable
public final class MqttEncoder extends MessageToMessageEncoderMqttMessage {public static final MqttEncoder INSTANCE new MqttEncoder();private MqttEncoder() {}...
} 文章转载自东阳马生架构 原文链接Netty源码—9.性能优化和设计模式 - 东阳马生架构 - 博客园 体验地址JNPF快速开发平台