How Caffeine’s ReadBuffer Works: Deep Dive into getIfPresent and Eviction Mechanics
This article explains the inner workings of Caffeine's getIfPresent method, the design of its MPSC ReadBuffer and WriteBuffer, the maintenance cycle, eviction strategies, and the TinyLFU algorithm, providing Java code examples and diagrams to illustrate how caching decisions are made in high‑concurrency environments.
getIfPresent
After a basic understanding of the
putmethod, we continue to explore the
getIfPresentmethod.
<code>public class TestReadSourceCode {
@Test
public void doRead() {
// read constructor
Cache<String, String> cache = Caffeine.newBuilder()
.maximumSize(10_000)
.build();
// read put
cache.put("key", "value");
// read get
cache.getIfPresent("key");
}
}</code>The corresponding source code with comments is shown below:
<code>abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {
final ConcurrentHashMap<Object, Node<K, V>> data;
final Buffer<Node<K, V>> readBuffer;
@Override
public @Nullable V getIfPresent(Object key, boolean recordStats) {
// Directly obtain element from ConcurrentHashMap
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
if (node == null) {
// Update miss statistics
if (recordStats) {
statsCounter().recordMisses(1);
}
// If drainStatus is REQUIRED, schedule maintenance
if (drainStatusOpaque() == REQUIRED) {
scheduleDrainBuffers();
}
return null;
}
V value = node.getValue();
long now = expirationTicker().read();
// Check expiration or collection
if (hasExpired(node, now) || (collectValues() && (value == null))) {
if (recordStats) {
statsCounter().recordMisses(1);
}
scheduleDrainBuffers();
return null;
}
// Ensure not computing async
if (!isComputingAsync(node)) {
@SuppressWarnings("unchecked")
K castedKey = (K) key;
setAccessTime(node, now);
tryExpireAfterRead(node, castedKey, value, expiry(), now);
}
// Post‑read processing
V refreshed = afterRead(node, now, recordStats);
return (refreshed == null) ? value : refreshed;
}
}</code>In the
getIfPresentmethod, we have already introduced
scheduleDrainBuffers; the final step focuses on
afterRead, which handles post‑read operations.
<code>abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef implements LocalCache<K, V> {
final Buffer<Node<K, V>> readBuffer;
@Nullable V afterRead(Node<K, V> node, long now, boolean recordHit) {
// Update hit statistics
if (recordHit) {
statsCounter().recordHits(1);
}
// If skipReadBuffer is false, offer to readBuffer
boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
// Determine if delayed processing is needed
if (shouldDrainBuffers(delayable)) {
scheduleDrainBuffers();
}
// Refresh if needed
return refreshIfNeeded(node, now);
}
boolean skipReadBuffer() {
// Fast‑path check for skip
return fastpath() && frequencySketch().isNotInitialized();
}
boolean shouldDrainBuffers(boolean delayable) {
switch (drainStatusOpaque()) {
case IDLE: return !delayable;
case REQUIRED: return true;
case PROCESSING_TO_IDLE:
case PROCESSING_TO_REQUIRED: return false;
default: throw new IllegalStateException("Invalid drain status: " + drainStatus);
}
}
}</code>The
ReadBufferis a multiple‑producer/single‑consumer (MPSC) buffer that uses a segmented design to reduce contention. Its actual type becomes
BoundedBufferwhen conditions are met, as illustrated below:
The
Bufferinterface is described as a MPSC buffer that rejects new elements when full and does not guarantee FIFO or LIFO ordering.
A multiple‑producer / single‑consumer buffer that rejects new elements if it is full or fails spuriously due to contention. Unlike a queue or stack, a buffer does not guarantee FIFO or LIFO ordering.
The
StripedBufferimplementation uses segmenting and CAS to achieve efficient concurrent writes. Its
offermethod hashes the thread probe to a segment and attempts to insert the element, expanding or retrying as needed.
<code>abstract class StripedBuffer<E> implements Buffer<E> {
volatile Buffer<E> @Nullable[] table;
@Override
public int offer(E e) {
long z = mix64(Thread.currentThread().getId());
int increment = ((int) (z >>> 32)) | 1;
int h = (int) z;
int mask;
int result;
Buffer<E> buffer;
Buffer<E>[] buffers = table;
if (buffers == null || (mask = buffers.length - 1) < 0 ||
(buffer = buffers[h & mask]) == null ||
!(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
return expandOrRetry(e, h, increment, uncontended);
}
return result;
}
}</code>The
expandOrRetrymethod handles initialization, resizing, creation of new buffers, and contention, as shown in the source code.
<code>final int expandOrRetry(E e, int h, int increment, boolean wasUncontended) {
int result = Buffer.FAILED;
boolean collide = false;
for (int attempt = 0; attempt < ATTEMPTS; attempt++) {
Buffer<E>[] buffers;
Buffer<E> buffer;
int n;
if ((buffers = table) != null && (n = buffers.length) > 0) {
if ((buffer = buffers[(n - 1) & h]) == null) {
if (tableBusy == 0 && casTableBusy()) {
boolean created = false;
try {
Buffer<E>[] rs;
int mask, j;
if ((rs = table) != null && (mask = rs.length) > 0 &&
(rs[j = (mask - 1) & h] == null)) {
rs[j] = create(e);
created = true;
}
} finally {
tableBusy = 0;
}
if (created) {
result = Buffer.SUCCESS;
break;
}
continue;
}
collide = false;
} else if (!wasUncontended) {
wasUncontended = true;
} else if ((result = buffer.offer(e)) != Buffer.FAILED) {
break;
} else if (n >= MAXIMUM_TABLE_SIZE || table != buffers) {
collide = false;
} else if (!collide) {
collide = true;
} else if ((tableBusy == 0) && casTableBusy()) {
try {
if (table == buffers) {
table = Arrays.copyOf(buffers, n << 1);
}
} finally {
tableBusy = 0;
}
collide = false;
continue;
}
h += increment;
} else if ((tableBusy == 0) && (table == buffers) && casTableBusy()) {
boolean init = false;
try {
Buffer<E>[] rs = new Buffer[1];
rs[0] = create(e);
table = rs;
init = true;
} finally {
tableBusy = 0;
}
if (init) {
result = Buffer.SUCCESS;
break;
}
}
}
return result;
}</code>The
ReadBufferis an MPSC buffer that partitions the buffer into multiple segments, reducing contention and using CAS for safe concurrent writes. It does not guarantee FIFO or LIFO ordering.
The maintenance method orchestrates the processing of read and write buffers, key/value references, expiration, eviction, and the “climb” operation that dynamically adjusts window and protected sizes based on hit rate.
<code>void maintenance(@Nullable Runnable task) {
setDrainStatusRelease(PROCESSING_TO_IDLE);
try {
drainReadBuffer();
drainWriteBuffer();
if (task != null) {
task.run();
}
drainKeyReferences();
drainValueReferences();
expireEntries();
evictEntries();
climb();
} finally {
if ((drainStatusOpaque() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
setDrainStatusOpaque(REQUIRED);
}
}
}</code>The
climbmethod adapts the eviction policy toward an optimal recency/frequency configuration by adjusting window and protected capacities based on hit‑rate changes.
<code>void climb() {
if (!evicts()) return;
determineAdjustment();
demoteFromMainProtected();
long amount = adjustment();
if (amount == 0) return;
if (amount > 0) {
increaseWindow();
} else {
decreaseWindow();
}
}</code>Overall, Caffeine’s cache combines a TinyLFU eviction policy with segmented MPSC buffers, dynamic window/protected sizing, and a maintenance cycle that balances recency and frequency for high‑performance caching in concurrent Java applications.
JD Cloud Developers
JD Cloud Developers (Developer of JD Technology) is a JD Technology Group platform offering technical sharing and communication for AI, cloud computing, IoT and related developers. It publishes JD product technical information, industry content, and tech event news. Embrace technology and partner with developers to envision the future.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.