Mastering Java Thread Communication: volatile, synchronized, wait/notify, and Piped Streams
This article explains how Java threads interact using volatile for visibility, synchronized blocks for mutual exclusion, wait/notify (and Condition) for coordination, piped streams for one‑way byte communication, as well as Thread.join and ThreadLocal utilities, providing concrete code examples, best‑practice guidelines, and performance considerations.
Thread Basics
A thread runs with its own stack like an independent script, executing code step by step until termination. When multiple threads cooperate, they can achieve far greater value.
volatile Keyword
Core Features
Visibility guarantee: writes flush immediately to main memory, reads fetch directly from main memory.
Reordering prevention: the compiler/CPU cannot reorder volatile reads/writes.
Non‑atomicity: compound operations (e.g., i++) still require synchronized or atomic classes.
Example 1 – State Flag Control
public class ServerStatus {
private volatile boolean isRunning = true; // key point
public void stopServer() {
isRunning = false; // write becomes visible to other threads
}
public void doWork() {
while (isRunning) { // read accesses main memory directly
// server logic
}
System.out.println("Server stopped");
}
public static void main(String[] args) throws InterruptedException {
ServerStatus server = new ServerStatus();
new Thread(server::doWork).start();
Thread.sleep(1000);
server.stopServer(); // safe stop
}
}Example 2 – Real‑time Config Hot‑Update
Scenario: an e‑commerce platform needs to toggle a promotion without restarting services. Using volatile ensures all server nodes see the new configuration instantly while avoiding heavyweight locks.
public class PromotionConfig {
private volatile boolean isPromotionActive = false;
private volatile double discountRate = 1.0;
public void updateConfig(boolean isActive, double rate) {
this.isPromotionActive = isActive; // volatile write
this.discountRate = rate; // combine with synchronized if atomicity needed
}
public double applyDiscount(double originalPrice) {
if (isPromotionActive) { // volatile read
return originalPrice * discountRate;
}
return originalPrice;
}
}
public class OrderService {
private final PromotionConfig config = new PromotionConfig();
public void processOrder(Order order) {
double finalPrice = config.applyDiscount(order.getPrice());
order.setFinalPrice(finalPrice);
// further processing
}
}Technical analysis: after updateConfig() all threads read the new values immediately; volatile reads are lock‑free, making them ideal for high‑frequency read scenarios, while writes are cheap for low‑frequency updates.
synchronized Block
Visibility Mechanism
Entering a synchronized block clears the working memory and reloads variables from main memory; exiting forces writes back to main memory. The keyword ensures that only one thread can execute the block at a time, providing both visibility and exclusivity.
Example – Safe Counter
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class SafeCounter {
private int count = 0;
private final Object lock = new Object();
public void increment(){
synchronized (lock){ // enter sync block
count++; // compound operation becomes atomic
} // exit forces flush to main memory
}
public int getCount(){
synchronized (lock){ // read with visibility guarantee
return count;
}
}
public static void main(String[] args) throws InterruptedException {
SafeCounter counter = new SafeCounter();
ExecutorService executor = Executors.newFixedThreadPool(10);
IntStream.range(0,1000).forEach(i -> executor.submit(counter::increment));
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
System.out.println(counter.getCount()); // prints 1000
}
}Double‑Checked Locking Singleton
class Singleton {
private static volatile Singleton instance; // must be volatile
public static Singleton getInstance() {
if (instance == null) { // first check (no lock)
synchronized (Singleton.class) {
if (instance == null) { // second check (with lock)
instance = new Singleton(); // volatile write prevents reordering
}
}
}
return instance;
}
}wait/notify Mechanism
Monitor Model
Every Java object has an associated monitor lock and a waiting queue. wait(): releases the lock and puts the thread into WAITING state. notify(): wakes a single waiting thread. notifyAll(): wakes all waiting threads.
Preconditions (Common Pitfall)
synchronized (lock) {
lock.wait(); // must hold the lock, otherwise IllegalMonitorStateException
}Java 8 Enhancements
The java.util.concurrent.locks.Condition interface offers more flexible waiting/notification, and LockSupport.park()/unpark() provides low‑level thread blocking/unblocking.
Classic Producer‑Consumer (synchronized version)
import java.util.LinkedList;
import java.util.Queue;
import java.util.stream.IntStream;
public class MessageQueue {
private final Queue<String> queue = new LinkedList<>();
private final int maxSize = 5;
private final Object lock = new Object();
public void produce(String message) throws InterruptedException {
synchronized (lock) {
while (queue.size() == maxSize) {
System.out.println("Queue full, producer waiting");
lock.wait(); // release lock and wait
}
queue.add(message);
System.out.println("Produced:" + message);
lock.notifyAll(); // wake all waiting threads
}
}
public String consume() throws InterruptedException {
synchronized (lock) {
while (queue.isEmpty()) {
System.out.println("Queue empty, consumer waiting");
lock.wait();
}
String msg = queue.poll();
System.out.println("Consumed:" + msg);
lock.notifyAll(); // wake producer
return msg;
}
}
public static void main(String[] args) {
MessageQueue queue = new MessageQueue();
new Thread(() -> {
IntStream.range(0,10).forEach(i -> {
try { queue.produce("msg-"+i); Thread.sleep(200); }
catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
}).start();
new Thread(() -> {
IntStream.range(0,10).forEach(i -> {
try { queue.consume(); Thread.sleep(500); }
catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
}).start();
}
}Advanced Version (Condition)
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
public class AdvancedMessageQueue {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private Queue<String> queue = new LinkedList<>();
private int maxSize = 5;
public void produce(String message) throws InterruptedException {
lock.lock();
try {
while (queue.size() == maxSize) {
System.out.println("Queue full, producer waiting");
notFull.await();
}
queue.add(message);
System.out.println("Produced:" + message);
notEmpty.signal(); // wake consumer only
} finally { lock.unlock(); }
}
public String consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("Queue empty, consumer waiting");
notEmpty.await();
}
String msg = queue.poll();
System.out.println("Consumed:" + msg);
notFull.signal(); // wake producer only
return msg;
} finally { lock.unlock(); }
}
}Advantages: targeted wake‑up avoids the cost of notifyAll(), and multiple condition queues allow distinct waiting criteria (full vs. empty).
Piped Input/Output Streams
Core Characteristics
Directed communication: one‑way byte stream between two threads (one writes, the other reads).
Implemented via a circular buffer (default 1024 bytes).
Key Java 8 Classes
PipedInputStream– consumer side. PipedOutputStream – producer side. PipedReader / PipedWriter – character‑stream equivalents.
Complete Byte‑Stream Example
import java.io.*;
import java.nio.charset.StandardCharsets;
public class PipeStreamDemo {
public static void main(String[] args) throws IOException {
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream();
try { pos.connect(pis); } catch (IOException e) { throw new RuntimeException("Pipe connection failed", e); }
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
String data = "DataBlock-" + i + "
";
pos.write(data.getBytes(StandardCharsets.UTF_8));
System.out.println("Producer sent:" + data.trim());
Thread.sleep(500);
}
} catch (IOException | InterruptedException e) { Thread.currentThread().interrupt(); }
finally { try { pos.close(); } catch (IOException ignored) {} }
});
Thread consumer = new Thread(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(pis, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
System.out.println("Consumer received:" + line);
}
} catch (IOException e) {
if (!"Pipe closed".equals(e.getMessage())) e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}Key points: explicit connection is required; failing to connect can cause deadlock. Typical use cases include log collection, command‑line output redirection, and testing I/O simulations. For large data or bidirectional communication, consider Socket, BlockingQueue, NIO FileChannel, or message queues like Kafka.
Thread.join()
Core Principle
Calling t.join() blocks the current thread until thread t finishes. Internally it uses the wait/notify mechanism, with the terminating thread invoking notifyAll(). Java 8 adds join(long millis) for timeout control.
Method Signatures
void join()– wait indefinitely. void join(long millis) – wait up to the specified milliseconds. void join(long millis, int nanos) – adds nanosecond precision (actual precision depends on the OS).
Basic Usage Example
public class JoinDemo {
public static void main(String[] args) {
Thread dataLoader = new Thread(() -> {
System.out.println("Loading data...");
try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
System.out.println("Data load complete");
});
Thread uiRenderer = new Thread(() -> {
try {
System.out.println("Waiting for data load...");
dataLoader.join(); // block until dataLoader finishes
System.out.println("Start rendering UI");
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
dataLoader.start();
uiRenderer.start();
}
}Timeout Control (Java 8 Improvement)
public class JoinDemo2 {
public static void main(String[] args) throws InterruptedException {
Thread downloadThread = new Thread(() -> {
try { Thread.sleep(5000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
});
downloadThread.start();
System.out.println("Waiting for download...");
downloadThread.join(3000); // wait at most 3 seconds
if (downloadThread.isAlive()) {
System.out.println("Download timed out, terminating");
downloadThread.interrupt();
} else {
System.out.println("Download completed normally");
}
}
}In production, join() can coordinate task dependencies, ensure ordered resource initialization, or be combined with CompletableFuture and ForkJoinPool for more efficient parallelism.
ThreadLocal
Design Idea
Thread confinement: each thread holds its own copy of a variable, eliminating shared‑resource contention. Internally it uses a ThreadLocalMap stored inside each Thread object.
Key Methods
T get()– retrieve the current thread's value. void set(T value) – assign a value for the current thread. void remove() – delete the current thread's entry (important to avoid memory leaks). protected T initialValue() – can be overridden to provide a default.
Basic Usage – Thread‑Safe SimpleDateFormat
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.stream.IntStream;
public class DateFormatHolder {
private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
public static String format(Date date) { return DATE_FORMAT.get().format(date); }
public static void cleanup() { DATE_FORMAT.remove(); }
public static void main(String[] args) {
IntStream.range(0,5).forEach(i -> new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + ": " + format(new Date()));
} finally { cleanup(); }
}).start());
}
}Context Propagation Example (User Session Management)
class UserContext {
private static final ThreadLocal<User> currentUser = new ThreadLocal<>();
public static void setCurrentUser(User user) { currentUser.set(user); }
public static User getCurrentUser() { return currentUser.get(); }
public static void clear() { currentUser.remove(); }
}
public class AuthInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
UserContext.setCurrentUser((User) request.getSession().getAttribute("user"));
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
UserContext.clear(); // must clean up
}
}Memory‑Leak Prevention
Always call remove() in a finally block to avoid retaining references after a thread finishes.
try {
threadLocal.set(data);
// business logic
} finally {
threadLocal.remove();
}Java 8 Advanced Uses
Combining with lambdas for per‑thread random generators:
ThreadLocal<Random> randomHolder = ThreadLocal.withInitial(() -> new Random(System.nanoTime()));
List<Double> randoms = IntStream.range(0,1000)
.parallel()
.mapToObj(i -> randomHolder.get().nextDouble())
.collect(Collectors.toList());Performance monitoring by subclassing ThreadLocal:
class MonitoredThreadLocal<T> extends ThreadLocal<T> {
private final AtomicInteger counter = new AtomicInteger();
@Override
protected T initialValue() {
counter.incrementAndGet();
return super.initialValue();
}
public int getInstanceCount() { return counter.get(); }
}Best Practices
Declare as static final to avoid repeated creation.
Use a descriptive suffix like Holder or Context for readability.
Always clean up with remove() to prevent memory leaks.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
The Dominant Programmer
Resources and tutorials for programmers' advanced learning journey. Advanced tracks in Java, Python, and C#. Blog: https://blog.csdn.net/badao_liumang_qizhi
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
