Fundamentals 23 min read

Mastering Java Thread Communication: volatile, synchronized, wait/notify, and Piped Streams

This article explains how Java threads interact using volatile for visibility, synchronized blocks for mutual exclusion, wait/notify (and Condition) for coordination, piped streams for one‑way byte communication, as well as Thread.join and ThreadLocal utilities, providing concrete code examples, best‑practice guidelines, and performance considerations.

The Dominant Programmer
The Dominant Programmer
The Dominant Programmer
Mastering Java Thread Communication: volatile, synchronized, wait/notify, and Piped Streams

Thread Basics

A thread runs with its own stack like an independent script, executing code step by step until termination. When multiple threads cooperate, they can achieve far greater value.

volatile Keyword

Core Features

Visibility guarantee: writes flush immediately to main memory, reads fetch directly from main memory.

Reordering prevention: the compiler/CPU cannot reorder volatile reads/writes.

Non‑atomicity: compound operations (e.g., i++) still require synchronized or atomic classes.

Example 1 – State Flag Control

public class ServerStatus {
    private volatile boolean isRunning = true; // key point

    public void stopServer() {
        isRunning = false; // write becomes visible to other threads
    }

    public void doWork() {
        while (isRunning) { // read accesses main memory directly
            // server logic
        }
        System.out.println("Server stopped");
    }

    public static void main(String[] args) throws InterruptedException {
        ServerStatus server = new ServerStatus();
        new Thread(server::doWork).start();
        Thread.sleep(1000);
        server.stopServer(); // safe stop
    }
}

Example 2 – Real‑time Config Hot‑Update

Scenario: an e‑commerce platform needs to toggle a promotion without restarting services. Using volatile ensures all server nodes see the new configuration instantly while avoiding heavyweight locks.

public class PromotionConfig {
    private volatile boolean isPromotionActive = false;
    private volatile double discountRate = 1.0;

    public void updateConfig(boolean isActive, double rate) {
        this.isPromotionActive = isActive; // volatile write
        this.discountRate = rate; // combine with synchronized if atomicity needed
    }

    public double applyDiscount(double originalPrice) {
        if (isPromotionActive) { // volatile read
            return originalPrice * discountRate;
        }
        return originalPrice;
    }
}

public class OrderService {
    private final PromotionConfig config = new PromotionConfig();

    public void processOrder(Order order) {
        double finalPrice = config.applyDiscount(order.getPrice());
        order.setFinalPrice(finalPrice);
        // further processing
    }
}

Technical analysis: after updateConfig() all threads read the new values immediately; volatile reads are lock‑free, making them ideal for high‑frequency read scenarios, while writes are cheap for low‑frequency updates.

synchronized Block

Visibility Mechanism

Entering a synchronized block clears the working memory and reloads variables from main memory; exiting forces writes back to main memory. The keyword ensures that only one thread can execute the block at a time, providing both visibility and exclusivity.

Example – Safe Counter

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class SafeCounter {
    private int count = 0;
    private final Object lock = new Object();

    public void increment(){
        synchronized (lock){ // enter sync block
            count++; // compound operation becomes atomic
        } // exit forces flush to main memory
    }

    public int getCount(){
        synchronized (lock){ // read with visibility guarantee
            return count;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SafeCounter counter = new SafeCounter();
        ExecutorService executor = Executors.newFixedThreadPool(10);
        IntStream.range(0,1000).forEach(i -> executor.submit(counter::increment));
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
        System.out.println(counter.getCount()); // prints 1000
    }
}

Double‑Checked Locking Singleton

class Singleton {
    private static volatile Singleton instance; // must be volatile

    public static Singleton getInstance() {
        if (instance == null) { // first check (no lock)
            synchronized (Singleton.class) {
                if (instance == null) { // second check (with lock)
                    instance = new Singleton(); // volatile write prevents reordering
                }
            }
        }
        return instance;
    }
}

wait/notify Mechanism

Monitor Model

Every Java object has an associated monitor lock and a waiting queue. wait(): releases the lock and puts the thread into WAITING state. notify(): wakes a single waiting thread. notifyAll(): wakes all waiting threads.

Preconditions (Common Pitfall)

synchronized (lock) {
    lock.wait(); // must hold the lock, otherwise IllegalMonitorStateException
}

Java 8 Enhancements

The java.util.concurrent.locks.Condition interface offers more flexible waiting/notification, and LockSupport.park()/unpark() provides low‑level thread blocking/unblocking.

Classic Producer‑Consumer (synchronized version)

import java.util.LinkedList;
import java.util.Queue;
import java.util.stream.IntStream;

public class MessageQueue {
    private final Queue<String> queue = new LinkedList<>();
    private final int maxSize = 5;
    private final Object lock = new Object();

    public void produce(String message) throws InterruptedException {
        synchronized (lock) {
            while (queue.size() == maxSize) {
                System.out.println("Queue full, producer waiting");
                lock.wait(); // release lock and wait
            }
            queue.add(message);
            System.out.println("Produced:" + message);
            lock.notifyAll(); // wake all waiting threads
        }
    }

    public String consume() throws InterruptedException {
        synchronized (lock) {
            while (queue.isEmpty()) {
                System.out.println("Queue empty, consumer waiting");
                lock.wait();
            }
            String msg = queue.poll();
            System.out.println("Consumed:" + msg);
            lock.notifyAll(); // wake producer
            return msg;
        }
    }

    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue();
        new Thread(() -> {
            IntStream.range(0,10).forEach(i -> {
                try { queue.produce("msg-"+i); Thread.sleep(200); }
                catch (InterruptedException e) { Thread.currentThread().interrupt(); }
            });
        }).start();
        new Thread(() -> {
            IntStream.range(0,10).forEach(i -> {
                try { queue.consume(); Thread.sleep(500); }
                catch (InterruptedException e) { Thread.currentThread().interrupt(); }
            });
        }).start();
    }
}

Advanced Version (Condition)

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

public class AdvancedMessageQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();
    private int maxSize = 5;

    public void produce(String message) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == maxSize) {
                System.out.println("Queue full, producer waiting");
                notFull.await();
            }
            queue.add(message);
            System.out.println("Produced:" + message);
            notEmpty.signal(); // wake consumer only
        } finally { lock.unlock(); }
    }

    public String consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                System.out.println("Queue empty, consumer waiting");
                notEmpty.await();
            }
            String msg = queue.poll();
            System.out.println("Consumed:" + msg);
            notFull.signal(); // wake producer only
            return msg;
        } finally { lock.unlock(); }
    }
}

Advantages: targeted wake‑up avoids the cost of notifyAll(), and multiple condition queues allow distinct waiting criteria (full vs. empty).

Piped Input/Output Streams

Core Characteristics

Directed communication: one‑way byte stream between two threads (one writes, the other reads).

Implemented via a circular buffer (default 1024 bytes).

Key Java 8 Classes

PipedInputStream

– consumer side. PipedOutputStream – producer side. PipedReader / PipedWriter – character‑stream equivalents.

Complete Byte‑Stream Example

import java.io.*;
import java.nio.charset.StandardCharsets;

public class PipeStreamDemo {
    public static void main(String[] args) throws IOException {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();
        try { pos.connect(pis); } catch (IOException e) { throw new RuntimeException("Pipe connection failed", e); }

        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    String data = "DataBlock-" + i + "
";
                    pos.write(data.getBytes(StandardCharsets.UTF_8));
                    System.out.println("Producer sent:" + data.trim());
                    Thread.sleep(500);
                }
            } catch (IOException | InterruptedException e) { Thread.currentThread().interrupt(); }
            finally { try { pos.close(); } catch (IOException ignored) {} }
        });

        Thread consumer = new Thread(() -> {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(pis, StandardCharsets.UTF_8))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    System.out.println("Consumer received:" + line);
                }
            } catch (IOException e) {
                if (!"Pipe closed".equals(e.getMessage())) e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

Key points: explicit connection is required; failing to connect can cause deadlock. Typical use cases include log collection, command‑line output redirection, and testing I/O simulations. For large data or bidirectional communication, consider Socket, BlockingQueue, NIO FileChannel, or message queues like Kafka.

Thread.join()

Core Principle

Calling t.join() blocks the current thread until thread t finishes. Internally it uses the wait/notify mechanism, with the terminating thread invoking notifyAll(). Java 8 adds join(long millis) for timeout control.

Method Signatures

void join()

– wait indefinitely. void join(long millis) – wait up to the specified milliseconds. void join(long millis, int nanos) – adds nanosecond precision (actual precision depends on the OS).

Basic Usage Example

public class JoinDemo {
    public static void main(String[] args) {
        Thread dataLoader = new Thread(() -> {
            System.out.println("Loading data...");
            try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
            System.out.println("Data load complete");
        });

        Thread uiRenderer = new Thread(() -> {
            try {
                System.out.println("Waiting for data load...");
                dataLoader.join(); // block until dataLoader finishes
                System.out.println("Start rendering UI");
            } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
        });

        dataLoader.start();
        uiRenderer.start();
    }
}

Timeout Control (Java 8 Improvement)

public class JoinDemo2 {
    public static void main(String[] args) throws InterruptedException {
        Thread downloadThread = new Thread(() -> {
            try { Thread.sleep(5000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
        });
        downloadThread.start();
        System.out.println("Waiting for download...");
        downloadThread.join(3000); // wait at most 3 seconds
        if (downloadThread.isAlive()) {
            System.out.println("Download timed out, terminating");
            downloadThread.interrupt();
        } else {
            System.out.println("Download completed normally");
        }
    }
}

In production, join() can coordinate task dependencies, ensure ordered resource initialization, or be combined with CompletableFuture and ForkJoinPool for more efficient parallelism.

ThreadLocal

Design Idea

Thread confinement: each thread holds its own copy of a variable, eliminating shared‑resource contention. Internally it uses a ThreadLocalMap stored inside each Thread object.

Key Methods

T get()

– retrieve the current thread's value. void set(T value) – assign a value for the current thread. void remove() – delete the current thread's entry (important to avoid memory leaks). protected T initialValue() – can be overridden to provide a default.

Basic Usage – Thread‑Safe SimpleDateFormat

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.stream.IntStream;

public class DateFormatHolder {
    private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT =
        ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

    public static String format(Date date) { return DATE_FORMAT.get().format(date); }
    public static void cleanup() { DATE_FORMAT.remove(); }

    public static void main(String[] args) {
        IntStream.range(0,5).forEach(i -> new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + ": " + format(new Date()));
            } finally { cleanup(); }
        }).start());
    }
}

Context Propagation Example (User Session Management)

class UserContext {
    private static final ThreadLocal<User> currentUser = new ThreadLocal<>();
    public static void setCurrentUser(User user) { currentUser.set(user); }
    public static User getCurrentUser() { return currentUser.get(); }
    public static void clear() { currentUser.remove(); }
}

public class AuthInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        UserContext.setCurrentUser((User) request.getSession().getAttribute("user"));
        return true;
    }
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
        UserContext.clear(); // must clean up
    }
}

Memory‑Leak Prevention

Always call remove() in a finally block to avoid retaining references after a thread finishes.

try {
    threadLocal.set(data);
    // business logic
} finally {
    threadLocal.remove();
}

Java 8 Advanced Uses

Combining with lambdas for per‑thread random generators:

ThreadLocal<Random> randomHolder = ThreadLocal.withInitial(() -> new Random(System.nanoTime()));
List<Double> randoms = IntStream.range(0,1000)
    .parallel()
    .mapToObj(i -> randomHolder.get().nextDouble())
    .collect(Collectors.toList());

Performance monitoring by subclassing ThreadLocal:

class MonitoredThreadLocal<T> extends ThreadLocal<T> {
    private final AtomicInteger counter = new AtomicInteger();
    @Override
    protected T initialValue() {
        counter.incrementAndGet();
        return super.initialValue();
    }
    public int getInstanceCount() { return counter.get(); }
}

Best Practices

Declare as static final to avoid repeated creation.

Use a descriptive suffix like Holder or Context for readability.

Always clean up with remove() to prevent memory leaks.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaConcurrencyvolatileThreadLocalsynchronizedwait/notifyPipedStream
The Dominant Programmer
Written by

The Dominant Programmer

Resources and tutorials for programmers' advanced learning journey. Advanced tracks in Java, Python, and C#. Blog: https://blog.csdn.net/badao_liumang_qizhi

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.