本文主要关于Flink高可用保障方案的测试探讨,主要程序运行模式为Flink On Yarn,并且是“Run a single Flink job on YARN”,目前基于Flink1.7.1版本测试,现在也处于摸索总结阶段,可能有些测试不够到位导致结果不是特别正确,请大家多多指教,有问题请发我邮箱leafming@foxmail.com。
最近在学习使用Flink,并且也在研究Flink的高可用保障机制,现把最近的学习成果总结一下,精心原创整理出来,转载请注明。
(心路补充:本文写于2019-06-10,现在都2020年了,这个文章到底拖了这么久才上传。flink1.9已经不同了,目前主要看体现在task故障恢复策略,有机会再看看。)
概述
我们使用Flink多用于流处理,对于流处理程序最重要的就是保证上线运行程序的稳定性。为了保障程序的稳定运行,所以针对制造了各种异常来探究其对Flink的影响以及如何保障。(感谢领导给我这个学习的机会!!)不过因为我对Flink使用还不深入,还没有特殊的稳定保障机制,因此并没有使用成熟的混沌工具测试,直接手动kill的各种进程营造的异常环境,对于“网络延迟”等异常环境并没有测试到,以后有机会使用工具再整体测试一下。本次测试Flink Job提交模式为“Run a single Flink job on YARN”,并且flink版本基于Flink1.7.1版本。
下面先对不同异常对Flink造成的影响做一个总结,如下图所示,之后再分别描述一下。
测试及现象说明
简单整理一下,我从flink的task、tm、jm等以及yarn、kafka、hdfs等方面简单测试了一下出现问题对flink job造成的影响,不是特别全面,但是也覆盖了大部分问题,详见下述表格。表格展示不全,可左右滑动看。
flink
首先此表格为对于flink自身的简单测试,会从task、taskmanager、jobmanager方面来细分一下。
组件 | 细分测试点 | 预期结果 | 测试 | 实际结果及高可用保障 | check list |
---|---|---|---|---|---|
Flink 1. Task Failed 2.TM Crash 3.Job failed(以上2者均最终引起job failed) 4. JM Crash |
Task Failed | task failed不会导致job完全中断,支持恢复。 | a) 测试理论准备:若task failed,会直接导致相关taskmanager failed,并且flink中遇到任何Exception都会引起task failed。 b) 测试方法:制造Exception来测试task failed情况。 异常可分为2类: 1. Flink内部机制异常-Flink internal code throw exception 2. 用户代码未捕获异常-UserCode throw exception 为避免task failed,尽量做好UserCode Try catch Exception。例如用户代码出现exception,如某几条特殊日志解析出错,不应导致task failed,应做好try catch处理,将异常日志丢掉或统计,不应该影响整个job的运行。 |
实际结果:若出现exception,会瞬间导致task failed,之后会直接导致taskmanager failed,之后导致flink job failed。 高可用保障: flink中目前未发现针对与task的高可用保障机制,若出现task failed,会直接导致taskmanager failed,之后导致flink job failed,但flink支持针对于整个job的恢复机制-配置合理的job自动重启次数。 详细见下述job failed。 (目前配置的是RestartStrategies.fixedDelayRestart(10, 10000)) |
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理? 尽可能捕获异常,不触发task failed,若出现task failed,走flink job自动恢复机制。 支持的恢复机制是什么样的? 内部不支持单独针对task的高可用,支持job恢复。 多长时间恢复? 是否和Checkpoint会有关系? 会不会丢数据 ?对数据的影响? 见下述job failed 需要哪些告警支持? flink task failed统计;exception 统计。 |
Flink | TM Crash | taskmanager crash不会导致job完全中断,支持恢复。 | a) 测试理论准备:taskmanager failed,会导致flink job failed。 b) 测试方法:kill掉taskmanager进程。 |
实际结果:kill taskmanager进程之后,整个job failed,若配置了flink job自动恢复机制,则整个job重启。 高可用保障:目前未发现单独针对taskmanager的高可用保障机制,只是taskmanager crash后,影响整个job重启,启动flink的自动恢复整个job的机制。 |
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理? 若出现taskmanager crash,走flink job自动恢复机制。 支持的恢复机制是什么样的? 内部不支持单独针对taskmanager的高可用,支持job恢复。 多长时间恢复? 是否和Checkpoint会有关系? 会不会丢数据 ?对数据的影响? 见下述job failed 需要哪些告警支持? flink taskmanager告警(需要监控具体哪台taskmanger故障导致job重启) |
Flink | Job failed(以上2者均最终引起job failed) | 支持job自动恢复。 | a) 高可用保障测试理论准备:task failed和taskmanager crash,会导致flink job failed,均需要flink内部job恢复机制来保障高可用。flink中支持自动恢复整个Job,前提checkpoint启用,会从latest checkpoint恢复。 b) 测试方法:使job failed。 |
a)高可用保障:flink内部支持整个job不同的重启策略,分为固定延迟(Fixed delay)策略、失败率(Failure rate)重启策略、无重启,但是前提是checkpoint启用,如果checkpoint未启动,就会采用no restart策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用fixed-delay策略,重试Integer.MAX_VALUE次。 配置方式:(1)全局配置:通过Flink的flink-conf.yaml来指定的默认的重启策略,使用配置参数restart-strategy定义了哪种策略会被采用,如配置restart-strategy: fixed-delay。(2)代码配置:在配置文件默认全局配置之外,可以为每一个Job指定它自己的重启策略,在每个job的代码中env.setRestartStrategy(RestartStrategies…)。 策略具体配置区别: (1)固定延迟策略(默认):会尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。 (2)失败率重启策略:在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。 最终选择:使用固定延迟策略,设置较大的重启次数,并需监控剩余次数。 b) 测试操作以及现象分析: 1. 恢复时间,flink中自动恢复整个job,能做到多长时间? 恢复时间与checkpoint大小测试: (1)ck达1G以上: ck 1.71 GB 恢复时间3min (2)ck量很小:ck 470M 恢复时间200s 结果分析:目前测试恢复与ck大小关系不明显,主要是job cancel时间较长,cancel与subtask个数和处理逻辑有关,并且某些task cancel时间过长。 恢复时间与数据高峰测试,高峰情况下测试看多久能恢复: (1)业务程序1db:11:59 kill ,checkpoint大小189M,恢复耗时203s,3min,恢复后checkpoint时间变长为1m,过4个checkpoint后恢复正常。 (2)业务程序2zb:20:38 kill source中的taskmanager,checkpoint大小5G,checkpointId 14 ,总共恢复耗时202s。数据正常数量47692597,恢复后写入数量52718404。 结果分析:目前线上重新恢复job均需要3min左右恢复,恢复过程中遇到问题,可能会导致再次重新恢复,浪费job恢复次数。 2. 丢数据情况,看是否会丢数据? 预期:从checkpoint重新恢复job,不会丢数据,可能会造成数据重复。 测试结果: 测试1:只有一次失败,checkpoint id 21,大小1.7G左右,恢复3min,数据正常9981011,恢复写入数据9981123。 测试2:kill时程序运行21m12s,checkpointID是21,大小1.71GB,恢复程序24m11s job重启,恢复总计时间3min。 中途由于启动连接超时,又紧接着一次恢复,导致2个checkpoint未成功,数据正常14519332,恢复后写入数据重复,为27011928。 结果分析:从checkpoint重新恢复job,会造成数据重复。 c) 风险以及未处理问题 通过观察多次测试kill掉tm,引起job自动恢复的结果,取消job和恢复过程中可能会遇到如下报错:“Could not materialize checkpoint — for operator —” “Could not flush and close the file system output stream to hdfs://hdfs-ha/user/sscheckpoints/——in order to obtain the stream state handle” “No lease on /user/sscheckpoints/——— (inode 90568031): File does not exist. Holder DFSClient_NONMAPREDUCE_-375052832_111 does not have any open files.” “2019-04-03 15:24:35,839 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler-Implementation error: Unhandled exception.org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:No TaskExecutor registered under container_e13_1553018798704_0234_01_000015.”-待排查问题原因 此种问题会导致多次checkpoint无法成功,并且导致task异常,引起taskmanager异常,最终导致job多次重启。 解决方案:应该做好job failed监控,若出现job failed则直接告警通知,则不论是否出现此问题job是否成功恢复,均需人工查看。若人工观察发现job恢复后多次checkpoint不成功或时间过长不正常,则查看日志是否有上述报错,发现则需直接人工重启,否则job会频繁多次重启,浪费job恢复次数,造成数据重复写入过多。 d)监控点 如果出现以上问题,需要快速感知。 细粒度监控,知道是tm、task 等具体的crash;Exception监控。 job重启后,看到flink的task attempt会加1(初始值为1),需增加自动重启剩余次数监控。 每次job failed无论是否正常自动重启,均需告警。 e) 实验版本(持续关注)-更新:目前1.9版本已经有此功能,支持基于 Region 的局部重启故障恢复策略,详见官网https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/task_failure_recovery.html。 JobManagerOptions.class可见配置“jobmanager.execution.failover-strategy”处于实验阶段,官网未显示因此目前版本1.7不建议使用,可持续关注。此配置为针对task failed情况下job的恢复策略,共3个选项:full、individual、region,其中各自描述如下:full: Restarts all tasks.(重启全部task,目前默认);individual: Restarts only the failed task. Should only be used if all tasks are independent components.(仅影响failed task);region: Restarts all tasks that could be affected by the task failure.(仅影响task failed相关pipeline)。目前经测试,使用region模式kill掉一个task(只一个pipeline不是全部),会在10s内恢复job,数据不丢不多,但问题是无有效监控,log过多以及attempt数量增长超过限制,因此暂不使用,需持续关注此配置。 |
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理? flink内部支持job自动恢复机制。 支持的恢复机制是什么样的? flink job failed支持从最新的checkpoint恢复,需配置合理的job自动重启次数。 多长时间恢复? 目前线上程序恢复时间3min左右,主要是job failed之后原job cancel时间较长,cancel与subtask个数和处理逻辑有关,并且某些task cancel时间过长。 是否和Checkpoint会有关系? 有关系,恢复需从最新的checkpoint恢复数据。 会不会丢数据 ?对数据的影响? 不会丢数据,会造成数据重复。 需要哪些告警支持? job failed监控;flink job自动重启剩余次数监控。 |
Flink | JM Crash | jobmanager crash不会导致job完全中断,支持恢复。 | a) 高可用保障测试理论准备:flink支持jobmanager异常感知,会瞬间感知异常,重新启动jobmanager并重新部署整个job。 b) 测试方法:kill jm。 |
a) 高可用保障测试理论准备:flink支持jobmanager异常感知,会瞬间感知异常,重新启动jobmanager并重新部署整个job。flink on yarn时,不会运行多个JobManager(ApplicationMaster)实例,而只会运行一个,由YARN在失败时重新启动。 配置需要同时配置如下2个文件: (1)配置yarn最大重试次数:yarn-site.xml <property><name>yarn.resourcemanager.am.max-attempts</name><value>2</value></property> 当前YARN版本的默认值为2(表示允许单个JobManager失败)。 (2)flink配置文件:conf/flink-conf.yaml yarn.application-attempts: 10 设置10表示在Yarn应用程序失败之前,应用程序可以重新启动9次(9次重试+ 1次初始尝试)。 注意:此设置是Flink Job在Jobmanager恢复时,允许重启的最大次数。Flink On Yarn环境中,当Jobmanager失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job。因 此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps. b) 测试操作以及现象分析: 1.恢复时间:测试将jobmanager kill掉之后,yarn能够瞬间感知并重新启动jobmanager,之后重新部署整个job(jobmanager container ID +1),整体耗时为启动job时间。晚高峰线上测试程序kill掉jobmanager之后,yarn瞬间响应重启,启动job时间为28s。 2.会不会丢数据?启动由yarn调度,重启恢复初次启动时的状态。若skip savepoint丢数据,使用savepoint的情况下造成大量的数据重复。——需解决此问题:暂定方案,在启用ck情况下,监控jobmanager健康状态,自己开发调度进行job重启,从最新的checkpoint启动。 (1)skip checkpoint情况下,jobmanager crash之后,job重启,checkpoint从1开始记,会丢数据。 (2)有checkpoint的情况下,checkpoint ID变为启动时候的ckid,不会丢数据,会造成大量的数据重复,有些数据会双倍。 (如job启动时ckID为162,ckID为166时kill jobmanager,由yarn调度重启后ckID为162,从162的offset开始消费,后续的消费均数据重复) 3.若同时配置了taskmanger恢复以及jobmanager恢复,整体恢复次数为多少?经 测试,将taskmanager恢复次数设置为2,jobmanager恢复设置为2,kill掉taskmanager2次之后直接job失败不再重 启,不会由yarn重启jobmanager。因此,taskmanager是flink的job恢复,而jobmanager是由yarn恢复,重启次 数互相独立,互不影响。 c) 风险以及未处理问题 启用ck,并恢复jobmanager重启的job会从原始启动的checkpoint开始消费,造成大量数据重复。解决方案:不使用由yarn调度重启,自己监控jobmanager健康状态,当出现jobmanager异常,不走yarn自动调度重启的机制,由自己开发的自动调度重启job,从最近的checkpoint开始消费,避免大量数据重复。 实际结果整理:flink on yarn,当Jobmanager失败时,yarn会尝试重启JobManager(AM),jobmanager重启后,会重新启动Flink的Job。-问题:启用ck的时候造成大量的数据重复。目前集群没有配置jobmanager自动重启。暂定jobmanager重启方案:不使用由yarn调度重启,自己监控jobmanager健康状态,当出现jobmanager异常,不走yarn自动调度重启的机制,由自己开发的自动调度重启job,从最近的checkpoint开始消费,避免大量数据重复。 |
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理? jobmanger restart,job恢复 支持的恢复机制是什么样的? jobmanager重启由yarn支持 多长时间恢复? 很快,为启动一个job所用时间 是否和Checkpoint会有关系? 有关系,但从启动job时候的savepoint恢复。 会不会丢数据 ?对数据的影响? 最初job启动时skip checkpoint的程序会丢数据;最初job启动时,若使用了savepoint,会造成大量数据重复。 需要哪些告警支持? jobmanager健康监控及告警 |
HDFS
此部分为对于HDFS的测试。
组件 | 细分测试点 | 预期结果 | 测试 | 实际结果及高可用保障 | check list |
---|---|---|---|---|---|
HDFS 1.Name Node Crash 2.Data Node Crash |
Name Node Crash-stop | hdfs namenode高可用已经配置情况下能够自动切换,对flink job无影响。 | 测试方法:直接stop namenode,单次stop及频繁stop | 实际结果总结:hdfs namenode高可用已经配置,出现问题自动瞬间切换,对flink job无中断影响。 a) 高可用保障:hdfs namenode ha配置,能自动切换active和standby。 b) 测试操作详细: 1.单次切换 stop active namenode之后,瞬间切换standby namenode为active,jobmanager报错但对flink job无影响,ck时间无变化。 2. 5分钟内频繁来回切换namenode 切换3次中,影响job 2次checkpoint时间,程序时间从3s变为42s和54s,程序2从895ms变为16s,但后续ck时间马上恢复原状。 日志报错,但不影响整体job: 2019-04-10 15:20:22,041 INFO org.apache.hadoop.io.retry.RetryInvocationHandler -Exception while invoking mkdirs of class ClientNamenodeProtocolTranslatorPB over bj01-public-datacenter01.bj01/10.0.0.138:8020. Trying to fail over immediately. java.net.ConnectException: Call From bj01-public-datacenter01.bj01/10.0.0.221 to bj01-public-datacenter01.bj01:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused c) 监控点 HDFS namenode健康检查告警 |
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理?namenode主备切换,namenode restart 支持的恢复机制是什么样的? hdfs namenode高可用 多长时间恢复?namenode主备瞬间切换,对flink job无影响。 是否和Checkpoint会有关系? 无影响 会不会丢数据 ? 不会 需要哪些告警支持? HDFS namenode进程监控;切换告警。 |
HDFS | Name Node Crash-kill | 同上 | 测试方法:kill namenode进程 | a) 测试操作:直接kill namenode,看是否影响flink job b) 现象:kill namenode之后瞬间切换standby namenode为active,不影响flink job,并且过3min左右kill掉的namenode自动重启,状态为standby。 c) 监控点:HDFS namenode进程监控 |
会不会有这样的在线上场景? 会。 如果出现这样场景需要做什么处理?进程恢复 支持的恢复机制是什么样的? 自动进程恢复 多长时间恢复?无影响 是否和Checkpoint会有关系? 无影响 会不会丢数据 ?不会 |
HDFS | Data Node Crash | 对flink job无影响 | 测试方法:直接stop datanode | 实际结果:对flink job无影响。 a) 测试操作:stop 1个 datanode,连续测试2次,看是否影响flink job b) 现象:对job没有影响,不会导致中断,不会影响ck,只有部分job日志会报错。 2019-04-11 12:25:24,193 INFO org.apache.hadoop.hdfs.DFSClient-Exception in createBlockOutputStream java.net.ConnectException: Connection refused c) 监控点:HDFS datanode健康检查告警 |
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理?datanode restart 支持的恢复机制是什么样的? 无,但一个datanode问题不会影响flink job 多长时间恢复?无影响 是否和Checkpoint会有关系? 无影响 会不会丢数据 ?不会 需要哪些告警支持? HDFS datanode健康监控及告警 |
YARN
此部分为yarn相关测试。
组件 | 细分测试点 | 预期结果 | 测试 | 实际结果及高可用保障 | check list |
---|---|---|---|---|---|
Yarn 1.RM crash 2.NM crash 3.集群整体重启 |
RM Crash-stop | 对flink job无影响 | 测试操作:直接stop resourcemanager | 实际结果:yarn ResourceManager高可用已经配置情况下能够自动切换,对flink job无影响。 a) 高可用保障:yarn resourcemanager ha配置,能自动切换active和standby。 b) 测试操作:stop rm,看是否影响flink job c) 现象:不影响flink job,不会导致job重启,通过截取的如下jobmanager日志可以看出,会切换active RM,并且重新连接已经有的container,不会重新启动container。 2019-04-11 14:48:38,583 INFO org.apache.hadoop.io.retry.RetryInvocationHandler -Exception while invoking allocate of class ApplicationMasterProtocolPBClientImpl over rm2. Trying to fail over immediately. java.io.EOFException: End of File Exception between local host is: “bj01-public-datacenter01.bj01/10.0.0.174”; destination host is: “bj01-public-datacenter01.bj01”:8030; : java.io.EOFException; For more details see: http://wiki.apache.org/hadoop/EOFException Caused by: java.io.EOFException … 2019-04-11 14:48:38,783 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm1 2019-04-11 14:48:38,788 WARN org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl -ApplicationMaster is out of sync with ResourceManager, hence resyncing. 2019-04-11 14:48:53,806 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl -Replacing token for :bj01-public-datacenter01.bj01:45454 … 2019-04-11 14:48:53,806 INFO org.apache.flink.yarn.YarnResourceManager -Received new container: container_e15_1554898541690_0009_01_000001 -Remaining pending container requests: 0 2019-04-11 14:48:53,807 INFO org.apache.flink.yarn.YarnResourceManager -Returning excess container container_e15_1554898541690_0009_01_000001. |
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理?resourcemanager restart 支持的恢复机制是什么样的? 无,但一个datanode问题不会影响flink job,需做好告警 多长时间恢复?无影响 是否和Checkpoint会有关系? 无影响 会不会丢数据 ?不会 需要哪些告警支持? Yarn resourcemanager健康监控;切换告警。 |
Yarn | RM Crash-kill | 对flink job无影响 | 测试操作:kill resourcemanager | 实际结果:yarn ResourceManager高可用已经配置情况下能够自动切换,对flink job无影响。 a) 测试操作:kill resourcemanager,查看是否影响flink job. b) 现象:现象以及log同上述stop,并且过3min左右kill掉的resourcemanager自动重启,状态为standby。 c) 监控点:resourcemanager健康检查告警。 |
. |
Yarn | NM CRASH-stop | 对flink job无影响(预期与实际结果稍有偏差) | 测试操作:直接stop nodemanager | 实际结果:nodemanager down在resourcemanager未感知情况下马上恢复,不会影响flink job。 若时间较长(10min左右),resourcemanager感知,其会关闭相关YarnTaskExecutor,flink job开始失败,此后恢复靠flink内部的自动job恢复机制,从最新offset开始恢复。-配置合理的job自动重启次数。 a) 测试操作:stop nodemanager,查看是否影响flink job b) 现象:stop nodemanager,yarn没有迅速感知(即UI上active nodes数量不减1),10min左右之后才感知,当resourcemanager感知nodemanager down之后,此时flink相关job jobmanager日志显示: 2019-04-11 20:58:34,120 INFO org.apache.flink.yarn.YarnResourceManager -Closing TaskExecutor connection container_e16_1554967311294_0003_01_000014 because: Container released on a lost node 2019-04-11 20:58:34,127 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph -wideReduce-toNarrow (307/480) (baf741c32e712513ae5f1485d2efdbba) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: The assigned slot container_e16_1554967311294_0003_01_000014_9 was removed. 之后task failed,开始走flink内部的job恢复机制,整个job重新启动,和taskmanager crash一样。 c) 监控点:nodemanager健康检查告警。 |
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理?nodemanager restart 支持的恢复机制是什么样的? nodemanager出现问题,影响整个flink job,走flink job内部恢复机制 多长时间恢复?为job恢复时间 是否和Checkpoint会有关系? 有,走flink job 内部恢复机制从最新的checkpoint开始恢复job 会不会丢数据 ?不会 需要哪些告警支持? Nodemanager健康监控,需及时重启(10min)内,才不会导致job失败。【重要】 |
Yarn | NM CRASH-stop2 | 对flink job无影响 | 测试操作:stop nodemanager,并迅速启动。 | 实际结果:nodemanager down在resourcemanager未感知情况下马上恢复,不会影响flink job。 a) 测试操作:stop nodemanager,在resourcemanager没有感知的情况下快速重启,看是否会影响job。 b) 现象:等待15min,观察现象,不会影响job。 |
会不会有这样的在线上场景? 会,告警nodemanager中断之后,迅速恢复。 如果出现这样场景需要做什么处理?nodemanager restart 支持的恢复机制是什么样的? 在resourcemanager没有感知的情况下快速重启不影响flink job 多长时间恢复?无影响 是否和Checkpoint会有关系?无 会不会丢数据 ?不会 需要哪些告警支持? Nodemanager健康监控,进程消失告警以及恢复通知 |
Yarn | NM CRASH-kill | 对flink job无影响 | 测试操作:kill nodemanager | a) 测试操作:kill nodemanager,观察现象。 b) 现象:kill 掉nodemanager之后,过3min左右,进程自动恢复重启,resourcemanager没有感知到,不影响job。 c) 现象分析结论 nodemanager down之后,YarnTaskExecutorRunner不会马上消失,待resourcemanager感知nodemanager down之后(10min左右),会close相关TaskExecutor,flink job开始失败,此后恢复靠flink内部的自动job恢复机制,从最新offset开始恢复。 若nodemanager down,马上重启恢复,则resourcemanager无感知,则不会影响相关YarnTaskExecutorRunner,即不会导致flinkjob失败重启。 |
会不会有这样的在线上场景? 会,进程突然消失。 如果出现这样场景需要做什么处理?nodemanager restart 支持的恢复机制是什么样的? nodemanager进程自动重启 多长时间恢复?3min 是否和Checkpoint会有关系? 无,不影响flink job 会不会丢数据 ?不会 需要哪些告警支持? Nodemanager健康监控,进程消失告警以及恢复通知 |
Yarn | 集群整体重启 | 对flink job无影响 | 测试操作:集群整体重启 | 对flink job无影响 | . |
其他-kafka和zk
此部分为其他简单测试。这部分有些仓促,目前按有限的经验来说,不会对flink job有大影响。
组件 | 细分测试点 | 预期结果 | 测试 | 实际结果及高可用保障 | check list |
---|---|---|---|---|---|
Kafka | Broker Crash | 对flink job无影响 | 测试操作:stop kafka broker | 实际结果:对flink job 无影响 a) 测试操作:kill kafka broker,看是否影响flink job b) 现象:kill掉一个broker之后,jobmanager不报错,kafka source相关taskmanager报错,但不影响job运行: 2019-04-15 15:03:51,976 WARN org.apache.kafka.clients.NetworkClient -[Consumer clientId=consumer-13,groupId=cdn-live.10.cdn_live] Connection to node 720 could not be established. Broker may not be available. checkpoint时间无影响。 c) 现象分析结论:topic有3个备份,停掉一个borker不会影响数据,也不会影响flink job。 c) 监控点:kafka broker健康检查告警。 |
会不会有这样的在线上场景? 会。 如果出现这样场景需要做什么处理?及时做到 restart 支持的恢复机制是什么样的? 无 多长时间恢复?3min 是否和Checkpoint会有关系? 无,不影响flink job 会不会丢数据 ?不影响flink 需要哪些告警支持? kafka Broker健康监控;失败broker影响topic,剩余副本告警 |
Kafka | 磁盘故障 | 对flink job无影响 | 测试操作:kafka磁盘readonly | . | . |
Kafka | OffsetOutOfRange | . | . | . | . |
zookeeper | ZK CRASH | . | . | zookeeper目前一共3台机器,只允许1台出现问题。 测试通过stop zookeeper leader。 |
. |
Flink相关告警支持及处理(恢复)预案
Flink TM Crash 处理恢复预案
高可用保障配置
无单独针对tm高可用配置,tm crash会导致相关task failed,进而导致job failed,后续保障由flink内部job恢复机制负责。
监控配置
- Flink TaskManager crash监控
Flink TaskManager Crash, yarn上appID无变化,Jobmanager的container ID无变化,仅 crash的TaskManager的container ID变化,其他无变化。
flink 已有采集相关metric,间隔一段时间对比每个job的containerID是否变化来判断哪个taskmanager重启,监控记录重启的taskmanager所在host。
告警配置及处理(恢复)预案
- Taskmanager crash告警:taskmanager crash告警,告警哪台机器上的哪个taskmanager crash。
处理:
情况1-taskmanager crash但job正常调度起来:
因taskmanager crash目前会导致job failed,会走flink内部job恢复机制,若job正常调度起来,不用紧急人工处理,则需排查重启原因。
情况2-某台host上taskmanager频繁crash:
若yarn资源足够,可暂时将此nodemanager在yarn上排除,使下次container不再重启在此服务器上(如下图可通过ambari操作);之后观察job是否正常重启,待job恢复正常,及时排查此nodemanager问题。
Flink Job Failed 处理(恢复)预案
高可用保障配置
经测试发现,Flink job 运行过程中,可能因TM Crash导致task failed或者遇exception导致Task Failed,最终都会Job failed。
目前为保证可靠性及便于监控维护,直接配置使用flink内部的job固定延迟策略恢复机制,在streamingframework中增加配置:
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(streamingConfig.globalConfig.configs("restart-strategy.fixed-delay.attempts").toInt, streamingConfig.globalConfig.configs("restart-strategy.fixed-delay.delay").toLong))
可通过调度web针对每个job设置不同的尝试次数,restart-strategy.fixed-delay.attempts为固定最大重启次数,restart-strategy.fixed-delay.delay为连续两次重启间的固定等待时间(建议直接配置10000)。
告警配置及处理(恢复)预案
- 【预警】job重启次数剩余告警:配置job重启次数阈值为配置的“restart-strategy.fixed-delay.attempts”,当监控fullRestarts达配置的N%,则进行告警通知。
处理:若收到告警,证明job重启次数即将耗尽,需手动重启job,以重制剩余job重启次数。 - job重启告警:fullRestarts值变化,则证明job重启。
处理:
情况1-收到1次job重启告警,后续没有收到其他告警:若仅收到1次job重启告警,没有收到其他告警,则不方便情况下可不用紧急人工处理,因job由flink调度成功恢复。
情况2-连续收到job重启告警:若收到此告警,证明job存在频繁重启或未知异常导致job无法恢复的风险,需及时人工查看排查问题。
【以下3、4两点,因为讨论之后说其他告警能够覆盖,因此不配置】 - 【2job重启告警的升级】job restartingTime告警:配置job重启所用时间N,若restartingTime>N,则告警通知。
处理:若收到此告警,证明job重启时间过长,可能卡住无法正常启动且job进程不消失,需要观察job是否正常启动,排查问题,若持续卡住可暂时手动重启。 - 【2job重启告警的升级】job downtime告警:配置job处于failing/recovering situation的时间M,若downtime>M,则告警通知。
处理:若收到此告警,证明job处于failing/recovering situation的时间过长,可能卡住无法正常启动且job进程不消失,需要观察job是否正常启动,排查问题,若持续卡住可暂时手动重启。
Job彻底中断告警 /Flink JobManager Crash处理(恢复)预案
高可用保障配置
- 启动的Job不使用savePoint的情况下,flink on yarn运行的job可使用yarn保障高可用,此模式不会运行多个JobManager(ApplicationMaster)实例,而只会运行一个,由YARN在失败时重新启动。
配置需要同时配置如下2个文件:
(1)配置yarn最大重试次数:yarn-site.xml<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>2</value> </property>
当前YARN版本的默认值为2(表示允许单个JobManager失败)。
(2)flink配置文件:conf/flink-conf.yamlyarn.application-attempts: 10
设置10表示在Yarn应用程序失败之前,应用程序可以重新启动9次(9次重试+ 1次初始尝试)。
Flink Jobmanager Crash由yarn重启后, yarn上appID无变化,Jobmanager的container ID变化+1(如appattempt_1555417075180_0045_000001变为appattempt_1555417075180_0045_000002),taskmanager重新启动其container变化(如container_e19_1555417075180_0045_01_000025变为container_e19_1555417075180_0045_02_000025)。
注意:此设置是Flink Job在Jobmanager恢复时,允许重启的最大次数。Flink On Yarn环境中,当Jobmanager失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job。因 此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps。 - 启动的Job使用savePoint的情况下,若使用yarn调度,会从最近的savepoint来启动程序(目前savepoint仅在重启job的时候才会保存一次,因此不能使用由yarn调度,否则造成大量数据重复);因此我们这里不使用Yarn恢复机制,应开发调度系统支持监控到Flink Job失败,失败后重新调度使其从最新的checkpoint重启来保证Job高可用。
监控配置
- application监控存活判断:持续判断application是否存活。因目前选择方案不使用yarn调度,只要jobmanager crash就一定导致job相关application消失,因此直接监控application是否存活即可。
告警配置及处理(恢复)预案
- application监控存活告警
告警配置条目例子
组件 | 告警ID | 说明 |
---|---|---|
Flink告警 | flink_checkpoint_time | 流处理-连续3分钟checkpoint耗时过大 |
flink_bandwidth_diff | 流处理-带宽掉量自动切源 | |
flink_kafka_lag | 流处理-消费kafka的lag过高 | |
flink_etl_parse_error | 流处理-日志解析出错 | |
flink_job_fullRestarts | 流处理-Job自动重启次数过多 | |
flink_attempt_change | 流处理-flink attempt变化 | |
flink_es_bulk_errorNum | 流处理-入ES失败条数 | |
flink_es_tookInMillisPerBulk | 流处理-最近三分钟入ES平均耗时过长 | |
ES告警 | es_nodeThreadpool_bulk_queue | 连续5min[ES nodeThreadpool bulk队列满] |
rt_es_nodeThreadpool_bulk_active | 连续5min[ES nodeThreadpool bulk.active等于cpu.core] | |
es_loss_node | elasticsearch丢失节点 | |
Kafka告警 | kafka_qps | kafka QPS异常 |
kafka_loss_node | kafka丢失节点 | |
eskeeper | ||
基础监控告警 |
以上为关于flink高可用的测试及简单方案,但是时间有限,测试不充分,也会有些错误,请大家多多指正,谢谢啦!