Netty Recycler源码解析
背景
如果我们项目中会大量使用某一种对象,反复的创建这些对象,然后GC又反复的回收这些对象,这无疑增加的GC的负担,影响系统的性能,我们可以使用对象池来实现这类对象的复用,当对象用完之后返还给对象池,如果需要使用这类对象的时候就从对象池中去取得,在netty中会大量的使用一些类型的对象,netty实现线程级别的对象池来管理这些对象。
netty中如何定义可复用类型
- 创建一个对象复用池
- 可复用类必须实现recycle方法
- 可复用对象需要持有Handle的引用
Handle是netty定义的接口默认实现是DefaultHandle,Handle提供了实现对象回收的方法
一个简单的例子
import io.netty.util.Recycler;
import io.netty.util.internal.ObjectPool;
public class Recycler_T {
//productObjectPoolRecycle是Product类型的对象池
static ObjectPool<Product> productObjectPoolRecycle = ObjectPool.newPool(new ObjectPool.ObjectCreator<Product>() {
@Override
public Product newObject(ObjectPool.Handle<Product> handle) {
return new Product(handle);
}
});
static class Product{
ObjectPool.Handle<Product> handle ;
Product(ObjectPool.Handle<Product> handle){
this.handle = handle ;
}
void recycle(){
this.handle.recycle(this);
}
}
public static void main(String[] args) {
Product p1 = productObjectPoolRecycle.get();
p1.recycle();
Product p2 = productObjectPoolRecycle.get();
Product p3 = productObjectPoolRecycle.get();
p3.recycle();
Product p4 = productObjectPoolRecycle.get();
System.out.println(p1==p2);
System.out.println(p3==p4);
}
}
上面的例子说明了要定义一个可复用类型对象需要满足的条件:
1)创建对象池
2)复用类型Product需要持有Handle的引用
3)提供对象回收方法(例如recycle)。
在main方法中我们从对象池中获取产品p1,这个时候由于对象池中还没有这个对象,于是对象池内部会通过调用recycler的newObject方法去创建一个Product,当我们使用完p1之后通过调用recycle方法把p1回收到对象池中,之后我们再次从对象池中获取p2,我们获取到的p2和p1指向的是相同的对象。但是当我们比较p3和p4的时候发现他们是不同的。为什么p1和p2相同但是p3和p4不同呢,继续读下去,你会得到答案
原理之前
netty实现的对象池都是线程级别的,也就是说在netty中针对某一类型的复用对象每个线程都有自己的对象池,线程之间互不共享,这避免的线程之间的同步开销。
原理
先上图为敬
原理.png
先解释上图中的一些主要元素的意义
-
stack
stack是每个线程对象池的具体实现,从图中可以看出每个线程都会有自己的stack
-
stack -> elements
stack中的elements属性指向的是一个默认长度为256的DefaultHandle数组,数组的每个元素代表的是一个被回收的对象
-
DefaultHandle
DefaultHandle是对象实现复用的入口,它具有两个关键的属性stack和value,stack指向的是线程对象池,value指向的是可复用对象,线程对象池存放的其实是DefaultHandle对象
DefaultHandle.png
-
stack -> head
head的类型是WeakOrderQueue,指向一个WeakOrderQueue的链表
-
stack -> cursor
cursor的类型是WeakOrderQueue,我们在下面解析如何从对象池中获取对象的时候会详细解释
-
stack -> pre
pre的类型是WeakOrderQueue,我们在下面解析如何从对象池中获取对象的时候会详细解释
-
WeakOrderQueue
从上面我们知道netty为了避免对象复用的时候需要进行线程同步,所以对象池都是线程化的。每个线程都有自己的对象池,每次需要对象的时候都从本线程的对象池中取得,但是对于对象的回收却并不是必须由本线程去做的,可以由别的线程帮本线程回收对象,当然为了实现回收的时候不需要对stack的elements对象进行加锁同步,当一个线程为另一个线程的对象池回收对象的时候,会为那个对象池的stack创建一个WeakOrderQueue,然后在WeakOrderQueue中实现对另外一个线程复用对象的回收。当有多个外部线程给某一个线程回收对象的时候就会创建多个WeakOrderQueue,这些WeakOrderQueue就组成了一个链表,stack的head永远指向最新加入的WeakOrderQueue
-
WeakOrderQueue -> next
指向链表的下一个WeakOrderQueue节点
-
WeakOrderQueue -> id
每个WeakOrderQueue都有自己的id
-
WeakOrderQueue -> head
指向Head对象,Head的会在下面进行解释
-
WeakOrderQueue -> tail
指向下面描述的Link链表的尾部
-
Head
Head是连接WeakOrderQueue和Link的中间体,它记录了其他线程最大可以为别的线程回收多少个对象,以及它提供了指向真正存放回收对象的Link链表
Head.png
-
Link
本线程回收的对象会存在stack的elements数组中,其他线程帮本线程回收的对象存在那个线程创建的Link对象中,从上图可以看出Link中也有一个elements对象,这个属性指向是的是长度为16的DefaultHandle数组
Link.png
这个Link有两个‘指针’:readIndex和writeIndex,因为Link继承AtomicInteger,所以它自身就当做writeIndex,readIndex代表的是从elements数组readIndex位置获取复用对象,每次获取一个复用对象readIndex都会+1,writeIndex代表的是当回收对象的时候从elements的writeIndex位置回收,每次回收一个对象writeIndex +1,当readIndex = writeIndex表示在这个Link中没有对象可以被复用。这里需要注意一点的是elements中的复用对象不是直接返回给用户的,它先会被转移到stack的elements数组中,然后在从stack的elements数组中找到一个复用对象返回给用户。
对象回收源码解析
先解析可复用对象是如何被回收的,这样在解析如何从对象池中获取复用对象的时候,我们就能明白这些复用对象是如何存进来的,被存在什么位置
reccycle.png
上图就是回收复用对象的方法调用链,那么我看下stack.push发生了什么
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) {
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
pushNow(item);
} else {
// The current Thread is not the one that belongs to the Stack
// (or the Thread that belonged to the Stack was collected already), we need to signal that the push
// happens later.
pushLater(item, currentThread);
}
}
stack中会有一个threadRef的属性,这个属性指向了本stack所属的Thread(每个线程都有自己的stack)。上面代码的意思是:如果是本线程回收自己的复用对象走pushNow方法,如果是别的线程帮本线程回收复用对象那么走pushLater方法
push.png
下面我们分两种情况来分别讨论:
-
pushNow
private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
//设置recycleId和lastRecycledId
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
// size是当前对象池中已经存在的复用对象的数量,maxCapacity代表的
//是对象池中允许的最大复用对象数量,如果size > maxCapacity那么就不会回收这个对象
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
//当回收条件满足,如果size等于对象池现在的长度,那么把对象池容量扩
//到min(2 * size,maxCapacity)
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
//把本次回收的对象加入到elements数组中
elements[size] = item;
//更新stack中回收对象的数量
this.size = size + 1;
}
上面我们注意到有一个dropHandle方法,这个方法用来判断要不要回收这个对象
boolean dropHandle(DefaultHandle<?> handle) {
//hasBeenRecycled表示这个对象是否被回收过
if (!handle.hasBeenRecycled) {
//如果是第一次被回收,那么判断handleRecycleCount是不是小于
//interval,handleRecycleCount和interval默认值是8,所以当本线程
//对象池第一次回收对象的时候这个对象会被回收,之后是每8个
//可回收对象只回收一个:这这样做的目的是防止对象池增长过快
//这就是我们文章开头p3和p4不同的原因
if (handleRecycleCount < interval) {
handleRecycleCount++;
// Drop the object.
return true;
}
handleRecycleCount = 0;
handle.hasBeenRecycled = true;
}
return false;
}
上面本线程回收自己的复用对象的逻辑
-
pushLater
如果是别的线程帮助本线程回收复用对象,那么执行这个方法
private void pushLater(DefaultHandle<?> item, Thread thread) {
//maxDelayedQueues表示的是一个线程可以帮助多少个别的线程回收
//对象,默认是16,如果设置成0表示不允许外部线程帮自己回收对象
if (maxDelayedQueues == 0) {
// We don't support recycling across threads and should just drop the item on the floor.
return;
}
// we don't want to have a ref to the queue as the value in our weak map
// so we null it out; to ensure there are no races with restoring it later
// we impose a memory ordering here (no-op on x86)
// DELAYED_RECYCLED是一个FastThreadLocal,
//每个线程会绑定一个Map<Stack,WeakOrderQueue>容器
//这个容器存放的是本线程帮别的线程回收对象的数据结构
//容器key是stack表示的是别的线程的对象池
//容器value是WeakOrderQueue,表示的是这个线程帮别的线程回收对象的容器
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
//如果WeakOrderQueue还没有创建
if (delayedRecycled.size() >= maxDelayedQueues) {
// Add a dummy queue so we know we should drop the object
//如果本线程的Map<Stack,WeakOrderQueue> 的size已经大于了
//maxDelayedQueues那么针对本stack直接设置一个WeakOrderQueue.DUMMY
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
//创建WeakOrderQueue对象
if ((queue = newWeakOrderQueue(thread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
//如果queue是DUMMY那么不回收这个对象
return;
}
//回收item
queue.add(item);
}
- newWeakOrderQueue实现
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
// We allocated a Link so reserve the space
//netty在默认的情况会限制一个stack允许别的线程帮自己回收对象的最大数量
//availableSharedCapacity = maxCapacity/2也就是2048,
//Head.reserveSpaceForLink方法的作用就是检查下目前
//availableSharedCapacity剩下的空间是不是够创建一个Link所需要的
//空间,默认创建一个Link需要的空间是16,
//如果可以把availableSharedCapacity减少16,返回true,否则反之
if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {
return null;
}
//创建WeakOrderQueue,在这里面会初始化tWeakOrderQueue的tail
//Head属性就像那副原理图中展示的那样,在最初的时候tail和Head的
//link属性指向同一个Link
final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
// Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
// may be accessed while its still constructed.
// 因为这里可能并发调用,所以这是个同步方法,把stack的head属性指
//向这个WeakOrderQueue,可看出head永远指向最新为这个stack创建的WeakOrderQueue
stack.setHead(queue);
return queue;
}
reserveSpaceForLink 代码
static boolean reserveSpaceForLink(AtomicInteger availableSharedCapacity) {
for (;;) {
int available = availableSharedCapacity.get();
if (available < LINK_CAPACITY) {//默认是16
return false;
}
if (availableSharedCapacity.compareAndSet(available, available - LINK_CAPACITY)) {
return true;
}
}
}
}
stack.setHead源码
synchronized void setHead(WeakOrderQueue queue) {
queue.setNext(head);
head = queue;
}
经过上面的过程如果这个对象的回收条件满足那么就使用WeakOrderQueue.add方法进行对象回收
void add(DefaultHandle<?> handle) {
handle.lastRecycledId = id;
// While we also enforce the recycling ratio one we transfer objects from the WeakOrderQueue to the Stack
// we better should enforce it as well early. Missing to do so may let the WeakOrderQueue grow very fast
// without control if the Stack
//这里和上面的dropHandle功能相同
if (handleRecycleCount < interval) {
handleRecycleCount++;
// Drop the item to prevent recycling to aggressive.
return;
}
handleRecycleCount = 0;
//tail永远都指向用于存放最新回收对象的Link,也就是Link链表中的最后一个节点
Link tail = this.tail;
int writeIndex;
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
//如果writeIndex 等于LINK_CAPACITY说明这个Link的空间已经用完
//了,那么新建Link,新建Link的时候由于每个Link需要占用16个空间
//所以head.newLink()会去判断stack.availableSharedCapacity剩余空
//间是否大于16,如果剩余空间不足,不会创建新的Link,这个对象也就不会被回收
Link link = head.newLink();
if (link == null) {
// Drop it.
return;
}
// We allocate a Link so reserve the space
//创建Link成功,更新WeakOrderQueue.tail为当前最新的Link
this.tail = tail = tail.next = link;
writeIndex = tail.get();
}
//把回收对象放入Link的elements数组中
tail.elements[writeIndex] = handle;
handle.stack = null;
// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
// this also means we guarantee visibility of an element in the queue if we see the index updated
//更新Link的writeIndex
tail.lazySet(writeIndex + 1);
}
上面就是WeakOrderQueue回收对象的过程
Head.newLink源码如下
Link newLink() {
return reserveSpaceForLink(availableSharedCapacity) ? new Link() : null;
}
到此就介绍了整个对象回收过程
接下来我们来看下获取复用对象过程
从对象池中获取复用对象的流程如下
obj_get.png
核心方法是Recycler.get
public final T get() {
//maxCapacity是每个线程可以缓存复用对象的最大数量,默认是4096
if (maxCapacityPerThread == 0) {
//maxCapacityPerThread等于0,那么每次在对象池中获取对象都是新建
return newObject((Handle<T>) NOOP_HANDLE);
}
//threadLocal是FastThreadLocal<Stack<T>>类型,表示每个线程拥有自己的Stack
Stack<T> stack = threadLocal.get();
//通过pop方法从本线程的Stack表示的对象池中获取对象
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
//如果从本线程的stack中没有获得到可复用对象,那么通过自定义的newObject去创建目标对象
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
stack.pop
废话不多说,直接上源码解析
DefaultHandle<T> pop() {
//size代表的是stack的elements数组目前存放的可复用对象的数量
int size = this.size;
//如果size等于0表示目前stack的elements没有存任何可复用的DefaultHandle
if (size == 0) {
//scavenge方法是从其他线程帮助本线程回收的对象中去找可复用对象
if (!scavenge()) {
//如果从其他线程中帮组本线程回收对象的存储结构中也没有找到可用对象那么直接返回null
return null;
}
size = this.size;
if (size <= 0) {
// double check, avoid races
return null;
}
}
//经过上面的努力,如果发现有可复用对象存在,那么更新size,然后取得的这个对象,
size --;
DefaultHandle ret = elements[size];
//在elements删除这个对象
elements[size] = null;
// As we already set the element[size] to null we also need to store the updated size before we do
// any validation. Otherwise we may see a null value when later try to pop again without a new element
// added before.
this.size = size;
//被同一个线程回收的对象它的lastRecycledId是等于recycleId的,这个在下面的会说到
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
//重置对象的回收状态
ret.recycleId = 0;
ret.lastRecycledId = 0;
return ret;
}
从上面我们我们发现如果stack的elements有可用的(size>0)复用对象,那么直接从elements获取就好了,如果stack的elements是空的那就去尝试从执行scavenge方法
scavenge
private boolean scavenge() {
// continue an existing scavenge, if any
//scavengeSome才是真正去WeakOrderQueue找可复用对象
if (scavengeSome()) {
return true;
}
// reset our scavenge cursor
//当scavengeSome失败后我们让cursor指向head,pre=null,
//为什么这样做呢?因为上面scavenge失败说明cursor已经在遍历了整个
//WeakOrderQueue链表但是没有发现可用对象,那么这个时候需要回到
//头部,当将来有新的WeakOrderQueue加入的时候,可以从新加入的WeakOrder中去找可复用对象
prev = null;
cursor = head;
return false;
}
scavengeSome
看着这么复杂就知道,从WeakOrderQueue查找可复用对象就是在这个方法中实现,
private boolean scavengeSome() {
WeakOrderQueue prev;
//设置遍历WeakOrderQueue链开始位置
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
prev = null;
//代码走到这里说明cursor是第一次使用,所以赋值为stack的head
cursor = head;
if (cursor == null) {
return false;
}
} else {
prev = this.prev;
}
boolean success = false;
do {
//这里才是真正实现获取WeakOrderQueue中存储的复用对象,这个方
//法会把cursor引用的WeakOrderQueue中存储的复用对象转移到stack的elements中
if (cursor.transfer(this)) {
success = true;
break;
}
//如果上面从cursor指向的WeakOrderQueue转移对象失败,那么继续从WeakOrderQueue链表的下一个节点进行转移
WeakOrderQueue next = cursor.getNext();
//如果cursor指向的WeakOrderQueue对应的线程不存在了(我自作主张的给其取名叫做Dead_WeakOrderQueue),
//那么会尽可能把这个WeakOrderQueue中存的复用对象全部转移到stack的elements中
if (cursor.get() == null) {
// If the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect.
// We never unlink the first queue, as we don't want to synchronize on updating the head.
//如果Dead_WeakOrderQueue中还有数据,那么就一直transfer
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
// Ensure we reclaim all space before dropping the WeakOrderQueue to be GC'ed.
//释放Dead_WeakOrderQueue占用的stack.availableSharedCapacity空间
cursor.reclaimAllSpaceAndUnlink();
//把Dead_WeakOrderQueue从WeakOrderQueue链表中删除,帮助GC清理
prev.setNext(next);
}
} else {
prev = cursor;
}
//修改cursor指向下一个WeakOrderQueue链表的下一个节点
cursor = next;
} while (cursor != null && !success);
// 更新stack的pre和cursor
this.prev = prev;
this.cursor = cursor;
return success;
}
scavengeSome就是从WeakOrderQueue链表中去不断的转移对象到stack的elements中,如果是一个Active_WeakOrderQueue(WeakOrderQueue所属的线程还存在)那么每次去转移一个Link中的数据到stack中,如果是一个Dead_WeakOrderQueue(WeakOrderQueue所属的线程还不存在),那么会把它Link链表中存储的所有复用对象都转移到stack中。理论上Dead_WeakOrderQueue在转移完毕之后应该被踢出WeakOrderQueue链表,这样Dead_WeakOrderQueue就可以被GC掉,但是如果head指向的是Dead_WeakOrderQueue,那么它就不会被踢出WeakOrderQueue链,这个应该是因为对于head操作需要做线程同步,而netty一直在避免线程同步的开销,因此在从对象池中获取对象的时候,head是一直不变的,想象一下如果从head开始一直到一个pre指向的Active_WeakOrderQueue之间的所有的节点都是Dead_WeakOrderQueue,按照目前的代码实现这些Dead_WeakOrderQueue都不会被踢出WeakOrderQueue链表,也就会一直像僵尸一样存在,pre之后的Dead_WeakOrderQueue是可以被踢出WeakOrderQueue链表
Dead_WeakOrderQueue回收.png
上图中蓝色Dead_WeakOrderQueue无法被回收,WeakOrderQueue链表最后一个节点如果为Dead_WeakOrderQueue也是无法被回收
WeakOrderQueue.transfer(stack)
这个方法是真正实现了把WeakOrderQueue的一个Link存储的复用对象转移到stack中,源码如下,好长。。。。。
boolean transfer(Stack<?> dst) {
//设置head在Link链表中的位置,可以看出每次都是从Link链表的头部开始寻找可复用对象去转移
Link head = this.head.link;
//如果Link对象不存在直接返回转移失败
if (head == null) {
return false;
}
//如果head指向的Link的readIndex等于LINK_CAPACITY,表示
//这个Link中存放的复用对象已经被全部转移到stack中了
if (head.readIndex == LINK_CAPACITY) {
if (head.next == null) {
return false;
}
//把head移动到Link链表的下一个节点
head = head.next;
//因为当前的link已经被用完了,那么可以从link链表中把这个link节点
//删除,head.relink会把WeakOrderQueue的head重新指向Link链表的下
//一个节点。然后还会向stack.availableSharedCapacity释放16个空间
this.head.relink(head);
}
//srcStart 表示link 的elements数组的起始可以读取的位置
final int srcStart = head.readIndex;
//srcEnd表示是link elements数组最后一个复用对象的位置
int srcEnd = head.get();
//srcSize 表示这个link本次可以转移的对象的数量
final int srcSize = srcEnd - srcStart;
//没有对象可以转移,返回转移失败
if (srcSize == 0) {
return false;
}
//stack elements数组的长度。对于Dead_WeakOrderQueue来说这个值可能不是0
//因为Dead_WeakOrderQueue的所有link都会被转移
//但是对于Active_WeakOrderQueue来说这个size是0,因为对于
//Active_WeakOrderQueue来说只要有一个link转移成功就返回了
final int dstSize = dst.size;
//这次转移之后stack的elements数组中存放的可复用对象的数量
final int expectedCapacity = dstSize + srcSize;
//如果expectedCapacity大于stack elements数组的长度
//那么每次扩展elements的空间为目前数组长度的2倍直到满足
//elements.length >=expectedCapacity 但是不能大于maxCapacity
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
//有对象需要转移
if (srcStart != srcEnd) {
//Link中的elements数组
final DefaultHandle[] srcElems = head.elements;
//stack 中的elements数组
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
//下面过程就是把Link中的可转移的复用对象转移到stack的elements数组中
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle<?> element = srcElems[i];
if (element.recycleId == 0) {
//设置本转移对象回收id
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
//正常来说recycleId应该等于0,如果不是0说明已经被回收了
throw new IllegalStateException("recycled already");
}
//Link elements数组中对应对象被转移之后,对象的数组元素置空
srcElems[i] = null;
//熟悉的dropHandle,和之前逻辑一致
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
element.stack = dst;
//把本次转移的对象加入到stack的elements数组中
dstElems[newDstSize ++] = element;
}
//如果本Link节点中的数据被全部转移了,而且readIndex到了
//elements的尾部,同时这个Link节点不是
//Link链表中的最后一个节点,那么把它从Link链表删除
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
this.head.relink(head.next);
}
//更新本link的readIndex为最后回收那个对象在elements中的位置
head.readIndex = srcEnd;
//一顿操作猛如虎,最后发现没有转移任何复用对象,返回转移失败
if (dst.size == newDstSize) {
return false;
}
//转移对象成功,更新stack中可复用对象的数量
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}
引用
感谢以下文章的作者,他们的文章让我阅读这块源码顺利了很多
https://huzb.me/2019/10/17/netty%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%E2%80%94%E2%80%94%E5%AF%B9%E8%B1%A1%E6%B1%A0/