Extending Apache Seatunnel for Trino and Kyuubi Integration: A Practical Guide
This article outlines the challenges of scaling data integration platforms, proposes a comprehensive solution using Apache Seatunnel and Dinky, details the implementation of Trino and Kyuubi JDBC support, and describes the platform's architecture, task publishing workflow, logging, monitoring, resource management, and future enhancements.
Background
As business volume grows, data integration tasks increase dramatically, requiring support for many data sources. The existing system can no longer handle the current scale, exposing shortcomings that prevent rapid integration for some business lines. Data source architectures become more diverse and complex, and data applications become increasingly vertical and scenario‑driven, driving rapid evolution of modern data architectures. Consequently, data integration has evolved from a technical management task into a full‑blown system engineering effort.
Overall Solution
The implementation of the entire system involves the following core steps:
Define the goals and scope of data integration. The company currently has many tasks based on Trino and Kyuubi for accelerated data queries and unified SQL management. Since many tasks are already integrated, the new system must adapt to these existing tasks and perform automatic migration.
Select suitable data integration tools and technologies. Considering seamless integration with the legacy system and sustainable extensibility, we chose the emerging open‑source tools Apache Seatunnel and Dinky . At present, Apache Seatunnel does not support the two components, so we need to develop secondary support.
Seatunnel Integration
Because most of our tasks rely on the open‑source components Trino and Kyuubi , Apache Seatunnel currently lacks support for them and requires secondary development.
We have added JDBC support for Trino and Kyuubi in Seatunnel, currently only for the SOURCE side.
The JDBC support implementation follows the simple approach used by MySQL.
Build support for Trino Dialect.
The Dialect must implement the interfaces JdbcDialect and JdbcDialectFactory .
Implementation of TrinoDialect :
<code>@Override
public String dialectName() {
return "Trino";
}
@Override
public JdbcRowConverter getRowConverter() {
return new TrinoJdbcRowConverter();
}
@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new TrinoTypeMapper();
}</code>Implementation of PrestoDialectFactory :
<code>@Override
public boolean acceptsURL(@NonNull String url) {
// support Presto via jdbc:presto
return url.startsWith("jdbc:presto:") || url.startsWith("jdbc:trino:");
}
@Override
public JdbcDialect create() {
return new TrinoDialect();
}</code>Note: the code also supports Presto .
Build a Trino data type converter.
The converter mainly handles type support. Sample implementation:
<code>@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
String columnType = metadata.getColumnTypeName(colIndex).toUpperCase();
// VARCHAR(x) --> VARCHAR
if (columnType.indexOf("(") > -1) {
columnType = columnType.split("\\(")[0];
}
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (columnType) {
case PRESTO_BOOLEAN:
return BasicType.BOOLEAN_TYPE;
case PRESTO_TINYINT:
return BasicType.BYTE_TYPE;
case PRESTO_INTEGER:
return BasicType.INT_TYPE;
case PRESTO_SMALLINT:
return BasicType.SHORT_TYPE;
case PRESTO_BIGINT:
return BasicType.LONG_TYPE;
case PRESTO_DECIMAL:
return new DecimalType(precision, scale);
case PRESTO_REAL:
return BasicType.FLOAT_TYPE;
case PRESTO_DOUBLE:
return BasicType.DOUBLE_TYPE;
case PRESTO_CHAR:
case PRESTO_VARCHAR:
case PRESTO_JSON:
case PRESTO_ARRAY:
return BasicType.STRING_TYPE;
case PRESTO_DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case PRESTO_TIME:
return LocalTimeType.LOCAL_TIME_TYPE;
case PRESTO_TIMESTAMP:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case PRESTO_VARBINARY:
case PRESTO_BINARY:
return PrimitiveByteArrayType.INSTANCE;
// Doesn't support yet
case PRESTO_MAP:
case PRESTO_ROW:
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new JdbcConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
String.format("Doesn't support Trino type '%s' on column '%s' yet.", columnType, jdbcColumnName));
}
}</code>Note: special handling is required for VARCHAR(x) types with length specifications.
Support TIMESTAMP WITH TIME ZONE .
If a SQL query specifies a time zone, the Seatunnel framework does not support it by default; we need to modify org.apache.seatunnel.api.table.type.LocalTimeType to handle time zones.
<code>public static final LocalTimeType<LocalDateTime> LOCAL_DATE_TIME_TYPE_WITH_ZONE =
new LocalTimeType<>(LocalDateTime.class, SqlType.TIMESTAMP);
</code>Corresponding changes are also needed in Spark or Flink TypeConverterUtils to handle time zones correctly.
The integration approach for Kyuubi is consistent with that for Trino .
Design Practice
Below is the architecture diagram of the data integration platform:
The platform provides a fairly complete set of functions, divided into the following core components:
Data service system
Data asset management
Data aggregation
Data development
Data monitoring, etc.
By integrating these modules and adjusting resources, a complete data pipeline can be built.
The following image shows the detailed task publishing process:
When a client request arrives, the master node retrieves the registered Nodes list, validates the Gateway and API , selects a Node as the primary task dispatcher, configures monitoring and control logic, and during execution creates drivers such as Log Driver , Monitor Driver , and Resource Driver to manage the task lifecycle. After execution, the system distributes the final task status and cleans up resources.
Publish Task
The platform offers a full pipeline for data tasks. A new task can be created with the following DSL, which queries Presto and writes data to ClickHouse, then sends the result to a Kafka hook.
<code>{
"appKey": "datateam",
"deployMode": "yarn",
"username": "demo",
"taskName": "datateam_demo_first_task",
"timeout": 900,
"sources": [
{
"source": "Jdbc",
"configure": [
{"field": "host", "value": "localhost", "split": false},
{"field": "port", "value": "8080", "split": false},
{"field": "type", "value": "presto", "split": false},
{"field": "user", "value": "default", "split": false},
{"field": "query", "value": "select 'xxx' as name, 12 as age", "split": false},
{"field": "result_table_name", "value": "datateam_demo_first_task_presto_source", "split": false},
{"field": "fields", "value": [
{"column": "name", "origin": "名称", "type": "string"},
{"column": "age", "origin": null, "type": "int"}
], "split": true}
]
}
],
"sinks": [
{
"sink": "Clickhouse",
"configure": [
{"field": "host", "value": "127.0.0.1", "split": false},
{"field": "port", "value": "9000", "split": false},
{"field": "database", "value": "test", "split": false},
{"field": "table", "value": "datateam_demo_first_task", "split": false},
{"field": "columns", "value": ["name", "age"], "split": false}
]
}
],
"hooks": [
{"hook": "Kafka", "rule": "SUCCESS", "configure": [{"field": "format", "value": "JSON"}]}
],
"platform": "Data"
}</code>Only this simple JSON configuration is needed to query Presto and write results to ClickHouse, with the final output sent to a Kafka hook as JSON. The platform generates a task from this DSL and distributes it to the cluster for execution.
An equivalent SQL‑based task can be defined as follows:
<code>CREATE TASK `datateam_demo_first_task`
WITH INPUT Jdbc (
url=`localhost`,
port=`8080`,
type=`presto`,
username=`default`,
result_table_name=`datateam_demo_first_task_presto_source`,
fields={
format=`name`|`名称`
}
)
WITH OUTPUT Clickhouse(
host=`127.0.0.1`,
port=`8123`,
database=`default`,
columns=[`name`, `age`]
)
WITH HOOK Kafka(
rule=`SUCCESS`,
fields={
format=`JSON`
}
)
WITH QUREY
select 'xxx' as name, 12 as age</code>The system parses the SQL node, converts the configuration into executable parameters, and publishes the task.
Log Management Log Driver
The platform provides comprehensive task log management, including detailed logs from the submission machine and the cluster.
After a task is submitted, a Log Driver is automatically created to collect runtime logs from the cluster until the task finishes, after which the Log Driver is destroyed. Once destroyed, the system invokes Log Aggregation to obtain the final execution status and store it.
Timeout and Retry Mechanism Monitor Driver
When a task is successfully published, a Monitor Driver is started to monitor the task, with the core responsibilities of handling timeout and retry.
Timeout mechanism – Users can set the timeout parameter. The system may automatically adjust the timeout based on recent execution records (resource consumption, duration, etc.) to prevent abnormal exits caused by external dependencies.
Retry mechanism – Typically not triggered. It supports two modes: automatic and manual . In automatic mode, retries occur only when the task fails with a FAILURE status and meets predefined retry rules. In manual mode, users can define custom retry rules; manual settings override automatic ones.
The Monitor Driver not only handles timeout and retry but also manages the entire task lifecycle, providing real‑time feedback from construction to completion.
Resource Control Resource Driver
Each task has an independent Resource Driver . After task execution, the driver starts to collect resource usage information from the cluster. The following image illustrates memory and CPU consumption for a task.
Future Outlook
Streaming tasks – Currently most tasks are batch‑oriented; streaming support lacks a complete driver framework and will be enhanced.
Optimized integration methods – Most tasks are integrated via DSL, which limits data analysts and warehouse engineers. Although an experimental SQL integration exists, it will be fully refined and incorporated.
Inke Technology
Official account of Inke Technology
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.