IT教程 ·

[源码剖析] 从源码入手看 Flink Watermark 之流传历程

Solr搜索解析及查询解析器用法概述

[源码剖析] 从源码入手看 Flink Watermark 之流传历程

0x00 择要

本文将经过过程源码剖析,率领人人熟习Flink Watermark 之流传历程,趁便也可以对Flink团体逻辑有一个大抵把握。

0x01 总述

从静态角度讲,watermarks是完成流式盘算的中心观点;从动态角度说,watermarks贯串全部流处置惩罚程序。所以为相识说watermarks的流传,须要对flink的许多模块/观点举行相识,触及险些各个阶段。我起首会解说相干观点,然后会依据一个实例代码从以下几部份来诠释:程序逻辑/盘算图模子/程序实行。末了是细致Flink源码剖析(略冗杂,可以挑选性浏览)。

0x02 相干观点

流盘算被笼统成四个问题,what,where,when,how。

window处理的是where,也就是将无界数据划分红有界数据。

window的数据什么时刻被盘算是when?处理这个问题用的体式格局是watermark和trigger,watermark用来标记窗口的完全性。trigger用来设想窗口数据触发前提。

1. 乱序处置惩罚

乱序问题平常是和event time关联的, 关于一个流式处置惩罚体系的process time来说,是不存在乱序问题的。所以下面引见的watermark/allowedLateness也只是在event time作为主时刻才见效。

Flink中处置惩罚乱序依靠的 watermark+window+trigger,属于全局性的处置惩罚;Flink同时关于window而言,还供应了allowedLateness要领,使得更大限制的许可乱序,属于局部性的处置惩罚;

即watermark是全局的,不止针对window盘算,而allowedLateness让某一个特定window函数能本身掌握处置惩罚耽误数据的战略,allowedLateness是窗口函数的属性。

2. Watermark(水位线)

watermark是流式体系中主要用于处理流式体系中数据乱序问题的机制,要领是用于标记当前处置惩罚到什么水位的数据了,这意味着再早于这个水位的数据过来会被直接抛弃。这使得引擎可以自动跟踪数据中的当前事宜时刻,并尝试响应地消灭旧状况。

Watermarking示意多长时刻之前的数据将不再更新,您可以经过过程指定事宜时刻列来定义查询的Watermarking,并依据事宜时刻展望数据的耽误时刻。也就是说每次窗口滑动之前会举行Watermarking的盘算。当一组数据或新吸收的数据事宜时刻小于Watermarking时,则该数据不会更新,在内存中就不会保护该组数据的状况。

换一种说法,阈值内的滞后数据将被聚合,然则晚于阈值到来的数据(其现实时刻比watermark小)将被抛弃。

watermark和数据本身一样作为一般的音讯在流中活动

3. Trigger

Trigger 指明在哪些前提下触发window盘算,基于处置惩罚数据时的时刻以及事宜的特定属性。平常trigger的完成是当watermark处于某种时刻前提下或许窗口数据抵达肯定前提,窗口的数据入手下手盘算。

每一个窗口分派器都邑有一个默许的Trigger。假如默许的Trigger不能满足你的需求,你可以指定一个自定义的trigger()。Flink Trigger接口有以下要领许可trigger对差别的事宜做出回响反映:

* onElement():进入窗口的每一个元素都邑挪用该要领。
* onEventTime():事宜时刻timer触发的时刻被挪用。
* onProcessingTime():处置惩罚时刻timer触发的时刻会被挪用。
* onMerge():有状况的触发器相干,并在它们响应的窗口兼并时兼并两个触发器的状况,比方运用会话窗口。
* clear():该要领重如果实行窗口的删除操纵。

每次trigger,都是要对新增的数据,相干的window举行从新盘算,并输出。输出有complete, append,update三种输出情势:

  • Complete mode:Result Table 全量输出,也就是从新盘算过的window效果都输出。意味着这类情势下,每次读了新增的input数据,output的时刻会把内存中resulttable中一切window的效果都输出一遍。
  • Append mode (default):只需 Result Table 中新增的行才会被输出,所谓新增是指自上一次 trigger 的时刻。因为只是输出新增的行,所以假如老数据有修改就不适宜运用这类情势。 更新的window并不输出,不然外存里的key就重了。
  • Update mode:只需更新的 Row 都邑被输出,相称于 Append mode 的加强版。而且是对外存中的雷同key举行update,而不是append,须要外存是能kv操纵的!只会输出新增和更新过的window的效果。

从上面能看出来,流式框架关于window的效果数据是存在一个 result table里的!

4. allowedLateness

Flink中借助watermark以及window和trigger来处置惩罚基于event time的乱序问题,那末怎样处置惩罚“late element”呢?

或许另有人会问,out-of-order element与late element有什么区分?不都是一回事么?答案是一回事,都是为了处置惩罚乱序问题而发生的观点。要说区分,可以总结以下:

  • 经过过程watermark机制来处置惩罚out-of-order的问题,属于第一层防护,属于全局性的防护,一般说的乱序问题的处理办法,就是指这类;
  • 经过过程窗口上的allowedLateness机制来处置惩罚out-of-order的问题,属于第二层防护,属于特定window operator的防护,late element的问题就是指这类。

默许状况下,当watermark经过过程end-of-window今后,再有之前的数据抵达时,这些数据会被删除。为了防止有些晚到的数据被删除,因而发生了allowedLateness的观点。

简朴来说,allowedLateness就是针对event time而言,关于watermark凌驾end-of-window今后,还许可有一段时刻(也是以event time来权衡)来守候之前的数据抵达,以便再次处置惩罚这些数据。

5. 处置惩罚音讯历程

  1. windowoperator接到音讯今后,起首存到state,寄存的花样为k,v,key的花样是key + window,value是key和window对应的数据。
  2. 注册一个timer,timer的数据组织为 [key,window,window边境 - 1],将timer放到鸠合中去。
  3. 当windowoperator收到watermark今后,掏出鸠合中小于watermark的timer,触发其window。触发的历程当中将state内里对应key及window的数据掏出来,这里要经过序列化的历程,发送给windowfunction盘算。
  4. 数据发送给windowfunction,完成windowfunction的window数据盘算逻辑。

比方某窗口有三个数据:[key A, window A, 0], [key A, window A, 4999], [key A, window A, 5000]

关于牢固窗口,当第一个watermark (Watermark 5000)抵达时刻,[key A, window A, 0], [key A, window A, 4999] 会被盘算,当第二个watermark (Watermark 9999)抵达时刻,[key A, window A, 5000]会被盘算。

6. 累加(再次)盘算

watermark是全局性的参数,用于治理音讯的乱序,watermark凌驾window的endtime今后,就会触发窗口盘算。平常状况下,触发窗口盘算今后,窗口就烧毁掉了,背面再来的数据也不会再盘算。

因为到场了allowedLateness,所以盘算会和之前差别了。window这个allowedLateness属性,默许为0,假如allowedLateness > 0,那末在某一个特定watermark到来之前,这个触发过盘算的窗口还会继续保留,这个保留重如果窗口里的音讯。

这个特定的watermark是什么呢? watermark-allowedLateness>=窗口endtime。这个特定watermark来了今后,窗口就要消逝了,背面再来属于这个窗口的音讯,就丢掉了。在 "watermark(=窗口endtime)" ~ “watermark(=endtime+allowedLateness)" 这段时刻之间,对应窗口大概会屡次盘算。那末要window的endtime+allowedLateness <= watermark的时刻,window才会被清掉。

比方window的endtime是5000,allowedLateness=0,那末假如watermark 5000到来今后,这个window就应该被消灭。然则假如allowedLateness = 1000,则须要等water 6000(endtime + allowedLateness)到来今后,这个window才会被清掉。

Flink的allowedLateness可用于TumblingEventTimeWindow、SlidingEventTimeWindow以及EventTimeSessionWindows,这大概使得窗口再次被触发,相称于对前一次窗口的窗口的修改(累加盘算或许累加撤回盘算);

注重:关于trigger是默许的EventTimeTrigger的状况下,allowedLateness会再次触发窗口的盘算,而之前触发的数据,会buffer起来,直到watermark凌驾end-of-window + allowedLateness的时刻,窗口的数据及元数据信息才会被删除。再次盘算就是DataFlow模子中的Accumulating的状况。

同时,关于sessionWindow的状况,当late element在allowedLateness局限以内抵达时,大概会引起窗口的merge,如许之前窗口的数据会在新窗口中累加盘算,这就是DataFlow模子中的AccumulatingAndRetracting的状况。

7. Watermark流传

生产使命的pipeline中一般有多个stage,在泉源发生的watermark会在pipeline的多个stage间通报。相识watermark怎样在一个pipeline的多个stage间举行通报,可以更好的相识watermark对全部pipeline的影响,以及对pipeline效果延时的影响。我们在pipeline的各stage的边境上对watermark做以下定义:

  • 输入watermark(An input watermark):捕获上游各阶段数据处置惩罚进度。对泉源算子,input watermark是个特别的function,对进入的数据发生watermark。对非泉源算子,input watermark是上游stage中,一切shard/partition/instance发生的最小的watermark
  • 输出watermark(An output watermark):捕获本stage的数据进度,实质上指本stage中,一切input watermark的最小值,和本stage中一切非late event的数据的event time。比方,该stage中,被缓存起来守候做聚合的数据等。

每一个stage内的操纵并非线性递增的。观点上,每一个stage的操纵都可以被分为几个组件(components),每一个组件都邑影响pipeline的输出watermark。每一个组件的特征与详细的完成体式格局和包含的算子相干。理论上,这类算子会缓存数据,直到触发某个盘算。比方缓存一部份数据并将其存入状况(state)中,直到触发聚合盘算,并将盘算效果写入下流stage。

watermark可以是以下项的最小值:

  • 每一个source的watermark(Per-source watermark) - 每一个发送数据的stage.
  • 每一个外部数据源的watermark(Per-external input watermark) - pipeline之外的数据源
  • 每一个状况组件的watermark(Per-state component watermark) - 每种须要写入的state范例
  • 每一个输出buffer的watermark(Per-output buffer watermark) - 每一个吸收stage

这类精度的watermark可以更好的形貌体系内部状况。可以更简朴的跟踪数据在体系各个buffer中的流转状况,有助于排查数据梗塞问题。

0x03. Flink 程序组织 & 中心观点

1. 程序组织

Flink程序像通例的程序一样对数据鸠合举行转换操纵,每一个程序由下面几部份组成:

  1. 猎取一个实行环境
  2. 加载/竖立初始化数据
  3. 指定关于数据的transformations操纵
  4. 指定盘算的输出效果(打印或许输出到文件)
  5. 触发程序实行

flink流式盘算的中心观点,就是将数据从输入流一个个通报给Operator举行链式处置惩罚,末了交给输出流的历程。对数据的每一次处置惩罚在逻辑上成为一个operator,而且为了当地化处置惩罚的效力起见,operator之间也可以串成一个chain一同处置惩罚。

下面这张图表清楚明了flink是怎样对待用户的处置惩罚流程的:用户操纵被笼统化为一系列operator。以source入手下手,以sink末端,中心的operator做的操纵叫做transform,而且可以把几个操纵串在一同实行。

Source ---> Transformation ----> Transformation ----> Sink

以下是一个样例代码,后续的剖析会基于此代码

DataStream<String> text = env.socketTextStream(hostname, port);

DataStream counts = text
    .filter(new FilterClass())
    .map(new LineSplitter())
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator) 
    .keyBy(0)
    .timeWindow(Time.seconds(10))
    .sum(2)
  
counts.print()  
System.out.println(env.getExecutionPlan());

2. 中心类/接口

在用户设想程序时刻,对应以下中心类/接口

  • DataStream:形貌的是一个具有雷同数据范例的数据流,底层是经过过程详细的Transformation来完成,其担任供应种种对流上的数据举行操纵转换的API接口。
  • Transformation:形貌了构建一个DataStream的操纵,以及该操纵的并行度、输出数据范例等信息,并有一个属性,用来持有StreamOperator的一个详细实例;

上述代码逻辑中,对数据流做了以下操纵:filter, map, keyBy, assignTimestampsAndWatermarks, timeWindow, sum。每次转换都生成了一个新的DataStream

比方实例代码中的timeWindow末了生成了windowedStream。windowedStream之上实行的apply要领会生成了WindowOperator,初始化时包含了trigger以及allowedLateness的值。然后经过transform转换,现实上是实行了DataStream中的transform要领,末了生成了SingleOutputStreamOperator。SingleOutputStreamOperator这个类名字有点误导,现实上它是DataStream的子类

    public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
        KeySelector<T, K> keySel = input.getKeySelector(); //依据keyedStream猎取key
        WindowOperator<K, T, Iterable<T>, R, W> operator;
        operator =  new WindowOperator<>(windowAssigner, ... ,
                            new InternalIterableWindowFunction<>(function),
                          trigger,
                          allowedLateness,
                          legacyWindowOpType);
        return input.transform(opName, resultType, operator);//依据operator name,窗口函数的范例,以及window operator,实行keyedStream.transaform操纵
    }

0x04. Flink 实行图模子

Flink 中的实行图可以分红四层:StreamGraph ---> JobGraph ---> ExecutionGraph -> 物理实行图

  • StreamGraph:是对用户逻辑的映照,代表程序的拓扑组织,是依据用户经过过程 Stream API 编写的代码生成的最初的图。
  • JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据组织。主要的优化为,将多个相符前提的节点 chain 在一同作为一个节点,如许可以削减数据在节点之间活动所须要的序列化/反序列化/传输斲丧。
  • ExecutionGraph:JobManager 依据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调理层最中心的数据组织。
  • 物理实行图:JobManager 依据 ExecutionGraph 对 Job 举行调理后,在各个TaskManager 上布置 Task 后组成的“图”,并非一个详细的数据组织。

我们这里重点看StreamGraph,其相干重点数据组织是:

  • StreamNode 是用来形貌 operator 的逻辑节点,并具有一切相干的属性,如并发度、入边和出边等。
  • StreamEdge 是用来形貌两个 StreamNode(operator) 逻辑的链接边。

我们可以直接打印 Execution Plan

System.out.println(env.getExecutionPlan());

其内部挪用 StreamExecutionEnvironment.getExecutionPlan 取得 StreamGraph。

public String getExecutionPlan() {
        return getStreamGraph(DEFAULT_JOB_NAME, false).getStreamingPlanAsJSON();
}

StreamGraph的转换流是:

* Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink

下面是我把 示例代码 打印StreamGraph效果整理出来一个静态架构。可以看出代码中的转换被翻译成了以下实行Unit(在下面图中,实在行序列是由上而下)。

*        +-----> Data Source(ID = 1) [ Source Socket Stream ]  
*        |      // env.socketTextStream(hostname, port) 要领中生成了一个 Data Source
*        |      
*        +-----> Operator(ID = 2) [ Filter ]
*        | 
*        |      
*        +-----> Operator(ID = 3) [ Map ]
*        | 
*        |      
*        +-----> Operator(ID = 4) [ Timestamps/Watermarks ]
*        | 
*        |      
*        +-----> Operator(ID = 6) [ Window(SumAggregator) ]
*        |       // 多个Operator被构建成 Operator Chain
*        | 
*        |      
*        +-----> Data Sink(ID = 7) [ Sink : Print to Std. Out ] 
*                // counts.print() 是在数据流末了增加了个 Data Sink,用于承接统计效果   

示例代码中,Flink生成StreamGraph的大抵处置惩罚流程是:

  • 起首处置惩罚的Source,生成了SourceStreamNode
  • 处置惩罚Filter,生成了FilterStreamNode,并生成StreamEdge衔接上游SourceFilter
  • 处置惩罚Map,生成了MapStreamNode,并生成StreamEdge衔接上游FilterMap
  • 处置惩罚assignTimestampsAndWatermarks,生成了Timestamps/WatermarksStreamNode,并生成StreamEdge衔接上游MapTimestamps/Watermarks
  • 处置惩罚keyBy/timeWindow/sum,生成了WindowStreamNode 以及 Operator Chain,并生成StreamEdge衔接上游Timestamps/WatermarksWindow
  • 末了处置惩罚Sink,竖立SinkStreamNode,并生成StreamEdge与上游Window相连。

0x05. 实行模块生命周期

这里主要中心类是:

  • Function:用户经过过程继续该接口的差别子类来完成用户本身的数据处置惩罚逻辑。如子类SocketTextStreamFunction完成从指定hostname和port来吸收数据,并转发字符串的逻辑;
  • Task: 是Flink中实行的基本单位,代表一个 TaskManager 中所起的并行子使命,实行封装的 flink 算子并运转,供应以下效劳:花费输入data、生产 IntermediateResultPartition [ flink关于中心效果的笼统 ]、与 JobManager 交互。
  • StreamTask : 是当地实行的基本单位,由TaskManagers布置实行。包含了多个StreamOperator,封装了算子的处置惩罚逻辑。
  • StreamOperator:DataStream 上的每一个 Transformation 都对应了一个 StreamOperator,StreamOperator是运转时的详细完成,会决议UDF(User-Defined Funtion)的挪用体式格局。
  • StreamSource 是StreamOperator接口的一个详细完成类,其组织函数入参就是SourceFunction的子类,这里就是SocketTextStreamFunction的实例。

Task 是直接收 TaskManager 治理和调理的,而 Task 又会挪用 StreamTask(重如果其种种子类),StreamTask 中封装了算子(StreamOperator)的处置惩罚逻辑。StreamSource是用来开启全部流的算子。我们接下来就说说动态逻辑。

我们的示例代码中,一切程序逻辑都是运转在StreamTask(重如果其种种子类)中,filter/map对应了StreamOperator;assignTimestampsAndWatermarks用来生成Watermarks,通报给下流的.keyBy.timeWindow(WindowOperator)。而keyBy/timeWindow/sum又被构建成OperatorChain。所以我们下面就一一解说这些观点。

1. Task

Task,它是在线程中实行的Runable对象,每一个Task都是由一组Operators Chaining在一同的事变鸠合,Flink Job的实行历程可看做一张DAG图,Task是DAG图上的极点(Vertex),极点之间经过过程数据通报体式格局互相链接组成全部Job的Execution Graph。

Task 是直接收 TaskManager 治理和调理的,Flink末了经过过程RPC要领提交task,现实会挪用到TaskExecutor.submitTask要领中。这个要领会竖立真正的Task,然后挪用task.startTaskThread();入手下手task的实行。而startTaskThread要领,则会实行executingThread.start,从而挪用Task.run要领。
它的最中心的代码以下:

 * public class Task implements Runnable...
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and runs it
 *
 *  -- doRun()
 *        |
 *        +----> 从 NetworkEnvironment 中要求 BufferPool
 *        |      包含 InputGate 的吸收 pool 以及 task 的每一个 ResultPartition 的输出 pool
 *        +----> invokable = loadAndInstantiateInvokable(userCodeClassLoader, 
 *        |                  nameOfInvokableClass) 经过过程反射竖立
 *        |      load and instantiate the task's invokable code
 *        |      invokable即为operator对象实例,比方OneInputStreamTask,SourceStreamTask等
 *        |      OneInputStreamTask继续了StreamTask,这里现实挪用的invoke()要领是StreamTask里的
 *        +----> invokable.invoke()
 *        |      run the invokable, 
 *        |      
 *        |        
 * OneInputStreamTask<IN,OUT> extends StreamTask<OUT,OneInputStreamOperator<IN, OUT>>    

这个nameOfInvokableClass是那里生成的呢?实在早在生成StreamGraph的时刻,这就已肯定了,见StreamGraph.addOperator要领

        if (operatorObject instanceof StoppableStreamSource) {
            addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
        } else if (operatorObject instanceof StreamSource) {
            addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
        } else {
            addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
        }

这里的OneInputStreamTask.class即为生成的StreamNode的vertexClass。这个值会一向通报

StreamGraph --> JobVertex.invokableClass --> ExecutionJobVertex.TaskInformation.invokableClassName --> Task

2. StreamTask

是当地实行的基本单位,由TaskManagers布置实行,Task会挪用 StreamTask。StreamTask包含了headOperator 和 operatorChain,封装了算子的处置惩罚逻辑。可以明白为,StreamTask是实行流程框架,OperatorChain(StreamOperator)是担任详细算子逻辑,嵌入到StreamTask的实行流程框架中

直接从StreamTask的解释中,能看到StreamTask的生命周期。

个中,每一个operator的open()要领都被StreamTaskopenAllOperators()要领挪用。该要领(指openAllOperators)实行一切的operational的初始化,比方运用定时器效劳注册定时器。单个task大概正在实行多个operator,斲丧其先驱的输出,在这类状况下,该open()要领在末了一个operator中挪用,这个operator的输出也是task本身的输出。如许做使妥当第一个operator入手下手处置惩罚使命的输入时,它的一切下流operator都预备好吸收其输出。

OperatorChain是在StreamTask的invoke要领中被竖立的,在实行的时刻,假如一个operator没法被chain起来,那它就只需headOperator,chain里就没有其他operator了。

注重: task中的一连operator是从末了到第一个顺次open。

以OneInputStreamTask为例,Task的中心实行代码即为OneInputStreamTask.invoke要领,它会挪用StreamTask.invoke要领。

 * The life cycle of the task(StreamTask) is set up as follows:
 * {@code
 *  -- setInitialState -> provides state of all operators in the chain
 *        |   
 *        +----> 从新初始化task的state,而且在以下两种状况下尤为主要:
 *        |      1. 当使命从毛病中恢复并从末了一个胜利的checkpoint点从新启动时
 *        |      2. 从一个保留点恢复时。
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup() //竖立 operatorChain 并设置为 headOperator 的 Output
 *        --------> openAllOperators()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators() //实行 operatorChain 中一切 operator 的 open 要领
 *        +----> run() //runMailboxLoop()要领将一向运转,直到没有更多的输入数据
 *        --------> mailboxProcessor.runMailboxLoop();
 *        --------> StreamTask.processInput()
 *        --------> StreamTask.inputProcessor.processInput()   
 *        --------> 间接挪用 operator的processElement()和processWatermark()要领
 *        +----> close-operators() //实行 operatorChain 中一切 operator 的 close 要领
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }

3. OneInputStreamTask

OneInputStreamTask是 StreamTask 的完成类之一,具有代表性。我们示例代码中基本都是由OneInputStreamTask来做详细实行。

看看OneInputStreamTask 是怎样生成的?

 * 生成StreamNode时刻
 *
 *  -- StreamGraph.addOperator()
 *        |   
 *        +----> addNode(... OneInputStreamTask.class, operatorObject, operatorName);
 *        |      将 OneInputStreamTask 等 StreamTask 设置到 StreamNode 的节点属性中
 *     
 *  
 * 在 JobVertex 的节点组织时也会做一次初始化
 *        |      
 *        +----> jobVertex.setInvokableClass(streamNode.getJobVertexClass());   

后续在 TaskDeploymentDescriptor 实例化的时刻会猎取 jobVertex 中的属性。

再看看OneInputStreamTask 的 init() 和run() 离别都做了什么

 * OneInputStreamTask
 * class OneInputStreamTask<IN,OUT> extends StreamTask<OUT,OneInputStreamOperator<IN, OUT>>  * {@code
 *  -- init要领
 *        |
 *        +----> 猎取算子对应的输入序列化器 TypeSerializer
 *        +----> CheckpointedInputGate inputGate = createCheckpointedInputGate();
 *               猎取输入数据 InputGate[],InputGate 是 flink 网络传输的中心笼统之一
 *               其在内部封装了音讯的吸收和内存的治理,从 InputGate 可以拿到上游传送过来的数据
 *        +----> inputProcessor = new StreamOneInputProcessor<>(input,output,operatorChain) 
 *        |      1. StreamInputProcessor,是 StreamTask 内部用来处置惩罚 Record 的组件,  
 *        |      内里封装了外部 IO 逻辑【内存不够时将 buffer 吐到磁盘上】以及 时刻对齐逻辑【Watermark】
 *        |      2. output 是 StreamTaskNetworkOutput, input是StreamTaskNetworkInput
 *        |      如许就把input, output 他俩聚合进StreamOneInputProcessor
 *        +----> headOperator.getMetricGroup().gauge 
 *        +----> getEnvironment().getMetricGroup().gauge 
 *               设置一些 metrics 及 累加器
 * 
 * 
 *  -- run要领(就是基类StreamTask.run)
 *        +----> StreamTask.runMailboxLoop
 *        |      从 StreamTask.runMailboxLoop 入手下手,下面是一层层的挪用关联
 *        -----> StreamTask.processInput()
 *        -----> StreamTask.inputProcessor.processInput()
 *        -----> StreamOneInputProcessor.processInput
 *        -----> input.emitNext(output) 
 *        -----> StreamTaskNetworkInput.emitNext()
 *        |      while(true) {从输入source读取一个record, output是 StreamTaskNetworkOutput}
 *        -----> StreamTaskNetworkInput.processElement()  //详细处置惩罚record
 *        |      依据StreamElement的差别范例做差别处置惩罚
 *        |      if (recordOrMark.isRecord()) output.emitRecord()
 *        ------------> StreamTaskNetworkOutput.emitRecord()  
 *        ----------------> operator.processElement(record)   
 *        |      if (recordOrMark.isWatermark()) statusWatermarkValve.inputWatermark()
 *        |      if (recordOrMark.isLatencyMarker()) output.emitLatencyMarker()
 *        |      if (recordOrMark.isStreamStatus()) statusWatermarkValve.inputStreamStatus()   

4. OperatorChain

flink 中的一个 operator 代表一个最顶级的 api 接口,拿 streaming 来说就是,在 DataStream 上做诸如 map/reduce/keyBy 等操纵均会生成一个算子。

Operator Chain是指在生成JobGraph阶段,将Job中的Operators根据肯定战略(比方:single output operator可以chain在一同)链接起来并安排在一个Task线程中实行。削减了数据通报/线程切换等环节,下降体系开支的同时增加了资本应用率和Job机能。

chained operators现实上是从下流往上游去反向一个个竖立和setup的。假定chained operators为:StreamGroupedReduce - StreamFilter - StreamSink,而现实初始化次序则相反:StreamSink - StreamFilter - StreamGroupedReduce

 * OperatorChain(
 *          StreamTask<OUT, OP> containingTask,
 *          RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate)
 * {@code
 *  -- collect
 *        |
 *        +----> pushToOperator(StreamRecord<X> record)
 *        +---------> operator.processElement(castRecord); 
 *        //这里的operator是chainedOperator,即除了headOperator之外,盈余的operators的chain。
 *        //这个operator.processElement,会轮回挪用operator chain一切operator,直到chain end。
 *        //比方 Operator A 对应的 ChainingOutput collect 挪用了对应的算子 A 的 processElement 要领,这里又会挪用 B 的 ChainingOutput 的 collect 要领,以此类推。如许便完成了可 chain 算子的当地处置惩罚,终究经过网络输出 RecordWriterOutput 发送到下流节点。   

5. StreamOperator

StreamTask会挪用Operator,所以我们须要看看Operator的生命周期。

逻辑算子Transformation末了会对应到物理算子Operator,这个观点对应的就是StreamOperator

StreamOperator是根接口。关于 Streaming 来说一切的算子都继续自 StreamOperator。继续了StreamOperator的扩大接口则有OneInputStreamOperator,TwoInputStreamOperator。完成了StreamOperator的笼统类有AbstractStreamOperator以及它的子类AbstractStreamUdfOperator。

个中operator处置惩罚输入的数据(elements)可以是以下之一:input element,watermark和checkpoint barriers。他们中的每一个都有一个特别的单位来处置惩罚。element由processElement()要领处置惩罚,watermark由processWatermark()处置惩罚,checkpoint barriers由异步挪用的snapshotState()要领处置惩罚,此要领会触发一次checkpoint 。

processElement()要领也是UDF的逻辑被挪用的处所,比方MapFunction里的map()要领。

 * AbstractUdfStreamOperator, which is the basic class for all operators that execute UDFs.
 * 
 *         // initialization phase
 *         //初始化operator-specific要领,如RuntimeContext和metric collection
 *         OPERATOR::setup 
 *             UDF::setRuntimeContext
 *         //setup的挪用链是invoke(StreamTask) -> constructor(OperatorChain) -> setup
 *         //挪用setup时,StreamTask已在各个TaskManager节点上 
 *         //给出一个用来初始state的operator   
 *  
 *         OPERATOR::initializeState
 *         //实行一切operator-specific的初始化  
 *         OPERATOR::open
 *            UDF::open
 *         
 *         // processing phase (called on every element/watermark)
 *         OPERATOR::processElement
 *             UDF::run //给定一个operator可以有一个用户定义的函数(UDF)
 *         OPERATOR::processWatermark
 *         
 *         // checkpointing phase (called asynchronously on every checkpoint)
 *         OPERATOR::snapshotState
 *                 
 *         // termination phase
 *         OPERATOR::close
 *             UDF::close
 *         OPERATOR::dispose

OneInputStreamOperator与TwoInputStreamOperator接口。这两个接口异常相似,本质上就是处置惩罚流上存在的三种元素StreamRecord,Watermark和LatencyMarker。一个用作单流输入,一个用作双流输入。

6. StreamSource

StreamSource是用来开启全部流的算子(继续AbstractUdfStreamOperator)。StreamSource因为没有输入,所以没有完成InputStreamOperator的接口。比较特别的是ChainingStrategy初始化为HEAD。

在StreamSource这个类中,在运转时由SourceStreamTask挪用SourceFunction的run要领来启动source。

 * class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 *      extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> 
 * 
 *
 *  -- run()
 *        |   
 *        +----> latencyEmitter = new LatencyMarksEmitter
 *        |      用来发生耽误监控的LatencyMarker
 *        +----> this.ctx = StreamSourceContexts.getSourceContext
 *        |      据时刻情势(EventTime/IngestionTime/ProcessingTime)生成响应SourceConext  
 *        |      包含了发生element关联的timestamp的要领和生成watermark的要领
 *        +----> userFunction.run(ctx);
 *        |      挪用SourceFunction的run要领来启动source,举行数据的转发
 *        
public {
            //读到数据后,把数据交给collect要领,collect要领担任把数据交到适宜的位置(如宣布为br变量,或许交给下个operator,或许经过过程网络发出去)
    private transient SourceFunction.SourceContext<OUT> ctx;
    private transient volatile boolean canceledOrStopped = false;
    private transient volatile boolean hasSentMaxWatermark = false;
  
    public void run(final Object lockingObject,
            final StreamStatusMaintainer streamStatusMaintainer,
            final Output<StreamRecord<OUT>> collector,
            final OperatorChain<?, ?> operatorChain) throws Exception {
            userFunction.run(ctx);    
  }
}

7. StreamMap

StreamFilter,StreamMap与StreamFlatMap算子在完成的processElement离别挪用传入的FilterFunction,MapFunction, FlatMapFunction的udf将element传到下流。这里用StreamMap举例:

public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

  public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

8. WindowOperator

Flink经过过程水位线分派器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator这两个算子)向事宜流中注入水位线。

我们示例代码中,timeWindow()终究对应了WindowStream,窗口算子WindowOperator是窗口机制的底层完成。assignTimestampsAndWatermarks 则对应了TimestampsAndPeriodicWatermarksOperator算子,它把发生的Watermark通报给了WindowOperator。

元素在streaming dataflow引擎中活动到WindowOperator时,会被分为两拨,离别是一般事宜和水位线。

  • 假如是一般的事宜,则会挪用processElement要领举行处置惩罚,在processElement要领中,起首会应用窗口分派器为当前吸收到的元素分派窗口,接着会挪用触发器的onElement要领举行逐元素触发。关于时刻相干的触发器,一般会注册事宜时刻或许处置惩罚时刻定时器,这些定时器会被存储在WindowOperator的处置惩罚时刻定时器行列和水位线定时器行列中,假如触发的效果是FIRE,则对窗口举行盘算。
  • 假如是水位线(事宜时刻场景),则要领processWatermark将会被挪用,它将会处置惩罚水位线定时器行列中的定时器。假如时刻戳满足前提,则应用触发器的onEventTime要领举行处置惩罚。

而关于处置惩罚时刻的场景,WindowOperator将本身完成为一个基于处置惩罚时刻的触发器,以触发trigger要领来花费处置惩罚时刻定时器行列中的定时器满足前提则会挪用窗口触发器的onProcessingTime,依据触发效果推断是不是对窗口举行盘算。

 * public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 *  extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
 *  implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> 
 *
 *  -- processElement()
 *        |   
 *        +----> windowAssigner.assignWindows
 *        |      //经过过程WindowAssigner为element分派一系列windows
 *        +----> windowState.add(element.getValue())
 *        |      //把当前的element到场buffer state 
 *        +----> TriggerResult triggerResult = triggerContext.onElement(element)
 *        |      //触发onElment,取得triggerResult
 *        +----> Trigger.OnMergeContext.onElement()
 *        +----> trigger.onElement(element.getValue(), element.getTimestamp(), window,...)
 *        +----> EventTimeTriggers.onElement()
 *        |      //假如当前window.maxTimestamp已小于CurrentWatermark,直接触发  
 *        |      //不然将window.maxTimestamp注册到TimeService中,守候触发   
 *        +----> contents = windowState.get(); emitWindowContents(actualWindow, contents)
 *        |      //对triggerResult做种种处置惩罚,假如fire,真正去盘算窗口中的elements
   
 *  -- processWatermark()   
 *        -----> 终究进入基类AbstractStreamOperator.processWatermark
 *        -----> AbstractStreamOperator.processWatermark(watermark) 
 *        -----> timeServiceManager.advanceWatermark(mark); 第一步处置惩罚watermark
 *        -----> output.emitWatermark(mark) 第二步将watermark发送到下流
 *        -----> InternalTimeServiceManager.advanceWatermark      

0x06. 处置惩罚 Watermark 的扼要流程

末了是处置惩罚 Watermark 的扼要流程(OneInputStreamTask为例)

 *  -- OneInputStreamTask.invoke()
 *        |   
 *        +----> StreamTask.init 
 *        |      把StreamTaskNetworkOutput/StreamTaskNetworkInput聚合StreamOneInputProcessor
 *        +----> StreamTask.runMailboxLoop
 *        |      从 StreamTask.runMailboxLoop 入手下手,下面是一层层的挪用关联
 *        -----> StreamTask.processInput()
 *        -----> StreamTask.inputProcessor.processInput()
 *        -----> StreamOneInputProcessor.processInput
 *        -----> input.emitNext(output)
 *        -----> StreamTaskNetworkInput.emitNext()
 *        -----> StreamTaskNetworkInput.processElement()
   
   
 *  下面是处置惩罚一般 Record  
 *  -- StreamTaskNetworkInput.processElement()  
 *        |   
 *        | 下面都是一层层的挪用关联
 *        -----> output.emitRecord(recordOrMark.asRecord())
 *        -----> StreamTaskNetworkOutput.emitRecord()
 *        -----> operator.processElement(record)
 *               进入详细算子 processElement 的处置惩罚,比方StreamFlatMap.processElement
 *        -----> StreamFlatMap.processElement(record)
 *        -----> userFunction.flatMap()
    
   
 *  -- 下面是处置惩罚 Watermark
 *  -- StreamTaskNetworkInput.processElement()  
 *        |   
 *        | 下面都是一层层的挪用关联
 *        -----> StatusWatermarkValve.inputWatermark()
 *        -----> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels()
 *        -----> output.emitWatermark()
 *        -----> StreamTaskNetworkOutput.emitWatermark()
 *        -----> operator.processWatermark(watermark) 
 *        -----> KeyedProcessOperator.processWatermark(watermark) 
 *               详细算子processWatermark处置惩罚,如WindowOperator/KeyedProcessOperator.processWatermark 
 *               终究进入基类AbstractStreamOperator.processWatermark
 *        -----> AbstractStreamOperator.processWatermark(watermark) 
 *        -----> timeServiceManager.advanceWatermark(mark); 第一步处置惩罚watermark
 *               output.emitWatermark(mark) 第二步将watermark发送到下流
 *        -----> InternalTimeServiceManager.advanceWatermark   
 *        -----> 下面看看第一步处置惩罚watermark  
 *        -----> InternalTimerServiceImpl.advanceWatermark   
 *               逻辑timer时刻小于watermark的都应该被触发还调。从eventTimeTimersQueue从小到大取timer,假如小于传入的water mark,那末申明这个window须要触发。注重watermarker是没有key的,所以当一个watermark来的时刻是会触发一切timer,而timer的key是不肯定的,所以这里肯定要设置keyContext,不然就乱了
 *        -----> triggerTarget.onEventTime(timer);
 *               triggerTarget是详细operator对象,open时经过过程InternalTimeServiceManager.getInternalTimerService通报到HeapInternalTimerService  
 *        -----> KeyedProcessOperator.onEeventTime()
 *               挪用用户完成的keyedProcessFunction.onTimer去做详细事变。关于window来说也是挪用onEventTime或许onProcessTime来从key和window对应的状况中的数据发送到windowFunction中去盘算并发送到下流节点  
 *        -----> invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
 *        -----> userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
 
   
 *  -- DataStream 设置定时发送Watermark,是加了个chain的TimestampsAndPeriodicWatermarksOperator
 *  -- StreamTaskNetworkInput.processElement()        
 *        -----> TimestampsAndPeriodicWatermarksOperator.processElement
 *               会挪用AssignerWithPeriodicWatermarks.extractTimestamp提取event time
 *               然后更新StreamRecord的时刻
 *        -----> WindowOperator.processElement
 *               在windowAssigner.assignWindows时以element的timestamp作为assign时刻

0x07 处置惩罚 Watermark 的细致流程(源码剖析)

下面代码剖析略冗杂。

我们再看看样例代码

DataStream<String> text = env.socketTextStream(hostname, port);

DataStream counts = text
    .filter(new FilterClass())
    .map(new LineSplitter())
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator) 
    .keyBy(0)
    .timeWindow(Time.seconds(10))
    .sum(2)
  
counts.print()  
System.out.println(env.getExecutionPlan());

1. 程序逻辑 DataStream & Transformation

起首看看逻辑API。

DataStream是数据流观点。A DataStream represents a stream of elements of the same type。

Transformation是一个逻辑API观点。Transformation代表了流的转换,将一个或多个DataStream转换为新的DataStream。A Transformation is applied on one or more data streams or data sets and results in one or more output data streams or data sets。

我们以为Transformation就是逻辑算子,而 Transformation 对应的物理观点是Operators。

DataStream类在内部组合了一个 Transformation类,现实的转换操纵均经过过程该类完成,形貌了这个DataStream是怎样来的。

针对示例代码,"assignTimestampsAndWatermarks","Filter","Map"这几种,都被转换为 SingleOutputStreamOperator,继续由用户举行逻辑处置惩罚。SingleOutputStreamOperator这个类名字有点误导,现实上它是DataStream的子类

@Public
public class DataStream<T> {
    protected final StreamExecutionEnvironment environment;
    protected final Transformation<T> transformation;  
    
  //assignTimestampsAndWatermarks这个操纵现实上也生成了一个SingleOutputStreamOperator算子
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

        final int inputParallelism = getTransformation().getParallelism();
        final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

        TimestampsAndPeriodicWatermarksOperator<T> operator =
                new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

        return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
                .setParallelism(inputParallelism);
    }

  //Map是一个OneInputStreamOperator算子。
    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
        return transform("Map", outputType, new StreamMap<>(clean(mapper)));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            OneInputStreamOperatorFactory<T, R> operatorFactory) {
        return doTransform(operatorName, outTypeInfo, operatorFactory);
    }

    protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                this.transformation,
                operatorName,
                operatorFactory,
                outTypeInfo,
                environment.getParallelism());

    // SingleOutputStreamOperator 现实上是 DataStream 的子类,名字内里有Operator轻易误导人人。
        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        //就是把Transformation加到运转环境上去。
        getExecutionEnvironment().addOperator(resultTransform); 
        return returnStream;
    }     
}

针对示例代码,绝大多数逻辑算子都转换为OneInputTransformation,每一个Transformation内里间接纪录了对应的物理Operator。注册到Env上。

// OneInputTransformation对应了单输入的算子
@Internal
public class OneInputTransformation<IN, OUT> extends PhysicalTransformation<OUT> {
    private final Transformation<IN> input;
    private final StreamOperatorFactory<OUT> operatorFactory; // 这里间接纪录了本Transformation对应的物理Operator。比方StreamMap。
    private KeySelector<IN, ?> stateKeySelector;
    private TypeInformation<?> stateKeyType;
  
    public OneInputTransformation(
            Transformation<IN> input,
            String name,
            OneInputStreamOperator<IN, OUT> operator, // 比方StreamMap
            TypeInformation<OUT> outputType,
            int parallelism) {
        this(input, name, SimpleOperatorFactory.of(operator), outputType, parallelism);
    }  
}   

回到样例代码,DataStream.keyBy会返回一个KeyedStream。KeyedStream. timeWindow会返回一个WindowedStream。同时内部把种种 Transformation 注册到了 Env 中。

WindowedStream内部对应WindowedOperator。WindowedStream却不是Stream的子类! 而是把 KeyedStream 包含在内作为一个成员变量。

// 这个竟然不是Stream的子类! 而是把 KeyedStream 包含在内作为一个成员变量。
@Public
public class WindowedStream<T, K, W extends Window> {
    private final KeyedStream<T, K> input; // 这里包含了DataStream。
    private final WindowAssigner<? super T, W> windowAssigner;
    private Trigger<? super T, ? super W> trigger;
    private Evictor<? super T, ? super W> evictor;
    private long allowedLateness = 0L;

  // reduce, fold等函数也是相似操纵。
  private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {

        final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
        KeySelector<T, K> keySel = input.getKeySelector();
        WindowOperator<K, T, Iterable<T>, R, W> operator;

        ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
                input.getType().createSerializer(getExecutionEnvironment().getConfig()));

      // 这里直接生成了 WindowOperator
            operator =
                new WindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    function,
                    trigger,
                    allowedLateness,
                    lateDataOutputTag);
        }

        return input.transform(opName, resultType, operator);
}

在生成了程序逻辑今后,Env内里就有了 一系列 transformation(每一个transformation内里纪录了本身对应的物理 operator,比方StreamMap,WindowOperator),这个是背面生成盘算图的基本。

当挪用env.execute时,经过过程StreamGraphGenerator.generate遍历个中的transformation鸠合组织出StreamGraph。

2. 生成盘算图

我们这里重点引见StreamGraph以及怎样生成,JobGraph,ExecutionGraph只是简介。

StreamGraph代表程序的拓扑组织,是从用户代码直接生成的图。StreamOperator是详细的物理算子。

一个很主要的点是,把 SourceStreamTask / OneInputStreamTask 增加到StreamNode上,作为 jobVertexClass,这个是实在盘算的部份。

StreamOperator是一个接口。StreamOperator 是 数据流操纵符的基本接口,该接口的详细完成子类中,会有保留用户自定义数据处置惩罚逻辑的函数的属性,担任对userFunction的挪用,以及挪用时传入所需参数,比方在StreamSource这个类中,在挪用SourceFunction的run要领时,会构建一个SourceContext的详细实例,作为入参,用于run要领中,举行数据的转发;

StreamOperator

PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {
}

AbstractStreamOperator

AbstractStreamOperator笼统类完成了StreamOperator。在AbstractStreamOperator中有一些主要的成员变量,整体来说可以分为几类,一类是运转时相干的,一类是状况相干的,一类是设置相干的,一类是时刻相干的,另有一类是监控相干的。

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable {
    protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
    private transient StreamTask<?, ?> container;
    protected transient StreamConfig config;
    protected transient Output<StreamRecord<OUT>> output;
    private transient StreamingRuntimeContext runtimeContext;
    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark); //第一步处置惩罚watermark
        }
        output.emitWatermark(mark);//第二步,将watermark发送到下流
    }  
}

AbstractUdfStreamOperator

AbstractUdfStreamOperator笼统类继续了AbstractStreamOperator,对其部份要领做了加强,多了一个成员变量UserFunction。供应了一些通用功用,比方把context赋给算子,保留快照等等。另外还完成了OutputTypeConfigurable接口的setOutputType要领对输出数据的范例做了设置。

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
        extends AbstractStreamOperator<OUT>
        implements OutputTypeConfigurable<OUT> {
    protected final F userFunction;/** The user function. */
}

KeyedProcessOperator & WindowOperator。

KeyedStream,WindowedStream离别对应KeyedProcessOperator,WindowOperator。

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
    extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
    protected final WindowAssigner<? super IN, W> windowAssigner;
    private final KeySelector<IN, K> keySelector;
    private final Trigger<? super IN, ? super W> trigger;
    private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<W> windowSerializer;  
}

@Internal
public class KeyedProcessOperator<K, IN, OUT>
        extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
    private transient TimestampedCollector<OUT> collector;
    private transient ContextImpl context;
    private transient OnTimerContextImpl onTimerContext;
  
    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
        InternalTimerService<VoidNamespace> internalTimerService =
                getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        TimerService timerService = new SimpleTimerService(internalTimerService);
        context = new ContextImpl(userFunction, timerService);
        onTimerContext = new OnTimerContextImpl(userFunction, timerService);
    }

    @Override
    public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.setAbsoluteTimestamp(timer.getTimestamp());
        invokeUserFunction(TimeDomain.EVENT_TIME, timer);
    }

    @Override
    public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.eraseTimestamp();
        invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement(element.getValue(), context, collector);
        context.element = null;
    }

    private void invokeUserFunction(
            TimeDomain timeDomain,
            InternalTimer<K, VoidNamespace> timer) throws Exception {
        onTimerContext.timeDomain = timeDomain;
        onTimerContext.timer = timer;
        userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
        onTimerContext.timeDomain = null;
        onTimerContext.timer = null;
    }  
}

OneInputStreamOperator & TwoInputStreamOperator

承接输入数据并举行处置惩罚的算子就是OneInputStreamOperator、TwoInputStreamOperator等。 这两个接口异常相似,本质上就是处置惩罚流上存在的三种元素StreamRecord,Watermark和LatencyMarker。一个用作单流输入,一个用作双流输入。除了StreamSource之外的一切Stream算子都必须完成而且只能完成个中一个接口。

@PublicEvolving
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
    void processElement(StreamRecord<IN> element) throws Exception;
    void processWatermark(Watermark mark) throws Exception;
    void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
}

StreamMap & StreamFlatMap

map,filter等经常使用操纵都是OneInputStreamOperator。下面给出StreamMap,StreamFlatMap作为详细例子。

// 用StreamMap里做个现实算子的例子@
Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

// 用StreamFlatMap里做个现实算子的例子
@Internal
public class StreamFlatMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private transient TimestampedCollector<OUT> collector;

    public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
        super(flatMapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        userFunction.flatMap(element.getValue(), collector);
    }
}

生成StreamGraph

程序实行即env.execute("Java WordCount from SocketTextStream Example")这行代码的时刻,就会生成StreamGraph。代表程序的拓扑组织,是从用户代码直接生成的图。

StreamGraph生成函数剖析

现实生成StreamGraph的进口是StreamGraphGenerator.generate(env, transformations) 。个中的transformations是一个list,内里纪录的就是我们在transform要领中放进来的算子。终究会挪用 transformXXX 来对详细的Transformation举行转换。

@Internal
public class StreamGraphGenerator {
    private final List<Transformation<?>> transformations;
    private StreamGraph streamGraph;

    public StreamGraph generate() {
        //注重,StreamGraph的生成是从sink入手下手的
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);

        for (Transformation<?> transformation: transformations) {
            transform(transformation);
        }

        final StreamGraph builtStreamGraph = streamGraph;
        return builtStreamGraph;
    }   

    private Collection<Integer> transform(Transformation<?> transform) {
        //这个要领的中心逻辑就是推断传入的steamOperator是哪一种范例,并实行响应的操纵,概况见下面那一大堆if-else
        //这里对操纵符的范例举行推断,并以此挪用响应的处置惩罚逻辑.简而言之,处置惩罚的中心无非是递归的将该节点和节点的上游节点到场图
        Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
    }
        .......
  }

  //因为map,filter等经常使用操纵都是OneInputStreamOperator,我们就来看看StreamGraphGenerator.transformOneInputTransform((OneInputTransformation<?, ?>) transform)要领。
  //该函数起首会对该transform的上游transform举行递归转换,确保上游的都已完成了转化。然后经过过程transform组织出StreamNode,末了与上游的transform举行衔接,组织出StreamNode。

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
        //就是递归处置惩罚节点,为当前节点和它的依靠节点竖立边,处置惩罚边之类的,把节点加到图里。
        Collection<Integer> inputIds = transform(transform.getInput());
        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
    // 这里增加Operator到streamGraph上。
        streamGraph.addOperator(transform.getId(),
                slotSharingGroup,
                transform.getCoLocationGroupKey(),
                transform.getOperatorFactory(),
                transform.getInputType(),
                transform.getOutputType(),
                transform.getName());

        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
            transform.getParallelism() : executionConfig.getParallelism();
        streamGraph.setParallelism(transform.getId(), parallelism);
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }
        return Collections.singleton(transform.getId());
    }
}
streamGraph.addOperator

在之前的生成图代码中,有streamGraph.addOperator,我们详细看看完成。

里主要的是把 SourceStreamTask / OneInputStreamTask 增加到StreamNode上,作为 jobVertexClass。

@Internal
public class StreamGraph implements Pipeline {
  
  public <IN, OUT> void addOperator(
        Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        StreamOperatorFactory<OUT> operatorFactory,
        TypeInformation<IN> inTypeInfo,
        TypeInformation<OUT> outTypeInfo,
        String operatorName) {

      // 这里增加了 OneInputStreamTask/SourceStreamTask,这个是往后实在运转的处所。
      if (operatorFactory.isStreamSource()) {
        addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName);
      } else {
        addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName);
      }
  }

    protected StreamNode addNode(Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        Class<? extends AbstractInvokable> vertexClass, // 这里是OneInputStreamTask...
        StreamOperatorFactory<?> operatorFactory,
        String operatorName) {

        StreamNode vertex = new StreamNode(
            vertexID,
            slotSharingGroup,
            coLocationGroup,
            operatorFactory,
            operatorName,
            new ArrayList<OutputSelector<?>>(),
            vertexClass);

        streamNodes.put(vertexID, vertex);
        return vertex;
    }
}

症结类StreamNode

@Internal
public class StreamNode implements Serializable {
    private transient StreamOperatorFactory<?> operatorFactory;
    private List<OutputSelector<?>> outputSelectors;
    private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
    private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
    private final Class<? extends AbstractInvokable> jobVertexClass; // OneInputStreamTask
  
    @VisibleForTesting
    public StreamNode(
            Integer id,
            @Nullable String slotSharingGroup,
            @Nullable String coLocationGroup,
            StreamOperator<?> operator,
            String operatorName,
            List<OutputSelector<?>> outputSelector,
            Class<? extends AbstractInvokable> jobVertexClass) {
        this(id, slotSharingGroup, coLocationGroup, SimpleOperatorFactory.of(operator),
                operatorName, outputSelector, jobVertexClass);
    }  
  
    public Class<? extends AbstractInvokable> getJobVertexClass() {
        return jobVertexClass;
    }  
}

3. Task之间数据交流机制

Flink中的数据交流构建在以下两条设想准绳之上:

  • 数据交流的掌握流(比方,为实例化交流而举行的音讯传输)是吸收端初始化的,这异常像最初的MapReduce。
  • 数据交流的数据流(比方,在网络上终究传输的数据)被笼统成一个叫做IntermediateResult的观点,它是可插拔的。这意味着体系基于雷同的完成逻辑可以既支撑流数据,又支撑批处置惩罚数据的传输。

数据在task之间传输团体历程

  • 第一步必定是预备一个ResultPartition;
  • 关照JobMaster;
  • JobMaster关照下流节点;假以下流节点还没有布置,则布置之;
  • 下流节点向上游要求数据
  • 入手下手传输数据

数据在task之间详细传输

形貌了数据从生产者传输到花费者的完全生命周期。

数据在task之间通报有以下几步:

  • 数据在本operator处置惩罚完后,经过过程Collector网络,这些纪录被传给RecordWriter对象。每条纪录都要挑选一个下流节点,所以要经过ChannelSelector。一个ChannelSelector挑选一个或许多个序列化器来处置惩罚纪录。假如纪录在broadcast中,它们将被通报给每一个序列化器。假如纪录是基于hash分区的,ChannelSelector将会盘算纪录的hash值,然后挑选适宜的序列化器。
  • 每一个channel都有一个serializer,序列化器将record数据纪录序列化成二进制的示意情势。然后将它们放到大小适宜的buffer中(纪录也可以被切割到多个buffer中)。
  • 接下来数据被写入ResultPartition下的各个subPartition (ResultSubpartition - RS,用于为特定的花费者网络buffer数据)里,此时该数据已存入DirectBuffer(MemorySegment)。既然首个buffer进来了,RS就对花费者变成可接见的状况了(注重,这个行动完成了一个streaming shuffle),然后它关照JobManager。
  • JobManager查找RS的花费者,然后关照TaskManager一个数据块已可以接见了。关照TM2的音讯会被发送到InputChannel,该inputchannel被以为是吸收这个buffer的,接着关照RS2可以初始化一个网络传输了。然后,RS2经过过程TM1的网络栈要求该buffer,然后两边基于netty预备举行数据传输。网络衔接是在TaskManager(而非特定的task)之间长时刻存在的。
  • 零丁的线程掌握数据的flush速率,一旦触发flush,则经过过程Netty的nio通道向对端写入。
  • 对端的netty client吸收到数据,decode出来,把数据拷贝到buffer里,然后关照InputChannel
  • 一旦buffer被TM2吸收,它会穿过一个相似的对象栈,起始于InputChannel(吸收端 等价于IRPQ),进入InputGate(它包含多个IC),终究进入一个RecordDeserializer,它用于从buffer中还原成范例化的纪录,然后将其通报给吸收task。
  • 有可用的数据时,下流算子从壅塞醒来。从InputChannel掏出buffer,再解序列化成record,交给算子实行用户代码。

4. 数据源的逻辑——StreamSource与时刻模子

SourceFunction是一切stream source的根接口。

StreamSource笼统了一个数据源,而且指定了一些怎样处置惩罚数据的情势。StreamSource是用来开启全部流的算子。SourceFunction定义了两个接口要领:

run : 启动一个source,即对接一个外部数据源然后emit元素组成stream(大部份状况下会经过过程在该要领里运转一个while轮回的情势来发生stream)。
cancel : 作废一个source,也行将run中的轮回emit元素的行动停止。

@Public
public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceContext<T> ctx) throws Exception;
    void cancel();
    @Public // Interface might be extended in the future with additional methods.
  //SourceContex则是用来举行数据发送的接口。
    interface SourceContext<T> {
      void collect(T element);
      @PublicEvolving
      void collectWithTimestamp(T element, long timestamp);
      @PublicEvolving
      void emitWatermark(Watermark mark);
      @PublicEvolving
      void markAsTemporarilyIdle();
      Object getCheckpointLock();
      void close();
    }  
}

public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
        extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
            //读到数据后,把数据交给collect要领,collect要领担任把数据交到适宜的位置(如宣布为br变量,或许交给下个operator,或许经过过程网络发出去)
    private transient SourceFunction.SourceContext<OUT> ctx;
    private transient volatile boolean canceledOrStopped = false;
    private transient volatile boolean hasSentMaxWatermark = false;
  
    public void run(final Object lockingObject,
            final StreamStatusMaintainer streamStatusMaintainer,
            final Output<StreamRecord<OUT>> collector,
            final OperatorChain<?, ?> operatorChain) throws Exception {
            userFunction.run(ctx);    
  }
}

SocketTextStreamFunction

回到实例代码,env.socketTextStream(hostname, port)就是生成了SocketTextStreamFunction。

run要领的逻辑如上,逻辑很清楚,就是从指定的hostname和port延续不停的读取数据,按行分隔符划分红一个个字符串,然后转发到下流。

cancel要领的完成以下,就是将运转状况的标识isRunning属性设置为false,并依据须要封闭当前socket。

@PublicEvolving
public class SocketTextStreamFunction implements SourceFunction<String> {
    private final String hostname;
    private final int port;
    private final String delimiter;
    private final long maxNumRetries;
    private final long delayBetweenRetries;
    private transient Socket currentSocket;
    private volatile boolean isRunning = true;

  public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries) {
        this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
    }

    public void run(SourceContext<String> ctx) throws Exception {
   final StringBuilder buffer = new StringBuilder();
   long attempt = 0;
   /** 这里是第一层轮回,只需当前处于运转状况,该轮回就不会退出,会一向轮回 */
   while (isRunning) {
      try (Socket socket = new Socket()) {
         /** 对指定的hostname和port,竖立Socket衔接,并构建一个BufferedReader,用来从Socket中读取数据 */
         currentSocket = socket;
         LOG.info("Connecting to server socket " + hostname + ':' + port);
         socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
         BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         char[] cbuf = new char[8192];
         int bytesRead;
         /** 这里是第二层轮回,对运转状况举行了两重校验,同时对从Socket中读取的字节数举行推断 */
         while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
            buffer.append(cbuf, 0, bytesRead);
            int delimPos;
            /** 这里是第三层轮回,就是对从Socket中读取到的数据,按行分隔符举行支解,并将每行数据作为一个团体字符串向下流转发 */
            while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
               String record = buffer.substring(0, delimPos);
               if (delimiter.equals("n") && record.endsWith("r")) {
                  record = record.substring(0, record.length() - 1);
               }
               /** 用入参ctx,举行数据的转发 */
               ctx.collect(record);
               buffer.delete(0, delimPos + delimiter.length());
            }
         }
      }
      /** 假如因为碰到EOF字符,致使从轮回中退出,则依据运转状况,以及设置的最大重试尝试次数,决议是不是举行 sleep and retry,或许直接退出轮回 */
      if (isRunning) {
         attempt++;
         if (maxNumRetries == -1 || attempt < maxNumRetries) {
            LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
            Thread.sleep(delayBetweenRetries);
         }
         else {
            break;
         }
      }
   }
   /** 在最外层的轮回都退出后,末了搜检下缓存中是不是另有数据,假如有,则向下流转发 */
   if (buffer.length() > 0) {
      ctx.collect(buffer.toString());
   }
    }
  
    public void cancel() {
   isRunning = false;
   Socket theSocket = this.currentSocket;
   /** 假如当前socket不为null,则举行封闭操纵 */
   if (theSocket != null) {
      IOUtils.closeSocket(theSocket);
   }
    }  
}

5. StreamTask

回到实例代码,filter,map是在StreamTask中实行,可以看看StreamTask等详细定义。

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {
    private final StreamTaskActionExecutor actionExecutor;

  /**
     * The input processor. Initialized in {@link #init()} method.
     */
    @Nullable
    protected StreamInputProcessor inputProcessor; // 这个是处置惩罚症结。

    /** the head operator that consumes the input streams of this task. */
    protected OP headOperator;

    /** The chain of operators executed by this task. */
    protected OperatorChain<OUT, OP> operatorChain;

    /** The configuration of this streaming task. */
    protected final StreamConfig configuration;

    /** Our state backend. We use this to create checkpoint streams and a keyed state backend. */
    protected StateBackend stateBackend;

    /** The external storage where checkpoint data is persisted. */
    private CheckpointStorageWorkerView checkpointStorage;

    /**
     * The internal {@link TimerService} used to define the current
     * processing time (default = {@code System.currentTimeMillis()}) and
     * register timers for tasks to be executed in the future.
     */
    protected TimerService timerService;

    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;

    /** The map of user-defined accumulators of this task. */
    private final Map<String, Accumulator<?, ?>> accumulatorMap;

    /** The currently active background materialization threads. */
    private final CloseableRegistry cancelables = new CloseableRegistry();

    private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;

    /**
     * Flag to mark the task "in operation", in which case check needs to be initialized to true,
     * so that early cancel() before invoke() behaves correctly.
     */
    private volatile boolean isRunning;

    /** Flag to mark this task as canceled. */
    private volatile boolean canceled;

    private boolean disposedOperators;

    /** Thread pool for async snapshot workers. */
    private ExecutorService asyncOperationsThreadPool;

    private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;

    protected final MailboxProcessor mailboxProcessor;

    private Long syncSavepointId = null;  
  
    @Override
    public final void invoke() throws Exception {
        try {
            beforeInvoke();

            // final check to exit early before starting to run
            if (canceled) {
                throw new CancelTaskException();
            }

            // let the task do its work
            isRunning = true;
            runMailboxLoop(); //MailboxProcessor.runMailboxLoop会挪用StreamTask.processInput

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            afterInvoke();
        }
        finally {
            cleanUpInvoke();
        }
    } 
  
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        InputStatus status = inputProcessor.processInput(); // 这里会详细从source读取数据。
        if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
            return;
        }
        if (status == InputStatus.END_OF_INPUT) {
            controller.allActionsCompleted();
            return;
        }
    //详细实行操纵。
        CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
        MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
        jointFuture.thenRun(suspendedDefaultAction::resume);
    }  
}

前面提到,Task对象在实行历程当中,把实行的使命交给了StreamTask这个类去实行。在我们的wordcount例子中,现实初始化的是OneInputStreamTask的对象。那末这个对象是怎样实行用户的代码的呢?

它做的以下:

起首,初始化 initialize-operator-states()。

然后 open-operators() 要领。

末了挪用 StreamTask#runMailboxLoop,便入手下手处置惩罚Source端花费的数据,并流入下流算子处置惩罚。

详细来说,就是把使命直接交给了InputProcessor去实行processInput要领。这是一个StreamInputProcessor的实例,该processor的使命就是处置惩罚输入的数据,包含用户数据、watermark和checkpoint数据等。

详细到OneInputStreamTask,OneInputStreamTask.inputProcessor 是 StreamOneInputProcessor 范例,它把input, output聚合在一同。input是StreamTaskNetworkInput范例。output是StreamTaskNetworkOutput范例。

详细代码以下

@Internal
public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
  //这是OneInputStreamTask的init要领,从configs内里猎取StreamOperator信息,生成本身的inputProcessor。
    @Override
    public void init() throws Exception {
        StreamConfig configuration = getConfiguration();
        int numberOfInputs = configuration.getNumberOfInputs();
        if (numberOfInputs > 0) {
            CheckpointedInputGate inputGate = createCheckpointedInputGate();
            DataOutput<IN> output = createDataOutput(); //  这里生成了 StreamTaskNetworkOutput
            StreamTaskInput<IN> input = createTaskInput(inputGate, output);
            inputProcessor = new StreamOneInputProcessor<>( // 这里把input, output经过过程Processor设置到了一同。
                input,
                output,
                operatorChain);
        }
        headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
    }
  
    private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate inputGate, DataOutput<IN> output) {
        int numberOfInputChannels = inputGate.getNumberOfInputChannels();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(numberOfInputChannels, output);

        TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
        return new StreamTaskNetworkInput<>(
            inputGate,
            inSerializer,
            getEnvironment().getIOManager(),
            statusWatermarkValve,
            0);
    }  
  
    /**
     * The network data output implementation used for processing stream elements
     * from {@link StreamTaskNetworkInput} in one input processor.
     */
    private static class StreamTaskNetworkOutput<IN> extends AbstractDataOutput<IN> {

        private final OneInputStreamOperator<IN, ?> operator;
        private final WatermarkGauge watermarkGauge;
        private final Counter numRecordsIn;

        private StreamTaskNetworkOutput(
                OneInputStreamOperator<IN, ?> operator, // 这个就是注册的Operator
                StreamStatusMaintainer streamStatusMaintainer,
                WatermarkGauge watermarkGauge,
                Counter numRecordsIn) {
            super(streamStatusMaintainer);

            this.operator = checkNotNull(operator);
            this.watermarkGauge = checkNotNull(watermarkGauge);
            this.numRecordsIn = checkNotNull(numRecordsIn);
        }

        @Override
        public void emitRecord(StreamRecord<IN> record) throws Exception {
            numRecordsIn.inc();
            operator.setKeyContextElement1(record);
            operator.processElement(record);
        }

        @Override
        public void emitWatermark(Watermark watermark) throws Exception {
            watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            operator.processWatermark(watermark); // 这里就进入了processWatermark详细处置惩罚,比方WindowOperator的
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            operator.processLatencyMarker(latencyMarker);
        }
    }  
}

@Internal
public interface StreamInputProcessor extends AvailabilityProvider, Closeable {
    InputStatus processInput() throws Exception;
}

@Internal
public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {
    @Override
    public InputStatus processInput() throws Exception {
        InputStatus status = input.emitNext(output);  // 这里是入手下手从输入source读取一个record。input, output离别是 StreamTaskNetworkInput,StreamTaskNetworkOutput。
        if (status == InputStatus.END_OF_INPUT) {
            operatorChain.endHeadOperatorInput(1);
        }
        return status;
    }
}

@Internal
public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> {
  
    @Override
    public InputStatus emitNext(DataOutput<T> output) throws Exception {

        while (true) {
            // get the stream element from the deserializer
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    processElement(deserializationDelegate.getInstance(), output); //详细处置惩罚record
                    return InputStatus.MORE_AVAILABLE;
                }
            }

            Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
            if (bufferOrEvent.isPresent()) {
                processBufferOrEvent(bufferOrEvent.get());
            } else {
                if (checkpointedInputGate.isFinished()) {
                    checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");
                    if (!checkpointedInputGate.isEmpty()) {
                        throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                    }
                    return InputStatus.END_OF_INPUT;
                }
                return InputStatus.NOTHING_AVAILABLE;
            }
        }
    }

  // 依据record范例,来处置惩罚record照样watermark
    private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
        if (recordOrMark.isRecord()){
            output.emitRecord(recordOrMark.asRecord()); // 挪用 StreamTaskNetworkOutput,终究挪用到operator.processElement(record);
        } else if (recordOrMark.isWatermark()) {
            statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
        } else if (recordOrMark.isLatencyMarker()) {
            output.emitLatencyMarker(recordOrMark.asLatencyMarker());
        } else if (recordOrMark.isStreamStatus()) {
            statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);
        } else {
            throw new UnsupportedOperationException("Unknown type of StreamElement");
        }
    }
}

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable {
    protected transient InternalTimeServiceManager<?> timeServiceManager;
    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
    }  
}

@Internal
public class InternalTimeServiceManager<K> {
    private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;  
    public void advanceWatermark(Watermark watermark) throws Exception {
        for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
            service.advanceWatermark(watermark.getTimestamp());
        }
    }
}  

public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {
    private final ProcessingTimeService processingTimeService;
    private final KeyContext keyContext;
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
    private final KeyGroupRange localKeyGroupRange;
    private final int localKeyGroupRangeStartIdx;  
    public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;
        InternalTimer<K, N> timer;
        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }  
}

上面的代码中,StreamTaskNetworkOutput.emitRecord中的operator.processElement(record);才是真正处置惩罚用户逻辑的代码。

StatusWatermarkValve就是用来处置惩罚watermark的。

@Internal
public class StatusWatermarkValve {
    private final DataOutput output;
    
    public void inputWatermark(Watermark watermark, int channelIndex) throws Exception {
        // ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle).
        if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) {
            long watermarkMillis = watermark.getTimestamp();

            // if the input watermark's value is less than the last received watermark for its input channel, ignore it also.
            if (watermarkMillis > channelStatuses[channelIndex].watermark) {
                channelStatuses[channelIndex].watermark = watermarkMillis;

                // previously unaligned input channels are now aligned if its watermark has caught up
                if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) {
                    channelStatuses[channelIndex].isWatermarkAligned = true;
                }

                // now, attempt to find a new min watermark across all aligned channels
                findAndOutputNewMinWatermarkAcrossAlignedChannels();
            }
        }
    }   
    
    private void findAndOutputNewMinWatermarkAcrossAlignedChannels() throws Exception {
        long newMinWatermark = Long.MAX_VALUE;
        boolean hasAlignedChannels = false;

        // determine new overall watermark by considering only watermark-aligned channels across all channels
        for (InputChannelStatus channelStatus : channelStatuses) {
            if (channelStatus.isWatermarkAligned) {
                hasAlignedChannels = true;
                newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
            }
        }

        // we acknowledge and output the new overall watermark if it really is aggregated
        // from some remaining aligned channel, and is also larger than the last output watermark
        if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
            lastOutputWatermark = newMinWatermark;
            output.emitWatermark(new Watermark(lastOutputWatermark)); // 这里会终究emit watermark
        }
    }   
}

6. Watermarks的生成

而Watermark的发生是在Apache Flink的Source节点 或 Watermark生成器盘算发生(如Apache Flink内置的 Periodic Watermark完成)

There are two ways to assign timestamps and generate Watermarks:

  1. Directly in the data stream source 自定义数据源设置 Timestamp/Watermark
  2. Via a TimestampAssigner / WatermarkGenerator 在数据流中设置 Timestamp/Watermark。

自定义数据源设置 Timestamp/Watermark

自定义的数据源类须要继续并完成 SourceFunction[T] 接口,个中 run 要领是定义数据生产的处所:

//自定义的数据源为自定义范例MyType
class MySource extends SourceFunction[MyType]{

    //重写run要领,定义数据生产的逻辑
    override def run(ctx: SourceContext[MyType]): Unit = {
        while (/* condition */) {
            val next: MyType = getNext()
            //设置timestamp从MyType的哪一个字段猎取(eventTimestamp)
            ctx.collectWithTimestamp(next, next.eventTimestamp)
    
            if (next.hasWatermarkTime) {
                //设置watermark从MyType的谁人要领猎取(getWatermarkTime)
                ctx.emitWatermark(new Watermark(next.getWatermarkTime))
            }
        }
    }
}

在数据流中设置 Timestamp/Watermark

在数据流中,可以设置 stream 的 Timestamp Assigner ,该 Assigner 将会吸收一个 stream,并生产一个带 Timestamp和Watermark 的新 stream

Flink经过过程水位线分派器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator这两个算子)向事宜流中注入水位线。元素在streaming dataflow引擎中活动到WindowOperator时,会被分为两拨,离别是一般事宜和水位线

回到实例代码,assignTimestampsAndWatermarks 就是生成一个TimestampsAndPeriodicWatermarksOperator。

TimestampsAndPeriodicWatermarksOperator的详细处置惩罚 Watermark代码以下。个中processWatermark详细是阻断上游水位线,如许下流就只能用本身发生的水位线了

public class TimestampsAndPeriodicWatermarksOperator<T>
        extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
        implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
    private transient long watermarkInterval;
    private transient long currentWatermark;        

  //可以看到在processElement会挪用AssignerWithPeriodicWatermarks.extractTimestamp提取event time, 然后更新StreamRecord的时刻。
    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

        output.collect(element.replace(element.getValue(), newTimestamp));
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        // register next timer
        Watermark newWatermark = userFunction.getCurrentWatermark(); //定时挪用用户自定义的getCurrentWatermark
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark
            output.emitWatermark(newWatermark);
        }

        long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        // if we receive a Long.MAX_VALUE watermark we forward it since it is used
        // to signal the end of input and to not block watermark progress downstream
        if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
            currentWatermark = Long.MAX_VALUE;
            output.emitWatermark(mark);
        }
    }  
}   

7. WindowOperator的完成

末了的 .keyBy(0) .timeWindow(Time.seconds(10)) 是由 WindowOperator处置惩罚。

Flink经过过程水位线分派器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator这两个算子)向事宜流中注入水位线。元素在streaming dataflow引擎中活动到WindowOperator时,会被分为两拨,离别是一般事宜和水位线

假如是一般的事宜,则会挪用processElement要领举行处置惩罚,在processElement要领中,起首会应用窗口分派器为当前吸收到的元素分派窗口,接着会挪用触发器的onElement要领举行逐元素触发。关于时刻相干的触发器,一般会注册事宜时刻或许处置惩罚时刻定时器,这些定时器会被存储在WindowOperator的处置惩罚时刻定时器行列和水位线定时器行列中,假如触发的效果是FIRE,则对窗口举行盘算。

假如是水位线(事宜时刻场景),则要领processWatermark将会被挪用,它将会处置惩罚水位线定时器行列中的定时器。假如时刻戳满足前提,则应用触发器的onEventTime要领举行处置惩罚。processWatermark 用来处置惩罚上游发送过来的watermark,可以以为不做任何处置惩罚,下流的watermark只与其上游近来的生成体式格局相干。

WindowOperator内部有触发器上下文对象接口的完成——Context,它主要供应了三种范例的要领:

  • 供应状况存储与接见;
  • 定时器的注册与删除;
  • 窗口触发器process系列要领的包装;

在注册定时器时,会新建定时器对象并将其到场到定时器行列中。比及时刻相干的处置惩罚要领(processWatermark和trigger)被触发挪用,则会从定时器行列中花费定时器对象并挪用窗口触发器,然后依据触发效果来推断是不是震动窗口的盘算。

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
    extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

  protected final WindowAssigner<? super IN, W> windowAssigner;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient Context triggerContext = new Context(null, null); //触发器上下文对象
    protected transient WindowContext processContext;
    protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;

无论是windowOperator照样KeyedProcessOperator都持有InternalTimerService详细完成的对象,经过过程这个对象用户可以注册EventTime及ProcessTime的timer,当watermark 超出这些timer的时刻,挪用回调函数实行肯定的操纵。

window operator经过过程WindowAssigner和Trigger来完成它的逻辑。当一个element抵达时,经过过程KeySelector先assign一个key,而且经过过程WindowAssigner assign若干个windows(指定element分派到哪一个window去),如许这个element会被放入若干个pane。一个pane会寄存一切雷同key和雷同window的elements。

比方 SlidingEventTimeWindows 的完成。

* public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>
  
    Collection<TimeWindow> assignWindows(Object element, long timestamp, ...) {
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
            for (long start = lastStart;
                start > timestamp - size;
                start -= slide) {
        //可以看到这里会assign多个TimeWindow,因为是slide
                windows.add(new TimeWindow(start, start + size));
            }
            return windows;
    }

再比方 TumblingProcessingTimeWindows

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {

Collection<TimeWindow> assignWindows(Object element, long timestamp, ...) {
      final long now = context.getCurrentProcessingTime();
      long start = now - (now % size);
      //很简朴,分派一个TimeWindow
      return Collections.singletonList(new TimeWindow(start, start + size)); 
}

processWatermark

起首看看处置惩罚Watermark

public void processWatermark(Watermark mark) throws Exception {
    //定义一个标识,示意是不是仍有定时器满足触发前提   
    boolean fire;   
    do {
        //从水位线定时器行列中查找队首的一个定时器,注重此处并非出队(注重跟remove要领的区分)      
        Timer<k, w=""> timer = watermarkTimersQueue.peek();      
        //假如定时器存在,且其时刻戳戳不大于水位线的时刻戳
        //(注重明白前提是:不大于,水位线用于示意小于该时刻戳的元素都已抵达,所以一切不大于水位线的触发时刻戳都该被触发)
        if (timer != null && timer.timestamp <= mark.getTimestamp()) {
            //置标识为真,示意找到满足触发前提的定时器         
            fire = true;         
            //将该元素从队首出队
            watermarkTimers.remove(timer);         
            watermarkTimersQueue.remove();
            //构建新的上下文         
            context.key = timer.key;         
            context.window = timer.window;         
            setKeyContext(timer.key);         
            //窗口所运用的状况存储范例为可追加的状况存储
            AppendingState<in, acc=""> windowState;         
            MergingWindowSet<w> mergingWindows = null;         
            //假如分派器是兼并分派器(比方会话窗口)
            if (windowAssigner instanceof MergingWindowAssigner) {
                //取得兼并窗口协助类MergingWindowSet的实例            
                mergingWindows = getMergingWindowSet();            
                //获妥当前窗口对应的状况窗口(状况窗口对应着状况后端存储的定名空间)
                W stateWindow = mergingWindows.getStateWindow(context.window);            
                //假如没有对应的状况窗口,则跳过本次轮回
                if (stateWindow == null) {                              
                    continue;            
                }
                //获妥当前窗口对应的状况示意            
                windowState = getPartitionedState(stateWindow, 
                    windowSerializer, windowStateDescriptor);         
            } else {
                //假如不是兼并分派器,则直接猎取窗口对应的状况示意            
                windowState = getPartitionedState(context.window, 
                    windowSerializer, windowStateDescriptor);         
            }
            //从窗口状况示意中取得窗口中一切的元素         
            ACC contents = windowState.get();         
            if (contents == null) {            
                // if we have no state, there is nothing to do            
                continue;         
            }
            //经过过程上下文对象挪用窗口触发器的事宜时刻处置惩罚要领并取得触发效果对象
            TriggerResult triggerResult = context.onEventTime(timer.timestamp);         
            //假如触发的效果是FIRE(震动窗口盘算),则挪用fire要领举行窗口盘算
            if (triggerResult.isFire()) {            
                fire(context.window, contents);         
            }
            //而假如震动的效果是清算窗口,或许事宜时刻即是窗口的清算时刻(一般为窗口的maxTimestamp属性)         
            if (triggerResult.isPurge() || 
                (windowAssigner.isEventTime() 
                    && isCleanupTime(context.window, timer.timestamp))) {
                //清算窗口及元素            
                cleanup(context.window, windowState, mergingWindows);         
            }      
        } else {
            //行列中没有相符前提的定时器,置标识为否,停止轮回         
            fire = false;      
        }   
    } while (fire);   
    //向下流发射水位线,把waterMark通报下去
    output.emitWatermark(mark);   
    //更新currentWaterMark, 将当前算子的水位线属性用新水位线的时刻戳掩盖
    this.currentWatermark = mark.getTimestamp();
}

以上要领虽然冗杂但流程还算清楚,个中的fire要领用于对窗口举行盘算,它会挪用内部窗口函数(即InternalWindowFunction,它包装了WindowFunction)的apply要领。

processElement

处置惩罚element抵达的逻辑,将当前的element的value加到对应的window中,触发onElement

public void processElement(StreamRecord<IN> element) throws Exception {
    Collection<W> elementWindows = windowAssigner.assignWindows(  //经过过程WindowAssigner为element分派一系列windows
        element.getValue(), element.getTimestamp(), windowAssignerContext);

    final K key = (K) getStateBackend().getCurrentKey();

    if (windowAssigner instanceof MergingWindowAssigner) { //假如是MergingWindow
        //.......
    } else { //假如是一般window
        for (W window: elementWindows) {

            // drop if the window is already late
            if (isLate(window)) { //late data的处置惩罚,默许是抛弃  
                continue;
            }

            AppendingState<IN, ACC> windowState = getPartitionedState( //从backend中掏出该window的状况,就是buffer的element
                window, windowSerializer, windowStateDescriptor);
            windowState.add(element.getValue()); //把当前的element到场buffer state

            context.key = key;
            context.window = window; //context的设想相称tricky和艰涩

            TriggerResult triggerResult = context.onElement(element); //触发onElment,取得triggerResult

            if (triggerResult.isFire()) { //对triggerResult做种种处置惩罚
                ACC contents = windowState.get();
                if (contents == null) {
                    continue;
                }
                fire(window, contents); //假如fire,真正去盘算窗口中的elements
            }

            if (triggerResult.isPurge()) {
                cleanup(window, windowState, null); //purge,即去cleanup elements
            } else {
                registerCleanupTimer(window);
            }
        }
    }
}

推断是不是是late data的逻辑

protected boolean isLate(W window) {
    return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark));
}

而isCleanupTime和cleanup这对要领主要触及到窗口的清算。假如当前窗口是时刻窗口,且窗口的时刻抵达了清算时刻,则会举行清算窗口清算。那末清算时刻怎样推断呢?Flink是经过过程窗口的最大时刻戳属性连系许可耽误的时刻团结盘算的

private long cleanupTime(W window) {
    //清算时刻被预置为窗口的最大时刻戳加上许可的耽误事宜   
    long cleanupTime = window.maxTimestamp() + allowedLateness;
    //假如窗口为非时刻窗口(其maxTimestamp属性值为Long.MAX_VALUE),则其加上许可耽误的时刻,
    //会形成Long溢出,从而会变成负数,致使cleanupTime < window.maxTimestamp 前提建立,
    //则直接将清算时刻设置为Long.MAX_VALUE   
    return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
}

trigger

这个是用来触发onProcessingTime,这个须要依靠体系时刻的定时器来触发,逻辑和processWatermark基本同等,只是触发前提不一样

@Override
public void trigger(long time) throws Exception {
    boolean fire;

    //Remove information about the triggering task
    processingTimeTimerFutures.remove(time);
    processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time));

    do {
        Timer<K, W> timer = processingTimeTimersQueue.peek();
        if (timer != null && timer.timestamp <= time) {
            fire = true;

            processingTimeTimers.remove(timer);
            processingTimeTimersQueue.remove();

            context.key = timer.key;
            context.window = timer.window;
            setKeyContext(timer.key);

            AppendingState<IN, ACC> windowState;
            MergingWindowSet<W> mergingWindows = null;

            if (windowAssigner instanceof MergingWindowAssigner) {
                mergingWindows = getMergingWindowSet();
                W stateWindow = mergingWindows.getStateWindow(context.window);
                if (stateWindow == null) {
                    // then the window is already purged and this is a cleanup
                    // timer set due to allowed lateness that has nothing to clean,
                    // so it is safe to just ignore
                    continue;
                }
                windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
            } else {
                windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
            }

            ACC contents = windowState.get();
            if (contents == null) {
                // if we have no state, there is nothing to do
                continue;
            }

            TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
            if (triggerResult.isFire()) {
                fire(context.window, contents);
            }

            if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
                cleanup(context.window, windowState, mergingWindows);
            }

        } else {
            fire = false;
        }
    } while (fire);
}

0x08 参考

仅仅知道如何终止XHR请求,或许对你来说是不够的!

参与评论