当我们对 Flink 有了初步的了解,往往就更加在意对资源消耗的把握;Flink 真正执行任务的组件是 TaskManage,很显然 TaskManage 的计算资源是有限的,并不是所有的任务都放在一个 TaskManage 上并行执行,对于一个 TaskManage 到底能并行多少个任务需要对资源做出明确的划分,这就是 task slots(任务槽)。每个 TaskManage 会有若干个 task slots,这些 slot 会平分 TaskManage 上的资源(主要是内存),简单地说就是一个 TaskManage 上有多少了 task slots 就可以并行多少个任务。
很多小伙伴在初学 Flink 的时候(standalone模式)会经常遇到因资源不够导致任务提交失败,得到的回复基本都是"你的slots不够了",那就会产生一个问题,如何计算一个任务需要多少个 slot 呢?有经验的人会给你总结出一个规律:任务需要的 task slots 数等于你的并行度。这句话到底是怎么来的,下面来推给你看。
一、基本概念
1.1 数据流图
所有的 Flink 程序都可以归纳为三个部分组成:source、transform、sink
- source:源算子,负责数据的读取
- transform:转换算子,负责对数据进行各种的处理加工
- sink:下沉算子,负责数据输出
在运行时,Flink 程序会被映射成所有算子按照逻辑顺连接在一起的一张图,在 Flink 自带的 web 页面会看到对应的数据流图(dataflow),如:
package tech.kpretty.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.处理参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
// 3.定义数据源
DataStreamSource<String> socketTextStream = env.socketTextStream(parameterTool.get("host"), parameterTool.getInt("port"));
// 4.数据扁平化
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socketTextStream.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (s, collector) -> {
String[] words = s.split(",");
for (String word : words) {
collector.collect(Tuple2.of(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
// 5.分组
KeyedStream<Tuple2<String, Integer>, String> keyBy = flatMap.keyBy(data -> data.f0);
// 6.求和
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);
// 7.打印
sum.print();
// 8.启动程序
env.execute();
}
}
这是一个 wordcount 案例,提交到 Flink 集群并设置并行度为1会得到如下的一张 dataflow
当前任务提交会发现消耗了 1 个 task slots
同时 Running Job List 列表展示出当前任务的 Tasks 为 2|2 这是什么意思,继续看
1.2 并行度和并行子任务
在大数据场景下,我们依靠分布式架构做并行计算,从而提高数据的吞吐量。例如 MR、Spark 以及现在的 Flink,对于 Spark 来说是将程序生成的 DAG 划分成一个个 stage,前一个阶段处理完发送给下一个阶段其架构思想是"移动计算",但对于 flink 来说这种思想显然不适合流式数据,因为对于源源不断的流式数据不可能等到前一个阶段处理完再处理下一个阶段,往往是 sink 在输出数据 3、transform 在加工数据 2、source 在读取数据 1,因此 Flink 的架构思想是"移动数据"。
那么对于 Flink 来说提高并行度就不再是将一个任务分发给多个 TaskManage 那么简单了,Flink 是将一个算子操作复制多份到不同节点来实现任务的并行,一个算子被拆分成多个并行的算子我么称为并行子任务(subtask),在程序运行过程中这些子任务在不同的线程、不同的物理机或不同的容器中完全独立执行。而上面的 Tasks 就等于所有算子的 subtask 之和,下面我们来抽象一下计算 Tasks 的方法。
对于上述抽象出来的数据流图 Tasks 等于 5,因为 source 的 subtask 为 2、transform 的 subtask 为 2、sink 的 subtask 为 1,因此 Tasks = 2 + 2 + 1 = 5。此时程序的并行度为 2
至于 Flink 的并行度设置方式就不在赘述,大致分为:配置文件指定、任务提交时 -p 指定、程序 env 设置全局并行度、算子单独设置并行度;这几种方式优先级依次递增,后者会覆盖前者。
根据上面总结的方法尝试计算一下 1.1 提交的任务的 Tasks 吧!因为提交时包括代码都没有指定并行度,因此读取配置文件默认并行度为 1,然后程序涉及的算子操作由 source、flatMap、sum、sink,因此计算出来的 Tasks 应该是 4 ≠ 2(注:keyBy 不是算子操作,最简单判断算子操作就是看返回值类型[包括继承的父类]是否是 xxxOperator),难道计算方式错了吗?继续往下看
1.3 算子链
结合 1.1 的 dataflow 和 1.2 抽象出来的图发现在 wordcount 程序中 dataflow 和代码的算子并不是一一对应的,很明显 dataflow 将若干个算子合并成了一个"大任务"。在了解这一过程需要先了解 Flink 算子见的数据传输方式:
一对一(one-to-one、forwarding):这种模式下,数据流维护着分区以及元素的顺序,不需要对数据进行重分区,类比 spark 的窄依赖
重分区(redistributing):这种模式下,数据流的分区会发生改变,改变的方式随算子不同而不同如:轮训、hash、广播等,类比 spark 的宽依赖
Flink 对算子操作采用了称为任务链的优化技术,在特定条件下减少本地通信的开销,对满足条件的若干个算子连接起来形成算子链(Operator Chains)形成一个 task,原先的算子称为里面的 subtask,对于形成算子链的条件如下:
- 并行度相同
- one-to-one 操作
上述两个条件必须全部满足才能形成算子链
因此对于 1.2 抽象的图 source 和 transform 可以组成一个算子链形成一个 task,因此优化后只需要 3 个 Tasks,运用这个方式重新计算 wordcount 所需要的 Tasks 就是 source、flatmap 形成一个算子链并行度为 1,keyBy和sum 会被翻译成 keyed aggregate 操作和 sink 形成一个算子链并行度为 1,flatmap 和 sum 中间因为经历了 hash 因此此程序最终需要的 Tasks 数为 2
二、对算子链的魔幻操作
根据算子链结合的条件并综合各算子特性我们可以做出一些不一样的操作,针对 1.1 相同的 wordcount 任务,我们修改它的并行度为 8,看看它的 dataflow 会有什么变化
因为 socketTextStream 底层实现的并行度永远是 1,因此 flatmap 被独立出来了;在任务提交之前我们尝试计算一下这种方式需要几个 Tasks 数和 task slots 数吧!Tasks = 1 + 8 + 8 = 17,task slots = 8
截图为证
同时 Flink 还提供了在满足条件下自定义算子链的方法,比如在并行度为 1 的情况下,sink 和 flatmap 结合成一个算子链,那么在一些特定场合下我们需要将 flatmap 独立出来。见下面代码
package tech.kpretty.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.处理参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
// 3.定义数据源
DataStreamSource<String> socketTextStream = env.socketTextStream(parameterTool.get("host"), parameterTool.getInt("port"));
// 4.数据扁平化
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socketTextStream.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (s, collector) -> {
String[] words = s.split(",");
for (String word : words) {
collector.collect(Tuple2.of(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT)).disableChaining()
.map((MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>) value -> value).returns(Types.TUPLE(Types.STRING, Types.INT))
.map((MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>) value -> value).returns(Types.TUPLE(Types.STRING, Types.INT));
// 5.分组
KeyedStream<Tuple2<String, Integer>, String> keyBy = flatMap.keyBy(data -> data.f0);
// 6.求和
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1).startNewChain();
// 7.打印
sum.print();
// 8.启动程序
env.execute();
}
}
注:disableChaining() 关闭此算子的链接,即与之前和之后算子断开可能发生的连接,dataflow 图如下
Flink 还提供新起一个算子链的方式,例如 flatmap 需要和 source 断开但需要和 map 组成新链,因为 disableChaining 会断开前后
见下面代码
package tech.kpretty.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.处理参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
// 3.定义数据源
DataStreamSource<String> socketTextStream = env.socketTextStream(parameterTool.get("host"), parameterTool.getInt("port"));
// 4.数据扁平化
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socketTextStream.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (s, collector) -> {
String[] words = s.split(",");
for (String word : words) {
collector.collect(Tuple2.of(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT)).startNewChain()
.map((MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>) value -> value).returns(Types.TUPLE(Types.STRING, Types.INT))
.map((MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>) value -> value).returns(Types.TUPLE(Types.STRING, Types.INT));
// 5.分组
KeyedStream<Tuple2<String, Integer>, String> keyBy = flatMap.keyBy(data -> data.f0);
// 6.求和
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1).startNewChain();
// 7.打印
sum.print();
// 8.启动程序
env.execute();
}
}
dataflow 如下:
三、计算 Task Slots
结合上面所有的知识点,我们就可以计算一个任务需要的 task slots 数了,当我们知道一个任务需要多少个 Tasks 后,最简单的方式就是一个 task 放到一个 slot 进行运算,那么这样的方式就是 task slots = Tasks ,但实际情况并不是这样显然 Flink 对此还做了优化。
首先需要明白一个 task 一个 slot 有什么缺点,缺点就是资源浪费。例如对数据的读取和简单的转换往往在极短的时间就完成了,而 window 操作则需要较长的时间,这就导致 slot 忙的忙死闲的闲死,同时 source 还会受限下游对的处理能力(背压机制)。因此这样的方式不是一个很好的分配方案。
Flink 处理上述问题的方案就是允许 slot 共享,具体分配方案就是 task 和 slot 一字排开,task 和 task 之间允许共享一个 slot,随便拿一个 dataflow 为例来解释
上述有 9 个 task,为了方便画图给他们一个名字:source[1]、transform[1-4]、sink[1-4],他们的 slot 分配方案如下
这样还有一个好处就是,一定存在一个 slot 中运行这一个任务的全流程,间接的也提高了程序的健壮性。
同时 Flink 还提供了自定义共享组的概念,默认情况下所有的算子都属于一个 slot 共享组:default,也就是说默认情况下是允许 slot 共享的,如果在一些特殊的场合需要将某一个算子或者 task 独占一个 slot 需要怎么设置呢?
看下面代码
package tech.kpretty.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.处理参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
// 3.定义数据源
DataStreamSource<String> socketTextStream = env.socketTextStream(parameterTool.get("host"), parameterTool.getInt("port"));
// 4.数据扁平化
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = socketTextStream.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (s, collector) -> {
String[] words = s.split(",");
for (String word : words) {
collector.collect(Tuple2.of(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT)).slotSharingGroup("slot-group-1");
// 5.分组
KeyedStream<Tuple2<String, Integer>, String> keyBy = flatMap.keyBy(data -> data.f0);
// 6.求和
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);
// 7.打印
sum.print();
// 8.启动程序
env.execute();
}
}
注:slotSharingGroup 设置算子的共享组,默认是 default,只有相同的共享组才可能实现 slot 共享,同时只需要设置一遍,下面的算子自动继承,除非是下面的算子也想独占才需要再次指定
dataflow如下:
默认情况下这个任务需要 1 个 task slots,提交当前任务
实际上消耗了 2 个 task slots,因为 source 在 default 组,后面的算子在 slot-group-1 组,也就是 source 独占 slot 需要 1 个,后面算子可以共享消耗了 1。到这里是不是就明白了 Flink 任务资源的消耗问题了!!!
课后作业:考虑一下如何让一个算子独占一个slot,例如:wordcount 案例 source -> flatmap -> keyBy -> sum -> sink,在并行度为 1 的情况下消耗 4 个 slot。
评论区