11、Flink 笔记 - window窗口(二)
一、触发器(Trigger)
1.1、案例一
利用global window + trigger 计算单词出现三次统计一次(有点像CountWindow)
某台虚拟机或者mac 终端输入:nc -lk 9999
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.util.Collector;
public class WindowFunction_Global_Trigger {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> inputDataStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream.flatMap(new CustomFlatMap())
.keyBy(0)
.window(GlobalWindows.create()) // 如果不调用trigger 那么程序一直处于数据收集阶段 无法触发计算
.trigger(CountTrigger.of(3))
.sum(1);
resultDataStream.print();
env.execute();
}
public static class CustomFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
终端输入:
hello flink
hello spark
hello hive
控制台打印:(hello,3)
1.2、案例二
利用global window 自定义一个CountWindow,也是单词出现3次统计一次
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;
public class WindowFunction_CustomTrigger {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> inputDataStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream.flatMap(new CustomFlatMap())
.keyBy(0)
.window(GlobalWindows.create())
.trigger(new CustomTrigger(3))
.sum(1);
resultDataStream.print();
env.execute();
}
public static class CustomFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
public static class CustomTrigger extends Trigger<Tuple2<String, Integer>, GlobalWindow> {
private long maxCount;
public CustomTrigger(long count) {
this.maxCount = count;
}
// 定义一个状态保存 每个 key 对应的 count 值 (涉及到状态编程 后面会具体介绍)
private ReducingStateDescriptor<Long> stateDescriptor = new ReducingStateDescriptor<Long>("count", new ReduceFunction<Long>() {
@Override
public Long reduce(Long input1, Long input2) throws Exception {
return input1 + input2;
}
}, Long.class);
/**
* 每来一条数据都会执行
*
* @param input 输入类型
* @param timestamp 处理时间戳
* @param globalWindow 全窗口类型(所属窗口)
* @param triggerContext trigger 上下文
* @return TriggerResult
* 1. TriggerResult.CONTINUE :表示对 window 不做任何处理
* 2. TriggerResult.FIRE :表示触发 window 的计算
* 3. TriggerResult.PURGE :表示清除 window 中的所有数据
* 4. TriggerResult.FIRE_AND_PURGE :表示先触发 window 计算,然后删除 window 中的数据
* @throws Exception
*/
@Override
public TriggerResult onElement(Tuple2<String, Integer> input, long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 获取 key 对应之前 count 状态值
ReducingState<Long> count = triggerContext.getPartitionedState(stateDescriptor);
// 每来一条数据 累加 1
count.add(1L);
if (maxCount == count.get()) {
// 如果已经达到预期的count
// 1 清除 count 状态
count.clear();
// 2 先触发计算 再清空窗口的数据
return TriggerResult.FIRE_AND_PURGE;
}
// 3 否则不做任务处理
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 基于 processingTime 定时器处理逻辑
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 基于 EventTime 定时器处理逻辑
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 清理状态
triggerContext.getPartitionedState(stateDescriptor).clear();
}
}
}
终端输入:
world spark
world flink
world hive
控制台打印:(world,3)
二、移除器(Evictor)
2.1 案例
利用global window + trigger + evictor 实现每个2个单词统计最近3个单词
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.util.Collector;
import java.util.Iterator;
public class WindowFuncton_CustomEvictor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> inputDataStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream.flatMap(new CustomFlatMap())
.keyBy(0)
.window(GlobalWindows.create())
.trigger(new CustomTrigger(2)) // 每来2条数据触发后面的计算
.evictor(new CustomEvictor(3))
.sum(1);
resultDataStream.print();
env.execute();
}
public static class CustomFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(" ");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
public static class CustomTrigger extends Trigger<Tuple2<String, Integer>, GlobalWindow> {
private long maxCount;
public CustomTrigger(long count) {
this.maxCount = count;
}
// 定义一个状态保存 每个 key 对应的 count 值 (涉及到状态编程 后面会具体介绍)
private ReducingStateDescriptor<Long> stateDescriptor = new ReducingStateDescriptor<Long>("count", new ReduceFunction<Long>() {
@Override
public Long reduce(Long input1, Long input2) throws Exception {
return input1 + input2;
}
}, Long.class);
/**
* 每来一条数据都会执行
*
* @param input 输入类型
* @param timestamp 处理时间戳
* @param globalWindow 全窗口类型(所属窗口)
* @param triggerContext trigger 上下文
* @return TriggerResult
* 1. TriggerResult.CONTINUE :表示对 window 不做任何处理
* 2. TriggerResult.FIRE :表示触发 window 的计算
* 3. TriggerResult.PURGE :表示清除 window 中的所有数据
* 4. TriggerResult.FIRE_AND_PURGE :表示先触发 window 计算,然后删除 window 中的数据
* @throws Exception
*/
@Override
public TriggerResult onElement(Tuple2<String, Integer> input, long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 获取 key 对应之前 count 状态值
ReducingState<Long> count = triggerContext.getPartitionedState(stateDescriptor);
// 每来一条数据 累加 1
count.add(1L);
if (maxCount == count.get()) {
// 如果已经达到预期的count
// 1 清除 count 状态
count.clear();
// 2 先触发计算 不清空窗口的数据
return TriggerResult.FIRE;
}
// 3 否则不做任务处理
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 基于 processingTime 定时器处理逻辑
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 基于 EventTime 定时器处理逻辑
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
// 清理状态
triggerContext.getPartitionedState(stateDescriptor).clear();
}
}
public static class CustomEvictor implements Evictor<Tuple2<String, Integer>, GlobalWindow> {
// 定义窗口的数据大小
private long windowCount;
public CustomEvictor(long windowCount) {
this.windowCount = windowCount;
}
/**
* @param iterable 当前窗口的全部数据 (可以认为这些数据是有顺序的(相对队列))
* @param size 当前窗口的数据大小
* @param globalWindow
* @param evictorContext 上下文
*/
@Override
public void evictBefore(Iterable<TimestampedValue<Tuple2<String, Integer>>> iterable, int size, GlobalWindow globalWindow, EvictorContext evictorContext) {
// 如果输入数据窗口大小等于指定窗口大小 没有数据可以移除
if (windowCount == size) {
return;
} else {
// 临时 count 用来判断移除哪些数据
int evictorCount = 0;
Iterator<TimestampedValue<Tuple2<String, Integer>>> iterator = iterable.iterator();
while (iterator.hasNext()) {
iterator.next();
evictorCount++;
// 判断什么时候可以移除哪些数据
/**
* 比如当前窗口共有5条数据 统计最近3条数据 移除2条数据
* evictorCount = 1 size = 5 windowCount = 3 (需要移除当前遍历数据)
* evictorCount = 2 size = 5 windowCount = 3 (需要移除当前遍历数据)
* evictorCount = 3 size = 5 windowCount = 3 (不需要移除当前遍历数据)
* ...
*/
if (evictorCount > size - windowCount) {
break;
} else {
iterator.remove();
}
}
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<Tuple2<String, Integer>>> iterable, int i, GlobalWindow globalWindow, EvictorContext evictorContext) {
}
}
}
终端输入:
flink A
flink B
(此时控制台已经打印(flink,2) )
flink C
flink D
(此时控制台已经答应 (flink,3))
flink E
flink F
(此时控制台已经答应 (flink,3))
以此类推
三、迟到数据和侧输出流
迟到数据:这些数据根据 EventTime已经分配好了所在窗口执行,但是所在窗口已经执行计算了,并且这些数据不会被当前窗口所收集和计算。
侧输出流:把迟到的数据放入到侧输出流中,单独进行计算。
由于该案例涉及到 EventTime(日志产生时间)、ingestTime(数据进入程序时间)、processTime(数据处理时间)的关系和 watermark(水位线)的概念描述,该案例放到下面具体介绍。