Backend Development 25 min read

Understanding Hystrix: Circuit Breaking, Isolation, Timeout, and Fallback Mechanisms

This article explains how Hystrix protects distributed Java applications from cascading failures by using circuit breakers, thread‑pool or semaphore isolation, timeout detection, fallback strategies, and health statistics, and includes key source code excerpts illustrating each mechanism.

Sohu Tech Products
Sohu Tech Products
Sohu Tech Products
Understanding Hystrix: Circuit Breaking, Isolation, Timeout, and Fallback Mechanisms

In complex distributed applications, failures of a single dependent service can cause thread blockage, resource exhaustion, and a cascade of failures known as the avalanche effect; Hystrix provides a framework to isolate such failures and keep the overall system available.

The overall mechanism consists of several stages:

Entry : HystrixCommand or HystrixObservableCommand objects serve as the execution entry point, typically created via Spring annotations and AOP.

Cache : If caching is enabled and a request hits the cache, the result is returned immediately.

Circuit Breaker : When the failure rate within a rolling window exceeds a threshold, the breaker opens and short‑circuits subsequent calls.

Isolation : Calls are isolated either by thread‑pool or semaphore; isolation prevents a slow or failing service from exhausting resources of other services.

Execution : The actual business call is performed; failures or exceptions trigger fallback logic.

Timeout : A timer monitors execution duration; if the call exceeds the configured timeout, it is cancelled and treated as a failure.

Fallback : When execution fails, Hystrix invokes a user‑defined fallback method; if none is provided, an exception is returned.

Statistics : Success, failure, timeout, and rejection events are recorded in a sliding‑window health stream that drives the circuit‑breaker state.

Key source‑code excerpts illustrate these steps:

public boolean attemptExecution() {
    //判断配置是否强制打开熔断器
    if (properties.circuitBreakerForceOpen().get()) {
        return false;
    }
    //判断配置是否强制关闭熔断器
    if (properties.circuitBreakerForceClosed().get()) {
        return true;
    }
    //判断熔断器开关是否关闭
    if (circuitOpened.get() == -1) {
        return true;
    } else {
        //判断请求是否在休眠窗口后
        if (isAfterSleepWindow()) {
            //更新开关为半开,并允许本次请求通过
            if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                return true;
            } else {
                return false;
            }
        } else {
            //拒绝请求
            return false;
        }
    }
}
public void markSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
        metrics.resetStream();
        Subscription previousSubscription = activeSubscription.get();
        if (previousSubscription != null) {
            previousSubscription.unsubscribe();
        }
        Subscription newSubscription = subscribeToStream();
        activeSubscription.set(newSubscription);
        circuitOpened.set(-1L);
    }
}
public void markNonSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
        circuitOpened.set(System.currentTimeMillis());
    }
}
private Subscription subscribeToStream() {
    return metrics.getHealthCountsStream()
            .observe()
            .subscribe(new Subscriber
() {
                @Override public void onCompleted() {}
                @Override public void onError(Throwable e) {}
                @Override public void onNext(HealthCounts hc) {
                    if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                        // not enough requests
                    } else {
                        if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                            // healthy
                        } else {
                            if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                circuitOpened.set(System.currentTimeMillis());
                            }
                        }
                    }
                }
            });
}
private Observable
applyHystrixSemantics(final AbstractCommand
_cmd) {
    executionHook.onStart(_cmd);
    if (circuitBreaker.attemptExecution()) {
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = new Action0() {
            @Override public void call() {
                if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                    executionSemaphore.release();
                }
            }
        };
        if (executionSemaphore.tryAcquire()) {
            try {
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        return handleShortCircuitViaFallback();
    }
}
protected TryableSemaphore getExecutionSemaphore() {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
        if (executionSemaphoreOverride == null) {
            TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
            if (_s == null) {
                executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
                return executionSemaphorePerCircuit.get(commandKey.name());
            } else {
                return _s;
            }
        } else {
            return executionSemaphoreOverride;
        }
    } else {
        return TryableSemaphoreNoOp.DEFAULT;
    }
}
private Observable
executeCommandWithSpecifiedIsolation(final AbstractCommand
_cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        return Observable.defer(new Func0
>() {
            @Override public Observable
call() {
                executionResult = executionResult.setExecutionOccurred();
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }
                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                    return Observable.error(new RuntimeException("timed out before executing run()"));
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                    HystrixCounters.incrementGlobalConcurrentThreads();
                    threadPool.markThreadExecution();
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    executionResult = executionResult.setExecutedInThread();
                    try {
                        executionHook.onThreadStart(_cmd);
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                } else {
                    return Observable.empty();
                }
            }
        }).doOnTerminate(new Action0() {
            @Override public void call() { /* end logic omitted */ }
        }).doOnUnsubscribe(new Action0() {
            @Override public void call() { /* cancel logic omitted */ }
        }).subscribeOn(threadPool.getScheduler(new Func0
() {
            @Override public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } else {
        // semaphore path omitted
        return Observable.empty();
    }
}
public Subscription schedule(final Action0 action) {
    if (subscription.isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }
    ScheduledAction sa = new ScheduledAction(action);
    subscription.add(sa);
    sa.addParent(subscription);
    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
    FutureTask
f = (FutureTask
) executor.submit(sa);
    sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
    return sa;
}
private Observable
executeCommandAndObserve(final AbstractCommand
_cmd) {
    Observable
execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator
(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }
    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}
public Subscriber
call(final Subscriber
child) {
    final CompositeSubscription s = new CompositeSubscription();
    child.add(s);
    final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
    TimerListener listener = new TimerListener() {
        @Override public void tick() {
            if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                s.unsubscribe();
                HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
                    @Override public void run() { child.onError(new HystrixTimeoutException()); }
                });
                timeoutRunnable.run();
            }
        }
        @Override public int getIntervalTimeInMilliseconds() {
            return originalCommand.properties.executionTimeoutInMilliseconds().get();
        }
    };
    final Reference
tl = HystrixTimer.getInstance().addTimerListener(listener);
    originalCommand.timeoutTimer.set(tl);
    Subscriber
parent = new Subscriber
() {
        @Override public void onCompleted() { if (isNotTimedOut()) { tl.clear(); child.onCompleted(); } }
        @Override public void onError(Throwable e) { if (isNotTimedOut()) { tl.clear(); child.onError(e); } }
        @Override public void onNext(R v) { if (isNotTimedOut()) { child.onNext(v); } }
        private boolean isNotTimedOut() { return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); }
    };
    s.add(parent);
    return parent;
}
public Reference
addTimerListener(final TimerListener listener) {
    startThreadIfNeeded();
    Runnable r = new Runnable() {
        @Override public void run() {
            try { listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); }
        }
    };
    ScheduledFuture
f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
    return new TimerReference(listener, f);
}
private Observable
getFallbackOrThrowException(final AbstractCommand
_cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
    executionResult = executionResult.addEvent((int) latency, eventType);
    if (isUnrecoverable(originalException)) {
        Exception e = wrapWithOnErrorHook(failureType, originalException);
        return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
    } else {
        if (isRecoverableError(originalException)) { logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException); }
        if (properties.fallbackEnabled().get()) {
            // fallback logic (omitted for brevity)
            return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", originalException, null));
        } else {
            return Observable.error(originalException);
        }
    }
}
protected R getFallback() {
    throw new UnsupportedOperationException("No fallback available.");
}
private void handleCommandEnd(boolean commandExecutionStarted) {
    Reference
tl = timeoutTimer.get();
    if (tl != null) { tl.clear(); }
    long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
    executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
    if (executionResultAtTimeOfCancellation == null) {
        metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
    } else {
        metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
    }
    if (endCurrentThreadExecutingCommand != null) { endCurrentThreadExecutingCommand.call(); }
}
protected BucketedRollingCounterStream(HystrixEventStream
stream, final int numBuckets, int bucketSizeInMs, final Func2
appendRawEventToBucket, final Func2
reduceBucket) {
    super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
    Func1
, Observable
> reduceWindowToSummary = new Func1
, Observable
>() {
        @Override public Observable
call(Observable
window) {
            return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
        }
    };
    this.sourceStream = bucketedStream
            .window(numBuckets, 1)
            .flatMap(reduceWindowToSummary)
            .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } })
            .doOnUnsubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(false); } })
            .share()
            .onBackpressureDrop();
}
public HealthCounts plus(long[] eventTypeCounts) {
    long updatedTotalCount = totalCount;
    long updatedErrorCount = errorCount;
    long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
    long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
    long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
    long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
    long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
    updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
    updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
    return new HealthCounts(updatedTotalCount, updatedErrorCount);
}

Key usage recommendations include defining a fast, non‑network fallback in getFallback(), preferring thread isolation, configuring thread‑pool sizes appropriately, and being cautious with semaphore isolation due to its impact on container threads.

Hystrix relies heavily on RxJava; understanding RxJava’s Observable patterns helps in grasping Hystrix’s internal flow.

Javamicroservicesfault toleranceRxJavacircuit-breakerHystrix
Sohu Tech Products
Written by

Sohu Tech Products

A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.