Flink学习
# 学习
# 1. WordCount示例
flink run ./examples/batch/WordCount.jar --input hdfs://mycluster/datas/wordcounttest.txt --output hdfs://mycluster/datas/wordcountresult.txt
# 2. 验证Flink高可用
- 手动将JobManager / TaskManager实例添加到群集(单点启动)
jobmanager.sh start hadoop100
jobmanager.sh start cluster hadoop100
- 在hadoop101上 kill -9 10155(StandaloneSessionClusterEntrypoint), 查看另一个节点是否可用。
# 3. 高可用
独立集群的JobManager的机制是,一个leader JobManager和多个standby JobManager,当leader JobManager崩溃后,多个standby JobManager选举后产生新的leader JobManager。
leader JobManager和standby JobManager之间没有区别,任何JobManager都可以承担leader或standby角色。
借助ZK的临时节点机制,Flink实现了Job Manager独立集群的高可用性。但是由于ZK是CP,并不保证每次可用性,实际使用中应当予以考虑。
# 4. flink on yarn
修改etc/hadoop/yarn-site.xml
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
stop-cluster.sh 后再启动 flink on yarn (1. namenode死掉) yarn-session.sh -n 3 -jm 1024 -tm 1024 -s 3 -nm FlinkOnYarnSession -d -st
# 5. Flink 的 Watermark(水位线 水印)(迟到数据的处理)
每隔3秒统计前3秒的元素个数,那么flink系统会事先在系统中划分好20(60/3)个window
制定watermark的策略: 周期性提取watermark,默认时间为200ms,我们可以认为在1号数据被分配到window之后的200ms,flink系统就开始计算水位线了
假设允许数据乱序的最大时间为10秒 数据开始流入flink系统
1号数据 01:01:22---hello
2号数据 01:01:35---flink
在此之后 又来一条数据(迟到) 3号数据 01:01:23---later
第一条数据流入 01:01:22---hello
这条数据的event time是01:01:22, 那么它将会被放置到[00:00:21-00:00:24)窗口内
(100ms后)此时水位线(Watermark)为 12 (22-10)
Watermark(12) < WindowEndTime(24) 所以 不会触发该window的计算
第二条数据流入 01:01:35---flink
这条数据的event time是01:01:35, 那么它会被放置在[00:00:33-00:00:36)窗口内
(100ms后)此时水位线(Watermark)为 25 (35-10)
由于Watermark(25) >= WindowEndTime(24), 所以 会触发水位线(25)之前window的计算, ([21, 24))
计算后窗口直接销毁
第三条数据流入 01:01:23---later
正常情况下数据应放到[00:00:21-00:00:24)窗口, 由于此窗口被销毁,所以数据被丢弃
为保证数据完整性,修改 AllowedLateness 为2s 也就是窗口触发计算后2s再销毁
(35-37流入)这条数据的event time是01:01:23,
那么它将会被放置到[00:00:21-00:00:24)窗口(此窗口在37时会被销毁)内
(100ms后)此时计算水位线: 23-10=13 < Watermark=25, 所以水位线不变仍为25
由于 Watermark(25) < WindowEndTime(24) + AllowedLateness(2),
所以 [00:00:21-00:00:24)窗口会再次(多次)触发
此时, 窗口销毁时机 Watermark >= WindowEndTime + AllowedLateness
# 代码学习
word count
package hrbu
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamingJob {
def main(args: Array[String]) {
//参数--host localhost --port 7777
val params = ParameterTool.fromArgs(args)
val host:String = params.get("host")
val port:Int = params.getInt("port")
// set up the streaming execution environment
//创建流处理的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//接收socket数据流
val textDataSet = env.socketTextStream(host, port)
//逐一读取数据,搭散之后进行WordCount
val wordCountData = textDataSet
.flatMap(_.split("\\s"))
.filter(_.nonEmpty)
.map( (_,1) )
.keyBy(0)
//.timeWindow(Time.seconds(10))
.sum(1)
//打印输出
wordCountData.print()
.setParallelism(2) //设置并行度 1
//wordCountData.writeAsText("datas\\flink_Stream_result").setParallelism(2)
//执行任务
env.execute("stream word count job")
}
}
消费kafka
package hrbu.StreamingJob
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object StreamingJob_Kafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// kafka读取数据
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.1.100:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer[String]("", new SimpleStringSchema(), properties))
stream3.print("stream3").setParallelism(1)
env.execute("source test")
}
}
# demo案例
需求:在当今数字时代,信用卡欺诈行为越来越被重视。 罪犯可以通过诈骗或者入侵安全级别较低系统来盗窃信用卡卡号。 用盗得的信用卡进行很小额度的例如一美元或者更小额度的消费进行测试。 如果测试消费成功,那么他们就会用这个信用卡进行大笔消费,来购买一些他们希望得到的,或者可以倒卖的财物 为了方便编码,我们直接下载flink官网提供的项目模板,因为内置的walkthrough包只有最新的flink1.10版本才有,所以我们下载完项目后就不要再改动相关版本号了
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-scala \
-DarchetypeVersion=1.10.0 \
-DgroupId=frauddetection \
-DartifactId=frauddetection \
-Dversion=0.1 \
-Dpackage=spendreport \
-DinteractiveMode=false
下载完成后,将项目导入到IDEA FraudDetectionJob
package spendreport
import org.apache.flink.streaming.api.scala._
import org.apache.flink.walkthrough.common.sink.AlertSink
import org.apache.flink.walkthrough.common.entity.Alert
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource
/**
* Skeleton code for the DataStream code walkthrough
*/
object FraudDetectionJob {
@throws[Exception]
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
/**
* TransactionSource就是walkthrough包的交易数据生成类
* Transaction类结构非常简单,类似于: Transaction {accountId=2, timestamp=154627760000. amount=412.91]
*/
val transactions: DataStream[Transaction] = env
.addSource(new TransactionSource)
.name("transactions")
/**
* 此处为了方便检测结果的输出,我们在最终结果DataStream中存放的是wa1kthrough的Alert类
* 该类很简单,主要为了输出欺诈账户的id
*/
/**
* DataStrean的process方法支持传入一个继承KeyedProcessFunction的自定义类
* KeyedProcessFunction主要是解决同一个key对应集合数据的处理
* 内部实现的方法为processElement, 该方法能够保证在处理元素数据时,属于同一个key的元素被放置到同一个context (上下文)中
*/
val alerts: DataStream[Alert] = transactions
.keyBy(transaction => transaction.getAccountId)
.process(new FraudDetector)
.name("fraud-detector")
alerts
.addSink(new AlertSink)
.name("send-alerts")
//上面的所有过程都设置了nne属性,没有实际业务意义,仅仅是方便我们查看日志,定位问题
env.execute("Fraud Detection")
}
}
FraudDetector
package spendreport
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.walkthrough.common.entity.Alert
import org.apache.flink.walkthrough.common.entity.Transaction
/**
* Skeleton code for implementing a fraud detector.
*/
//该类的派生对象,我们可以理解为java中的静态代码块
object FraudDetector {
//以下常量在我们正式实现欺诈检测时会用到
//定义常量小型交易金额
val SMALL_AMOUNT: Double = 1.00
//定义常量大型交易金额
val LARGE_AMOUNT: Double = 500.00
//定义常量一分钟是多少毫秒
val ONE_MINUTE: Long = 60 * 1000L
}
/**
* KeyedProcessFunction需要指定3个对象类型,K I 0
* 第一个参数key指的是被处理的key是什么类型,这里是账户id是一个长整型
* 第二个参数是传入的被处理对象,这里是Transaction交易数据类
* 第三个参数指的是输出数据类型,这里我们使用Alert类
*/
@SerialVersionUID(1L)
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
/**
* @ transient注解表示该属性不用被序列化,此处生命一个私有变量ValueState,并赋值默认值
* 此处顺便说明下ValueState
* ValueState是一个包装类, 类似于Java标准库里边的AtomicReference 和Atomiclongo
* 它提供了三个用于交互的方法。update 用于更新状态,value 用于获取状态值,还有clear 用于清空状态。
* 如果一个key 还没有状态,例如当程序刚启动或者调用过Va lueState#clear方法时,ValueState#value 将会返回nullo
* 如果需要更新状态,需要调用Va lueState#update方法,直接更改ValueState#value 的返回值可能不会被系统识别。
* 容错处理将在Flink 后台自动管理,你可以像与常规变量那样与状态变量进行交互
*/
@transient private var flagState:ValueState[java.lang.Boolean] = _
//新增一个时间状态
@transient private var timerState:ValueState[java.lang.Long] = _
/**
* 正如我们ppt中所讲,Va lueState在使用之前需要先调用open方法注册
* 我们先生成Va lueStateDescriptor实例
* 然后使用上下文对象获取ValueState对象
* 此处顺便说明下
*/
@throws[Exception]
override def open(parameters: Configuration): Unit = {
val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
flagState = getRuntimeContext.getState(flagDescriptor)
//注册时,一起将时间状态注册了
val timeDescriptor = new ValueStateDescriptor("time-state", Types.LONG)
timerState = getRuntimeContext.getState(timeDescriptor)
}
/**
* 处理对象为Transaction交易数据对象
* 上下文为KeyedProcessFunction的Context
* 显示报警的数据我们使用Collector收集
*/
@throws[Exception]
def processElement(transaction: Transaction, context: KeyedProcessFunction[Long, Transaction, Alert]#Context, collector: Collector[Alert]): Unit = {
//先获取flagState的value值,第一次肯定是空
val lastTransactionWasSmall = flagState.value
//我们顺便打印下transaction数据, 方便观察
println("transaction=============" + transaction)
/**
* 首次处理element,肯定不进入此if,往后可能进入
* 基于第111行代码,如果不为nu1l其实lastTransactionWasSmall的值就是true
* 代表了上次的交易金额为小型交易
*/
if(lastTransactionWasSmall != null) {
//如果上次是小型交易,并且此次是大型交易,就报警
if(transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
val alert = new Alert
alert.setId(transaction.getAccountId)
collector.collect(alert)
}
/**
* 在检查之后,不论是什么状态,都需要被清空。
* 不管是当前交易触发了欺诈报警而造成模式的结束,还是当前交易没有触发报警而造成模式的中断,都需要重新开始新的模式检测
* 因为我们的需求规定小额紧跟大额才算欺诈,如果现在有3笔交易: 1:$0.5; 2:$505; 3:$605,那么第3笔就不应该被认定为欺诈
*/
//flagState.clear()
clearUp(context)
}
/**
* 无论是否进入上面if,只要当前的交易金额小于我们预定的小型金额,那么flagState就应该被更新为true
* 否则什么都不做
*/
if(transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
flagState.update(true)
/**
* KeyedProcessFunction#processElement需要使用提供了定时器服务的Context 来调用。
* 定时器服务可以用于查询当前时间、注册定时器和删除定时器。
* 我们首先定义了一个1分钟后触发的定时器,时间到达后会调用onIimer方法
* 同时我们更新了timerState状态
*/
val timer = context.timerService().currentProcessingTime() + FraudDetector.ONE_MINUTE
context.timerService().registerProcessingTimeTimer(timer)
timerState.update(timer)
}
}
/**
* 定时器回调: 1分钟之内如果没有小额+大额交易我们就重置已有小额交易的状态,防止检测到正常交易
*/
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext, out: Collector[Alert]): Unit = {
timerState.clear()
flagState.clear()
}
/**
* 在成功捕获欺诈交易后我们同样需要清空所有状态与定时器
* 我们可以把这些逻辑封装到一个私有函数中,而不是直接调用flagState. clear()
*/
private def clearUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
val timer = timerState.value()
//删除定时器
ctx.timerService().deleteProcessingTimeTimer(timer)
//清空所有状态
timerState.clear()
flagState.clear()
}
}
编辑 (opens new window)
上次更新: 2022/04/13, 21:51:31