普通视图

发现新文章,点击刷新页面。
昨天 — 2025年8月18日首页

「Flink」业务搭建方法总结

作者 淡酒交魂
2025年8月18日 16:56

1.  合理设置并行度和TaskManager 的任务槽数

1.1 核心概念:

1.  并行度:指 Flink 作业中特定算子(Operator)或整个作业的执行并行实例(即子任务)的数量。例如,map 算子的并行度为 5,意味着这个 map 操作会被拆分成 5 个完全相同的任务,同时在集群的不同地方处理数据流的不同分区。

2.  JobManager: Flink 集群的管理节点,负责调度作业、协调检查点、故障恢复等

3.  TaskManager: Flink 的工作节点(Worker Node)。每个 TaskManager 是一个独立的 JVM 进程,负责执行实际的任务(即算子子任务),Slot 是 TM 上执行任务的基本资源单元。

4.  任务槽(slot): Flink 集群(如 TaskManager)中的基本资源单元。每个 TaskManager 是一个 JVM 进程,它可以提供一定数量的任务槽。一个任务槽可以执行一个算子并行度实例(即一个子任务)。JM 管理 Slot 的分配,TM 提供 Slot 的实际执行环境

1.2 并行度与资源的关系

1.  并行度决定所需任务槽总数:

● 作业中所有算子并行度实例的总和(即整个作业图的所有子任务)必须小于或等于集群中可用任务槽的总数。

● 总子任务数 = 所有算子并行度实例之和 <= 总可用任务槽数 = TaskManager 数量 * 每个 TaskManager 的任务槽数

● 开启 Slot Sharing 时总槽数 = 所有算子中最大并行度值 (或关键路径所需槽数)(Flink 作业运行时,所有算子子任务会被分配到槽位。通常以作业图中 最宽算子 的并行度作为总槽数需求,因为 Flink 会尝试 Slot Sharing 将多个算子子任务链化到同一个槽位)

● 示例:

○ 作业含 Source(并行度=4) → Map(并行度=8) → Sink(并行度=2)

○ 实际所需槽数 = max(4,8,2) = 8(开启 Slot Sharing 时)

● 更高的并行度需要更多的任务槽

2.  任务槽需要资源(CPU 和内存):

● 任务槽需要运行在一个 TaskManager JVM 进程中

● 每个 TaskManager 配置了CPU 核心数和内存

● 任务槽的资源占用: 每个任务槽会占用其所在 TaskManager 的一部分 CPU 和内存资源。

● 结论:更多的任务槽意味着每个任务槽分得的 CPU 时间片和内存(尤其是用户代码内存)更少

3.  关系总结:

并行度↑ → 所需任务槽总数↑ → 所需 TaskManager 数量↑ 或 每个 TaskManager 的任务槽数↑

每个 TaskManager 的任务槽数↑ → 每个任务槽可用的 CPU 资源↓ (竞争加剧)

每个 TaskManager 的任务槽数↑ → 每个任务槽可用的用户堆内存↓

每个 TaskManager 的任务槽数↑ → 每个任务槽可用的用户堆外内存↓

每个 TaskManager 的任务槽数↑ → 共享内存区域(网络、托管)压力↑

1.3 常见问题与陷阱:

● 并行度过高,任务槽不足: 作业无法启动(NoResourceAvailableException)。

● 每个 TaskManager 任务槽数过多:

○ CPU 不足: 线程竞争激烈,CPU 利用率达到 100%,但吞吐量不增反降,延迟增大。

○ 内存不足: 每个槽分到的内存太少,导致用户代码频繁 GC 或 OutOfMemoryError (Java Heap Space)。

○ 网络内存不足: 导致反压加剧,吞吐量下降。

● 每个 TaskManager 任务槽数过少: 资源利用率低(CPU、内存闲置),成本高

2.  数据输入源

2.1 数据输入源设置uid

dataSource输入数据源默认都要设置uid,方便后续Checkpoint启动系统可以使用同一个uid,避免发送因输入源uid由系统随机产生,而后续更新无法使用Checkpoint启动,导致数据紊乱

2.2 uid和checkpoint的关系

● 状态恢复的桥梁: 当 Flink 从 Checkpoint/Savepoint 恢复作业时,它需要知道如何将 Checkpoint 里保存的状态数据“分配”给新运行的作业拓扑中的哪个算子实例。uid 就是这个分配的匹配依据。

● 匹配过程:

○ 恢复作业时,Flink 会读取 Checkpoint/Savepoint 的元数据,其中记录了每个状态片段对应的算子 uid。

○ 启动新的作业实例(可能是修改后的代码版本)。

○ Flink 将新作业拓扑中具有相同 uid 的算子与Checkpoint 中保存的对应 uid 的状态进行匹配。

○ 匹配成功,则该算子的状态从 Checkpoint 中恢复。

○ 匹配失败(找不到对应 uid 的算子),则根据配置(allowNonRestoredState)决定是失败还是忽略该部分状态继续启动

2.3 数据源处理和流转

多数据源处理时,建议所有数据源优先根据各自数据源的业务逻辑(如:联表查询字段)查出业务所需数据,最后各数据源整合为统一数据格式,进行最终的业务合并计算和处理

3.  流水线

3.1 系统时间(TumblingProcessingTimeWindows):

以Flink系统接收到这批数据的时间为准,通常与业务系统产生这批数据的实际时间有一定的时间差

3.2 事件时间(TumblingEventTimeWindows):

取业务数据中某个时间字段值作为流水线标准,相对于系统时间会更为精准计算业务数据

3.3 流水线设置

将所有数据源整合为统一数据格式后,可以以数据格式中的时间字段设为统一流水线,确保所有数据源合并(union)后使用统一流水线进行输出

WatermarkStrategy<InputModel> watermarkStrategy = WatermarkStrategy
                //表示允许的最大乱序时间为 5 秒
                .<InputModel>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                // 取InputModel中time字段数据作为流水线
                .withTimestampAssigner((o, t) -> {
                    return o.getTime();
                })
                //表示如果某分区5秒没有数据,则标记为空闲
                .withIdleness(Duration.ofSeconds(5));
3.4 流水线使用
dataSource
    .union(otherDataSource1, otherDataSource2, otherDataSource3)
     // 设置流水线
    .assignTimestampsAndWatermarks(watermarkStrategy)
    // 根据Key进行分区
    .keyBy(InputModel::getKey)
    // 设置流水线窗口大小  5秒为一个窗口
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    // 业务数据计算处理  Integer为Key的数据类型
    .process(new ProcessWindowFunction<InputModel, OutputModel, Integer, TimeWindow>())

4.  数据业务处理

主要数据通过collect输出, 次要数据通过sideOut侧输出流输出

// 定义侧输出流
OutputTag<OutputModel> outputTag = new OutputTag<OutputModel>("public") {};
 
SingleOutputStreamOperator<OutputModel> process = dataSource
    .union(otherDataSource1, otherDataSource2, otherDataSource3)
     // 设置流水线
    .assignTimestampsAndWatermarks(watermarkStrategy)
    // 根据Key进行分区
    .keyBy(InputModel::getKey)
    // 设置流水线窗口大小  5秒为一个窗口
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    // 业务数据计算处理  Integer为Key的数据类型
    .process(.process(new ProcessWindowFunction<InputModel, OutputModel, Integer, TimeWindow>() {
    @Override
    public void process(Integer key, ProcessWindowFunction<InputModel, OutputModel, Integer, TimeWindow>.Context context, Iterable<InputModel> iterable, Collector<OutputModel> collector) throws Exception {
        // 获取窗口结束时间点
        long end = context.window().getEnd();
        LocalDateTime windowEndTime = Instant.ofEpochMilli(end).atZone(ZoneOffset.ofHours(8)).toLocalDateTime();
    
        // 业务逻辑计算
        
        // 侧输出流统计
        contect.output(outputTag, new OutPutModel(xxx, xxx, xxx, xxx));
        
        // 主要统计数据
        collector.collect(new OutPutModel(xxx,xxx,xxx, xxx));
    }
});

5.  数据输出

5.1 数据Sink输出

使用JDBC连接池进行连接提交至数据库,建议继承AbstractSink<OutPutModel>类,进行连接池资源共用,减少资源浪费

示例:

OutputSink

public class OutputSink extends AbstractSink<OutputModel> {
    //SQL
    private static final String OUTPUT_SQL = "insert into table_name(id,price,window_end_time,create_time) values(?,?,?,?)";
 
    @Override
    public void invoke(GMVResultOutput value, SinkFunction.Context context) throws Exception {
        //  获取写入数据库连接资源
        Connection imsConn = connManager.getImsConnection();
        PreparedStatement outputStmt = null;
 
        try {
            outputStmt = imsConn.prepareStatement(OUTPUT_SQL);
            outputStmt.setLong(1, IdUtil.nextId());
            outputStmt.setBigDecimal(2, value.getPrice());
            outputStmt.setTimestamp(3, java.sql.Timestamp.valueOf(value.getWindowEndTime()));
            outputStmt.setTimestamp(4, java.sql.Timestamp.valueOf(LocalDateTime.now()));
            int i = outputStmt.executeUpdate();
 
            System.out.println("写入数据成功:" + i + "条");
 
        } catch (Exception e) {
            e.printStackTrace();
 
        } finally {
            closeResources(outputStmt);
            if (imsConn != null) {
                imsConn.close();
            }
        }
    }
}

AbstractSink<I>

public abstract class AbstractSink<I> extends RichSinkFunction<I> {
 
    protected transient JdbcConnectionManager connManager;
 
    @Override
    public void open(Configuration parameters) throws Exception {
        connManager = new JdbcConnectionManager();
        connManager.open(); // 初始化连接
    }
 
    @Override
    public void close() throws Exception {
        connManager.close(); // 关闭连接
    }
 
    // 辅助方法:关闭资源
    protected void closeResources(AutoCloseable... resources) {
        for (AutoCloseable res : resources) {
            if (res != null) {
                try { res.close(); } catch (Exception e) { /* Ignore */ }
            }
        }
    }
}
5.2 连接池

数据库连接使用连接池进行系统统一管理

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
 
import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
 
public class JdbcConnectionManager implements Serializable {
    private transient HikariDataSource omsDataSource;
    private transient HikariDataSource imsDataSource;
 
    public void open() throws SQLException {
        // MySQL连接池配置(OMS)
        HikariConfig omsConfig = new HikariConfig();
        omsConfig.setJdbcUrl(OmsConstant.JDBC_URL);
        omsConfig.setUsername(OmsConstant.MYSQL_USER_NAME);
        omsConfig.setPassword(OmsConstant.MYSQL_PASSWORD);
        omsConfig.setMaximumPoolSize(20);       // 最大连接数(按需调整)
        omsConfig.setMinimumIdle(5);            // 最小空闲连接
        omsConfig.setConnectionTimeout(2000);    // 连接超时2秒
        omsConfig.setIdleTimeout(30000);        // 空闲超时30秒
        omsDataSource = new HikariDataSource(omsConfig);
 
        // MySQL连接池配置(IMS)
        HikariConfig imsConfig = new HikariConfig();
        imsConfig.setJdbcUrl(ImsConstant.IMS_JDBC_URL);
        imsConfig.setUsername(ImsConstant.IMS_USER_NAME);
        imsConfig.setPassword(ImsConstant.IMS_PASSWORD);
        imsConfig.setMaximumPoolSize(20);       // 最大连接数(按需调整)
        imsConfig.setMinimumIdle(5);            // 最小空闲连接
        imsConfig.setConnectionTimeout(2000);    // 连接超时2秒
        imsConfig.setIdleTimeout(30000);        // 空闲超时30秒
        imsDataSource = new HikariDataSource(imsConfig);
    }
 
    // 关闭连接池
    public void close() {
        if (omsDataSource != null) {
            omsDataSource.close();
        }
        if  (imsDataSource != null) {
            imsDataSource.close();
        }
    }
 
    // 从连接池获取连接(非物理关闭)
    public Connection getOmsConnection() throws SQLException {
        return omsDataSource.getConnection();
    }
 
    public Connection getImsConnection() throws SQLException {
        return imsDataSource.getConnection();
    }
}
5.3 数据输出源设置uid

业务数据处理好之后,使用Sink进行输出,输出时与输入源一样,需设置uid,以确保CheckPoint启动时可以正常启动

//  初始化outputSink
OutputSink outputSink = new OutputSink();
 
 
// 明细
process.getSideOutput(outputTag)
    .addSink(outputSink).name("side output").setParallelism(1).uid("side output Sink");
// 总和
process
    .addSink(outputSink).name("total output").setParallelism(1).uid("total output Sink");

6. 总结

以上的一些方法是近期基于业务的开发中遇到的一些坑点后,总结出来的一套相对比较完善的业务开发方法,便于后续Flink实时计算业务数据使用。如果大家有更好的建议和方法,也欢迎共同讨论学习!

❌
❌