Implementation of Workflow Versioning in DolphinScheduler
This article explains how DolphinScheduler (DS) implements workflow versioning by introducing integer‑based versions, describes the core principles of workflow and task relationships, outlines the key database tables and architecture, and provides detailed Java code for creating, updating, and managing workflow definitions, tasks, and their relations.
Background
With the large JSON split, DS introduced a version feature, giving workflows a version concept. The released 2.x version uses an integer version starting at 1; each change increments the version, and the UI displays it prefixed with a capital “V”.
Core Principle
In DS, a workflow consists of three parts: basic workflow information, basic task information, and workflow‑task relationship information. Any change to any part is a workflow change, so the workflow version must increase.
The workflow‑task relationship does not have its own version; it carries the workflow version, the upstream task version, and the current task version. A task can belong to at most one workflow.
Two operation types arise: (1) workflow‑only changes (e.g., modifying the workflow name) – only the workflow name, workflow version, and the workflow‑version field in the relationship are updated; task versions stay unchanged. (2) task‑related changes (e.g., renaming a task) – workflow version, task version, and the corresponding relationship entries are all updated.
Implementation Method
Key Tables
Key tables include t_ds_process_definition (workflow definition), t_ds_process_definition_log (workflow definition log), t_ds_process_task_relation (workflow‑task relation), t_ds_process_task_relation_log (relation log), t_ds_task_definition (task definition), and t_ds_task_definition_log (task definition log). These tables store definitions and change logs for workflows and tasks.
Implementation Architecture
The architecture centralises all workflow modifications through three service methods in ProcessService : saveProcessDefine , saveTaskDefine , and saveTaskRelation . This convergence simplifies code management and decouples upper‑level logic from low‑level database operations.
Creating an empty workflow calls processService.saveProcessDefine to create a blank workflow.
The workflow create API invokes ProcessService.saveProcessDefine , saveTaskDefine , and saveTaskRelation in parallel, then calls saveTaskRelation after the parallel calls complete.
Update and create share the same underlying logic: the presence of an ID determines whether the operation is an update.
When creating or deleting workflow‑task relations, the workflow version is first updated via processService.saveProcessDefine , then the relation is updated via processService.saveTaskRelation .
Creating a task that is not linked to any workflow calls processService.saveTaskDefine .
Task update first updates the task, then updates the DAG; task deletion first deletes the task then updates the DAG, which calls ProcessService.saveProcessDefine and saveTaskRelation .
Key Code Reveal
ProcessService.saveProcessDefine
public int saveProcessDefine(User operator, ProcessDefinition processDefinition, Boolean syncDefine, Boolean isFromProcessDefine) {
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
// query max version
Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
processDefinitionLog.setVersion(insertVersion);
processDefinitionLog.setReleaseState(isFromProcessDefine ? ReleaseState.OFFLINE : ReleaseState.ONLINE);
processDefinitionLog.setOperator(operator.getId());
processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
int insertLog = processDefineLogMapper.insert(processDefinitionLog);
int result = 1;
// if not syncing definition, only log is saved
if (Boolean.TRUE.equals(syncDefine)) {
if (0 == processDefinition.getId()) {
result = processDefineMapper.insert(processDefinitionLog);
} else {
processDefinitionLog.setId(processDefinition.getId());
result = processDefineMapper.updateById(processDefinitionLog);
}
}
return (insertLog & result) > 0 ? insertVersion : 0;
}ProcessService.saveTaskDefine
public int saveTaskDefine(User operator, long projectCode, List
taskDefinitionLogs, Boolean syncDefine) {
Date now = new Date();
List
newTaskDefinitionLogs = new ArrayList<>();
List
updateTaskDefinitionLogs = new ArrayList<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
// determine if update or insert
if (taskDefinitionLog.getCode() > 0 && taskDefinitionLog.getVersion() > 0) {
TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
if (definitionCodeAndVersion != null) {
if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
taskDefinitionLog.setVersion(version + 1);
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
updateTaskDefinitionLogs.add(taskDefinitionLog);
}
continue;
}
}
if (taskDefinitionLog.getCode() == 0) {
try {
taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
logger.error("Task code get error, ", e);
return Constants.DEFINITION_FAILURE;
}
}
newTaskDefinitionLogs.add(taskDefinitionLog);
}
int insertResult = 0;
int updateResult = 0;
// update tasks
for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
TaskDefinition task = taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode());
if (task == null) {
newTaskDefinitionLogs.add(taskDefinitionToUpdate);
} else {
insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
if (Boolean.TRUE.equals(syncDefine)) {
taskDefinitionToUpdate.setId(task.getId());
updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate);
} else {
updateResult++;
}
}
}
// insert new tasks
if (!newTaskDefinitionLogs.isEmpty()) {
insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
if (Boolean.TRUE.equals(syncDefine)) {
updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
} else {
updateResult += newTaskDefinitionLogs.size();
}
}
return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS;
}ProcessService.saveTaskRelation
public int saveTaskRelation(
User operator,
long projectCode,
long processDefinitionCode,
int processDefinitionVersion,
List
taskRelationList,
List
taskDefinitionLogs,
Boolean syncDefine) {
if (taskRelationList.isEmpty()) {
return Constants.EXIT_CODE_SUCCESS;
}
Map
taskDefinitionLogMap = null;
if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
taskDefinitionLogMap = taskDefinitionLogs.stream()
.collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
}
Date now = new Date();
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
if (taskDefinitionLogMap != null) {
TaskDefinitionLog preTaskDefinitionLog = taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
if (preTaskDefinitionLog != null) {
processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
}
TaskDefinitionLog postTaskDefinitionLog = taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
if (postTaskDefinitionLog != null) {
processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
}
}
processTaskRelationLog.set...;
}
int insert = taskRelationList.size();
if (Boolean.TRUE.equals(syncDefine)) {
List
processTaskRelationList = processTaskRelationMapper
.queryByProcessCode(projectCode, processDefinitionCode);
if (!processTaskRelationList.isEmpty()) {
Set
processTaskRelationSet = processTaskRelationList.stream()
.map(ProcessTaskRelation::hashCode).collect(toSet());
Set
taskRelationSet = taskRelationList.stream()
.map(ProcessTaskRelationLog::hashCode).collect(toSet());
boolean result = CollectionUtils.isEqualCollection(processTaskRelationSet, taskRelationSet);
if (result) {
return Constants.EXIT_CODE_SUCCESS;
}
processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
}
insert = processTaskRelationMapper.batchInsert(taskRelationList);
}
int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
}Current Limitations
Because changes to workflow basic information, DAG task modifications, and DAG task‑relation modifications all trigger a workflow version increment, the tables t_ds_process_definition_log and t_ds_process_task_relation_log can grow explosively, degrading query performance. A better solution, such as a one‑click cleanup of historical versions, is needed.
政采云技术
ZCY Technology Team (Zero), based in Hangzhou, is a growth-oriented team passionate about technology and craftsmanship. With around 500 members, we are building comprehensive engineering, project management, and talent development systems. We are committed to innovation and creating a cloud service ecosystem for government and enterprise procurement. We look forward to your joining us.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.