Flink分布式运行时知识梳理脑图

最近在重新整理学习flink官网内容,并在学习过程中将内容总结成脑图,是叶子自己最近一点一点整理绘制哒,转载请贴此文链接谢谢啦~图片以 svg形式插入文章,可拖动显示完整,后续如有更改会持续更新。

此脑图绘制依据官网链接:Distributed Runtime Environment

2、flink分布式运行时任务和算子链(Tasks和Operator Chains)tasks如示例图,包括5个subtasks,也就是说有5个并行线程每个任务(task)在一个线程中执行task形成在实际的分布式计算环境中,Flink会将多个运算子任务( operator subtasks)链接一起(chains)形成分布式计算任务(tasks)task和subtask关系理解:task可能是一个subtask,也可能是多个operator subtasks chains在一起形成的operator chains优点降低了线程间的切换减少了数据在与缓冲区的开销减少消息的序列化/反序列化框中的虚线是operator chain内部的数据流,这个流内的数据不会经过序列化/反序列化、网络传输,而是直接将消息对象传递给下游的 ChainOperator 处理。上面的OperaotrChain就可以看做是一个入度为1,出度为2的operator在降低延时的同时减少了系统的总体吞吐量操作默认全局开启全局关闭:StreamExecutionEnvironment.disableOperatorChaining()单独创建和结束在底层,这两个方法都是通过调整operator的 chain 策略(HEAD、NEVER)来实现的创建:someStream.filter(...).map(...).startNewChain().map(...)startNewChain()来指示从该operator开始一个新的chain(与前面截断,不会被chain到前面)结束chain:someStream.map(...).disableChaining()disableChaining()来指示指示该operator不参与chaining(不会与前后的operator chain一起)形成operator chains的条件operator chains过程例子Source并行度为1,FlatMap、KeyAggregation、Sink并行度均为2,最终以5个并行的线程来执行的优化过程。上图中将KeyAggregation和Sink两个operator进行了合并,因为这两个合并后并不会改变整体的拓扑结构并不是任意两个operator就能chain一起的,需要一定的条件 没有禁用Chain-默认开启上下游算子并行度一致下游算子的入度为1(也就是说下游节点没有来自其他节点的输入)上下游算子都在同一个slot group中上下游算子之间没有shuffle(两个算子间数据分区方式是forward)下游的chain策略为ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)上游的chain策略为ALWAYS或HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)源码及原理思路:Flink内部是通过OperatorChain这个类来将多个operator链在一起形成一个新的operator;OpeeratorChain是黑盒,对外可见的只有HeadOperator,以及与外部连通的实线输出,这些输出对应了JobGraph中的JobEdge,在底层通过RecordWriterOutput来实现Job Managers, Task Managers, ClientsJob Managers作业管理器(也称为master)独立的JVM 进程功能用于协调程序的分布式执行。它的主要功能是调度job,协调任务(tasks),协调checkpoint,故障恢复等从Client处接收到Job和JAR包等资源后,会生成优化后的执行计划,并以Task的单元调度到各个TaskManager去执行数量每个Flink环境中至少一个JobManager;高可用会包含多个JobManagers,其中一个是leader,其他standbyTask Managers任务管理器(也称为worker)独立的JVM 进程功能用于执行dataflow的tasks(更准确地说,是subtasks),并对数据流进行缓冲、交换在启动的时候就设置好了槽位数(Slot),每个slot能启动一个Task,Task为线程。从JobManager处接收需要部署的Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。启动之后,TaskManagers会连接到JobManagers来宣布自己可用报告自身的状态,便于JobManagers来分配工作数量每个Flink环境中至少包含一个TaskManagers集群启动启动方式standalone cluster容器中resource frameworks:YARN或Mesos等资源框架来管理client(不是运行时的一部分)独立的JVM 进程Client其实并不是运行时及程序执行时的一个组成部分,而是被用来准备和发送的数据流(dataflow)给JobManager的提交Job的客户端,可以是运行在任何机器上(与JobManager环境连通即可)在发送完dataflow之后(提交Job后),可以选择断开与JobManager的连接,或继续保持连接以接收程序运行的进度报告关系架构图 Flink运行时角色间的通信使用akka(Client,JobManager,TaskManager之间通信),数据的传输使用netty过程总结当Flink集群启动后,首先会启动一个(或HA)JobManger和一个或多个的TaskManager1由Client提交任务给JobManager2JobManager再调度任务到各个TaskManager去执行3TaskManager将心跳和统计信息汇报给JobManagerTaskManager 之间以流的形式进行数据的传输Flink的任务调度是多线程模型,并且不同Job/Task混合在一个TaskManager进程中。虽然这种方式可以有效提高CPU利用率,资源隔离如何实现???Storm的进程模型,一个JVM中只跑该Job的Tasks实际应用中更为合理?????解决:task solt、groupTask Slots and ResourcesTaskManager和Slot的关系-资源隔离,控制task数量引入原因每个worker都是一个独立的JVM进程,运行一个或多个subtask在其不同的线程中;task slots为了控制worker接收任务(tasks)的数量taskmanager中最高并发task数TaskManager最多能同时并发执行的任务,不能超过slot的数量资源隔离每个task slots代表TaskManager中一个特定的资源池子集,槽把TaskManager的资源进行平分将资源池solt化可以让subtask获取指定容量的内存资源,而避免同其他job中的subtask竞争注意:仅内存!这里没有对CPU进行隔离;目前task solt仅仅用于隔离tasks的内存数量设置每个worker(TaskManager)中至少包含一个task slot调整数量只有一个槽每个task group都运行在一个独立的JVM中多个槽TaskManager 有多个槽就意味着会有更多的子任务subtask共享同一个JVM优势在同一个JVM中的任务会共享 TCP连接(通过多路复用(multiplexing)的方式)和共享心跳信息,可以减少数据的网络传输也会共享数据集和数据结构,一定程度上可以降低每个task的开销wordcount分布示例经验上讲Slot的数量与CPU-core的数量一致为好。但考虑到超线程,可以让slotNumber=2*cpuCoreslot共享机制solt共享的概念默认情况下,Flink允许subtasks共享slot,即Flink会允许同一个作业(job)中来自不同的task的多个subtasks共享一个槽,即前提是他们来自同一个job,哪怕不同task也可以。这种情况下,可以使得同一个slot运行一个完整的job的流水线(pipleline)注意前提:同一个job使用solt共享的优势Flink集群需要与job程序中使用的最高并行度(highest parallelism )一样多的solt,因此只需计算Job中最高并行度(parallelism),只要这个满足,其他的job也都能满足,因此不需要去计算一个程序中一共会起多少个task如果有比较空闲的slot可以将更多的任务分配给它,可以提高资源利用率。非密集型(non-intensive)如source/map()子任务密集型(intensive)如window子任务若没有任务槽共享,有时候负载不高的Source/Map等subtask将会占据许多资源,而负载较高的窗口subtask则会缺乏资源如果有任务槽共享机制,通过提高程序的基础并发量,就可以让密集型子任务(heavy subtasks)公平的完全分散到任务管理器(TaskManager)中,从而可以显著提高槽的资源利用率充分利用资源例子:上述wordcount开启solt共享并且提高并行度后操作:将WordCount的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组)优势体现首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到密集型操作keyAggregation/sink被平均地分配到各个TaskManager实现共享slot以及分配策略SlotSharingGroup概念默认slot sharing group是"default"Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。Flink将把具有相同slot sharing group的 算子操作(operations)放入同一个插槽,同时保持其他插槽中没用此slot sharing group的算子操作手动强制指定slot sharing group:someStream.filter(...).slotSharingGroup("name"),就强制指定了filter的slot共享组为name。怎么确定一个算子的SlotSharingGroup默认情况下,所有的operator(算子)都属于默认的共享组default,也就是说默认情况下所有的operator都是可以共享一个slot的根据input的group和自身是否设置group共同确定当所有input operators具有相同的slot共享组时,该operator会继承这个共享组判断是否写代码强制指定:**.slotSharingGroup(“group1”)好处:适当的设置可以减少每个slot运行的线程数,从而整体上减少机器的负载CoLocationGroup用来强制将subtasks放到同一个slot中。CoLocationGroup主要用于迭代流中,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上一个应用需要多少个slot?不设置SlotSharingGroup的情况下:应用的最大并行度设置了SlotSharingGroup:所有SlotSharingGroup中最大并行度之和如下图:source时为default,然后在map算子处设置gourp为test,那么这个需要的槽数是10+20=30solt和并行度的关系-任务调度-pipelineFlink通过任务槽(Task Slot)定义执行资源,每个TaskManager都有一或多个任务槽,每个任务槽都可以运行一个并行任务流(one pipeline of parallel tasks),一个流(pipeline)包括多个连续的任务solt共享可调整Flink通常会并行的执行连续的任务对于Streaming程序来说,任何情况都如此执行对于batch 程序,多数情况也如此执行Scheduling示例在具有2个TaskManager,每个TaskManager都有3个Task Slot的集群上运行由一个data source、一个MapFunction和一个ReduceFunction组成的程序data source和MapFunction的并发度都为4,而ReduceFunction的并发度为3一个数据流由Source-Map-Reduce的顺序组成,一个MapFunction的第n个并行实例与一个ReduceFunction的第n个并行实例的连续任务可以组成一个pipelineState Backends概念当检查点(checkpoint)机制启动时,状态将在检查点中持久化来应对数据丢失以及恢复。而状态在内部是如何表示的、状态是如何持久化到检查点中以及持久化到哪里都取决于选定的State BackendFlink提供了不同的state backend,用于指定state的存储方式和位置存储键/值索引的确切数据结构取决于所选的State Backend有的后端存储将数据保存在内存中的哈希表中,而有的存储会使用RocksDB来保存键值对。除了定义保存状态的数据结构之外,后端存储还实现了获取键值对的特定时间点快照的功能,该功能可以将快照保存为检查点的一部分种类(后续有时间详细扩展描述)MemoryStateBackend(默认)-较小描述内部状态backend ,用于维护Java堆上的状态 Key/value 状态和window算子包含存储值和计时器等的的哈希表在检查点时,此state backend将对state进行SNAPSHOT,并将其作为检查点确认消息的一部分发送到JobManager,JobManager也将其存储在其堆上默认情况下,MemoryStateBackend配置为支持异步快照。 异步快照可避免可能导致流应用程序背压的潜在阻塞管道。(调试时可以关闭new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);)局限性默认情况下,每个状态的大小限制为5 MB。可以在MemoryStateBackend的构造函数中增加此值。无论配置的最大状态大小如何,状态都不能大于akka帧大小聚合状态必须适合JobManager内存应用场景本地开发和调试,因为它的状态有限适合很少状态的作业,如仅包含一次记录功能(Map,FlatMap或Filter)的作业或使用KafkaconsumerFsStateBackend-大描述基于文件系统将正在运行的数据保存在TaskManager的内存中在检查点时,它将状态SNAPSHOT写入配置的文件系统和目录中。最小元数据( Minimal metadata)存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中(Zookeeper))。FsStateBackend 默认使用异步SNAPSHOT,以避免在编写状态检查点时阻塞处理管道。(禁用的话可以使用new FsStateBackend(path, false);)使用场景Jobs with large state, long windows, large key/value states.所有高可用性设置。All high-availability setups.RocksDBStateBackend-超大描述基于文件系统将RocksDB中正在运行的数据保存在TaskManager数据目录中。在检查点时,整个RocksDB数据库将被checkpointed到配置的文件系统和目录中。最小元数据存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中(Zookeeper))。RocksDBStateBackend始终执行异步SNAPSHOT。局限性由于RocksDB的JNI桥接API基于byte [],因此每个Keys和每个值的最大支持大小为2 ^ 31个字节。重要提示:在RocksDB中使用合并 算子操作的状态(例如ListState)可以静默地累积> 2 ^ 31字节的值大小,然后在下次检索时失败。这是目前RocksDB JNI的一个限制。使用场景Jobs with very large state, long windows, large key/value states.所有高可用性设置。All high-availability setups.特点使用RocksDB时,状态大小仅受可用磁盘空间量的限制,这使RocksDBStateBackend成为管理超大状态的绝佳选择。 使用RocksDB时的权衡是所有状态访问和检索都需要序列化(或反序列化)才能跨越JNI边界。即所有读/写都必须通过去/序列化来检索/存储状态对象, 与上面提到的 on-heap(内存) backends相比,这可能会影响应用程序的吞吐量。( All reads/writes from/to this backend have to go through de-/serialization to retrieve/store the state objects)RocksDBStateBackend是目前唯一提供增量检查点的backend配置默认全局配置默认情况下,配置文件flink-conf.yaml确定所有Flink作业的状态后台state.backend、state.checkpoints.dir对每个job单独配置针对此job的StreamExecutionEnvironment来配置val env = StreamExecutionEnvironment.getExecutionEnvironment()env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))Savepoints保存点概念使用Data Stream API的程序可以从指定的保存点恢复。保存点可以更新程序和Flink集群,并且不丢失任何状态的功能savepoint是手动触发的checkpoint,它会获取程序的SNAPSHOT并将其写入state backend。他们依靠常规的checkpoint机制与checkpoint的比较相对于checkpoint来比较,checkpoint是针对于内部的,savepoint是针对于外部的checkpoint自动,savepoint手动失效问题程序执行时会定期在worker节点生成快照和checkpoint。由于Flink的恢复机制只需要使用最新一个有效的checkpoint,在新的checkpoint生成后就可以安全移除其余旧的checkpoint了保存点是由用户触发的,并且在新的检查点生成后不会自动过期失效举例,与spark比较SparkStreaming中,如果使用了checkpoint,流处理程序有更新的话,进行程序替换需要清理调checkpoint才能生效,而在flink中,可以在流处理程序更新后,手动添加savepoint,那么新程序能从savepoint的地方开始读取数据,能与老程序并行存在或者直接替换

至此,本篇内容完成。

如有问题,请发送邮件至leafming@foxmail.com联系我,谢谢~
0%