Flink高可用保障方案探究

本文主要关于Flink高可用保障方案的测试探讨,主要程序运行模式为Flink On Yarn,并且是“Run a single Flink job on YARN”,目前基于Flink1.7.1版本测试,现在也处于摸索总结阶段,可能有些测试不够到位导致结果不是特别正确,请大家多多指教,有问题请发我邮箱leafming@foxmail.com。

最近在学习使用Flink,并且也在研究Flink的高可用保障机制,现把最近的学习成果总结一下,精心原创整理出来,转载请注明。
(心路补充:本文写于2019-06-10,现在都2020年了,这个文章到底拖了这么久才上传。flink1.9已经不同了,目前主要看体现在task故障恢复策略,有机会再看看。)

概述

我们使用Flink多用于流处理,对于流处理程序最重要的就是保证上线运行程序的稳定性。为了保障程序的稳定运行,所以针对制造了各种异常来探究其对Flink的影响以及如何保障。(感谢领导给我这个学习的机会!!)不过因为我对Flink使用还不深入,还没有特殊的稳定保障机制,因此并没有使用成熟的混沌工具测试,直接手动kill的各种进程营造的异常环境,对于“网络延迟”等异常环境并没有测试到,以后有机会使用工具再整体测试一下。本次测试Flink Job提交模式为“Run a single Flink job on YARN”,并且flink版本基于Flink1.7.1版本。
下面先对不同异常对Flink造成的影响做一个总结,如下图所示,之后再分别描述一下。
2020-01-07-Flink故障情形

测试及现象说明

简单整理一下,我从flink的task、tm、jm等以及yarn、kafka、hdfs等方面简单测试了一下出现问题对flink job造成的影响,不是特别全面,但是也覆盖了大部分问题,详见下述表格。表格展示不全,可左右滑动看。

首先此表格为对于flink自身的简单测试,会从task、taskmanager、jobmanager方面来细分一下。

组件 细分测试点 预期结果 测试 实际结果及高可用保障 check list
Flink
1. Task Failed
2.TM Crash
3.Job failed(以上2者均最终引起job failed)
4. JM Crash
Task Failed task failed不会导致job完全中断,支持恢复。 a) 测试理论准备:若task failed,会直接导致相关taskmanager failed,并且flink中遇到任何Exception都会引起task failed。
b) 测试方法:制造Exception来测试task failed情况。
异常可分为2类:
1. Flink内部机制异常-Flink internal code throw exception
2. 用户代码未捕获异常-UserCode throw exception 为避免task failed,尽量做好UserCode Try catch Exception。例如用户代码出现exception,如某几条特殊日志解析出错,不应导致task failed,应做好try catch处理,将异常日志丢掉或统计,不应该影响整个job的运行。
实际结果:若出现exception,会瞬间导致task failed,之后会直接导致taskmanager failed,之后导致flink job failed。
高可用保障: flink中目前未发现针对与task的高可用保障机制,若出现task failed,会直接导致taskmanager failed,之后导致flink job failed,但flink支持针对于整个job的恢复机制-配置合理的job自动重启次数。 详细见下述job failed。 (目前配置的是RestartStrategies.fixedDelayRestart(10, 10000))
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理? 尽可能捕获异常,不触发task failed,若出现task failed,走flink job自动恢复机制。 支持的恢复机制是什么样的? 内部不支持单独针对task的高可用,支持job恢复。 多长时间恢复? 是否和Checkpoint会有关系? 会不会丢数据 ?对数据的影响? 见下述job failed 需要哪些告警支持? flink task failed统计;exception 统计。
Flink TM Crash taskmanager crash不会导致job完全中断,支持恢复。 a) 测试理论准备:taskmanager failed,会导致flink job failed。
b) 测试方法:kill掉taskmanager进程。
实际结果:kill taskmanager进程之后,整个job failed,若配置了flink job自动恢复机制,则整个job重启。
高可用保障:目前未发现单独针对taskmanager的高可用保障机制,只是taskmanager crash后,影响整个job重启,启动flink的自动恢复整个job的机制。
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理? 若出现taskmanager crash,走flink job自动恢复机制。 支持的恢复机制是什么样的? 内部不支持单独针对taskmanager的高可用,支持job恢复。 多长时间恢复? 是否和Checkpoint会有关系? 会不会丢数据 ?对数据的影响? 见下述job failed 需要哪些告警支持? flink taskmanager告警(需要监控具体哪台taskmanger故障导致job重启)
Flink Job failed(以上2者均最终引起job failed) 支持job自动恢复。 a) 高可用保障测试理论准备:task failed和taskmanager crash,会导致flink job failed,均需要flink内部job恢复机制来保障高可用。flink中支持自动恢复整个Job,前提checkpoint启用,会从latest checkpoint恢复。
b) 测试方法:使job failed。
a)高可用保障:flink内部支持整个job不同的重启策略,分为固定延迟(Fixed delay)策略、失败率(Failure rate)重启策略、无重启,但是前提是checkpoint启用,如果checkpoint未启动,就会采用no restart策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用fixed-delay策略,重试Integer.MAX_VALUE次。
配置方式:(1)全局配置:通过Flink的flink-conf.yaml来指定的默认的重启策略,使用配置参数restart-strategy定义了哪种策略会被采用,如配置restart-strategy: fixed-delay。(2)代码配置:在配置文件默认全局配置之外,可以为每一个Job指定它自己的重启策略,在每个job的代码中env.setRestartStrategy(RestartStrategies…)。
策略具体配置区别:
(1)固定延迟策略(默认):会尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。
(2)失败率重启策略:在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。
最终选择:使用固定延迟策略,设置较大的重启次数,并需监控剩余次数。
b) 测试操作以及现象分析:
1. 恢复时间,flink中自动恢复整个job,能做到多长时间?
恢复时间与checkpoint大小测试:
(1)ck达1G以上: ck 1.71 GB 恢复时间3min
(2)ck量很小:ck 470M 恢复时间200s
结果分析:目前测试恢复与ck大小关系不明显,主要是job cancel时间较长,cancel与subtask个数和处理逻辑有关,并且某些task cancel时间过长。
恢复时间与数据高峰测试,高峰情况下测试看多久能恢复:
(1)业务程序1db:11:59 kill ,checkpoint大小189M,恢复耗时203s,3min,恢复后checkpoint时间变长为1m,过4个checkpoint后恢复正常。
(2)业务程序2zb:20:38 kill source中的taskmanager,checkpoint大小5G,checkpointId 14 ,总共恢复耗时202s。数据正常数量47692597,恢复后写入数量52718404。
结果分析:目前线上重新恢复job均需要3min左右恢复,恢复过程中遇到问题,可能会导致再次重新恢复,浪费job恢复次数。
2. 丢数据情况,看是否会丢数据?
预期:从checkpoint重新恢复job,不会丢数据,可能会造成数据重复。
测试结果:
测试1:只有一次失败,checkpoint id 21,大小1.7G左右,恢复3min,数据正常9981011,恢复写入数据9981123。
测试2:kill时程序运行21m12s,checkpointID是21,大小1.71GB,恢复程序24m11s job重启,恢复总计时间3min。
中途由于启动连接超时,又紧接着一次恢复,导致2个checkpoint未成功,数据正常14519332,恢复后写入数据重复,为27011928。
结果分析:从checkpoint重新恢复job,会造成数据重复。
c) 风险以及未处理问题
通过观察多次测试kill掉tm,引起job自动恢复的结果,取消job和恢复过程中可能会遇到如下报错:“Could not materialize checkpoint — for operator —”
“Could not flush and close the file system output stream to hdfs://hdfs-ha/user/sscheckpoints/——in order to obtain the stream state handle”
“No lease on /user/sscheckpoints/——— (inode 90568031): File does not exist. Holder DFSClient_NONMAPREDUCE_-375052832_111 does not have any open files.”
“2019-04-03 15:24:35,839 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler-Implementation error: Unhandled
exception.org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:No TaskExecutor registered under container_e13_1553018798704_0234_01_000015.”-待排查问题原因
此种问题会导致多次checkpoint无法成功,并且导致task异常,引起taskmanager异常,最终导致job多次重启。
解决方案:应该做好job failed监控,若出现job failed则直接告警通知,则不论是否出现此问题job是否成功恢复,均需人工查看。若人工观察发现job恢复后多次checkpoint不成功或时间过长不正常,则查看日志是否有上述报错,发现则需直接人工重启,否则job会频繁多次重启,浪费job恢复次数,造成数据重复写入过多。
d)监控点
如果出现以上问题,需要快速感知。
细粒度监控,知道是tm、task 等具体的crash;Exception监控。
job重启后,看到flink的task attempt会加1(初始值为1),需增加自动重启剩余次数监控。
每次job failed无论是否正常自动重启,均需告警。
e) 实验版本(持续关注)-更新:目前1.9版本已经有此功能,支持基于 Region 的局部重启故障恢复策略,详见官网https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/task_failure_recovery.html。
JobManagerOptions.class可见配置“jobmanager.execution.failover-strategy”处于实验阶段,官网未显示因此目前版本1.7不建议使用,可持续关注。此配置为针对task failed情况下job的恢复策略,共3个选项:full、individual、region,其中各自描述如下:full: Restarts all tasks.(重启全部task,目前默认);individual: Restarts only the failed task. Should only be used if all tasks are independent components.(仅影响failed task);region: Restarts all tasks that could be affected by the task failure.(仅影响task failed相关pipeline)。目前经测试,使用region模式kill掉一个task(只一个pipeline不是全部),会在10s内恢复job,数据不丢不多,但问题是无有效监控,log过多以及attempt数量增长超过限制,因此暂不使用,需持续关注此配置。
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理? flink内部支持job自动恢复机制。 支持的恢复机制是什么样的? flink job failed支持从最新的checkpoint恢复,需配置合理的job自动重启次数。 多长时间恢复? 目前线上程序恢复时间3min左右,主要是job failed之后原job cancel时间较长,cancel与subtask个数和处理逻辑有关,并且某些task cancel时间过长。 是否和Checkpoint会有关系? 有关系,恢复需从最新的checkpoint恢复数据。 会不会丢数据 ?对数据的影响? 不会丢数据,会造成数据重复。 需要哪些告警支持? job failed监控;flink job自动重启剩余次数监控。
Flink JM Crash jobmanager crash不会导致job完全中断,支持恢复。 a) 高可用保障测试理论准备:flink支持jobmanager异常感知,会瞬间感知异常,重新启动jobmanager并重新部署整个job。
b) 测试方法:kill jm。
a) 高可用保障测试理论准备:flink支持jobmanager异常感知,会瞬间感知异常,重新启动jobmanager并重新部署整个job。flink on yarn时,不会运行多个JobManager(ApplicationMaster)实例,而只会运行一个,由YARN在失败时重新启动。
配置需要同时配置如下2个文件:
(1)配置yarn最大重试次数:yarn-site.xml
<property><name>yarn.resourcemanager.am.max-attempts</name><value>2</value></property>
当前YARN版本的默认值为2(表示允许单个JobManager失败)。
(2)flink配置文件:conf/flink-conf.yaml
yarn.application-attempts: 10
设置10表示在Yarn应用程序失败之前,应用程序可以重新启动9次(9次重试+ 1次初始尝试)。
注意:此设置是Flink Job在Jobmanager恢复时,允许重启的最大次数。Flink On Yarn环境中,当Jobmanager失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job。因 此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps.
b) 测试操作以及现象分析:
1.恢复时间:测试将jobmanager kill掉之后,yarn能够瞬间感知并重新启动jobmanager,之后重新部署整个job(jobmanager container ID +1),整体耗时为启动job时间。晚高峰线上测试程序kill掉jobmanager之后,yarn瞬间响应重启,启动job时间为28s。
2.会不会丢数据?启动由yarn调度,重启恢复初次启动时的状态。若skip savepoint丢数据,使用savepoint的情况下造成大量的数据重复。——需解决此问题:暂定方案,在启用ck情况下,监控jobmanager健康状态,自己开发调度进行job重启,从最新的checkpoint启动。
(1)skip checkpoint情况下,jobmanager crash之后,job重启,checkpoint从1开始记,会丢数据。
(2)有checkpoint的情况下,checkpoint ID变为启动时候的ckid,不会丢数据,会造成大量的数据重复,有些数据会双倍。
(如job启动时ckID为162,ckID为166时kill jobmanager,由yarn调度重启后ckID为162,从162的offset开始消费,后续的消费均数据重复)
3.若同时配置了taskmanger恢复以及jobmanager恢复,整体恢复次数为多少?经 测试,将taskmanager恢复次数设置为2,jobmanager恢复设置为2,kill掉taskmanager2次之后直接job失败不再重 启,不会由yarn重启jobmanager。因此,taskmanager是flink的job恢复,而jobmanager是由yarn恢复,重启次 数互相独立,互不影响。
c) 风险以及未处理问题
启用ck,并恢复jobmanager重启的job会从原始启动的checkpoint开始消费,造成大量数据重复。解决方案:不使用由yarn调度重启,自己监控jobmanager健康状态,当出现jobmanager异常,不走yarn自动调度重启的机制,由自己开发的自动调度重启job,从最近的checkpoint开始消费,避免大量数据重复。
实际结果整理:flink on yarn,当Jobmanager失败时,yarn会尝试重启JobManager(AM),jobmanager重启后,会重新启动Flink的Job。-问题:启用ck的时候造成大量的数据重复。目前集群没有配置jobmanager自动重启。暂定jobmanager重启方案:不使用由yarn调度重启,自己监控jobmanager健康状态,当出现jobmanager异常,不走yarn自动调度重启的机制,由自己开发的自动调度重启job,从最近的checkpoint开始消费,避免大量数据重复。
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理? jobmanger restart,job恢复 支持的恢复机制是什么样的? jobmanager重启由yarn支持 多长时间恢复? 很快,为启动一个job所用时间 是否和Checkpoint会有关系? 有关系,但从启动job时候的savepoint恢复。 会不会丢数据 ?对数据的影响? 最初job启动时skip checkpoint的程序会丢数据;最初job启动时,若使用了savepoint,会造成大量数据重复。 需要哪些告警支持? jobmanager健康监控及告警

HDFS

此部分为对于HDFS的测试。

组件 细分测试点 预期结果 测试 实际结果及高可用保障 check list
HDFS
1.Name Node Crash
2.Data Node Crash
Name Node Crash-stop hdfs namenode高可用已经配置情况下能够自动切换,对flink job无影响。 测试方法:直接stop namenode,单次stop及频繁stop 实际结果总结:hdfs namenode高可用已经配置,出现问题自动瞬间切换,对flink job无中断影响。
a) 高可用保障:hdfs namenode ha配置,能自动切换active和standby。
b) 测试操作详细:
1.单次切换
stop active namenode之后,瞬间切换standby namenode为active,jobmanager报错但对flink job无影响,ck时间无变化。
2. 5分钟内频繁来回切换namenode
flink高可靠保障-namenode测试
切换3次中,影响job 2次checkpoint时间,程序时间从3s变为42s和54s,程序2从895ms变为16s,但后续ck时间马上恢复原状。
日志报错,但不影响整体job:
2019-04-10 15:20:22,041 INFO org.apache.hadoop.io.retry.RetryInvocationHandler -Exception while invoking mkdirs of class ClientNamenodeProtocolTranslatorPB over bj01-public-datacenter01.bj01/10.0.0.138:8020. Trying to fail over immediately.
java.net.ConnectException: Call From bj01-public-datacenter01.bj01/10.0.0.221 to bj01-public-datacenter01.bj01:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
c) 监控点
HDFS namenode健康检查告警
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理?namenode主备切换,namenode restart 支持的恢复机制是什么样的? hdfs namenode高可用 多长时间恢复?namenode主备瞬间切换,对flink job无影响。 是否和Checkpoint会有关系? 无影响 会不会丢数据 ? 不会 需要哪些告警支持? HDFS namenode进程监控;切换告警。
HDFS Name Node Crash-kill 同上 测试方法:kill namenode进程 a) 测试操作:直接kill namenode,看是否影响flink job
b) 现象:kill namenode之后瞬间切换standby namenode为active,不影响flink job,并且过3min左右kill掉的namenode自动重启,状态为standby。
c) 监控点:HDFS namenode进程监控
会不会有这样的在线上场景? 会。 如果出现这样场景需要做什么处理?进程恢复 支持的恢复机制是什么样的? 自动进程恢复 多长时间恢复?无影响 是否和Checkpoint会有关系? 无影响 会不会丢数据 ?不会
HDFS Data Node Crash 对flink job无影响 测试方法:直接stop datanode 实际结果:对flink job无影响。
a) 测试操作:stop 1个 datanode,连续测试2次,看是否影响flink job
b) 现象:对job没有影响,不会导致中断,不会影响ck,只有部分job日志会报错。
2019-04-11 12:25:24,193 INFO org.apache.hadoop.hdfs.DFSClient-Exception in createBlockOutputStream java.net.ConnectException: Connection refused
c) 监控点:HDFS datanode健康检查告警
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理?datanode restart 支持的恢复机制是什么样的? 无,但一个datanode问题不会影响flink job 多长时间恢复?无影响 是否和Checkpoint会有关系? 无影响 会不会丢数据 ?不会 需要哪些告警支持? HDFS datanode健康监控及告警

YARN

此部分为yarn相关测试。

组件 细分测试点 预期结果 测试 实际结果及高可用保障 check list
Yarn
1.RM crash
2.NM crash
3.集群整体重启
RM Crash-stop 对flink job无影响 测试操作:直接stop resourcemanager 实际结果:yarn ResourceManager高可用已经配置情况下能够自动切换,对flink job无影响。
a) 高可用保障:yarn resourcemanager ha配置,能自动切换active和standby。
b) 测试操作:stop rm,看是否影响flink job
c) 现象:不影响flink job,不会导致job重启,通过截取的如下jobmanager日志可以看出,会切换active RM,并且重新连接已经有的container,不会重新启动container。
2019-04-11 14:48:38,583 INFO org.apache.hadoop.io.retry.RetryInvocationHandler -Exception while invoking allocate of class ApplicationMasterProtocolPBClientImpl over rm2. Trying to fail over immediately.
java.io.EOFException: End of File Exception between local host is: “bj01-public-datacenter01.bj01/10.0.0.174”; destination host is: “bj01-public-datacenter01.bj01”:8030; : java.io.EOFException; For more details see: http://wiki.apache.org/hadoop/EOFException
Caused by: java.io.EOFException

2019-04-11 14:48:38,783 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm1
2019-04-11 14:48:38,788 WARN org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl -ApplicationMaster is out of sync with ResourceManager, hence resyncing.
2019-04-11 14:48:53,806 INFO org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl -Replacing token for :bj01-public-datacenter01.bj01:45454

2019-04-11 14:48:53,806 INFO org.apache.flink.yarn.YarnResourceManager -Received new container: container_e15_1554898541690_0009_01_000001 -Remaining pending container requests: 0
2019-04-11 14:48:53,807 INFO org.apache.flink.yarn.YarnResourceManager -Returning excess container container_e15_1554898541690_0009_01_000001.
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理?resourcemanager restart 支持的恢复机制是什么样的? 无,但一个datanode问题不会影响flink job,需做好告警 多长时间恢复?无影响 是否和Checkpoint会有关系? 无影响 会不会丢数据 ?不会 需要哪些告警支持? Yarn resourcemanager健康监控;切换告警。
Yarn RM Crash-kill 对flink job无影响 测试操作:kill resourcemanager 实际结果:yarn ResourceManager高可用已经配置情况下能够自动切换,对flink job无影响。
a) 测试操作:kill resourcemanager,查看是否影响flink job.
b) 现象:现象以及log同上述stop,并且过3min左右kill掉的resourcemanager自动重启,状态为standby。
c) 监控点:resourcemanager健康检查告警。
.
Yarn NM CRASH-stop 对flink job无影响(预期与实际结果稍有偏差) 测试操作:直接stop nodemanager 实际结果:nodemanager down在resourcemanager未感知情况下马上恢复,不会影响flink job。
若时间较长(10min左右),resourcemanager感知,其会关闭相关YarnTaskExecutor,flink job开始失败,此后恢复靠flink内部的自动job恢复机制,从最新offset开始恢复。-配置合理的job自动重启次数。
a) 测试操作:stop nodemanager,查看是否影响flink job
b) 现象:stop nodemanager,yarn没有迅速感知(即UI上active nodes数量不减1),10min左右之后才感知,当resourcemanager感知nodemanager down之后,此时flink相关job jobmanager日志显示:
2019-04-11 20:58:34,120 INFO org.apache.flink.yarn.YarnResourceManager -Closing TaskExecutor connection container_e16_1554967311294_0003_01_000014 because: Container released on a lost node
2019-04-11 20:58:34,127 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph -wideReduce-toNarrow (307/480) (baf741c32e712513ae5f1485d2efdbba) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The assigned slot container_e16_1554967311294_0003_01_000014_9 was removed.
之后task failed,开始走flink内部的job恢复机制,整个job重新启动,和taskmanager crash一样。
c) 监控点:nodemanager健康检查告警。
会不会有这样的在线上场景? 会有。 如果出现这样场景需要做什么处理?nodemanager restart 支持的恢复机制是什么样的? nodemanager出现问题,影响整个flink job,走flink job内部恢复机制 多长时间恢复?为job恢复时间 是否和Checkpoint会有关系? 有,走flink job 内部恢复机制从最新的checkpoint开始恢复job 会不会丢数据 ?不会 需要哪些告警支持? Nodemanager健康监控,需及时重启(10min)内,才不会导致job失败。【重要】
Yarn NM CRASH-stop2 对flink job无影响 测试操作:stop nodemanager,并迅速启动 实际结果:nodemanager down在resourcemanager未感知情况下马上恢复,不会影响flink job。
a) 测试操作:stop nodemanager,在resourcemanager没有感知的情况下快速重启,看是否会影响job。flink高可靠保障-stopnodemanager测试
flink高可靠保障-stopnodemanager测试2
b) 现象:等待15min,观察现象,不会影响job。
会不会有这样的在线上场景? 会,告警nodemanager中断之后,迅速恢复。 如果出现这样场景需要做什么处理?nodemanager restart 支持的恢复机制是什么样的? 在resourcemanager没有感知的情况下快速重启不影响flink job 多长时间恢复?无影响 是否和Checkpoint会有关系?会不会丢数据 ?不会 需要哪些告警支持? Nodemanager健康监控,进程消失告警以及恢复通知
Yarn NM CRASH-kill 对flink job无影响 测试操作:kill nodemanager a) 测试操作:kill nodemanager,观察现象。
b) 现象:kill 掉nodemanager之后,过3min左右,进程自动恢复重启,resourcemanager没有感知到,不影响job。
flink高可靠保障-killnodemanager测试
c) 现象分析结论
nodemanager down之后,YarnTaskExecutorRunner不会马上消失,待resourcemanager感知nodemanager down之后(10min左右),会close相关TaskExecutor,flink job开始失败,此后恢复靠flink内部的自动job恢复机制,从最新offset开始恢复。
若nodemanager down,马上重启恢复,则resourcemanager无感知,则不会影响相关YarnTaskExecutorRunner,即不会导致flinkjob失败重启。
会不会有这样的在线上场景? 会,进程突然消失。 如果出现这样场景需要做什么处理?nodemanager restart 支持的恢复机制是什么样的? nodemanager进程自动重启 多长时间恢复?3min 是否和Checkpoint会有关系? 无,不影响flink job 会不会丢数据 ?不会 需要哪些告警支持? Nodemanager健康监控,进程消失告警以及恢复通知
Yarn 集群整体重启 对flink job无影响 测试操作:集群整体重启 对flink job无影响 .

其他-kafka和zk

此部分为其他简单测试。这部分有些仓促,目前按有限的经验来说,不会对flink job有大影响。

组件 细分测试点 预期结果 测试 实际结果及高可用保障 check list
Kafka Broker Crash 对flink job无影响 测试操作:stop kafka broker 实际结果:对flink job 无影响
a) 测试操作:kill kafka broker,看是否影响flink job
b) 现象:kill掉一个broker之后,jobmanager不报错,kafka source相关taskmanager报错,但不影响job运行:
2019-04-15 15:03:51,976 WARN org.apache.kafka.clients.NetworkClient -[Consumer clientId=consumer-13,groupId=cdn-live.10.cdn_live] Connection to node 720 could not be established. Broker may not be available.
checkpoint时间无影响。
c) 现象分析结论:topic有3个备份,停掉一个borker不会影响数据,也不会影响flink job。
c) 监控点:kafka broker健康检查告警。
会不会有这样的在线上场景? 会。 如果出现这样场景需要做什么处理?及时做到 restart 支持的恢复机制是什么样的?多长时间恢复?3min 是否和Checkpoint会有关系? 无,不影响flink job 会不会丢数据 ?不影响flink 需要哪些告警支持? kafka Broker健康监控;失败broker影响topic,剩余副本告警
Kafka 磁盘故障 对flink job无影响 测试操作:kafka磁盘readonly . .
Kafka OffsetOutOfRange . . . .
zookeeper ZK CRASH . . zookeeper目前一共3台机器,只允许1台出现问题。
测试通过stop zookeeper leader。
.

Flink相关告警支持及处理(恢复)预案

高可用保障配置

无单独针对tm高可用配置,tm crash会导致相关task failed,进而导致job failed,后续保障由flink内部job恢复机制负责。

监控配置

  1. Flink TaskManager crash监控
    Flink TaskManager Crash, yarn上appID无变化,Jobmanager的container ID无变化,仅 crash的TaskManager的container ID变化,其他无变化。
    flink 已有采集相关metric,间隔一段时间对比每个job的containerID是否变化来判断哪个taskmanager重启,监控记录重启的taskmanager所在host。
    2020-01-08-恢复预案-tm

    告警配置及处理(恢复)预案

  2. Taskmanager crash告警:taskmanager crash告警,告警哪台机器上的哪个taskmanager crash。
    处理:
    情况1-taskmanager crash但job正常调度起来:
    因taskmanager crash目前会导致job failed,会走flink内部job恢复机制,若job正常调度起来,不用紧急人工处理,则需排查重启原因。
    情况2-某台host上taskmanager频繁crash:
    若yarn资源足够,可暂时将此nodemanager在yarn上排除,使下次container不再重启在此服务器上(如下图可通过ambari操作);之后观察job是否正常重启,待job恢复正常,及时排查此nodemanager问题。
    2020-01-08-恢复预案-tm2

高可用保障配置

经测试发现,Flink job 运行过程中,可能因TM Crash导致task failed或者遇exception导致Task Failed,最终都会Job failed。
目前为保证可靠性及便于监控维护,直接配置使用flink内部的job固定延迟策略恢复机制,在streamingframework中增加配置:

env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(streamingConfig.globalConfig.configs("restart-strategy.fixed-delay.attempts").toInt, streamingConfig.globalConfig.configs("restart-strategy.fixed-delay.delay").toLong))

可通过调度web针对每个job设置不同的尝试次数,restart-strategy.fixed-delay.attempts为固定最大重启次数,restart-strategy.fixed-delay.delay为连续两次重启间的固定等待时间(建议直接配置10000)。

告警配置及处理(恢复)预案

  1. 【预警】job重启次数剩余告警:配置job重启次数阈值为配置的“restart-strategy.fixed-delay.attempts”,当监控fullRestarts达配置的N%,则进行告警通知。
    处理:若收到告警,证明job重启次数即将耗尽,需手动重启job,以重制剩余job重启次数。
  2. job重启告警:fullRestarts值变化,则证明job重启。
    处理:
    情况1-收到1次job重启告警,后续没有收到其他告警:若仅收到1次job重启告警,没有收到其他告警,则不方便情况下可不用紧急人工处理,因job由flink调度成功恢复。
    情况2-连续收到job重启告警:若收到此告警,证明job存在频繁重启或未知异常导致job无法恢复的风险,需及时人工查看排查问题。
    【以下3、4两点,因为讨论之后说其他告警能够覆盖,因此不配置】
  3. 【2job重启告警的升级】job restartingTime告警:配置job重启所用时间N,若restartingTime>N,则告警通知。
    处理:若收到此告警,证明job重启时间过长,可能卡住无法正常启动且job进程不消失,需要观察job是否正常启动,排查问题,若持续卡住可暂时手动重启。
  4. 【2job重启告警的升级】job downtime告警:配置job处于failing/recovering situation的时间M,若downtime>M,则告警通知。
    处理:若收到此告警,证明job处于failing/recovering situation的时间过长,可能卡住无法正常启动且job进程不消失,需要观察job是否正常启动,排查问题,若持续卡住可暂时手动重启。

高可用保障配置

  1. 启动的Job不使用savePoint的情况下,flink on yarn运行的job可使用yarn保障高可用,此模式不会运行多个JobManager(ApplicationMaster)实例,而只会运行一个,由YARN在失败时重新启动。
    配置需要同时配置如下2个文件:
    (1)配置yarn最大重试次数:yarn-site.xml
    <property>
    <name>yarn.resourcemanager.am.max-attempts</name>
    <value>2</value>
    </property>
    

    当前YARN版本的默认值为2(表示允许单个JobManager失败)。
    (2)flink配置文件:conf/flink-conf.yaml

    yarn.application-attempts: 10
    

    设置10表示在Yarn应用程序失败之前,应用程序可以重新启动9次(9次重试+ 1次初始尝试)。
    Flink Jobmanager Crash由yarn重启后, yarn上appID无变化,Jobmanager的container ID变化+1(如appattempt_1555417075180_0045_000001变为appattempt_1555417075180_0045_000002),taskmanager重新启动其container变化(如container_e19_1555417075180_0045_01_000025变为container_e19_1555417075180_0045_02_000025)。
    注意:此设置是Flink Job在Jobmanager恢复时,允许重启的最大次数。Flink On Yarn环境中,当Jobmanager失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job。因 此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps。

  2. 启动的Job使用savePoint的情况下,若使用yarn调度,会从最近的savepoint来启动程序(目前savepoint仅在重启job的时候才会保存一次,因此不能使用由yarn调度,否则造成大量数据重复);因此我们这里不使用Yarn恢复机制,应开发调度系统支持监控到Flink Job失败,失败后重新调度使其从最新的checkpoint重启来保证Job高可用。

监控配置

  1. application监控存活判断:持续判断application是否存活。因目前选择方案不使用yarn调度,只要jobmanager crash就一定导致job相关application消失,因此直接监控application是否存活即可。

告警配置及处理(恢复)预案

  1. application监控存活告警

告警配置条目例子

组件 告警ID 说明
Flink告警 flink_checkpoint_time 流处理-连续3分钟checkpoint耗时过大
  flink_bandwidth_diff 流处理-带宽掉量自动切源
  flink_kafka_lag 流处理-消费kafka的lag过高
  flink_etl_parse_error 流处理-日志解析出错
  flink_job_fullRestarts 流处理-Job自动重启次数过多
  flink_attempt_change 流处理-flink attempt变化
  flink_es_bulk_errorNum 流处理-入ES失败条数
  flink_es_tookInMillisPerBulk 流处理-最近三分钟入ES平均耗时过长
ES告警 es_nodeThreadpool_bulk_queue 连续5min[ES nodeThreadpool bulk队列满]
  rt_es_nodeThreadpool_bulk_active 连续5min[ES nodeThreadpool bulk.active等于cpu.core]
  es_loss_node elasticsearch丢失节点
Kafka告警 kafka_qps kafka QPS异常
  kafka_loss_node kafka丢失节点
eskeeper    
基础监控告警    

以上为关于flink高可用的测试及简单方案,但是时间有限,测试不充分,也会有些错误,请大家多多指正,谢谢啦!

如有问题,请发送邮件至leafming@foxmail.com联系我,谢谢~
0%