[jira] [Created] (FLINK-6858) Unbounded event time Over Window emits incorrect timestamps

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-6858) Unbounded event time Over Window emits incorrect timestamps

JIRA jira@apache.org
Fabian Hueske created FLINK-6858:
------------------------------------

             Summary: Unbounded event time Over Window emits incorrect timestamps
                 Key: FLINK-6858
                 URL: https://issues.apache.org/jira/browse/FLINK-6858
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
    Affects Versions: 1.3.0
            Reporter: Fabian Hueske
            Priority: Critical


The unbounded event time OVER windows emit records with incorrect timestamps.

OVER aggregates "enrich" each input row with aggregates computed over neighboring rows, i.e., they produce one output row for each input row. The (event-time) timestamp of each input row should be preserved and not modified.

All OVER window aggregates are implemented using the {{ProcessFunction}} interface. The interface has two methods {{processElement()}} and {{onTimer()}} that can produce output records. Records emitted by {{processElement()}} are emitted with the timestamp of the record that was given as an argument to the method. Records emitted by {{onTimer()}} are emitted with the timestamp of the timer that triggered the call of the method.

The implementation of the unbounded event-time OVER window registers a new new timer when {{processElement()}} is called for {{currentWatermark + 1}}. When the timer triggers, the {{onTimer()}} processes all rows that where received between this and the last {{onTimer()}} call with timestamps smaller than the current watermark. However, this means that all emitted rows have a timestamp of {{currentWatermark + 1}} which is not what we want.

The bounded event-time OVER window operators follow a different strategy and register a timer for the timestamp of each row that was processed by {{processElement()}} and emit the corresponding rows when {{onTimer()}} is called. Hence, they emit the rows with correct timestamps.

I think we should change the implementation of the unbounded event-time OVER aggregates to a similar strategy as the bounded event-time OVER aggregates.

What do you think [~Yuhong_kyo] [~sunjincheng121]?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)