Playing with EventTime in DataStreams

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Playing with EventTime in DataStreams

Nam-Luc Tran
Hello everyone,

I am currently playing with streams which timestamp is defined by
EventTime. I currently have the following code:

      final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
      env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

      DataStream<String> input = env.readTextFile("file:///var/log/syslog");
      input.assignTimestampsAndWatermarks(new AssignTimestampFromLogEvent());

      input.timeWindowAll(Time.minutes(5)).apply(new
AllWindowFunction<Iterable<String>, String, TimeWindow>() {
         @Override
         public void apply(TimeWindow window, Iterable<String> values,
Collector<String> out) throws Exception {
            for(String t:values) {
               out.collect(t);
            }
         }
      }).print();

(...)

public static final class AssignTimestampFromLogEvent extends
AscendingTimestampExtractor<String> {
   @Override
   public long extractAscendingTimestamp(String element, long
previousElementTimestamp){
      String date = element.substring(0,15);
      SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
      Date ddate = null;
      try {
         ddate = sdf.parse(date);
      } catch (ParseException e) {
         e.printStackTrace();
      }
      return ddate.getTime();
   }
}


What I expect it to do is to read the syslog, assign timestamp and do
5 minutes windows *based on the syslog event time*, as I've configured
the stream to do it. It however does not do that, and does the windows
based on processing time.

What am I missing here?

Best regards,

--

*Nam-Luc TRAN*

R&D Manager

EURA NOVA

(M) +32 498 37 36 23

*euranova.eu <http://euranova.eu>*
Reply | Threaded
Open this post in threaded view
|

Re: Playing with EventTime in DataStreams

Robert Metzger
Hi,

I had a similar issue recently.
Instead of
 input.assignTimestampsAndWatermarks

you have to do:

 input = input.assignTimestampsAndWatermarks

On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <[hidden email]>
wrote:

> Hello everyone,
>
> I am currently playing with streams which timestamp is defined by
> EventTime. I currently have the following code:
>
>       final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
>       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>       DataStream<String> input =
> env.readTextFile("file:///var/log/syslog");
>       input.assignTimestampsAndWatermarks(new
> AssignTimestampFromLogEvent());
>
>       input.timeWindowAll(Time.minutes(5)).apply(new
> AllWindowFunction<Iterable<String>, String, TimeWindow>() {
>          @Override
>          public void apply(TimeWindow window, Iterable<String> values,
> Collector<String> out) throws Exception {
>             for(String t:values) {
>                out.collect(t);
>             }
>          }
>       }).print();
>
> (...)
>
> public static final class AssignTimestampFromLogEvent extends
> AscendingTimestampExtractor<String> {
>    @Override
>    public long extractAscendingTimestamp(String element, long
> previousElementTimestamp){
>       String date = element.substring(0,15);
>       SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
>       Date ddate = null;
>       try {
>          ddate = sdf.parse(date);
>       } catch (ParseException e) {
>          e.printStackTrace();
>       }
>       return ddate.getTime();
>    }
> }
>
>
> What I expect it to do is to read the syslog, assign timestamp and do
> 5 minutes windows *based on the syslog event time*, as I've configured
> the stream to do it. It however does not do that, and does the windows
> based on processing time.
>
> What am I missing here?
>
> Best regards,
>
> --
>
> *Nam-Luc TRAN*
>
> R&D Manager
>
> EURA NOVA
>
> (M) +32 498 37 36 23
>
> *euranova.eu <http://euranova.eu>*
>
Reply | Threaded
Open this post in threaded view
|

Re: Playing with EventTime in DataStreams

Nam-Luc Tran
Great, that did it, thanks Robert ;)

While I'm at it:
Sometimes results are correctly returned, sometimes, the output of the job
(print or writeAsText)  is plain empty, like the job finished too quickly
before the results are written. One way of "forcing" results to happen is
to insert a "delay" in the source stream, as with a FlatMap:

      @Override
      public void flatMap(String value, Collector<String> out)
            throws Exception {
         Thread.sleep(1);
         out.collect(value.toLowerCase());
         }

Am I missing anything here?

Best regards,


2016-02-25 20:05 GMT+01:00 Robert Metzger <[hidden email]>:

> Hi,
>
> I had a similar issue recently.
> Instead of
>  input.assignTimestampsAndWatermarks
>
> you have to do:
>
>  input = input.assignTimestampsAndWatermarks
>
> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <[hidden email]>
> wrote:
>
> > Hello everyone,
> >
> > I am currently playing with streams which timestamp is defined by
> > EventTime. I currently have the following code:
> >
> >       final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
> >       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >       DataStream<String> input =
> > env.readTextFile("file:///var/log/syslog");
> >       input.assignTimestampsAndWatermarks(new
> > AssignTimestampFromLogEvent());
> >
> >       input.timeWindowAll(Time.minutes(5)).apply(new
> > AllWindowFunction<Iterable<String>, String, TimeWindow>() {
> >          @Override
> >          public void apply(TimeWindow window, Iterable<String> values,
> > Collector<String> out) throws Exception {
> >             for(String t:values) {
> >                out.collect(t);
> >             }
> >          }
> >       }).print();
> >
> > (...)
> >
> > public static final class AssignTimestampFromLogEvent extends
> > AscendingTimestampExtractor<String> {
> >    @Override
> >    public long extractAscendingTimestamp(String element, long
> > previousElementTimestamp){
> >       String date = element.substring(0,15);
> >       SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
> >       Date ddate = null;
> >       try {
> >          ddate = sdf.parse(date);
> >       } catch (ParseException e) {
> >          e.printStackTrace();
> >       }
> >       return ddate.getTime();
> >    }
> > }
> >
> >
> > What I expect it to do is to read the syslog, assign timestamp and do
> > 5 minutes windows *based on the syslog event time*, as I've configured
> > the stream to do it. It however does not do that, and does the windows
> > based on processing time.
> >
> > What am I missing here?
> >
> > Best regards,
> >
> > --
> >
> > *Nam-Luc TRAN*
> >
> > R&D Manager
> >
> > EURA NOVA
> >
> > (M) +32 498 37 36 23
> >
> > *euranova.eu <http://euranova.eu>*
> >
>



--

*Nam-Luc TRAN*

R&D Manager

EURA NOVA

(M) +32 498 37 36 23

*euranova.eu <http://euranova.eu>*
Reply | Threaded
Open this post in threaded view
|

Re: Playing with EventTime in DataStreams

Aljoscha Krettek-2
Hi,
I think the problem is that the source finished before the extractor has the chance to emit even a single watermark. This means that the topology will shut down and the window operator does not emit in-flight windows upon shutdown.

Cheers,
Aljoscha

> On 26 Feb 2016, at 11:40, Nam-Luc Tran <[hidden email]> wrote:
>
> Great, that did it, thanks Robert ;)
>
> While I'm at it:
> Sometimes results are correctly returned, sometimes, the output of the job
> (print or writeAsText)  is plain empty, like the job finished too quickly
> before the results are written. One way of "forcing" results to happen is
> to insert a "delay" in the source stream, as with a FlatMap:
>
>      @Override
>      public void flatMap(String value, Collector<String> out)
>            throws Exception {
>         Thread.sleep(1);
>         out.collect(value.toLowerCase());
>         }
>
> Am I missing anything here?
>
> Best regards,
>
>
> 2016-02-25 20:05 GMT+01:00 Robert Metzger <[hidden email]>:
>
>> Hi,
>>
>> I had a similar issue recently.
>> Instead of
>> input.assignTimestampsAndWatermarks
>>
>> you have to do:
>>
>> input = input.assignTimestampsAndWatermarks
>>
>> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <[hidden email]>
>> wrote:
>>
>>> Hello everyone,
>>>
>>> I am currently playing with streams which timestamp is defined by
>>> EventTime. I currently have the following code:
>>>
>>>      final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
>>>      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>>      DataStream<String> input =
>>> env.readTextFile("file:///var/log/syslog");
>>>      input.assignTimestampsAndWatermarks(new
>>> AssignTimestampFromLogEvent());
>>>
>>>      input.timeWindowAll(Time.minutes(5)).apply(new
>>> AllWindowFunction<Iterable<String>, String, TimeWindow>() {
>>>         @Override
>>>         public void apply(TimeWindow window, Iterable<String> values,
>>> Collector<String> out) throws Exception {
>>>            for(String t:values) {
>>>               out.collect(t);
>>>            }
>>>         }
>>>      }).print();
>>>
>>> (...)
>>>
>>> public static final class AssignTimestampFromLogEvent extends
>>> AscendingTimestampExtractor<String> {
>>>   @Override
>>>   public long extractAscendingTimestamp(String element, long
>>> previousElementTimestamp){
>>>      String date = element.substring(0,15);
>>>      SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
>>>      Date ddate = null;
>>>      try {
>>>         ddate = sdf.parse(date);
>>>      } catch (ParseException e) {
>>>         e.printStackTrace();
>>>      }
>>>      return ddate.getTime();
>>>   }
>>> }
>>>
>>>
>>> What I expect it to do is to read the syslog, assign timestamp and do
>>> 5 minutes windows *based on the syslog event time*, as I've configured
>>> the stream to do it. It however does not do that, and does the windows
>>> based on processing time.
>>>
>>> What am I missing here?
>>>
>>> Best regards,
>>>
>>> --
>>>
>>> *Nam-Luc TRAN*
>>>
>>> R&D Manager
>>>
>>> EURA NOVA
>>>
>>> (M) +32 498 37 36 23
>>>
>>> *euranova.eu <http://euranova.eu>*
>>>
>>
>
>
>
> --
>
> *Nam-Luc TRAN*
>
> R&D Manager
>
> EURA NOVA
>
> (M) +32 498 37 36 23
>
> *euranova.eu <http://euranova.eu>*

Reply | Threaded
Open this post in threaded view
|

Re: Playing with EventTime in DataStreams

Stephan Ewen
Nice catch, actually.

I think we should let the timestamp extracting operator emit the current
watermark prior to shutting down.

On Fri, Feb 26, 2016 at 11:49 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> I think the problem is that the source finished before the extractor has
> the chance to emit even a single watermark. This means that the topology
> will shut down and the window operator does not emit in-flight windows upon
> shutdown.
>
> Cheers,
> Aljoscha
> > On 26 Feb 2016, at 11:40, Nam-Luc Tran <[hidden email]> wrote:
> >
> > Great, that did it, thanks Robert ;)
> >
> > While I'm at it:
> > Sometimes results are correctly returned, sometimes, the output of the
> job
> > (print or writeAsText)  is plain empty, like the job finished too quickly
> > before the results are written. One way of "forcing" results to happen is
> > to insert a "delay" in the source stream, as with a FlatMap:
> >
> >      @Override
> >      public void flatMap(String value, Collector<String> out)
> >            throws Exception {
> >         Thread.sleep(1);
> >         out.collect(value.toLowerCase());
> >         }
> >
> > Am I missing anything here?
> >
> > Best regards,
> >
> >
> > 2016-02-25 20:05 GMT+01:00 Robert Metzger <[hidden email]>:
> >
> >> Hi,
> >>
> >> I had a similar issue recently.
> >> Instead of
> >> input.assignTimestampsAndWatermarks
> >>
> >> you have to do:
> >>
> >> input = input.assignTimestampsAndWatermarks
> >>
> >> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <[hidden email]>
> >> wrote:
> >>
> >>> Hello everyone,
> >>>
> >>> I am currently playing with streams which timestamp is defined by
> >>> EventTime. I currently have the following code:
> >>>
> >>>      final StreamExecutionEnvironment env =
> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >>>
> >>> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
> >>>      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>>
> >>>      DataStream<String> input =
> >>> env.readTextFile("file:///var/log/syslog");
> >>>      input.assignTimestampsAndWatermarks(new
> >>> AssignTimestampFromLogEvent());
> >>>
> >>>      input.timeWindowAll(Time.minutes(5)).apply(new
> >>> AllWindowFunction<Iterable<String>, String, TimeWindow>() {
> >>>         @Override
> >>>         public void apply(TimeWindow window, Iterable<String> values,
> >>> Collector<String> out) throws Exception {
> >>>            for(String t:values) {
> >>>               out.collect(t);
> >>>            }
> >>>         }
> >>>      }).print();
> >>>
> >>> (...)
> >>>
> >>> public static final class AssignTimestampFromLogEvent extends
> >>> AscendingTimestampExtractor<String> {
> >>>   @Override
> >>>   public long extractAscendingTimestamp(String element, long
> >>> previousElementTimestamp){
> >>>      String date = element.substring(0,15);
> >>>      SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
> >>>      Date ddate = null;
> >>>      try {
> >>>         ddate = sdf.parse(date);
> >>>      } catch (ParseException e) {
> >>>         e.printStackTrace();
> >>>      }
> >>>      return ddate.getTime();
> >>>   }
> >>> }
> >>>
> >>>
> >>> What I expect it to do is to read the syslog, assign timestamp and do
> >>> 5 minutes windows *based on the syslog event time*, as I've configured
> >>> the stream to do it. It however does not do that, and does the windows
> >>> based on processing time.
> >>>
> >>> What am I missing here?
> >>>
> >>> Best regards,
> >>>
> >>> --
> >>>
> >>> *Nam-Luc TRAN*
> >>>
> >>> R&D Manager
> >>>
> >>> EURA NOVA
> >>>
> >>> (M) +32 498 37 36 23
> >>>
> >>> *euranova.eu <http://euranova.eu>*
> >>>
> >>
> >
> >
> >
> > --
> >
> > *Nam-Luc TRAN*
> >
> > R&D Manager
> >
> > EURA NOVA
> >
> > (M) +32 498 37 36 23
> >
> > *euranova.eu <http://euranova.eu>*
>
>