Flink环境搭建
# 集群规划
主机 | Zookeeper | jobmanager | taskmanager |
---|---|---|---|
hadoop100 | 是 | ||
hadoop101 | 是 | 是 | 是 |
hadoop102 | 是 | 是 | |
hadoop103 | 是 | 是 |
# 一. standalone cluster HA
# 1. 安装
- 下载安装包
flink-1.9.2-bin-scala_2.12.tgz
- 上传解压
tar -zxvf flink-1.9.2-bin-scala_2.12.tgz -C /soft/module/
重命名mv flink-1.9.2/ flink
- 配置环境变量
sudo vim /etc/profile
# flink环境变量
export FLINK_HOME=/soft/module/flink
export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile
# 2.修改配置文件
- 修改zoo.cfg文件
vim conf/zoo.cfg
和Zookeeper配置相同
# The port at which the clients will connect
clientPort=2181
#######################cluster##########################
server.1=hadoop101:2888:3888
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
- 修改flink-conf.yaml文件(需要注意的是 配置:后要有空格)
vim conf/flink-conf.yaml
# jobmanager服务主机名,这条配置只在单机模式下有用 用于taskmanager寻找jobmanager 及后续通信
# 集群模式下这条会被忽略,集群模式下flink会寻找conf/master中的主机配置
jobmanager.rpc.address: hadoop100
#jobmanager rpc 端口号
jobmanager.rpc.port: 6123
#jobmanager内存
jobmanager.heap.size: 1024m
#jobmanager内存
taskmanager.heap.size: 1024m
# taskmanager服务中插槽个数,其实就是子任务task的个数
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
# 程序默认并行度
parallelism.default: 1
# 开启集群模式
high-availability: zookeeper
# 开启hdfs存储目录
high-availability.storageDir: hdfs:///flink/ha/
# 指定Zookeeper集群地址
high-availability.zookeeper.quorum: hadoop101:2181,hadoop102:2181,hadoop103:2181
# <class-name-of-factory>.
# 启用检查点,存储于文件系统
state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
# 检查点的文件存储位置,这里我们指定的事hdfs集群地址
# 这里的检查点是指程序运行过程中的检查点,以便快速恢复程序运行或恢复到某个正确运行的时刻
state.checkpoints.dir: hdfs://mycluster:9000/flink-checkpoints
# Default target directory for savepoints, optional.
# 对应外部应用程序的检查点文件存储位置 记录数据的处理进度
# 例如flink处理到30% 重启了 启动之后在30%的位置继续处理
state.savepoints.dir: hdfs://mycluster:9000/flink-checkpoints
# full模式是一个任务失败了,结束所有任务,然后重启所有任务 简单粗暴但是龙一直造成内存溢出
# region模式是查出具体出错的任务,然后单独重启这一任务
jobmanager.execution.failover-strategy: region
- 修改masters和slaves文件
vim conf/masters
hadoop100:8081
hadoop101:8081
vim conf/slaves
hadoop101
hadoop102
hadoop103
- 最后,当前的flink版本是一个纯净的版本,如果需要依赖其他系统(例如咱们当前flink的相关数据都存储到了hdfs),则需要添加相应的jar包
官网有提供:https://flink.apache.org/downloads.html#additional-components
mv /soft/software/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar /soft/module/flink/lib/
- 分发文件并修改环境变量
# 3. 启动服务
启动之前我们要确保我们的zookeeper集群已启动,并且hdfs集群是启动状态
- 直接使用
start-cluster.sh
脚本启动 - jps查看进程
- 访问flink的web UI hadoop100:8081
当前flink虽然使用了hdfs系统,但是其运行模式还是Stand alone,就是独立集群 独立集群在执行任务时所有的资源分配管理都是flink自己安排的 我们尝试使用独立集群运行一个测试计算 Flink可以读取系统文件,也可以读取hdfs文件 我们以本地系统文件为例 我们使用现有的WordCount.jar去统计README.txt文件的内容并输出 这个输出结果被输出到taskmanager机器,是随机的 所以我们最好还是使用hdfs系统
- 测试
flink run ./examples/batch/WordCount.jar --input hdfs://mycluster/datas/wordcounttest.txt --output hdfs://mycluster/datas/wordcountresult.txt
# 二. flink on yarn(Yarn cluster HA )
在上面的基础上 参考链接: https://www.jianshu.com/p/8f1e650ebcad
# 0. yarn高可用?
# 1. 修改配置文件
- 配置yarn-site.xml
- 修改环境变量
vim /etc/profile
加上(用于flink寻找yarn的配置信息 ):export HADOOP_CONF_DIR=/soft/module/hadoop-2.9.2/etc/hadoop/
source /etc/profile
- 修改文件:
vim yarn-site.xml
<property>
<!--配置yarn最大重试次数-->
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
</property>
- 配置flink-conf.yaml
# 配置Yarn重试次数
yarn.application-attempts: 10
## 配置Zookeeper
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: 10.108.4.203:2181,10.108.4.204:2181,10.108.4.205:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_yarn
- 同步配置文件
# 2. 启动
- 启动Flink Yarn Session有2种模式:分离模式、客户端模式
- 分离模式
通过-d指定分离模式,即客户端在启动Flink Yarn Session后,就不再属于Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通过yarn application -kill 命令来停止
yarn-session.sh -n 3 -jm 1024 -tm 1024 -s 3 -nm FlinkOnYarnSession -d -st
# 错误
2020-04-15 16:19:28,433 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - Error while running the Flink Yarn session.
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:387)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:616)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$3(FlinkYarnSessionCli.java:844)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:844)
Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment.
在yarn-site.xml加上
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
编辑 (opens new window)
上次更新: 2022/04/13, 21:51:31