本文所用flink版本为1.6,官方文档链接Apache Flink 1.6,本文所参考的文档均在正文中每个部分直接详细指出了,就不再汇总了。
Flink运行时环境
Tasks和Operator Chains
- 基础说明
在实际的分布式计算环境中,Flink会将多个运算子任务( operator subtasks)链接一起(chains)形成分布式计算任务(tasks)。每个任务(task)在一个线程中执行。
优点:将运算符(operators)链接成计算任务(tasks)中,形成Operator Chains对于系统性能的提升有很大的帮助:- 降低了线程间的切换
- 减少了数据在与缓冲区的开销
- 减少消息的序列化/反序列化
- 在降低延时的同时减少了系统的总体吞吐量
我们可以对这种chain操作进行配置,具体内容请参考chaining docs。
如下图是官网所示的数据流图包含五个子任务,也就是说其中有五个并行线程:
- 形成Operator Chains的条件:
上述描述了Tasks和Operator Chains的概念,下面以经典的WordCount为例,看下Operator Chains的形成。此部分结合Flink 原理与实现:理解 Flink 中的计算资源整理。
下面这幅图,展示了Source并行度为1,FlatMap、KeyAggregation、Sink并行度均为2,最终以5个并行的线程来执行的优化过程。
上图中将KeyAggregation和Sink两个operator进行了合并,因为这两个合并后并不会改变整体的拓扑结构。但是,并不是任意两个 operator 就能 chain 一起的。还是需要一定的条件:- 没有禁用Chain
- 上下游算子并行度一致
- 下游算子的入度为1(也就是说下游节点没有来自其他节点的输入)
- 上下游算子都在同一个slot group中(文章后续会解释slot group)
- 上下游算子之间没有shuffle(两个算子间数据分区方式是forward)
- 下游的chain策略为ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
- 上游的chain策略为ALWAYS或HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
Operator chain的行为可以通过编程API中进行指定。可以通过在DataStream的operator后面(如someStream.map(..))调用startNewChain()来指示从该operator开始一个新的chain(与前面截断,不会被chain到前面)。或者调用disableChaining()来指示该operator不参与chaining(不会与前后的operator chain一起)。在底层,这两个方法都是通过调整operator的 chain 策略(HEAD、NEVER)来实现的。另外,也可以通过调用StreamExecutionEnvironment.disableOperatorChaining()来全局禁用chaining。参考官网APITask chaining and resource groups。
- 原理与实现(转)
转载自Flink 原理与实现:理解 Flink 中的计算资源,此文应该是基于flink1.3,可以参考。
那么 Flink 是如何将多个 operators chain在一起的呢?chain在一起的operators是如何作为一个整体被执行的呢?它们之间的数据流又是如何避免了序列化/反序列化以及网络传输的呢?下图展示了operators chain的内部实现:
如上图所示,Flink内部是通过OperatorChain这个类来将多个operator链在一起形成一个新的operator。OperatorChain形成的框框就像一个黑盒,Flink 无需知道黑盒中有多少个ChainOperator、数据在chain内部是怎么流动的,只需要将input数据交给 HeadOperator 就可以了,这就使得OperatorChain在行为上与普通的operator无差别,上面的OperaotrChain就可以看做是一个入度为1,出度为2的operator。所以在实现中,对外可见的只有HeadOperator,以及与外部连通的实线输出,这些输出对应了JobGraph中的JobEdge,在底层通过RecordWriterOutput来实现。另外,框中的虚线是operator chain内部的数据流,这个流内的数据不会经过序列化/反序列化、网络传输,而是直接将消息对象传递给下游的 ChainOperator 处理,这是性能提升的关键点,在底层是通过 ChainingOutput 实现的,源码如下方所示:
注:HeadOperator和ChainOperator并不是具体的数据结构,前者指代chain中的第一个operator,后者指代chain中其余的operator,它们实际上都是StreamOperator。private static class ChainingOutput<T> implements Output<StreamRecord<T>> { // 注册的下游operator protected final OneInputStreamOperator<T, ?> operator; public ChainingOutput(OneInputStreamOperator<T, ?> operator) { this.operator = operator; } @Override // 发送消息方法的实现,直接将消息对象传递给operator处理,不经过序列化/反序列化、网络传输 public void collect(StreamRecord<T> record) { try { operator.setKeyContextElement1(record); // 下游operator直接处理消息对象 operator.processElement(record); } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } } ... }
作业管理器,任务管理器与客户端-Job Managers, Task Managers, Clients
- Flink运行时环境由两种类型进程组成:
- JobManagers-作业管理器(也称为master)
用于协调程序的分布式执行。它的主要功能是调度任务(tasks),协调checkpoint,故障恢复等。
每个Flink环境中只有一个JobManagers。高可用设计中会包含多个JobManagers,其中一个是leader,其他standby。 - TaskManagers-任务管理器(也称为worker)
用于执行数据流图(dataflow)的任务(tasks)(更准确地说,是计算子任务(subtasks)),并对数据流进行缓冲、交换。
每个Flink环境中至少包含一个TaskManagers。
- JobManagers-作业管理器(也称为master)
- JobManagers和TaskManagers可以以多种方式启动:
- 直接在机器上作为独立群集(standalone cluster)
- 容器中启动
- YARN或Mesos等资源框架来管理
启动之后,TaskManagers会连接到JobManagers来宣布自己可用报告自身的状态,便于JobManagers来分配工作。
-
关于客户端(Client)
Client其实并不是运行时及程序执行时的一个组成部分,而是被用来准备和发送的数据流(dataflow)给JobManager的。在发送完数据流图之后,客户端可以选择断开与JobManager的连接,或继续保持连接以接收程序运行的进度报告。Client程序可以以 Java/Scala 程序的形式执行,也可以以命令行的形式(./bin/flink run …)执行。官网运行时架构图如下:
- 小结(转)
本部分内容转载自Flink 原理与实现:架构和拓扑概览,此文章应该是基于flink1.3,可以参考。
当Flink集群启动后,首先会启动一个JobManger和一个或多个的TaskManager。由Client提交任务给JobManager,JobManager再调度任务到各个TaskManager去执行,然后TaskManager将心跳和统计信息汇报给JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的JVM 进程。- Client为提交Job的客户端,可以是运行在任何机器上(与JobManager环境连通即可)。提交Job后,Client可以结束进程(Streaming的任务),也可以不结束并等待结果返回。
- JobManager主要负责调度Job并协调Task做checkpoint,职责上很像Storm的Nimbus。从Client处接收到Job和JAR包等资源后,会生成优化后的执行计划,并以Task的单元调度到各个TaskManager去执行。
- TaskManager在启动的时候就设置好了槽位数(Slot),每个slot能启动一个Task,Task为线程。从JobManager处接收需要部署的Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。
可以看到Flink的任务调度是多线程模型,并且不同Job/Task混合在一个TaskManager进程中。虽然这种方式可以有效提高CPU利用率,但是个人不太喜欢这种设计,因为不仅缺乏资源隔离机制,同时也不方便调试。类似Storm的进程模型,一个JVM中只跑该Job的Tasks实际应用中更为合理。
- 其它
Flink运行时角色间的通信使用akka,数据的传输使用netty。对比spark,从Spark1.3.1版本开始,为了解决大数据块(如shuffle)的传输问题,Spark引入了Netty通信框架,到了1.6.0版本,Netty完全取代了Akka,承担Spark内部所有的RPC通信以及数据流传输。
参考Akka在Flink中的使用剖析。
任务槽与资源-Task Slots and Resources
-
TaskManager和Slot的关系
每个worker(即TaskManager)都是一个独立的JVM进程,可以运行一个或多个子任务(subtask)在其不同的线程中。为了控制worker(TaskManager)接收任务(tasks)的数量,在worker 中引入了任务槽(task slots)的概念(每个worker中至少包含一个任务槽)。
每个任务槽(task slots)代表任务管理器(TaskManager)中一个特定的资源池子集,槽把TaskManager的资源进行平分。例如,如果任务管理器有3个槽,它会为每个槽分配1/3的内存。将资源池槽化可以让子任务(subtask)获取指定容量的内存资源,而避免同其他作业(job)中的子任务(subtask)竞争。注意,这里没有对CPU进行隔离;目前任务槽仅仅用于隔离任务(tasks)的内存。
通过调整任务槽(task slots)的数量,用户可以设定子任务(subtasks)如何相互隔离。如果任务管理器(TaskManager)中只有一个槽,那么每个任务组(task group)都运行在一个独立的JVM中(which can be started in a separate container, for example)。若任务管理器(TaskManager )有多个槽就意味着会有更多的子任务共享同一个JVM。在同一个JVM中的任务会共享 TCP连接(通过多路复用(multiplexing)的方式)和心跳信息,可以减少数据的网络传输,同时他们也会共享数据集和数据结构,一定程度上可以降低每个task的开销。
如上文所述的 WordCount 例子,5个Task可能会在TaskManager的slots中如下图分布,2个TaskManager,每个有3个slot:
-
槽和并行度的关系-任务调度
官网Scheduling。
Flink通过任务槽(Task Slot)定义执行资源,每个TaskManager都有一或多个任务槽,每个任务槽都可以运行一个并行任务流(one pipeline of parallel tasks),一个流(pipeline)包括多个连续的任务,例如一个MapFunction的第n个并行实例与一个ReduceFunction的第n个并行实例的连续任务可以组成一个pipeline。注意,Flink通常会并行的执行连续的任务,对于Streaming程序来说,任何情况都如此执行;而对于batch 程序,多数情况也如此执行。
下图举例说明。由一个data source、一个MapFunction和一个ReduceFunction组成的程序,data source和MapFunction的并发度都为4,而ReduceFunction的并发度为3。一个数据流由Source-Map-Reduce的顺序组成,在具有2个TaskManager,每个TaskManager都有3个Task Slot的集群上运行,则程序执行情况如图所述。
-
slot共享机制-SlotSharingGroup与CoLocationGroup
参考官网和Flink 原理与实现:理解 Flink 中的计算资源。
默认情况下,Flink允许subtasks共享slot,即Flink会允许同一个作业(job)中来自不同的task的多个子任务(subtasks)共享一个槽,即前提是他们来自同一个job,哪怕不同task也可以。这种情况下,有可能会出现某个槽中包含一个完整的作业流水的场景(原文:The result is that one slot may hold an entire pipeline of the job.)。开启这样的slot共享机制主要有两点好处:- Flink集群需要确保job中任务槽的数量和程序最高并发量完全一致,因此不需要去计算一个程序中一共会起多少个task。
- 可以提高资源利用率。如果没有任务槽共享机制,非密集型(non-intensive)的source/map()子任务就会和(intensive)密集型的window子任务一样阻塞大量资源。如果有任务槽共享机制,会提高程序的基础并发量,比如说从2提高到6,就可以让密集型子任务(heavy subtasks)公平的完全分散到任务管理器(TaskManager)中,从而可以显著提高槽的资源利用率充分利用资源。
如下图,我们将WordCount的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组),就可以得到下图所示的slot分布图。首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到密集型操作keyAggregation/sink被平均地分配到各个TaskManager。
a. 关于是如何实现共享slot呢?槽分配的策略?
- SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。
- CoLocationGroup类用来强制将subtasks放到同一个slot中。CoLocationGroup主要用于迭代流中,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。这里我们不会详细讨论CoLocationGroup的实现细节。
b. 怎么判断operator(算子)属于哪个slot共享组呢?(怎么确定一个算子的SlotSharingGroup?)
默认情况下,所有的operator(算子)都属于默认的共享组default,也就是说默认情况下所有的operator都是可以共享一个slot的。而当所有input operators具有相同的slot共享组时,该operator会继承这个共享组。最后,为了防止不合理的共享,用户也能通过API来强制指定operator的共享组,比如:someStream.filter(…).slotSharingGroup(“group1”);就强制指定了filter的slot共享组为group1。(总结:根据input的group和自身是否设置group共同确定)适当的设置可以减少每个slot运行的线程数,从而整体上减少机器的负载。c.原理与实现(转)
那么多个tasks(或者说operators)是如何共享slot的呢?本部分转自Flink 原理与实现:理解 Flink 中的计算资源。
来看一下用来定义计算资源的slot的类。抽象类Slot定义了该槽位属于哪个TaskManager(instance)的第几个槽位(slotNumber),属于哪个Job(jobID)等信息。最简单的情况下,一个slot只持有一个task,也就是SimpleSlot的实现。复杂点的情况,一个slot能共享给多个task使用,也就是SharedSlot的实现。SharedSlot能包含其他的SharedSlot,也能包含SimpleSlot。所以一个SharedSlot能定义出一棵slots树。
接下来我们来看看 Flink 为subtask分配slot的过程。关于Flink调度,有两个非常重要的原则我们必须知道:A.同一个operator的各个subtask是不能呆在同一个SharedSlot中的,例如FlatMap[1]和FlatMap[2]是不能在同一个SharedSlot中的。B.Flink是按照拓扑顺序从Source一个个调度到Sink的。例如WordCount(Source并行度为1,其他并行度为2),那么调度的顺序依次是:Source -> FlatMap[1] -> FlatMap[2] -> KeyAgg->Sink[1] -> KeyAgg->Sink[2]。假设现在有2个TaskManager,每个只有1个slot(为简化问题),那么分配slot的过程如图所示:
注:图中 SharedSlot 与 SimpleSlot 后带的括号中的数字代表槽位号(slotNumber)。- 为Source分配slot。首先,我们从TaskManager1中分配出一个SharedSlot。并从SharedSlot中为Source分配出一个SimpleSlot。如上图中的①和②。
- 为FlatMap[1]分配slot。目前已经有一个SharedSlot,则从该SharedSlot中分配出一个SimpleSlot用来部署FlatMap[1]。如上图中的③。
- 为FlatMap[2]分配slot。由于TaskManager1的SharedSlot中已经有同operator的FlatMap[1]了,我们只能分配到其他SharedSlot中去。从TaskManager2中分配出一个SharedSlot,并从该SharedSlot中为FlatMap[2]分配出一个SimpleSlot。如上图的④和⑤。
- 为Key->Sink[1]分配slot。目前两个SharedSlot都符合条件,从TaskManager1的SharedSlot中分配出一个SimpleSlot用来部署Key->Sink[1]。如上图中的⑥。
- 为Key->Sink[2]分配slot。TaskManager1的SharedSlot中已经有同operator的Key->Sink[1]了,则只能选择另一个SharedSlot中分配出一个SimpleSlot用来部署Key->Sink[2]。如上图中的⑦。
最后Source、FlatMap[1]、Key->Sink[1]这些subtask都会部署到TaskManager1的唯一一个slot中,并启动对应的线程。FlatMap[2]、Key->Sink[2]这些subtask都会被部署到TaskManager2的唯一一个slot中,并启动对应的线程。从而实现了slot共享。
Flink API中包含一个资源组机制,可以避免不合理的任务槽共享。
依照以往的经验来说,默认的任务槽数量应设置为CPU core的数量。如果使用超线程技术,每个槽中甚至可以调度处理超过2个硬件线程。
总结:一个应用需要多少个slot?
- 不设置SlotSharingGroup的情况下:应用的最大并行度。
- 设置了SlotSharingGroup:所有SlotSharingGroup中最大并行度之和。如下图:source时为default,然后在map算子处设置gourp为test,那么这个需要的槽数是10+20=30。
State Backends
通过键值对索引的数据结构保存在指定的后端存储(State Backends)中。有的后端存储将数据保存在内存中的哈希表中,而有的存储会使用RocksDB来保存键值对。除了定义保存状态的数据结构之外,后端存储还实现了获取键值对的特定时间点快照的功能,该功能可以将快照保存为检查点的一部分。
保存点-Savepoints
使用Data Stream API的程序可以从指定的保存点恢复。保存点可以更新程序和Flink集群,并且不丢失任何状态的功能。也就是相对于checkpoint来比较,checkpoint是针对于内部的,savepoint是针对于外部的。SparkStreaming中,如果使用了checkpoint,流处理程序有更新的话,进行程序替换需要清理调checkpoint才能生效,而在flink中,可以在流处理程序更新后,手动添加savepoint,那么新程序能从savepoint的地方开始读取数据,能与老程序并行存在或者直接替换。
保存点可以看作是一种手动触发的检查点,该检查点可以获取程序的快照并将其写入后端存储(State Backend)中。所以说保存点的功能依赖于一般的检查点机制。程序执行时会定期在worker节点生成快照和检查点(checkpoint)。由于Flink的恢复机制只需要使用最新一个有效的检查点(checkpoint),在新的检查点(checkpoint)生成后就可以安全移除其余旧的检查点(checkpoint)了。
保存点(savepoint)和定期检查点(checkpoint)在大部分情况下都很相似,区别只在于保存点是由用户触发的,并且在新的检查点生成后不会自动过期失效。保存点可以通过命令行生成,也可以在调用REST API取消作业时产生。
关于Savepoints的详细说明,可见官网Savepoints。
总结
Flink中计算资源的相关,最核心的是 Task Slot,每个slot能运行一个或多个task。为了更高效地运行,Flink提出了Chaining,尽可能地将operators chain在一起作为一个task来处理。为了资源更充分的利用,Flink又提出了SlotSharingGroup,尽可能地让多个task共享一个slot。
至此,本篇内容完成。