文章將采用“總-分-總”的結構對配置固定大小元素驅逐策略的 Caffeine 緩存進行介紹,首先會講解它的實現原理,在大家對它有一個概念之后再深入具體源碼的細節(jié)之中,理解它的設計理念,從中能學習到用于統(tǒng)計元素訪問頻率的 Count-Min Sketch 數據結構、理解內存屏障和如何避免緩存?zhèn)喂蚕韱栴}、MPSC 多線程設計模式、高性能緩存的設計思想和多線程間的協(xié)調方案等等,文章最后會對全文內容進行總結,希望大家能有所收獲的同時在未來對本地緩存選型時提供完整的理論依據。
Caffeine 緩存原理圖如下:

它使用 ConcurrentHashMap 保存數據,并在該數據結構的基礎上創(chuàng)建了窗口區(qū)、試用區(qū)和保護區(qū),用于管理元素的生命周期,各個區(qū)的數據結構是使用了 LRU 算法的雙端隊列,隨著緩存的命中率變化,窗口區(qū)和保護區(qū)大小會自動調節(jié)以適應當前訪問模式。在對元素進行驅逐時,使用了 TinyLFU 算法,會優(yōu)先將頻率低的元素驅逐,訪問頻率使用 Count-Min Sketch 數據結構記錄,它能在保證較高準確率(93.75%)的情況下占用較少內存空間。讀、寫操作分別會向 ReadBuffer 和 WriteBuffer 中添加“讀/寫后任務”,這兩個緩沖區(qū)的設計均采用了 MPSC 多生產者單消費者的多線程設計模式。緩沖區(qū)中任務的消費由維護方法 maintenance 中 drainReadBuffer 和 drainWriteBuffer 實現,維護方法通過添加同步鎖,保證任務只由單線程執(zhí)行,這種設計參考了 WAL(Write-Ahead Logging)思想,即:先寫日志,再執(zhí)行操作,先把操作記錄在緩沖區(qū),然后在合適的時機異步、批量地執(zhí)行緩沖區(qū)中的任務。維護方法除了這些作用外,還負責元素在各個分區(qū)的移動、頻率的更新、元素的驅逐等操作。
接下來的源碼分析以如下測試用例為例:先分析構造方法,了解緩存初始化過程中創(chuàng)建的重要數據結構和關鍵字段,然后再深入添加元素的方法(put),該方法相對復雜,也是 Caffeine 緩存的核心,理解了這部分內容,文章剩余的內容理解起來會非常容易,接著分析獲取元素的方法(getIfPresent),最后再回到核心的維護方法 maintenance 中,這樣便基本理解了 Caffeine 緩存的運行原理,需要注意的是,因為我們并未指定緩存元素的過期時間,所以與此相關的內容如時間過期策略和時間輪等內容不會專門介紹。
public class TestReadSourceCode {
@Test
public void doRead() {
// read constructor
Cache cache = Caffeine.newBuilder()
.maximumSize(10_000)
.build();
// read put
cache.put("key", "value");
// read get
cache.getIfPresent("key");
}
}
constructor
Caffeine 的實現類區(qū)分了 BoundedLocalManualCache 和 UnboundedLocalManualCache,見名知意它們分別為“有邊界”的和“無邊界”的緩存。Caffeine#isBounded 方法詮釋了“邊界”的含義:
public final class Caffeine {
static final int UNSET_INT = -1;
public Cache build() {
// 校驗參數
requireWeightWithWeigher();
requireNonLoadingCache();
@SuppressWarnings("unchecked")
Caffeine self = (Caffeine) this;
return isBounded()
? new BoundedLocalCache.BoundedLocalManualCache?>(self)
: new UnboundedLocalCache.UnboundedLocalManualCache?>(self);
}
boolean isBounded() {
// 指定了最大大??;指定了最大權重
return (maximumSize != UNSET_INT) || (maximumWeight != UNSET_INT)
// 指定了訪問后過期策略;指定了寫后過期策略
|| (expireAfterAccessNanos != UNSET_INT) || (expireAfterWriteNanos != UNSET_INT)
// 指定了自定義過期策略;指定了 key 或 value 的引用級別
|| (expiry != null) || (keyStrength != null) || (valueStrength != null);
}
}
也就是說,當為緩存指定了上述的驅逐或過期策略會定義為有邊界的 BoundedLocalManualCache 緩存,它會限制緩存的大小,防止內存溢出,否則為無邊界的 UnboundedLocalManualCache 類型,它沒有大小限制,直到內存耗盡。我們以創(chuàng)建配置了固定大小的緩存為例,它對應的類型便是 BoundedLocalManualCache,在執(zhí)行構造方法時,有以下邏輯:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef
implements LocalCache {
// ...
static class BoundedLocalManualCache implements LocalManualCache, Serializable {
private static final long serialVersionUID = 1;
final BoundedLocalCache cache;
BoundedLocalManualCache(Caffeine builder) {
this(builder, null);
}
BoundedLocalManualCache(Caffeine builder, @Nullable CacheLoader? super K, V?> loader) {
cache = LocalCacheFactory.newBoundedLocalCache(builder, loader, /* async */ false);
}
}
}
BoundedLocalCache 為抽象類,緩存對象的實際類型都是它的子類。它在創(chuàng)建時使用了反射并遵循簡單工廠的編碼風格:
interface LocalCacheFactory {
static BoundedLocalCache newBoundedLocalCache(Caffeine builder,
@Nullable AsyncCacheLoader? super K, V?> cacheLoader, boolean async) {
var className = getClassName(builder);
var factory = loadFactory(className);
try {
return factory.newInstance(builder, cacheLoader, async);
} catch (RuntimeException | Error e) {
throw e;
} catch (Throwable t) {
throw new IllegalStateException(className, t);
}
}
}
getClassName 方法非常有意思,它會根據緩存配置的屬性動態(tài)拼接出實際緩存類名:
interface LocalCacheFactory {
static String getClassName(Caffeine?, ??> builder) {
var className = new StringBuilder();
// key 是強引用或弱引用
if (builder.isStrongKeys()) {
className.append('S');
} else {
className.append('W');
}
// value 是強引用或弱引用
if (builder.isStrongValues()) {
className.append('S');
} else {
className.append('I');
}
// 配置了移除監(jiān)聽器
if (builder.removalListener != null) {
className.append('L');
}
// 配置了統(tǒng)計功能
if (builder.isRecordingStats()) {
className.append('S');
}
// 不同的驅逐策略
if (builder.evicts()) {
// 基于最大值限制,可能是最大權重W,也可能是最大容量S
className.append('M');
// 基于權重或非權重
if (builder.isWeighted()) {
className.append('W');
} else {
className.append('S');
}
}
// 配置了訪問過期或可變過期策略
if (builder.expiresAfterAccess() || builder.expiresVariable()) {
className.append('A');
}
// 配置了寫入過期策略
if (builder.expiresAfterWrite()) {
className.append('W');
}
// 配置了刷新策略
if (builder.refreshAfterWrite()) {
className.append('R');
}
return className.toString();
}
}
這也就是為什么能在 com.github.benmanes.caffeine.cache 包路徑下能發(fā)現很多類似 SSMS 只有簡稱命名的類的原因(下圖只截取部分,實際上有很多):

根據代碼邏輯,它的命名遵循如下格式 S|W S|I [L] [S] [MW|MS] [A] [W] [R] 其中 [] 表示選填,| 表示某配置不同選擇的分隔符,結合注釋能清楚的了解各個位置字母簡稱表達的含義。如此定義實現類使用了 多級繼承,盡可能多地復用代碼。
以我們測試用例中創(chuàng)建的緩存類型為例,它對應的實現類為 SSMS,表示 key 和 value 均為強引用,并配置了非權重的最大緩存大小限制,類圖關系如下:

雖然在一些軟件設計相關的書籍中強調“多用組合,少用繼承”,但是這里使用多級繼承我覺得并沒有增加開發(fā)者的理解難度,反而了解了它的命名規(guī)則后,能更清晰的理解各個緩存所表示的含義,更好地實現代碼復用。
執(zhí)行 SSMS 的構造方法會有以下邏輯:
// 1
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef
implements LocalCache {
static final int WRITE_BUFFER_MIN = 4;
static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);
static final long MAXIMUM_CAPACITY = Long.MAX_VALUE - Integer.MAX_VALUE;
static final double PERCENT_MAIN = 0.99d;
static final double PERCENT_MAIN_PROTECTED = 0.80d;
static final double HILL_CLIMBER_STEP_PERCENT = 0.0625d;
final @Nullable RemovalListener evictionListener;
final @Nullable AsyncCacheLoader cacheLoader;
final MpscGrowableArrayQueue writeBuffer;
final ConcurrentHashMap> data;
final PerformCleanupTask drainBuffersTask;
final Consumer> accessPolicy;
final Buffer> readBuffer;
final NodeFactory nodeFactory;
final ReentrantLock evictionLock;
final Weigher weigher;
final Executor executor;
final boolean isAsync;
final boolean isWeighted;
protected BoundedLocalCache(Caffeine builder,
@Nullable AsyncCacheLoader cacheLoader, boolean isAsync) {
// 標記同步或異步
this.isAsync = isAsync;
// 指定 cacheLoader
this.cacheLoader = cacheLoader;
// 指定用于執(zhí)行驅逐元素、刷新緩存等任務的線程池,不指定默認為 ForkJoinPool.commonPool()
executor = builder.getExecutor();
// 標記是否定義了節(jié)點計算權重的 Weigher 對象
isWeighted = builder.isWeighted();
// 同步鎖,在接下來的內容中會看到很多標記了 @GuardedBy("evictionLock") 注解的方法,表示這行這些方法時都會獲取這把同步鎖
// 根據該鎖的命名,eviction 表示驅逐的意思,也就是說關注驅逐策略執(zhí)行的方法都要獲取該鎖,這一點需要在后文中注意
evictionLock = new ReentrantLock();
// 計算元素權重的對象,不指定為 SingletonWeigher.INSTANCE
weigher = builder.getWeigher(isAsync);
// 執(zhí)行緩存 maintenance 方法的任務,在后文中具體介紹
drainBuffersTask = new PerformCleanupTask(this);
// 創(chuàng)建節(jié)點的工廠
nodeFactory = NodeFactory.newFactory(builder, isAsync);
// 驅逐監(jiān)聽器,有元素被驅逐時會回調
evictionListener = builder.getEvictionListener(isAsync);
// 用于保存所有數據的 ConcurrentHashMap
data = new ConcurrentHashMap?>(builder.getInitialCapacity());
// 如果指定驅逐策略 或 key為弱引用 或 value為弱引用或軟引用 或 訪問后過期則創(chuàng)建 readBuffer,否則它為不可用狀態(tài)
// readBuffer 用于記錄某些被訪問過的節(jié)點
readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess()
? new BoundedBuffer?>() : Buffer.disabled();
// 如果指定了驅逐策略 或 訪問后過期策略則會定義訪問策略,執(zhí)行 onAccess 方法,后文詳細介紹
accessPolicy = (evicts() || expiresAfterAccess()) ? this::onAccess : e -> {};
// 初始化最大值和最小值的雙端隊列作為 writeBuffer,用于記錄一些寫后操作任務
writeBuffer = new MpscGrowableArrayQueue?>(WRITE_BUFFER_MIN, WRITE_BUFFER_MAX);
// 執(zhí)行了驅逐策略則更新最大容量限制
if (evicts()) {
setMaximumSize(builder.getMaximum());
}
}
@GuardedBy("evictionLock")
void setMaximumSize(long maximum) {
requireArgument(maximum >= 0, "maximum must not be negative");
if (maximum == maximum()) {
return;
}
// 不能超過最大容量
long max = Math.min(maximum, MAXIMUM_CAPACITY);
// 計算窗口區(qū)大小
long window = max - (long) (PERCENT_MAIN * max);
// 計算保護區(qū)大小
long mainProtected = (long) (PERCENT_MAIN_PROTECTED * (max - window));
// 記錄這些值
setMaximum(max);
setWindowMaximum(window);
setMainProtectedMaximum(mainProtected);
// 標記命中量、非命中量并初始化步長值,這三個值用于后續(xù)動態(tài)調整保護區(qū)和窗口區(qū)大小
setHitsInSample(0);
setMissesInSample(0);
setStepSize(-HILL_CLIMBER_STEP_PERCENT * max);
// 直到當前緩存的權重(大小)接近最大值一半時才初始化頻率草圖
if ((frequencySketch() != null) && !isWeighted() && (weightedSize() >= (max >>> 1))) {
frequencySketch().ensureCapacity(max);
}
}
}
// 2
class SS extends BoundedLocalCache {
static final LocalCacheFactory FACTORY = SS::new;
// key value 強引用無需特殊操作
SS(Caffeine var1, @Nullable AsyncCacheLoader? super K, V?> var2, boolean var3) {
super(var1, var2, var3);
}
}
// 3
class SSMS extends SS {
// 頻率草圖,后文具體介紹
final FrequencySketch sketch = new FrequencySketch();
final AccessOrderDeque> accessOrderWindowDeque;
final AccessOrderDeque> accessOrderProbationDeque;
final AccessOrderDeque> accessOrderProtectedDeque;
SSMS(Caffeine var1, @Nullable AsyncCacheLoader? super K, V?> var2, boolean var3) {
super(var1, var2, var3);
// 如果 Caffeine 初始化了容量則確定頻率草圖的容量
if (var1.hasInitialCapacity()) {
long var4 = Math.min(var1.getMaximum(), (long) var1.getInitialCapacity());
this.sketch.ensureCapacity(var4);
}
// 初始化窗口區(qū)、試用區(qū)和保護區(qū),它們都是雙端隊列(鏈表實現)
this.accessOrderWindowDeque = !var1.evicts() && !var1.expiresAfterAccess() ? null : new AccessOrderDeque();
this.accessOrderProbationDeque = new AccessOrderDeque();
this.accessOrderProtectedDeque = new AccessOrderDeque();
}
}
在步驟 1 中定義了三個區(qū)的初始化大小為 1%|19%|80%,這樣配置的性能相對較好。此外,我們還需要解釋一下 weightedSize() 方法,它用于訪問 long weightedSize 變量。根據其命名有“權重大小”的含義,在默認不指定權重計算對象 Weigher 的情況下,Weigher 默認為 SingletonWeigher.INSTANCE 表示每個元素的權重大小為 1,如下:
enum SingletonWeigher implements Weigher {
INSTANCE;
@Override
public int weigh(Object key, Object value) {
return 1;
}
}
這樣 weightedSize 表示的便是當前緩存中元素數量。如果自定義了 Weigher 那么 weightedSize 表示的便是緩存中總權重大小,每個元素的權重則可能會不同。因為在示例中我們并沒有指定 Weigher,所以在此處可以將 weightedSize 理解為當前緩存大小。
上文中我們提到緩存的定義遵循大寫字母縮寫的命名規(guī)則,實際上節(jié)點類的定義也采用了這種方式,在創(chuàng)建節(jié)點工廠 NodeFactory.newFactory(builder, isAsync)
的邏輯中,它會執(zhí)行如下邏輯,根據緩存的類型來確定它的節(jié)點類型,命名遵循 P|F S|W|D A|AW|W| [R] [MW|MS] 的規(guī)則,同樣使用了反射機制和簡單工廠的編碼風格,如下:
interface NodeFactory {
// ...
static NodeFactory newFactory(Caffeine builder, boolean isAsync) {
if (builder.interner) {
return (NodeFactory) Interned.FACTORY;
}
var className = getClassName(builder, isAsync);
return loadFactory(className);
}
static String getClassName(Caffeine?, ??> builder, boolean isAsync) {
var className = new StringBuilder();
// key 強引用或弱引用
if (builder.isStrongKeys()) {
className.append('P');
} else {
className.append('F');
}
// value 強引用或弱引用或軟引用
if (builder.isStrongValues()) {
className.append('S');
} else if (builder.isWeakValues()) {
className.append('W');
} else {
className.append('D');
}
// 過期策略
if (builder.expiresVariable()) {
if (builder.refreshAfterWrite()) {
// 訪問后過期
className.append('A');
if (builder.evicts()) {
// 寫入后過期
className.append('W');
}
} else {
className.append('W');
}
} else {
// 訪問后過期
if (builder.expiresAfterAccess()) {
className.append('A');
}
// 寫入后過期
if (builder.expiresAfterWrite()) {
className.append('W');
}
}
// 寫入后刷新
if (builder.refreshAfterWrite()) {
className.append('R');
}
// 驅逐策略
if (builder.evicts()) {
// 默認最大大小限制
className.append('M');
// 加權
if (isAsync || (builder.isWeighted() && (builder.weigher != Weigher.singletonWeigher()))) {
className.append('W');
} else {
// 非加權
className.append('S');
}
}
return className.toString();
}
}
SSMS 類型緩存對應的節(jié)點類型為 PSMS。
FrequencySketch
接下來,我們需要具體介紹下 FrequencySketch,它在上述構造方法的步驟 3 中被創(chuàng)建。這個類的實現采用了 Count-Min Sketch 數據結構,它維護了一個 long[] table 一維數組,每個元素有 64 位,每 4 位作為一個計數器(這也就限定了最大頻率為 15),那么數組中每個槽位便是 16 個計數器。通過哈希函數取 4 個獨立的計數值,將其中的最小值作為元素的訪問頻率。table 的初始大小為緩存最大容量最接近的 2 的 n 次冪,并在計算哈希值時使用 blockMask 掩碼來使哈希結果均勻分布,保證了獲取元素訪問頻率的正確率為 93.75%,達到空間與時間的平衡。它的實現原理和布隆過濾器類似,犧牲了部分準確性,但減少了占用內存的大小。如下圖所示為計算元素 e 的訪問頻率:

以下為 FrequencySketch 的源碼,關注注釋即可,并不復雜:
final class FrequencySketch {
static final long RESET_MASK = 0x7777777777777777L;
static final long ONE_MASK = 0x1111111111111111L;
// 采樣大小,用于控制 reset
int sampleSize;
// 掩碼,用于均勻分散哈希結果
int blockMask;
long[] table;
int size;
public FrequencySketch() {
}
public void ensureCapacity(@NonNegative long maximumSize) {
requireArgument(maximumSize >= 0);
// 取緩存最大容量和 Integer.MAX_VALUE >>> 1 中的小值
int maximum = (int) Math.min(maximumSize, Integer.MAX_VALUE >>> 1);
// 如果已經被初始化過并且 table 長度大于等于最大容量,那么不進行操作
if ((table != null) && (table.length >= maximum)) {
return;
}
// 初始化 table,長度為最接近 maximum 的 2的n次冪 和 8 中的大值
table = new long[Math.max(Caffeine.ceilingPowerOfTwo(maximum), 8)];
// 計算采樣大小
sampleSize = (maximumSize == 0) ? 10 : (10 * maximum);
// 計算掩碼
blockMask = (table.length >>> 3) - 1;
// 特殊判斷
if (sampleSize <= 0) {
sampleSize = Integer.MAX_VALUE;
}
// 計數器總數
size = 0;
}
@NonNegative
public int frequency(E e) {
// 如果緩存沒有被初始化則返回頻率為 0
if (isNotInitialized()) {
return 0;
}
// 創(chuàng)建 4 個元素的數組 count 用于保存 4 次 hash 計算出的頻率值
int[] count = new int[4];
// hash 擾動,使結果均勻分布
int blockHash = spread(e.hashCode());
// 重 hash,進一步分散結果
int counterHash = rehash(blockHash);
// 根據掩碼計算對應的塊索引
int block = (blockHash & blockMask) < 3;
// 循環(huán) 4 次計算 4 個計數器的結果
for (int i = 0; i < 4; i++) {
// 位運算變更 hash 值
int h = counterHash >>> (i < 3);
int index = (h >>> 1) & 15;
// 計算計數器的偏移量
int offset = h & 1;
// 定位到 table 中某個槽位后右移并進行位與運算得到最低的 4 位的值(0xfL 為二進制的 1111)
count[i] = (int) ((table[block + offset + (i < 1)] >>> (index < 2)) & 0xfL);
}
// 取其中的較小值
return Math.min(Math.min(count[0], count[1]), Math.min(count[2], count[3]));
}
public void increment(E e) {
if (isNotInitialized()) {
return;
}
// 長度為 8 的數組記錄該元素對應的位置,每個計數器需要兩個值來定位
int[] index = new int[8];
int blockHash = spread(e.hashCode());
int counterHash = rehash(blockHash);
int block = (blockHash & blockMask) < 3;
for (int i = 0; i < 4; i++) {
int h = counterHash >>> (i < 3);
// i 記錄定位到 table 中某元素的位偏移量
index[i] = (h >>> 1) & 15;
int offset = h & 1;
// i + 4 記錄元素所在 table 中的索引
index[i + 4] = block + offset + (i < 1);
}
// 四個對應的計數器都需要累加
boolean added =
incrementAt(index[4], index[0])
| incrementAt(index[5], index[1])
| incrementAt(index[6], index[2])
| incrementAt(index[7], index[3]);
// 累加成功且達到采樣大小需要進行重置
if (added && (++size == sampleSize)) {
reset();
}
}
boolean incrementAt(int i, int j) {
int offset = j < 2;
long mask = (0xfL < offset);
if ((table[i] & mask) != mask) {
table[i] += (1L < offset);
return true;
}
return false;
}
// 重置機制防止計數器溢出
void reset() {
int count = 0;
for (int i = 0; i < table.length; i++) {
// 累加 table 中每個元素的 2 進制表示的 1 的個數,結果為計數器個數的 4 倍
count += Long.bitCount(table[i] & ONE_MASK);
// 右移一位將計數值減半并將高位清零
table[i] = (table[i] >>> 1) & RESET_MASK;
}
// count >>> 2 表示計數器個數,計算重置后的 size
size = (size - (count >>> 2)) >>> 1;
}
static int spread(int x) {
x ^= x >>> 17;
x *= 0xed5ad4bb;
x ^= x >>> 11;
x *= 0xac4c1b51;
x ^= x >>> 15;
return x;
}
static int rehash(int x) {
x *= 0x31848bab;
x ^= x >>> 14;
return x;
}
}
到這里,Caffeine 緩存的基本數據結構全貌已經展現出來了,如下所示,在后文中我們再具體關注它們之間是如何協(xié)同的。

put
接下來繼續(xù)了解向緩存中添加元素的流程,本節(jié)內容比較多,理解起來也相對復雜,結合文章內容的同時,也需要多去深入查看 Caffeine 源碼才能有更好的理解,以下為 put 方法的源碼:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
// 默認入參 onlyIfAbsent 為 false,表示向緩存中添加相同的 key 會對 value 進行替換
@Override
public @Nullable V put(K key, V value) {
return put(key, value, expiry(), /* onlyIfAbsent */ false);
}
}
它會執(zhí)行到如下具體邏輯中,關注注釋信息:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
static final int WRITE_BUFFER_RETRIES = 100;
final MpscGrowableArrayQueue writeBuffer;
final ConcurrentHashMap> data;
final ReentrantLock evictionLock;
final NodeFactory nodeFactory;
@Nullable
V put(K key, V value, Expiry expiry, boolean onlyIfAbsent) {
// 不允許添加 null
requireNonNull(key);
requireNonNull(value);
Node node = null;
// 獲取當前時間戳
long now = expirationTicker().read();
// 計算緩存權重,如果沒有指定 weigher 的話,默認權重為 1
int newWeight = weigher.weigh(key, value);
// 創(chuàng)建用于查找的鍵對象
Object lookupKey = nodeFactory.newLookupKey(key);
for (int attempts = 1; ; attempts++) {
// 嘗試獲取節(jié)點;prior 譯為先前的;較早的
Node prior = data.get(lookupKey);
// 處理不存在的節(jié)點
if (prior == null) {
// 如果 node 在循環(huán)執(zhí)行中還未被創(chuàng)建
if (node == null) {
// NodeFactory 創(chuàng)建對應類型節(jié)點
node = nodeFactory.newNode(key, keyReferenceQueue(), value, valueReferenceQueue(), newWeight, now);
// 設置節(jié)點的過期時間
setVariableTime(node, expireAfterCreate(key, value, expiry, now));
}
// 嘗試添加新節(jié)點到緩存中,如果鍵已存在則返回現有節(jié)點
prior = data.putIfAbsent(node.getKeyReference(), node);
// 返回 null 表示插入成功
if (prior == null) {
// 寫后操作:添加 AddTask 并調度執(zhí)行任務
afterWrite(new AddTask(node, newWeight));
return null;
}
// onlyIfAbsent 形參在默認的 put 方法中為 false,以下邏輯簡單介紹
// 如果此時有其他線程添加了相同 key 的元素
else if (onlyIfAbsent) {
// 獲取到當前值,嘗試判斷讀后失效策略,更新訪問時間,并執(zhí)行讀后操作 afterRead 方法
V currentValue = prior.getValue();
if ((currentValue != null) && !hasExpired(prior, now)) {
if (!isComputingAsync(prior)) {
tryExpireAfterRead(prior, key, currentValue, expiry(), now);
setAccessTime(prior, now);
}
// 讀后操作,該方法在 getIfPresent 中進行講解
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
}
} else if (onlyIfAbsent) {
// 同樣的邏輯
V currentValue = prior.getValue();
if ((currentValue != null) && !hasExpired(prior, now)) {
if (!isComputingAsync(prior)) {
tryExpireAfterRead(prior, key, currentValue, expiry(), now);
setAccessTime(prior, now);
}
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
}
}
// ...
}
}
注意添加節(jié)點成功的邏輯,它會執(zhí)行 afterWrite 寫后操作方法,添加 AddTask 任務到 writeBuffer 中:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
// 寫重試最多 100 次
static final int WRITE_BUFFER_RETRIES = 100;
static final int WRITE_BUFFER_MIN = 4;
static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);
final MpscGrowableArrayQueue writeBuffer = new MpscGrowableArrayQueue?>(WRITE_BUFFER_MIN, WRITE_BUFFER_MAX);
// 添加寫后 Task 到 writeBuffer 中并在合適的時機調度執(zhí)行任務
void afterWrite(Runnable task) {
// 最多重試添加 100 次
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
if (writeBuffer.offer(task)) {
// 寫后調度
scheduleAfterWrite();
return;
}
// 向 writeBuffer 中添加任務失敗會調度任務執(zhí)行
scheduleDrainBuffers();
// 自旋等待,讓出 CPU 控制權
Thread.onSpinWait();
}
// ...
}
}
writeBuffer 的類型為 MpscGrowableArrayQueue,在這里我們詳細的介紹下它。
WriteBuffer
根據它的命名 GrowableArrayQueue 可知它是一個容量可以增長的雙端隊列,前綴 MPSC 表達的含義是“多生產者,單消費者”,也就是說可以有多個線程向其中添加元素,但只有一個線程能從其中獲取元素。那么它是如何實現 MPSC 的呢?接下來我們就根據源碼詳細了解一下。首先先來看一下它的類繼承關系圖及簡要說明:

圖中灰色的表示抽象類,藍色為實現類,java.util.AbstractQueue 就不再多解釋了。我們先看看其中標記紅框的類,討論到底什么是“避免內存?zhèn)喂蚕韱栴}”?
以 BaseMpscLinkedArrayQueuePad1 為例:
abstract class BaseMpscLinkedArrayQueuePad1 extends AbstractQueue {
byte p000, p001, p002, p003, p004, p005, p006, p007;
byte p008, p009, p010, p011, p012, p013, p014, p015;
byte p016, p017, p018, p019, p020, p021, p022, p023;
byte p024, p025, p026, p027, p028, p029, p030, p031;
byte p032, p033, p034, p035, p036, p037, p038, p039;
byte p040, p041, p042, p043, p044, p045, p046, p047;
byte p048, p049, p050, p051, p052, p053, p054, p055;
byte p056, p057, p058, p059, p060, p061, p062, p063;
byte p064, p065, p066, p067, p068, p069, p070, p071;
byte p072, p073, p074, p075, p076, p077, p078, p079;
byte p080, p081, p082, p083, p084, p085, p086, p087;
byte p088, p089, p090, p091, p092, p093, p094, p095;
byte p096, p097, p098, p099, p100, p101, p102, p103;
byte p104, p105, p106, p107, p108, p109, p110, p111;
byte p112, p113, p114, p115, p116, p117, p118, p119;
}
這個類除了定義了 120 字節(jié)的字段外,看上去沒有做其他任何事情,實際上它為 性能提升 默默做出了貢獻,避免了內存?zhèn)喂蚕?/strong>。CPU 中緩存行(Cache Line)的大小通常是 64 字節(jié),在類中定義 120 字節(jié)來占位,這樣便能將上下繼承關系間的字段間隔開,保證被多個線程訪問的關鍵字段距離至少跨越一個緩存行,分布在不同的緩存行中。這樣在不同的線程訪問 BaseMpscLinkedArrayQueueProducerFields 和 BaseMpscLinkedArrayQueueConsumerFields 中字段時互不影響,詳細了解原理可參考博客園 - CPU Cache與緩存行。
接下來我們看看其他抽象類的作用。BaseMpscLinkedArrayQueueProducerFields 定義生產者相關字段:
abstract class BaseMpscLinkedArrayQueueProducerFields extends BaseMpscLinkedArrayQueuePad1 {
// 生產者操作索引(并不對應緩沖區(qū) producerBuffer 中索引位置)
protected long producerIndex;
}
BaseMpscLinkedArrayQueueConsumerFields 負責定義消費者相關字段:
abstract class BaseMpscLinkedArrayQueueConsumerFields extends BaseMpscLinkedArrayQueuePad2 {
// 掩碼值,用于計算消費者實際的索引位置
protected long consumerMask;
// 消費者訪問這個緩沖區(qū)來獲取元素消費
protected E[] consumerBuffer;
// 消費者操作索引(并不對應緩沖區(qū) consumerBuffer 中索引位置)
protected long consumerIndex;
}
BaseMpscLinkedArrayQueueColdProducerFields 中定義字段如下,該類的命名包含 Cold,表示其中字段被修改的次數會比較少:
abstract class BaseMpscLinkedArrayQueueColdProducerFields extends BaseMpscLinkedArrayQueuePad3 {
// 生產者可以操作的最大索引上限
protected volatile long producerLimit;
// 掩碼值,用于計算生產者在數組中實際的索引
protected long producerMask;
// 存儲生產者生產的元素
protected E[] producerBuffer;
}
現在關鍵字段我們已經介紹完了,接下來看一下創(chuàng)建 MpscGrowableArrayQueue 的邏輯,執(zhí)行它的構造方法時會為我們剛剛提到的字段進行賦值:
class MpscGrowableArrayQueue extends MpscChunkedArrayQueue {
MpscGrowableArrayQueue(int initialCapacity, int maxCapacity) {
// 調用父類的構造方法
super(initialCapacity, maxCapacity);
}
}
abstract class MpscChunkedArrayQueue extends MpscChunkedArrayQueueColdProducerFields {
// 省略字節(jié)占位字段...
byte p119;
MpscChunkedArrayQueue(int initialCapacity, int maxCapacity) {
// 調用父類的構造方法
super(initialCapacity, maxCapacity);
}
}
abstract class MpscChunkedArrayQueueColdProducerFields extends BaseMpscLinkedArrayQueue {
protected final long maxQueueCapacity;
MpscChunkedArrayQueueColdProducerFields(int initialCapacity, int maxCapacity) {
// 調用父類的構造方法
super(initialCapacity);
if (maxCapacity < 4) {
throw new IllegalArgumentException("Max capacity must be 4 or more");
}
// 保證了最大值最少比初始值大 2 倍
if (ceilingPowerOfTwo(initialCapacity) >= ceilingPowerOfTwo(maxCapacity)) {
throw new IllegalArgumentException(
"Initial capacity cannot exceed maximum capacity(both rounded up to a power of 2)");
}
// 最大容量也為 2的n次冪
maxQueueCapacity = ((long) ceilingPowerOfTwo(maxCapacity)) < 1;
}
}
abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields {
BaseMpscLinkedArrayQueue(final int initialCapacity) {
if (initialCapacity < 2) {
throw new IllegalArgumentException("Initial capacity must be 2 or more");
}
// 初始化緩沖區(qū)大小為數值最接近的 2 的 n 次冪
int p2capacity = ceilingPowerOfTwo(initialCapacity);
// 掩碼值,-1L 使其低位均為 1,左移 1 位則最低位為 0,eg: 00000110,注意該值會被生產者和消費者掩碼值共同賦值
long mask = (p2capacity - 1L) < 1;
// 創(chuàng)建一個大小為 2的n次冪 +1 大小的緩沖區(qū),注意這個 buffer 分別被 producerBuffer 和 consumerBuffer 共同引用
E[] buffer = allocate(p2capacity + 1);
// BaseMpscLinkedArrayQueueColdProducerFields 類中相關字段賦值
producerBuffer = buffer;
producerMask = mask;
// 將 producerLimit 值賦為 掩碼值
soProducerLimit(this, mask);
// BaseMpscLinkedArrayQueueConsumerFields 類中相關字段賦值
consumerBuffer = buffer;
consumerMask = mask;
}
}
現在 MpscGrowableArrayQueue 的構建已經看完了,了解了其中關鍵字段的賦值,現在我們就需要看它是如何實現 MPSC 的。“多生產者”也就意味著會有多個線程向其中添加元素,既然是多線程就需要重點關注它是如何在多線程間完成協(xié)同的。添加操作對應了 BaseMpscLinkedArrayQueue#offer 方法,它的實現如下:
abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields {
private static final Object JUMP = new Object();
@Override
@SuppressWarnings("MissingDefault")
public boolean offer(final E e) {
if (e == null) {
throw new NullPointerException();
}
long mask;
E[] buffer;
long pIndex;
while (true) {
// 生產者最大索引(生產者掩碼值),獲取 BaseMpscLinkedArrayQueueColdProducerFields 中定義的該字段
long producerLimit = lvProducerLimit();
// 生產者當前索引,初始值為 0,BaseMpscLinkedArrayQueueProducerFields 中字段
pIndex = lvProducerIndex(this);
// producerIndex 最低位用來表示擴容(索引生產者索引 producerIndex 并不對應緩沖區(qū)中實際的索引)
// 低位為 1 表示正在擴容,自旋等待直到擴容完成(表示只有一個線程操作擴容)
if ((pIndex & 1) == 1) {
continue;
}
// 掩碼值和buffer可能在擴容中被改變,每次循環(huán)使用最新值
mask = this.producerMask;
buffer = this.producerBuffer;
// 檢查是否需要擴容
if (producerLimit <= pIndex) {
int result = offerSlowPath(mask, pIndex, producerLimit);
switch (result) {
case 0:
break;
case 1:
continue;
case 2:
return false;
case 3:
resize(mask, buffer, pIndex, e);
return true;
}
}
// CAS 操作更新生產者索引,注意這里是 +2,更新成功結束循環(huán)
if (casProducerIndex(this, pIndex, pIndex + 2)) {
break;
}
}
// 計算該元素在 buffer 中的實際偏移量,并將其添加到緩沖區(qū)中
final long offset = modifiedCalcElementOffset(pIndex, mask);
soElement(buffer, offset, e);
return true;
}
// 沒有將 resize 邏輯封裝在該方法中,而是由該方法判斷是否需要擴容
private int offerSlowPath(long mask, long pIndex, long producerLimit) {
int result;
// 獲取消費者索引 BaseMpscLinkedArrayQueueConsumerFields 類中
final long cIndex = lvConsumerIndex(this);
// 通過掩碼值計算當前緩沖區(qū)容量
long bufferCapacity = getCurrentBufferCapacity(mask);
result = 0;
// 如果隊列還有空間
if (cIndex + bufferCapacity > pIndex) {
// 嘗試更新生產者最大限制,更新失敗則返回 1 重試
if (!casProducerLimit(this, producerLimit, cIndex + bufferCapacity)) {
result = 1;
}
}
// 如果隊列已滿且無法擴展
else if (availableInQueue(pIndex, cIndex) <= 0) {
result = 2;
}
// 更新 producerIndex 最低位為 1,成功則進行擴容,否則重試
else if (casProducerIndex(this, pIndex, pIndex + 1)) {
result = 3;
} else {
result = 1;
}
return result;
}
private void resize(long oldMask, E[] oldBuffer, long pIndex, final E e) {
// 計算新緩沖區(qū)大小并創(chuàng)建,2 * (buffer.length - 1) + 1
int newBufferLength = getNextBufferSize(oldBuffer);
final E[] newBuffer = allocate(newBufferLength);
// 更新緩沖區(qū)引用為新的緩沖區(qū)
producerBuffer = newBuffer;
// 更新新的掩碼
final int newMask = (newBufferLength - 2) < 1;
producerMask = newMask;
// 計算元素在新舊緩沖區(qū)中的偏移量
final long offsetInOld = modifiedCalcElementOffset(pIndex, oldMask);
final long offsetInNew = modifiedCalcElementOffset(pIndex, newMask);
// 將元素放到新緩沖區(qū)中
soElement(newBuffer, offsetInNew, e);
// 將新緩沖區(qū)連接到舊緩沖區(qū)中
soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);
// 校驗可用空間
final long cIndex = lvConsumerIndex(this);
final long availableInQueue = availableInQueue(pIndex, cIndex);
if (availableInQueue <= 0) {
throw new IllegalStateException();
}
// 更新生產者限制大小和生產者索引
soProducerLimit(this, pIndex + Math.min(newMask, availableInQueue));
soProducerIndex(this, pIndex + 2);
// 將舊緩沖區(qū)中該位置的元素更新為 JUMP 標志位,這樣在被消費時就知道去新的緩沖區(qū)獲取了
soElement(oldBuffer, offsetInOld, JUMP);
}
private long nextArrayOffset(final long mask) {
return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE);
}
// 因為最低位用來表示是否在擴容,所以 producerIndex 和 consumerIndex 并不表示實際的索引
// 注意生產者(消費者)操作索引值會隨著元素的增加不斷變大,因為有它們和掩碼值的位與運算才保證了索引值一直在索引值的有效范圍內
static long modifiedCalcElementOffset(long index, long mask) {
return (index & mask) >> 1;
}
}
可見,在這個過程中它并沒有限制操作線程數量,也沒有使用加鎖的同步機制。它通過保證 可見性,并使用 自旋鎖結合 CAS 操作 更新生產者索引值,因為該操作是原子的,同時只有一個線程能更新獲取索引值成功,更新失敗的線程會自旋重試,這樣便允許多線程同時添加元素,可見性保證和CAS操作源碼如下:
abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields {
static final VarHandle P_INDEX = pIndexLookup.findVarHandle(
BaseMpscLinkedArrayQueueProducerFields.class, "producerIndex", long.class);
// volatile 可見性保證
static long lvProducerIndex(BaseMpscLinkedArrayQueue??> self) {
return (long) P_INDEX.getVolatile(self);
}
// CAS 操作
static boolean casProducerIndex(BaseMpscLinkedArrayQueue??> self, long expect, long newValue) {
return P_INDEX.compareAndSet(self, expect, newValue);
}
}
保證可見性(內存操作對其他線程可見)的原理是 內存屏障,除了保證可見性以外,內存屏障還能夠 防止重排序(確保在內存屏障前后的內存操作不會被重排序,從而保證程序的正確性)。到這里,生產者添加元素的邏輯我們已經分析完了,接下來我們需要繼續(xù)看一下消費者獲取元素的邏輯,它對應了 BaseMpscLinkedArrayQueue#poll 方法,同樣地,在這過程中需要關注“在這個方法中有沒有限制單一線程執(zhí)行”,以此實現單消費者呢:
abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQueueColdProducerFields {
private static final Object JUMP = new Object();
public E poll() {
// 讀取消費者相關字段 BaseMpscLinkedArrayQueueConsumerFields 類
final E[] buffer = consumerBuffer;
final long index = consumerIndex;
final long mask = consumerMask;
// 根據消費索引,計算出元素在消費者緩沖區(qū)中實際的位置
final long offset = modifiedCalcElementOffset(index, mask);
// 讀取該元素(volatile 可見性讀?。? Object e = lvElement(buffer, offset);
// 如果為空
if (e == null) {
// 比較生產者索引,如果兩個索引不相等,那么證明兩索引間存在距離表示還有元素能夠被消費
if (index != lvProducerIndex(this)) {
// 自旋讀取元素,直到讀到元素
do {
e = lvElement(buffer, offset);
} while (e == null);
} else {
// 索引相等證明確實是空隊列
return null;
}
}
if (e == JUMP) {
// 獲取到新緩沖區(qū)
final E[] nextBuffer = getNextBuffer(buffer, mask);
// 在新緩沖區(qū)中獲取到對應元素
return newBufferPoll(nextBuffer, index);
}
// 清除當前索引的元素,表示該元素已經被消費
soElement(buffer, offset, null);
// 更新消費者索引,這里也是 +2,它并不表示實際的在緩沖區(qū)的索引
soConsumerIndex(this, index + 2);
return (E) e;
}
private E[] getNextBuffer(final E[] buffer, final long mask) {
// 如果已經發(fā)生擴容,此時 consumerMask 仍然對應的是擴容前的 mask
// 此處與生產者操作擴容時拼接新舊緩沖區(qū)調用的是一樣的方法,這樣便能夠獲取到新緩沖區(qū)的偏移量
final long nextArrayOffset = nextArrayOffset(mask);
// 獲取到新緩沖區(qū),因為在擴容操作時已經將新緩沖區(qū)鏈接到舊緩沖區(qū)上了
final E[] nextBuffer = (E[]) lvElement(buffer, nextArrayOffset);
// 將舊緩沖區(qū)中新緩沖區(qū)位置設置為 null 表示舊緩沖區(qū)中已經沒有任何元素需要被消費了,也不再需要被引用了(能被垃圾回收了)
soElement(buffer, nextArrayOffset, null);
return nextBuffer;
}
private long nextArrayOffset(final long mask) {
return modifiedCalcElementOffset(mask + 2, Long.MAX_VALUE);
}
private E newBufferPoll(E[] nextBuffer, final long index) {
// 計算出消費者操作索引在新緩沖區(qū)中對應的實際位置
final long offsetInNew = newBufferAndOffset(nextBuffer, index);
// 在新緩沖區(qū)中獲取到對應元素
final E n = lvElement(nextBuffer, offsetInNew);
if (n == null) {
throw new IllegalStateException("new buffer must have at least one element");
}
// 清除當前索引的元素,表示該元素已經被消費
soElement(nextBuffer, offsetInNew, null);
// 更新消費者索引
soConsumerIndex(this, index + 2);
return n;
}
private long newBufferAndOffset(E[] nextBuffer, final long index) {
// 將消費者緩沖區(qū)引用和掩碼值更新
consumerBuffer = nextBuffer;
consumerMask = (nextBuffer.length - 2L) < 1;
return modifiedCalcElementOffset(index, consumerMask);
}
static long modifiedCalcElementOffset(long index, long mask) {
return (index & mask) >> 1;
}
static E lvElement(E[] buffer, long offset) {
return (E) REF_ARRAY.getVolatile(buffer, (int) offset);
}
}
可以發(fā)現在該方法中并沒有限制單一線程執(zhí)行,所以理論上這個方法可能被多個線程調用,那么它又為什么被稱為 MPSC 呢?在這個方法中的一段注釋值得細心體會:
This implementation is correct for single consumer thread use only.
此實現僅適用于單消費者線程使用
所以調用該方法時開發(fā)者本身需要保證單線程調用而并不是在實現中控制。
到這里 MpscGrowableArrayQueue 中核心的邏輯已經講解完了,現在我們回過頭來再看一下隊列擴容前后生產者和消費者是如何協(xié)同的?在擴容前,consumerBuffer 和 producerBuffer 引用的是同一個緩沖區(qū)對象。如果發(fā)生擴容,那么生產者會創(chuàng)建一個新的緩沖區(qū),并將 producerBuffer 引用指向它,此時它做了一個 非常巧妙 的操作,將 新緩沖區(qū)依然鏈接到舊緩沖區(qū) 上,并將觸發(fā)擴容的元素對應的舊緩沖區(qū)的索引處標記為 JUMP,表示這及之后的元素已經都在新緩沖區(qū)中。此時,消費者依然會在舊緩沖區(qū)中慢慢地消費,直到遇到 JUMP 標志位,消費者就知道需要到新緩沖區(qū)中取獲取元素了。因為之前生產者在擴容時對新舊緩沖區(qū)進行鏈接,所以消費者能夠通過舊緩沖區(qū)獲取到新緩沖區(qū)的引用,并變更 consumerBuffer 的引用和 consumerMask 掩碼值,接下來的消費過程便和擴容前沒有差別了。
scheduleAfterWrite
現在我們再回到 put 方法的邏輯中,如果向 WriterBuffer 中添加元素成功,則會調用 scheduleAfterWrite 方法,調度任務的執(zhí)行:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
final ReentrantLock evictionLock = new ReentrantLock();
// 默認為 ForkJoinPool.commonPool()
final Executor executor;
// 該任務在創(chuàng)建緩存時已經完成初始化
final PerformCleanupTask drainBuffersTask;
// 根據狀態(tài)的變化來調度執(zhí)行任務
void scheduleAfterWrite() {
// 獲取當前 drainStatus,drain 譯為排空,耗盡
int drainStatus = drainStatusOpaque();
for (; ; ) {
// 這里的狀態(tài)機變更需要關注下
switch (drainStatus) {
// IDLE 表示當前無任務可做
case IDLE:
// CAS 更新狀態(tài)為 REQUIRED
casDrainStatus(IDLE, REQUIRED);
// 調度任務執(zhí)行
scheduleDrainBuffers();
return;
// REQUIRED 表示當前有任務需要執(zhí)行
case REQUIRED:
// 調度任務執(zhí)行
scheduleDrainBuffers();
return;
// PROCESSING_TO_IDLE 表示當前任務處理完成后會變成 IDLE 狀態(tài)
case PROCESSING_TO_IDLE:
// 又來了新的任務,則 CAS 操作將它更新為 PROCESSING_TO_REQUIRED 狀態(tài)
if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {
return;
}
drainStatus = drainStatusAcquire();
continue;
// PROCESSING_TO_REQUIRED 表示正在處理任務,處理完任務后還有任務需要處理
case PROCESSING_TO_REQUIRED:
return;
default:
throw new IllegalStateException("Invalid drain status: " + drainStatus);
}
}
}
// 調度執(zhí)行緩沖區(qū)中的任務
void scheduleDrainBuffers() {
// 如果狀態(tài)表示正在有任務處理則返回
if (drainStatusOpaque() >= PROCESSING_TO_IDLE) {
return;
}
// 注意這里要獲取同步鎖 evictionLock
if (evictionLock.tryLock()) {
try {
// 獲取鎖后再次校驗當前處理狀態(tài)
int drainStatus = drainStatusOpaque();
if (drainStatus >= PROCESSING_TO_IDLE) {
return;
}
// 更新狀態(tài)為 PROCESSING_TO_IDLE
setDrainStatusRelease(PROCESSING_TO_IDLE);
// 同步機制保證任何時刻只能有一個線程能夠提交任務
executor.execute(drainBuffersTask);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
maintenance(/* ignored */ null);
} finally {
evictionLock.unlock();
}
}
}
}
寫后調度處理任務(scheduleAfterWrite)會根據狀態(tài)選擇性執(zhí)行 scheduleDrainBuffers 方法,執(zhí)行該方法時通過同步鎖 evictionLock 保證同時只有一個線程能提交 PerformCleanupTask 任務。這個任務在創(chuàng)建緩存時已經被初始化完成了,每次提交任務都會被復用,接下來我們看一下這個任務的具體實現:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
// 可重用的任務,用于執(zhí)行 maintenance 方法,避免了使用 ForkJoinPool 來包裝
static final class PerformCleanupTask extends ForkJoinTask implements Runnable {
private static final long serialVersionUID = 1L;
final WeakReference> reference;
PerformCleanupTask(BoundedLocalCache?, ??> cache) {
reference = new WeakReference>(cache);
}
@Override
public boolean exec() {
try {
run();
} catch (Throwable t) {
logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", t);
}
// Indicates that the task has not completed to allow subsequent submissions to execute
return false;
}
@Override
public void run() {
// 獲取到緩存對象
BoundedLocalCache?, ??> cache = reference.get();
if (cache != null) {
cache.performCleanUp(null);
}
}
// ...
}
}
它的實現非常簡單,其中 reference 字段在調用構造方法時被賦值,引用的是緩存對象本身。當任務被執(zhí)行時,調用的是 BoundedLocalCache#performCleanUp 方法:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
final ReentrantLock evictionLock = new ReentrantLock();
// 執(zhí)行該任務時,也要獲取同步鎖,表示任務只能由一個線程來執(zhí)行
void performCleanUp(@Nullable Runnable task) {
evictionLock.lock();
try {
// 執(zhí)行維護任務
maintenance(task);
} finally {
evictionLock.unlock();
}
rescheduleCleanUpIfIncomplete();
}
@GuardedBy("evictionLock")
void maintenance(@Nullable Runnable task) {
// 更新狀態(tài)為執(zhí)行中
setDrainStatusRelease(PROCESSING_TO_IDLE);
try {
// 處理讀緩沖區(qū)中的任務
drainReadBuffer();
// 處理寫緩沖區(qū)中的任務
drainWriteBuffer();
if (task != null) {
task.run();
}
// 處理 key 和 value 的引用
drainKeyReferences();
drainValueReferences();
// 過期和驅逐策略
expireEntries();
evictEntries();
// “增值” 操作,后續(xù)重點講
climb();
} finally {
// 狀態(tài)不是 PROCESSING_TO_IDLE 或者無法 CAS 更新為 IDLE 狀態(tài)的話,需要更新狀態(tài)為 REQUIRED,該狀態(tài)會再次執(zhí)行維護任務
if ((drainStatusOpaque() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
setDrainStatusOpaque(REQUIRED);
}
}
}
}
注意在執(zhí)行 performCleanUp 方法時,也需要獲取到同步鎖 evictionLock,那么任務的提交和任務的執(zhí)行也是互斥的。這個執(zhí)行的核心邏輯在 maintenance “維護”方法中,注意這個方法被標記了注解 @GuardedBy("evictionLock"),源碼中還有多個方法也標記了該注解,執(zhí)行這些方法時都要獲取同步鎖,這也是在提醒我們這些方法同時只有由一條線程被執(zhí)行。因為目前關注的是 put 方法,所以重點先看維護方法中 drainWriteBuffer 方法處理寫緩沖區(qū)中任務的邏輯:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
static final int NCPU = Runtime.getRuntime().availableProcessors();
static final int WRITE_BUFFER_MAX = 128 * ceilingPowerOfTwo(NCPU);
final MpscGrowableArrayQueue writeBuffer;
@GuardedBy("evictionLock")
void drainWriteBuffer() {
// 最大循環(huán)次數為 writeBuffer 最大容量,直至彈出元素為 null
for (int i = 0; i <= WRITE_BUFFER_MAX; i++) {
Runnable task = writeBuffer.poll();
if (task == null) {
return;
}
task.run();
}
// 更新狀態(tài)為 PROCESSING_TO_REQUIRED
setDrainStatusOpaque(PROCESSING_TO_REQUIRED);
}
}
執(zhí)行邏輯非常簡單,在獲取到同步鎖之后,在 WriteBuffer 中獲取要被執(zhí)行的任務并執(zhí)行。在這里我們能發(fā)現“SC 單消費者”的實現使用 同步鎖的機制保證同時只能有一個消費者消費緩沖區(qū)中的任務。在上文中我們已經知道,調用 put 方法時向緩沖區(qū) WriteBuffer 中添加的任務為 AddTask,下面我們看一下該任務的實現:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
static final long MAXIMUM_CAPACITY = Long.MAX_VALUE - Integer.MAX_VALUE;
final class AddTask implements Runnable {
final Node node;
// 節(jié)點權重
final int weight;
AddTask(Node node, int weight) {
this.weight = weight;
this.node = node;
}
@Override
@GuardedBy("evictionLock")
@SuppressWarnings("FutureReturnValueIgnored")
public void run() {
// 是否指定了驅逐策略
if (evicts()) {
// 更新緩存權重和窗口區(qū)權重
setWeightedSize(weightedSize() + weight);
setWindowWeightedSize(windowWeightedSize() + weight);
// 更新節(jié)點的 policyWeight,該字段只有在自定了權重計算規(guī)則時才有效
// 否則像只定義了固定容量的驅逐策略,使用默認元素權重為 1 是不需要關注該字段的
node.setPolicyWeight(node.getPolicyWeight() + weight);
// 檢測當前總權重是否超過一半的最大容量
long maximum = maximum();
if (weightedSize() >= (maximum >>> 1)) {
// 如果超過最大容量
if (weightedSize() > MAXIMUM_CAPACITY) {
// 執(zhí)行驅逐操作
evictEntries();
} else {
// 延遲加載頻率草圖 frequencySketch 數據結構,用于統(tǒng)計元素訪問頻率
long capacity = isWeighted() ? data.mappingCount() : maximum;
frequencySketch().ensureCapacity(capacity);
}
}
// 更新頻率統(tǒng)計信息
K key = node.getKey();
if (key != null) {
// 因為頻率草圖數據結構具有延遲加載機制(權重超過半數)
// 所以實際上在元素權重還未過半未完成初始化時,調用 increment 是沒有作用的
frequencySketch().increment(key);
}
// 增加未命中樣本數
setMissesInSample(missesInSample() + 1);
}
// 同步檢測節(jié)點是否還有效
boolean isAlive;
synchronized (node) {
isAlive = node.isAlive();
}
if (isAlive) {
// 寫后過期策略
if (expiresAfterWrite()) {
writeOrderDeque().offerLast(node);
}
// 過期策略
if (expiresVariable()) {
timerWheel().schedule(node);
}
// 驅逐策略
if (evicts()) {
// 如果權重比配置的最大權重大
if (weight > maximum()) {
// 執(zhí)行固定權重(RemovalCause.SIZE)的驅逐策略
evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
}
// 如果權重超過窗口區(qū)最大權重,則將其放在窗口區(qū)頭節(jié)點
else if (weight > windowMaximum()) {
accessOrderWindowDeque().offerFirst(node);
}
// 否則放在窗口區(qū)尾節(jié)點
else {
accessOrderWindowDeque().offerLast(node);
}
}
// 訪問后過期策略
else if (expiresAfterAccess()) {
accessOrderWindowDeque().offerLast(node);
}
}
// 處理異步計算
if (isComputingAsync(node)) {
synchronized (node) {
if (!Async.isReady((CompletableFuture??>) node.getValue())) {
long expirationTime = expirationTicker().read() + ASYNC_EXPIRY;
setVariableTime(node, expirationTime);
setAccessTime(node, expirationTime);
setWriteTime(node, expirationTime);
}
}
}
}
}
}
根據注釋很容易理解該方法的作用,因為我們目前對緩存只定義了固定容量的驅逐策略,所以我們需要在看一下 evictEntry 方法:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
final ConcurrentHashMap> data;
@GuardedBy("evictionLock")
@SuppressWarnings({"GuardedByChecker", "NullAway", "PMD.CollapsibleIfStatements"})
boolean evictEntry(Node node, RemovalCause cause, long now) {
K key = node.getKey();
@SuppressWarnings("unchecked")
V[] value = (V[]) new Object[1];
boolean[] removed = new boolean[1];
boolean[] resurrect = new boolean[1];
Object keyReference = node.getKeyReference();
RemovalCause[] actualCause = new RemovalCause[1];
data.computeIfPresent(keyReference, (k, n) -> {
if (n != node) {
return n;
}
synchronized (n) {
value[0] = n.getValue();
// key 或 value 為 null,這種情況下可能使用了 Caffeine.weakKeys, Caffeine.weakValues, or Caffeine.softValues
// 導致被垃圾回收了
if ((key == null) || (value[0] == null)) {
// 標記實際失效原因為垃圾回收
actualCause[0] = RemovalCause.COLLECTED;
}
// 如果原因為垃圾回收,記錄 resurrect 復活標記為 true
else if (cause == RemovalCause.COLLECTED) {
resurrect[0] = true;
return n;
}
// 否則記錄入參中的原因
else {
actualCause[0] = cause;
}
// 過期驅逐策略判斷
if (actualCause[0] == RemovalCause.EXPIRED) {
boolean expired = false;
if (expiresAfterAccess()) {
expired |= ((now - n.getAccessTime()) >= expiresAfterAccessNanos());
}
if (expiresAfterWrite()) {
expired |= ((now - n.getWriteTime()) >= expiresAfterWriteNanos());
}
if (expiresVariable()) {
expired |= (n.getVariableTime() <= now);
}
if (!expired) {
resurrect[0] = true;
return n;
}
}
// 固定容量驅逐策略
else if (actualCause[0] == RemovalCause.SIZE) {
int weight = node.getWeight();
if (weight == 0) {
resurrect[0] = true;
return n;
}
}
// 通知驅逐策略監(jiān)聽器,調用它的方法
notifyEviction(key, value[0], actualCause[0]);
// 將該 key 對應的刷新策略失效
discardRefresh(keyReference);
// 標記該節(jié)點被驅逐
removed[0] = true;
// 退休準備被垃圾回收
node.retire();
}
return null;
});
// 如果復活標記為 true 那么不被移除
if (resurrect[0]) {
return false;
}
// 節(jié)點已經要被驅逐
// 如果在窗口區(qū),那么直接從窗口區(qū)移除
if (node.inWindow() && (evicts() || expiresAfterAccess())) {
accessOrderWindowDeque().remove(node);
}
// 如果沒在窗口區(qū)
else if (evicts()) {
// 在試用區(qū)直接在試用區(qū)移除
if (node.inMainProbation()) {
accessOrderProbationDeque().remove(node);
}
// 在保護區(qū)則直接從保護區(qū)移除
else {
accessOrderProtectedDeque().remove(node);
}
}
// 將寫后失效和時間輪中關于該節(jié)點的元素移除
if (expiresAfterWrite()) {
writeOrderDeque().remove(node);
} else if (expiresVariable()) {
timerWheel().deschedule(node);
}
// 同步機制將 node 置為 dead
synchronized (node) {
logIfAlive(node);
makeDead(node);
}
if (removed[0]) {
// 節(jié)點被移除監(jiān)控計數和節(jié)點移除通知回調
statsCounter().recordEviction(node.getWeight(), actualCause[0]);
notifyRemoval(key, value[0], actualCause[0]);
}
return true;
}
}
該方法比較簡單,是將節(jié)點進行驅逐的邏輯,在后文中它會被多次復用,需要留一個印象?;氐?AddTask 任務的邏輯中,當被添加的元素權重超過最大權重限制時會被直接移除。這種特殊情況試用于指定了權重計算策略的緩存,如果只指定了固定容量,元素權重默認為 1,所以不會直接超過最大緩存數量限制。
現在我們已經將 put 方法中向緩存中添加元素的邏輯介紹完了,接下來需要關注 put 方法中對已存在的相同 key 值元素的處理邏輯:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
static final int MAX_PUT_SPIN_WAIT_ATTEMPTS = 1024 - 1;
static final long EXPIRE_WRITE_TOLERANCE = TimeUnit.SECONDS.toNanos(1);
final ConcurrentHashMap> data;
@Nullable
V put(K key, V value, Expiry expiry, boolean onlyIfAbsent) {
requireNonNull(key);
requireNonNull(value);
Node node = null;
long now = expirationTicker().read();
int newWeight = weigher.weigh(key, value);
Object lookupKey = nodeFactory.newLookupKey(key);
for (int attempts = 1; ; attempts++) {
Node prior = data.get(lookupKey);
if (prior == null) {
// ...
}
// 元素被讀到之后可能已經被驅逐了
if (!prior.isAlive()) {
// 自旋嘗試重新從 ConcurrentHashMap 中獲取,再獲取時如果為 null 則執(zhí)行新增邏輯
if ((attempts & MAX_PUT_SPIN_WAIT_ATTEMPTS) != 0) {
Thread.onSpinWait();
continue;
}
// 如果自旋嘗試后元素仍未被刪除,校驗元素是否處于存活狀態(tài)
// 如果處于非存活狀態(tài),那么可能這個元素已經被破壞,無法被移除,拋出異常
data.computeIfPresent(lookupKey, (k, n) -> {
requireIsAlive(key, n);
return n;
});
continue;
}
V oldValue;
// 新的過期時間
long varTime;
int oldWeight;
boolean expired = false;
boolean mayUpdate = true;
boolean exceedsTolerance = false;
// 為元素加同步鎖
synchronized (prior) {
// 如果此時元素已經失效了,那么需要重新循環(huán)
if (!prior.isAlive()) {
continue;
}
oldValue = prior.getValue();
oldWeight = prior.getWeight();
// oldValue 為 null 證明它被垃圾回收器回收了
if (oldValue == null) {
// 記錄元素創(chuàng)建后的過期時間
varTime = expireAfterCreate(key, value, expiry, now);
// 驅逐監(jiān)聽器回調
notifyEviction(key, null, RemovalCause.COLLECTED);
}
// 如果元素已經過期了
else if (hasExpired(prior, now)) {
// 標記過期標志為 true
expired = true;
// 記錄元素創(chuàng)建后的過期時間并回調驅逐監(jiān)聽器
varTime = expireAftexpireAfterCreateerCreate(key, value, expiry, now);
notifyEviction(key, oldValue, RemovalCause.EXPIRED);
}
// onlyInAbsent 為 true 時不會對已存在 key 的值進行修改
else if (onlyIfAbsent) {
mayUpdate = false;
// 記錄元素讀后過期時間
varTime = expireAfterRead(prior, key, value, expiry, now);
} else {
// 記錄元素修改后過期時間
varTime = expireAfterUpdate(prior, key, value, expiry, now);
}
// 需要修改原有 key 的 value 值
if (mayUpdate) {
exceedsTolerance =
// 配置了寫后過期策略且已經超過寫后時間的容忍范圍
(expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)
// 或者配置了可變時間過期策略同樣判斷是否超過時間的容忍范圍
|| (expiresVariable() && Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);
// 更新值,更新權重,更新寫時間
prior.setValue(value, valueReferenceQueue());
prior.setWeight(newWeight);
setWriteTime(prior, now);
// 寫后刷新策略失效
discardRefresh(prior.getKeyReference());
}
// 更新過期時間
setVariableTime(prior, varTime);
// 更新訪問時間
setAccessTime(prior, now);
}
// 根據不同的情況回調不同的監(jiān)聽器
if (expired) {
notifyRemoval(key, oldValue, RemovalCause.EXPIRED);
} else if (oldValue == null) {
notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);
} else if (mayUpdate) {
notifyOnReplace(key, oldValue, value);
}
// 計算寫后權重變化
int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;
// 如果 oldValue 已經被回收 或 權重修改前后發(fā)生變更 或 已經過期,添加更新任務
if ((oldValue == null) || (weightedDifference != 0) || expired) {
afterWrite(new UpdateTask(prior, weightedDifference));
}
// 如果超過了時間容忍范圍,添加更新任務
else if (!onlyIfAbsent && exceedsTolerance) {
afterWrite(new UpdateTask(prior, weightedDifference));
} else {
// 沒有超過時間容忍范圍,更新寫時間
if (mayUpdate) {
setWriteTime(prior, now);
}
// 處理讀后操作
afterRead(prior, now, /* recordHit */ false);
}
return expired ? null : oldValue;
}
}
}
對于已有元素的變更,會對節(jié)點添加同步鎖,更新它的權重等一系列變量,如果超過 1s 的時間容忍范圍,則會添加 UpdateTask 更新任務,至于處理讀后操作 afterRead 在讀方法中再去介紹。接下來我們需要重新再看一下 afterWrite 方法,其中有部分我們在上文中沒有介紹的邏輯:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
final ReentrantLock evictionLock;
void afterWrite(Runnable task) {
// 這段邏輯我們在看 AddTask 的邏輯時已經看過了,所以略過
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
if (writeBuffer.offer(task)) {
scheduleAfterWrite();
return;
}
scheduleDrainBuffers();
Thread.onSpinWait();
}
// 以下邏輯用于解決在重試了 100 次后仍然寫入失敗的問題,它會嘗試獲取 evictionLock 同步鎖
// 直接同步執(zhí)行“維護”方法并執(zhí)行當前任務,但是它并無法解決某個寫入操作執(zhí)行時間很長的問題
// 發(fā)生這種情況的原因可能是由于執(zhí)行器的所有線程都很忙(可能是寫入此緩存),寫入速率大大超過了消耗速率,優(yōu)先級反轉,或者執(zhí)行器默默地丟棄了維護任務
lock();
try {
maintenance(task);
} catch (RuntimeException e) {
logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", e);
} finally {
evictionLock.unlock();
}
// 重新調度異步維護任務,確保維護操作能及時執(zhí)行
rescheduleCleanUpIfIncomplete();
}
void lock() {
long remainingNanos = WARN_AFTER_LOCK_WAIT_NANOS;
long end = System.nanoTime() + remainingNanos;
boolean interrupted = false;
try {
for (;;) {
try {
if (evictionLock.tryLock(remainingNanos, TimeUnit.NANOSECONDS)) {
return;
}
logger.log(Level.WARNING, "The cache is experiencing excessive wait times for acquiring "
+ "the eviction lock. This may indicate that a long-running computation has halted "
+ "eviction when trying to remove the victim entry. Consider using AsyncCache to "
+ "decouple the computation from the map operation.", new TimeoutException());
evictionLock.lock();
return;
} catch (InterruptedException e) {
remainingNanos = end - System.nanoTime();
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
// 調用同步的維護方法時,可能發(fā)生獲取鎖超時,那么再重新開啟一個異步維護調度
void rescheduleCleanUpIfIncomplete() {
// 校驗是否有任務需要被執(zhí)行
if (drainStatusOpaque() != REQUIRED) {
return;
}
// 默認線程池調度任務執(zhí)行,這個方法我們在上文中已經詳細介紹過
if (executor == ForkJoinPool.commonPool()) {
scheduleDrainBuffers();
return;
}
// 如果自定義了線程池,那么會使用自定義的線程池進行處理
var pacer = pacer();
if ((pacer != null) && !pacer.isScheduled() && evictionLock.tryLock()) {
try {
if ((drainStatusOpaque() == REQUIRED) && !pacer.isScheduled()) {
pacer.schedule(executor, drainBuffersTask, expirationTicker().read(), Pacer.TOLERANCE);
}
} finally {
evictionLock.unlock();
}
}
}
}
寫后操作除了在添加任務到緩沖區(qū)成功后會執(zhí)行維護方法,添加失?。ㄗC明寫入操作非常頻繁)依然會嘗試同步執(zhí)行維護方法和發(fā)起異步維護,用于保證緩存中的任務能夠被及時執(zhí)行,使緩存中元素都處于“預期”狀態(tài)中。接下來我們在看一下 UpdateTask 更新任務的邏輯:
abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef implements LocalCache {
final class UpdateTask implements Runnable {
final int weightDifference;
final Node node;
public UpdateTask(Node node, int weightDifference) {
this.weightDifference = weightDifference;
this.node = node;
}
@Override
@GuardedBy("evictionLock")
public void run() {
// 寫后過期和自定義過期邏輯
if (expiresAfterWrite()) {
reorder(writeOrderDeque(), node);
} else if (expiresVariable()) {
timerWheel().reschedule(node);
}
// 指定了驅逐策略
if (evicts()) {
// 變更節(jié)點權重
int oldWeightedSize = node.getPolicyWeight();
node.setPolicyWeight(oldWeightedSize + weightDifference);
// 如果是窗口區(qū)節(jié)點
if (node.inWindow()) {
// 更新窗口區(qū)權重
setWindowWeightedSize(windowWeightedSize() + weightDifference);
// 節(jié)點權重超過最大權重限制,直接驅逐
if (node.getPolicyWeight() > maximum()) {
evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
}
// 節(jié)點權重比窗口區(qū)最大值小
else if (node.getPolicyWeight() <= windowMaximum()) {
onAccess(node);
}
// 窗口區(qū)包含該節(jié)點且該節(jié)點的權重大于窗口最大權重,則放到頭節(jié)點
else if (accessOrderWindowDeque().contains(node)) {
accessOrderWindowDeque().moveToFront(node);
}
}
// 如果是試用區(qū)節(jié)點
else if (node.inMainProbation()) {
// 節(jié)點權重比最大權重限制小
if (node.getPolicyWeight() <= maximum()) {
onAccess(node);
}
// 否則將該節(jié)點驅逐
else {
evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
}
}
// 如果是保護區(qū)節(jié)點
else if (node.inMainProtected()) {
// 更新保護區(qū)權重
setMainProtectedWeightedSize(mainProtectedWeightedSize() + weightDifference);
// 同樣的邏輯
if (node.getPolicyWeight() <= maximum()) {
onAccess(node);
} else {
evictEntry(node, RemovalCause.SIZE, expirationTicker().read());
}
}
// 更新緩存權重大小
setWeightedSize(weightedSize() + weightDifference);
// 更新完成后超過最大權重限制執(zhí)行驅逐操作
if (weightedSize() > MAXIMUM_CAPACITY) {
evictEntries();
}
}
// 配置了訪問后過期
else if (expiresAfterAccess()) {
onAccess(node);
}
}
}
@GuardedBy("evictionLock")
void onAccess(Node node) {
if (evicts()) {
K key = node.getKey();
if (key == null) {
return;
}
// 更新訪問頻率
frequencySketch().increment(key);
// 如果節(jié)點在窗口區(qū),則將其移動到尾節(jié)點
if (node.inWindow()) {
reorder(accessOrderWindowDeque(), node);
}
// 在試用區(qū)的節(jié)點執(zhí)行 reorderProbation 方法,可能會將該節(jié)點從試用區(qū)晉升到保護區(qū)
else if (node.inMainProbation()) {
reorderProbation(node);
}
// 否則移動到保護區(qū)的尾結點
else {
reorder(accessOrderProtectedDeque(), node);
}
// 更新命中量
setHitsInSample(hitsInSample() + 1);
}
// 配置了訪問過期策略
else if (expiresAfterAccess()) {
reorder(accessOrderWindowDeque(), node);
}
// 配置了自定義時間過期策略
if (expiresVariable()) {
timerWheel().reschedule(node);
}
}
static void reorder(LinkedDeque> deque, Node node) {
// 如果節(jié)點存在,將其移動到尾結點
if (deque.contains(node)) {
deque.moveToBack(node);
}
}
@GuardedBy("evictionLock")
void reorderProbation(Node node) {
// 檢查試用區(qū)是否包含該節(jié)點,不包含則證明已經被移除,則不處理
if (!accessOrderProbationDeque().contains(node)) {
return;
}
// 檢查節(jié)點的權重是否超過保護區(qū)最大值
else if (node.getPolicyWeight() > mainProtectedMaximum()) {
// 如果超過,將該節(jié)點移動到 試用區(qū) 尾巴節(jié)點,保證超重的節(jié)點不會被移動到保護區(qū)
reorder(accessOrderProbationDeque(), node);
return;
}
// 更新保護區(qū)權重大小
setMainProtectedWeightedSize(mainProtectedWeightedSize() + node.getPolicyWeight());
// 在試用區(qū)中移除該節(jié)點
accessOrderProbationDeque().remove(node);
// 在保護區(qū)尾節(jié)點中添加
accessOrderProtectedDeque().offerLast(node);
// 將該節(jié)點標記為保護區(qū)節(jié)點
node.makeMainProtected();
}
}
UpdateTask 修改任務負責變更權重值,并更新節(jié)點所在隊列的順序和訪問頻率,這里我們也能發(fā)現,這三個區(qū)域的隊列采用了 LRU 算法,一般情況下,最新被訪問的元素會被移動到尾節(jié)點。到現在,向有固定容量限制的緩存中調用 put 方法添加元素的邏輯基本已經介紹完了,目前對 Caffeine 緩存的了解程度如下所示:

put 添加元素時會先直接添加到 ConcurrentHashMap 中,并在 WriteBuffer 中添加 AddTask/UpdateTask 任務,WriteBuffer 是一個 MPSC 的緩沖區(qū),添加成功后會有加鎖的同步機制在默認的 ForkJoinPool.commonPool() 線程池中提交 PerformCleanupTask 任務,PerformCleanupTask 任務的主要作用是執(zhí)行 maintenance 維護方法,該方法執(zhí)行前需要先獲取同步鎖,單線程消費 WriteBuffer 中的任務。執(zhí)行 AddTask 任務時會將元素先添加到窗口區(qū),如果是 UpdateTask,它會修改三個不同區(qū)域的雙端隊列,這些隊列采用LRU算法,最新被訪問的元素會被放在尾節(jié)點處,并且試用區(qū)的元素被訪問后會被晉升到保護區(qū)尾節(jié)點,元素對應的訪問頻率也會在頻率草圖中更新,如果被添加的節(jié)點權重超過緩存最大權重會直接被驅逐。(目前維護方法中除了 drainWriteBuffer 方法外,其他步驟還未詳細解釋,之后會在后文中不斷完善)
審核編輯 黃宇
-
算法
+關注
關注
23文章
4784瀏覽量
98027 -
源碼
+關注
關注
8文章
685瀏覽量
31312
發(fā)布評論請先 登錄
C語言的緩沖區(qū)(緩存)詳解
緩存之美:從根上理解 ConcurrentHashMap
本地緩存 Caffeine 中的時間輪(TimeWheel)是什么?
harmony-utils之LRUCacheUtil,LRUCache緩存工具類
高性能緩存設計:如何解決緩存偽共享問題
由 Mybatis 源碼暢談軟件設計(八):從根上理解 Mybatis 二級緩存
【必看】開關電源中每一個元器件的計算+51頁圖文詳解
MCU緩存設計
高速SSD存儲系統(tǒng)中數據緩存控制器整體頂層設計
緩存之美:萬文詳解 Caffeine 實現原理(上)
評論