Inside StarRocks Pipeline Engine: How BE Splits and Schedules Queries
This article explains the core concepts, architecture, and source‑code details of StarRocks’ Pipeline execution framework, covering BE initialization, query lifecycle management, operator splitting, PipelineBuilder processing, and the scheduling logic of PipelineDriver, with concrete code examples and diagrams to illustrate each step.
BE Initialization of the Pipeline Engine
When a BE process starts, ExecEnv creates two global executors:
GlobalDriverExecutor : a thread pool that runs PipelineDriver instances. The default thread count equals the number of hardware cores and can be overridden by config::pipeline_exec_thread_pool_thread_num.
Pipeline Scan IO Thread Pool : used by ScanOperator for asynchronous I/O. Its size is controlled by config::pipeline_scan_thread_pool_thread_num (defaulting to hardware concurrency) and config::pipeline_scan_thread_pool_queue_size.
Initialization code (simplified):
// be/src/runtime/exec_env.cpp
std::unique_ptr<ThreadPool> driver_executor_thread_pool;
int max_thread_num = std::thread::hardware_concurrency();
if (config::pipeline_exec_thread_pool_thread_num > 0) {
max_thread_num = config::pipeline_exec_thread_pool_thread_num;
}
ThreadPoolBuilder("pip_executor")
.set_min_threads(0)
.set_max_threads(max_thread_num)
.set_max_queue_size(1000)
.set_idle_timeout(MonoDelta::FromMilliseconds(2000))
.build(&driver_executor_thread_pool);
_driver_executor = new pipeline::GlobalDriverExecutor(std::move(driver_executor_thread_pool), false);
_driver_executor->initialize(max_thread_num);
int io_threads = config::pipeline_scan_thread_pool_thread_num <= 0
? std::thread::hardware_concurrency()
: config::pipeline_scan_thread_pool_thread_num;
_pipeline_scan_io_thread_pool = new PriorityThreadPool(
"pip_scan_io", io_threads, config::pipeline_scan_thread_pool_queue_size);Query Lifecycle Management on BE
Each query on a BE node is represented by a hierarchy of objects:
QueryContext : registered in QueryContextManager, holds a FragmentContextManager that manages all fragment instances of the query.
FragmentContext : created per fragment instance; contains a set of Pipeline s, PipelineDriver s, and MorselQueue s.
Pipelines : derived from the physical execution tree of the fragment.
PipelineDriver : runtime instance of a pipeline, created according to the pipeline’s degree of parallelism (DOP).
MorselQueue : maps ScanOperator to a collection of Morsel s, each representing a data slice to be processed.
FragmentExecutor: Prepare and Execute
The FE sends an exec_plan_fragment RPC. BE creates a starrocks::pipeline::FragmentExecutor to handle the fragment instance.
// be/src/service/internal_service.cpp (excerpt)
template <typename T>
void PInternalServiceImpl<T>::exec_plan_fragment(
google::protobuf::RpcController* cntl_base,
const PExecPlanFragmentRequest* request,
PExecPlanFragmentResult* response,
google::protobuf::Closure* done) {
ClosureGuard closure_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
auto st = _exec_plan_fragment(cntl);
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, errmsg=" << st.get_error_msg();
}
st.to_protobuf(response->mutable_status());
}
Status PInternalServiceImpl<T>::_exec_plan_fragment(brpc::Controller* cntl) {
// deserialize request ...
bool is_pipeline = t_request.__isset.is_pipeline && t_request.is_pipeline;
if (is_pipeline) {
auto fragment_executor = std::make_unique<starrocks::pipeline::FragmentExecutor>();
auto status = fragment_executor->prepare(_exec_env, t_request);
if (status.ok()) {
return fragment_executor->execute(_exec_env);
}
return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
}
return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
} FragmentExecutor::prepareperforms the following steps (see be/src/exec/pipeline/fragment_executor.cpp):
Detect duplicate fragment submissions.
Register or obtain a QueryContext and set its fragment count and expiration time.
Create and initialize a FragmentContext, registering it with the QueryContext after all asynchronous initializations are ready.
Build a non‑pipeline execution tree via Exec::create_tree and then invoke PipelineBuilder to split it into pipelines.
Convert ScanRange parameters into Morsel s.
Transform the non‑pipeline DataSink into a pipeline SinkOperator.
According to the DOP, create PipelineDriver s and associate them with the corresponding MorselQueue and ScanOperator.
Finish other initializations and register the FragmentContext. FragmentExecutor::execute prepares each driver and submits it to the appropriate executor (resource‑group aware or regular):
Status FragmentExecutor::execute(ExecEnv* exec_env) {
for (const auto& driver : _fragment_ctx->drivers()) {
RETURN_IF_ERROR(driver->prepare(_fragment_ctx->runtime_state()));
}
if (_fragment_ctx->enable_resource_group()) {
for (const auto& driver : _fragment_ctx->drivers()) {
exec_env->wg_driver_executor()->submit(driver.get());
}
} else {
for (const auto& driver : _fragment_ctx->drivers()) {
exec_env->driver_executor()->submit(driver.get());
}
}
return Status::OK();
}PipelineBuilder and Operator Splitting
PipelineBuilderwalks the physical execution tree and recursively calls decompose_to_pipeline on each ExecNode. The result is a list of OperatorFactory objects that form a pipeline.
// be/src/exec/pipeline/pipeline_builder.cpp
Pipelines PipelineBuilder::build(const FragmentContext& fragment, ExecNode* exec_node) {
pipeline::OpFactories operators = exec_node->decompose_to_pipeline(&_context);
_context.add_pipeline(operators);
_context.get_pipelines().back()->set_root();
return _context.get_pipelines();
}Example: ProjectNode::decompose_to_pipeline first decomposes its child, then appends a ProjectOperatorFactory and optionally a LimitOperatorFactory:
pipeline::OpFactories ProjectNode::decompose_to_pipeline(PipelineBuilderContext* context) {
OpFactories operators = _children[0]->decompose_to_pipeline(context);
operators.emplace_back(std::make_shared<ProjectOperatorFactory>(
context->next_operator_id(), id(), _slot_ids, _expr_ctxs, _type_is_nullable,
_common_sub_slot_ids, _common_sub_expr_ctxs));
if (limit() != -1) {
operators.emplace_back(std::make_shared<LimitOperatorFactory>(
context->next_operator_id(), id(), limit()));
}
return operators;
}Each pipeline operator implements a standard interface (see be/src/exec/pipeline/operator.h): pull_chunk: fetch a chunk from the upstream operator. push_chunk: push a chunk to the downstream operator. has_output, need_input, is_finished: state queries. prepare, open, close: lifecycle hooks. set_finishing, set_finished, set_canceled: termination semantics. pending_finish: indicates asynchronous work still pending.
Scheduling Logic of PipelineDriver
The core scheduling loop runs in GlobalDriverExecutor::worker_thread. It repeatedly fetches ready drivers from a multi‑level feedback queue, runs PipelineDriver::process, and then decides whether the driver is finished, blocked, or should be yielded back to the ready queue.
If a driver finishes (normally or with error), PipelineDriver::finalize_driver cleans up resources.
If the driver is still RUNNING, it is re‑queued as READY.
If the driver is BLOCKED, it is moved to the blocked queue, where PipelineDriverPoller::run_internal periodically checks for unblocking conditions and moves drivers back to the ready queue.
Processing moves chunks between adjacent operators:
// be/src/exec/pipeline/pipeline_driver.cpp
Status PipelineDriver::process() {
for (size_t i = 0; i + 1 < i; ++i) {
Operator* src = operators_[i];
Operator* dst = operators_[i + 1];
if (!src->is_finished() && src->has_output() &&
!dst->is_finished() && dst->need_input()) {
ChunkPtr chunk;
RETURN_IF_ERROR(src->pull_chunk(&chunk));
RETURN_IF_ERROR(dst->push_chunk(std::move(chunk)));
}
}
// Yield if processed >100 chunks or >100ms, or if no progress => BLOCKED.
return Status::OK();
}The poller scans the blocked driver list and wakes drivers whose dependencies (e.g., hash‑join build side, runtime filter production) have become satisfied:
// be/src/exec/pipeline/pipeline_driver_poller.cpp
void PipelineDriverPoller::run_internal() {
for (auto* driver : blocked_drivers_) {
if (driver->can_be_unblocked()) {
driver_executor->submit(driver);
}
}
}Key Source Files and References
be/src/runtime/exec_env.cpp– initialization of global executors. be/src/service/internal_service.cpp – RPC entry point and fragment executor creation. be/src/exec/pipeline/fragment_executor.cpp – preparation and execution of a fragment. be/src/exec/pipeline/pipeline_builder.cpp – decomposition of the physical plan into pipelines. be/src/exec/pipeline/operator.h – operator interface definition. be/src/exec/pipeline/pipeline_driver.cpp – driver processing logic. be/src/exec/pipeline/pipeline_driver_poller.cpp – blocked driver handling. be/src/exec/pipeline/pipeline_driver_executor.cpp – global driver executor implementation (MLFQ scheduling).
Understanding these components is essential for extending the StarRocks execution engine or diagnosing performance issues.
StarRocks
StarRocks is an open‑source project under the Linux Foundation, focused on building a high‑performance, scalable analytical database that enables enterprises to create an efficient, unified lake‑house paradigm. It is widely used across many industries worldwide, helping numerous companies enhance their data analytics capabilities.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
