Seatunnel测试
saber599

1. 前置条件

  • seatunnel
  • hadoop,检查点会用到hdfs,没有也能看到效果
  • 本地hadoop,需要hadoop_home,没有也能看到效果

2. sftp->hive

2.1 场景说明

将ftp文件同步到hive分区表中,根据ftp文件内容字段动态保存。

2.2 准备

  1. 环境准备

    1. seatunnel-2.3.1
    2. hadoop-3.3.2
    3. hive-3.1.2
    4. sftp服务器
  2. ftp文件内容

1
2
3
4
5
user_id#user_name#create_time#province_id
1#张三#2023-06-24 19:59:23#11
2#李四#2023-06-23 18:50:20#11
3#王五#2023-06-24 19:59:23#12
4#赵六#2023-06-24 20:03:01#11
  1. hive建表sql
1
2
3
4
5
6
7
8
9
create table if not exists user_info(
user_id string comment '用户id',
user_name string comment '用户名',
create_time timestamp comment '创建时间'
) comment '用户信息'
partitioned by (province_id string, create_day date)
row format delimited
fields terminated by '#'
location '/hive/warehouse/dev.db/user_info';

2.3 seatunnel配置文件编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 5000
execution.checkpoint.data-uri = "hdfs://192.168.10.222:9000/checkpoint"
}

source {
# https://seatunnel.apache.org/docs/connector-v2/source/FtpFile
SftpFile {
path = "/opt/upload/user_info.txt"
host = "192.168.10.222"
port = 22
user = hadoop
password = hadoop
file_format_type = "text"
schema = {
fields {
user_id = string
user_name = string
create_time = TIMESTAMP
province_id = string
}
}
delimiter = "#"
datetime_format = "yyyy-MM-dd HH:mm:ss"
skip_header_row_number = 1
result_table_name = tmp
}
}

transform {
# https://seatunnel.apache.org/docs/transform-v2/sql
# https://seatunnel.apache.org/docs/transform-v2/sql-functions
Sql {
source_table_name = "tmp"
result_table_name = "tmp2"
query = "select user_id, user_name, create_time, TO_CHAR(create_time, 'yyyy-MM-dd') create_day, province_id from tmp"
}
}

sink {
# https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
HdfsFile {
source_table_name = "tmp2"
fs.defaultFS = "hdfs://192.168.10.222:9000"
path = "/hive/warehouse/dev.db/user_info"
file_format = "text"
field_delimiter = "\t"
have_partition = true
partition_by = ["province_id","create_day"]
partition_dir_expression = "${k0}=${v0}/${k1}=${v1}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["user_id","user_name","create_time","create_day","province_id"]
is_enable_transaction = true
}
}

2.4 多种方式加载数据到hive

  1. 上传数据到指定分区目录后修复,将没有创建分区的目录创建对应分区
1
msck repair table user_info;
  1. 事先/是否创建分区即可
1
2
3
alter table user_info add partition(province_id='11',create_day='2023-06-23');
alter table user_info add partition(province_id='11',create_day='2023-06-24');
alter table user_info add partition(province_id='12',create_day='2023-06-24');
  1. 手动执行load语句load数据
1
load data inpath 'hdfs://192.168.10.222:9000/hive/warehouse/dev.db/user_info/province_id=11/create_day=2023-06-23/	T_724967131367604225_a452435c6e_0_1_2023.06.24_0.txt' into table user_info partition(province_id='11',create_day='2023-06-23');

2.5 执行结果

  1. seatunnel执行结果
    https://cdn.jsdelivr.net/gh/saber599/image@main/blog_images/seatunnel测试用例/be57584da985150eec5d47fa5dfa0a82.4i8swjae99c0.webp
    seatunnel执行结果

  2. hdfs结果

截图

截图

截图

截图

  1. hive查询结果

截图

截图

3. kafka->hive

3.1 场景说明

将kafka队列数据同步到hive分区表中,根据消息体字段动态保存。

3.2 准备

  1. 环境准备

    1. seatunnel-2.3.1
    2. hadoop-3.3.2
    3. hive-3.1.2
    4. kafka_2.11-0.11.0.1
  2. kafka内容

1
2
3
4
{"user_id":"1","user_name":"张三","create_time":"2023-06-24 19:59:23","province_id":"12"}
{"user_id":"2","user_name":"李四","create_time":"2023-06-23 18:50:20","province_id":"11"}
{"user_id":"3","user_name":"王五","create_time":"2023-06-24 19:59:23","province_id":"12"}
{"user_id":"4","user_name":"赵六","create_time":"2023-06-24 20:03:01","province_id":"11"}
  1. hive建表sql
1
2
3
4
5
6
7
8
9
create table if not exists user_info_kafka(
user_id string comment '用户id',
user_name string comment '用户名',
create_time timestamp comment '创建时间'
) comment '用户信息'
partitioned by (province_id string, create_day date)
row format delimited
fields terminated by '#'
location '/hive/warehouse/dev.db/user_info_kafka';

3.3 seatunnel配置文件编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "STREAMING"
# job.mode = "BATCH"
checkpoint.interval = 5000
execution.checkpoint.data-uri = "hdfs://192.168.10.222:9000/checkpoint"
}

source {
# https://seatunnel.apache.org/docs/connector-v2/source/kafka
Kafka {
result_table_name = "tmp"
schema = {
fields {
user_id = string
user_name = string
create_time = string
province_id = string
}
}
# consumer.group = "test_group"
# format = text
datetime_format = "yyyy-MM-dd HH:mm:ss"
format = json
# field_delimiter = "#"
topic = "user_info_topic"
bootstrap.servers = "192.168.10.222:9092"
# kafka.config = {
# client.id = client_1
# max.poll.records = 500
# auto.offset.reset = "earliest"
# enable.auto.commit = "false"
# }
}
}

transform {
# https://seatunnel.apache.org/docs/transform-v2/sql
# https://seatunnel.apache.org/docs/transform-v2/sql-functions
Sql {
source_table_name = "tmp"
result_table_name = "tmp2"
query = "select user_id, user_name, create_time, SUBSTR(create_time, 0,10) create_day, province_id from tmp"
}
}

sink {
# https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
HdfsFile {
source_table_name = "tmp2"
fs.defaultFS = "hdfs://192.168.10.222:9000"
path = "/hive/warehouse/dev.db/user_info_kafka"
file_format = "text"
field_delimiter = "\t"
have_partition = true
partition_by = ["province_id","create_day"]
partition_dir_expression = "${k0}=${v0}/${k1}=${v1}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["user_id","user_name","create_time","create_day","province_id"]
is_enable_transaction = true
}
}

3.4 发送kafka消息

  1. 启动zk
1
nohup bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup.out &
  1. 启动kafka
1
nohup bin/kafka-server-start.sh config/server.properties >> producer.nohup.out &
  1. 创建topic
1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user_info_topic
  1. 发送消息
1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user_info_topic

截图

3.5 多种方式加载数据到hive

同2.4

3.6 执行结果

  1. seatunnel执行结果

截图

  1. hdfs结果

截图

截图

截图

  1. hive查询结果

截图

4. rdb->hive

4.1 场景说明

将关系型数据库数据同步到hive分区表中,根据消息体字段动态保存,示例选用mysql->hive。

4.2 准备

  1. 环境准备
    1. seatunnel-2.3.1
    2. hadoop-3.3.2
    3. hive-3.1.2
    4. mysql-8.0.32
  2. mysql建表sql
1
2
3
4
5
6
7
CREATE TABLE `user_info`  (
`user_id` bigint NOT NULL AUTO_INCREMENT COMMENT '用户id',
`user_name` varchar(255) NOT NULL COMMENT '用户名',
`create_time` datetime NOT NULL COMMENT '创建时间',
`province_id` bigint NOT NULL COMMENT '省份id',
PRIMARY KEY (`user_id`)
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COMMENT = '用户信息';
  1. mysql表内容
    1
    2
    3
    4
    5
    INSERT INTO `user_info` (`user_id`, `user_name`, `create_time`, `province_id`) VALUES (1, '张三', '2023-06-24 19:59:23', 12);
    INSERT INTO `user_info` (`user_id`, `user_name`, `create_time`, `province_id`) VALUES (2, '李四', '2023-06-23 18:50:20', 11);
    INSERT INTO `user_info` (`user_id`, `user_name`, `create_time`, `province_id`) VALUES (3, '王五', '2023-06-24 19:59:23', 12);
    INSERT INTO `user_info` (`user_id`, `user_name`, `create_time`, `province_id`) VALUES (4, '赵六', '2023-06-24 20:03:01', 11);

  2. hive建表sql
1
2
3
4
5
6
7
8
9
create table if not exists user_info_mysql(
user_id string comment '用户id',
user_name string comment '用户名',
create_time timestamp comment '创建时间'
) comment '用户信息'
partitioned by (province_id string, create_day date)
row format delimited
fields terminated by '#'
location '/hive/warehouse/dev.db/user_info_mysql';

4.3 seatunnel配置文件编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 5000
execution.checkpoint.data-uri = "hdfs://192.168.10.222:9000/checkpoint"
}

source {
# https://seatunnel.apache.org/docs/connector-v2/source/Mysql
Jdbc {
url = "jdbc:mysql://192.168.10.222:3306/seatunnel_test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "root"
query = "select user_id,user_name,create_time,date_format(create_time, '%Y-%m-%d') create_day,province_id from user_info"
}
}

sink {
# https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile
HdfsFile {
fs.defaultFS = "hdfs://192.168.10.222:9000"
path = "/hive/warehouse/dev.db/user_info_mysql"
file_format = "text"
field_delimiter = "\t"
have_partition = true
partition_by = ["province_id","create_day"]
partition_dir_expression = "${k0}=${v0}/${k1}=${v1}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["user_id","user_name","create_time","create_day","province_id"]
is_enable_transaction = true
}
}

4.4 多种方式加载数据到hive

同2.4

4.5 执行结果

  1. seatunnel执行结果

截图

  1. hdfs结果

截图

截图

截图

  1. hive查询结果

截图

5. 结论

  1. 支持sftp、kafka、rdb数据源
  2. 支持根据字段动态分区
  3. kafka支持离线、实时处理
  4. sftp不支持增量扫描
  5. sftp不支持编码转换
  6. sftp不支持文件级check、解码、回传