Flink On Local
环境准备
准备一台2核4G以上配置的Linux机器节点(CentOs、macOS等)
Java SE Development Kit 8
及以上版本安装确保本机节点hostname对应的ip正确
因为进程启动时候会去拿本机的ip地址,所以要保证
/etc/hosts
文件中hostname 对应ip地址正确hostname
# 得到上面显示的结果,例如:baisui-host-1
ping baisui-host-1
# 确认ip节点是否为本机ip无误,如不正确,请在/etc/hosts将正确的hostname与ip映射关系加上确保lsof安装成功,
sudo yum install -y lsof
安装
下载
wget http://mirror.qlangtech.com/4.0.1/tis/flink-tis-1.18.1-bin.tar.gz && rm -rf flink-tis-1.18.1 && mkdir flink-tis-1.18.1 && tar xvf flink-tis-1.18.1-bin.tar.gz -C ./flink-tis-1.18.1
启动Flink Cluster
cd ./flink-tis-1.18.1
./bin/start-cluster.sh确认端口是否正常
telnet localhost 8081
总结
恭喜您已经把Flink Standalone模式的集群安装成功了
TIS中使用Flink Standalon模式是经过TIS团队适配过的,修改了Flink JobManager中ClassLoader执行逻辑,所以此处安装不能使用Flink官方发布的安装包
FAQ
1. Could not acquire the minimum required resources
在Flink中已经添加一个任务之后,再向Flink提交新的任务,Flink JM节点日志报告: Could not acquire the minimum required resources 没有足够的资源进行启动
原因:
Flink运行没有足够的slot资源分配给新任务
方法:
修改Flink配置文件:
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 1
默认值是1,需要在单个Flink节点上运行多个Flink任务,课修改成大于1的值就行(一般情况slot代表了服务节点的资源并行处理能力,一般配置于节点CPU核数相一致即可)
2. 启动时候由于服务器目录/opt/data/tis
没有权限,导致tis-flink启动失败(后台没有对应的进程在运行?)
原因:
tis系统日志默认输出到/opt/data/tis
目录,如果当前启动用户没有权限创建此目录时,启动flink-tis时会启动失败,失败日志如下:
Caused by: java.io.IOException: Unable to create directory /opt/data/tis
at org.apache.commons.io.FileUtils.forceMkdir(FileUtils.java:1391) ~[tis-flink-extends-dist-3.6.0-alpha.jar:?]
at com.qlangtech.plugins.incr.flink.TISFlinkClassLoaderFactory.makeDataDirUseable(TISFlinkClassLoaderFactory.java:132) ~[tis-flink-extends-dist-3.6.0-alpha.jar:?]
at com.qlangtech.plugins.incr.flink.TISFlinkClassLoaderFactory.buildServerLoaderFactory(TISFlinkClassLoaderFactory.java:154) ~[tis-flink-extends-dist-3.6.0-alpha.jar:?]
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.defaultClassLoaderFactory(BlobLibraryCacheManager.java:196) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.jobmaster.JobManagerSharedServices.fromConfiguration(JobManagerSharedServices.java:124) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.Dispatcher.<init>(Dispatcher.java:171) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.StandaloneDispatcher.<init>(StandaloneDispatcher.java:40) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.SessionDispatcherFactory.createDispatcher(SessionDispatcherFactory.java:44) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.SessionDispatcherFactory.createDispatcher(SessionDispatcherFactory.java:27) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherGatewayServiceFactory.create(DefaultDispatcherGatewayServiceFactory.java:62) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.createDispatcher(SessionDispatcherLeaderProcess.java:102) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$createDispatcherIfRunning$0(SessionDispatcherLeaderProcess.java:96) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.runIfState(AbstractDispatcherLeaderProcess.java:222) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.runIfStateIs(AbstractDispatcherLeaderProcess.java:212) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.createDispatcherIfRunning(SessionDispatcherLeaderProcess.java:96) ~[flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_151]
... 6 more
2022-11-14 17:23:58,659 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2022-11-14 17:23:58,661 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - Shutting down rest endpoint.
2022-11-14 17:23:58,780 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
修改方式是在一下配置文件加env.java.opts
配置项
#具体目录根据自己情况修改
env.java.opts: "-Ddata.dir=/opt/data/tis"
3. 执行增量实例Cancel操作出现超时异常
象限描述:
本Case在创建增量实例时,使用了Chunjun Sink实现了整库表的同步功能,在手动取消(Cancel)实例过程中抛出以下异常,且一旦出现超时异常,Flink会将TM中剩下的全部slot耗尽,导致不能在重新部署新的Flink Job
2023-05-13 16:39:39,532 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 180 + seconds.
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1735) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
2023-05-13 16:39:39,536 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down...
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1735) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
原因分析:
由于Chujun Sink 在执行关闭时,需要为每张表对应的StreamLoadManager
执行close
操作,而该close
执行又是比较耗时的,而且从日志执行过程来看每个表对应的close是线性排队执行的,所以当通道本身选择的表
数据量比较多的情况下,会出现超时异常也就不足为奇了。
解决办法:
在$FLINK_HOME/conf/flink-conf.yaml
配置文件中设置 配置参数 task.cancellation.timeout
,详细配置请查阅:https://study.sf.163.com/documents/read/service_support/dsc-t-d-0501