Re: Stream sql example

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Stream sql example

jincheng sun
Hi Dawid,

About your exception:

  1. When you remove the window start and end selections, using
UserSongsStatistics can work well. the sql as follows:

     SELECT OUNT(1) as cnt, song_name as songName, userId FROM songs WHERE
type = 'PLAY' GROUP BY song_name, userId, TUMBLE(t, INTERVAL '3' SECOND)");

  2. When you add window properties in the projections. It's can not work.

So, First of all,  for your case I suggest you keep using:

  【tEnv.toAppendStream(table,TypeInformation.of(Row.class)).print();】

Because Row can include all fields which you want added in
UserSongsStatistics. And for express my point clearly, I had write a
example according your test case. Please see the example code as follows
link:

1. SongEventTableSource.java
<https://github.com/sunjincheng121/flink/blob/Bayes/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/SongEventTableSource.java>
using
`emitWatermark ` `collectWithTimestamp` and the objects you mentioned.
2. StreamSql.java
<https://github.com/sunjincheng121/flink/blob/Bayes/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/StreamSql.java>
like
yours.

Secondly, I think you can open a JIRA. relation the error.

I hope it can help you and best wish to you.
welcome any feedback. :)

Thanks,
SunJincheng

2017-06-10 12:02 GMT+08:00 jincheng sun <[hidden email]>:

> Hi Dawid,
>
>  For your case I think you can keep using:
>
>   【tEnv.toAppendStream(table,TypeInformation.of(Row.class)).print();】
>
> Because Row can include all fields which you want added in
> UserSongsStatistics. And for express my point clearly, I had write a
> example according your test case. Please see the example code as follows
> link:
>
> 1. SongEventTableSource.java
> <https://github.com/sunjincheng121/flink/blob/Bayes/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/SongEventTableSource.java>
> using `emitWatermark ` `collectWithTimestamp` and the objects you mentioned.
> 2. StreamSql.java
> <https://github.com/sunjincheng121/flink/blob/Bayes/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/StreamSql.java>
> like yours.
>
> I hope it can help you and best wish to you.
> welcome any feedback. :)
>
> Thanks,
> SunJincheng
>
> 2017-06-10 5:02 GMT+08:00 Dawid Wysakowicz <[hidden email]>:
>
>> Thanks a lot Timo, after I added the ResultTypeQueryable interface to my
>> mapper it worked. As for the SongEvent the reason I tried remapping it to
>> Row is that it has an enum field on which I want to filter, so my first
>> approach was to remap it in TableSource to String. What do you think should
>> be the way to go in such case?
>>
>> After successfully producing DataStream[Row] I tried sth like:
>>
>>> tEnv.toAppendStream(table)(TypeInformation.of(classOf[UserSo
>>> ngsStatistics])).print();
>>>
>>
>> The class UserSongsStatistics is a pojo with fields named the same as
>> expressions in SELECT clause. Is such a construct intended to work? Right
>> now I get an error:
>>
>> org.apache.flink.table.api.TableException: The field types of physical
>>> and logical row types do not match.This is a bug and should not happen.
>>> Please file an issue.
>>
>>
>> Is it really a bug?
>>
>> Anyway thanks for help. I will file a JIRA for the previous issue
>> tomorrow.
>>
>> Z pozdrowieniami! / Cheers!
>>
>> Dawid Wysakowicz
>>
>> *Data/Software Engineer*
>>
>> Skype: dawid_wys | Twitter: @OneMoreCoder
>>
>> <http://getindata.com/>
>>
>> 2017-06-09 22:25 GMT+02:00 Timo Walther <[hidden email]>:
>>
>>> Hi David,
>>>
>>> I think the problem is that the type of the DataStream produced by the
>>> TableSource, does not match the type that is declared in the `
>>> getReturnType()`. A `MapFunction<xxx, Row>` is always a generic type
>>> (because Row cannot be analyzed). A solution would be that the mapper
>>> implements `ResultTypeQueryable`. I agree that the error should be thrown
>>> earlier, not in the CodeGenerator. Can you create an issue for this?
>>>
>>> Btw the Table API supports nested types, it should work that the
>>> TableSource returns ` SongEvent`.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 09.06.17 um 20:19 schrieb Dawid Wysakowicz:
>>>
>>> Sorry forgot to add the link:
>>>
>>> https://gist.github.com/dawidwys/537d12a6f2355cba728bf93f1af87b45
>>>
>>> Z pozdrowieniami! / Cheers!
>>>
>>> Dawid Wysakowicz
>>>
>>> *Data/Software Engineer*
>>>
>>> Skype: dawid_wys | Twitter: @OneMoreCoder
>>>
>>> <http://getindata.com/>
>>>
>>> 2017-06-09 20:19 GMT+02:00 Dawid Wysakowicz <[hidden email]>
>>> :
>>>
>>>> Hi,
>>>> I tried writing a simple sql query with custom StreamTableSource and it
>>>> fails with error:
>>>>
>>>> org.apache.flink.table.codegen.CodeGenException: Arity of result type
>>>>>> does not match number of expressions.
>>>>>
>>>>> at org.apache.flink.table.codegen.CodeGenerator.generateResultE
>>>>>> xpression(CodeGenerator.scala:940)
>>>>>
>>>>> at org.apache.flink.table.codegen.CodeGenerator.generateConvert
>>>>>> erResultExpression(CodeGenerator.scala:883)
>>>>>
>>>>> at org.apache.flink.table.plan.nodes.CommonScan$class.generated
>>>>>> ConversionFunction(CommonScan.scala:57)
>>>>>
>>>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour
>>>>>> ceScan.generatedConversionFunction(StreamTableSourceScan.scala:35)
>>>>>
>>>>> at org.apache.flink.table.plan.nodes.datastream.StreamScan$clas
>>>>>> s.convertToInternalRow(StreamScan.scala:48)
>>>>>
>>>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour
>>>>>> ceScan.convertToInternalRow(StreamTableSourceScan.scala:35)
>>>>>
>>>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour
>>>>>> ceScan.translateToPlan(StreamTableSourceScan.scala:107)
>>>>>
>>>>>
>>>> You can check the source code here:
>>>>
>>>>
>>>> Z pozdrowieniami! / Cheers!
>>>>
>>>> Dawid Wysakowicz
>>>>
>>>> *Data/Software Engineer*
>>>>
>>>> Skype: dawid_wys | Twitter: @OneMoreCoder
>>>>
>>>> <http://getindata.com/>
>>>>
>>>
>>>
>>>
>>
>
Loading...