Streaming reduce discussion

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Streaming reduce discussion

Gyula Fóra-2
Hey All,

I would like to bring up a discussion regarding the different reduce
functionalities in the streaming API, because there are several ways they
can (and cannot) be implemented efficiently which will work in totally
different ways.

I will go through the different reduce types and their problems:

*Simple reduce:*
dataStream.reduce(ReduceFunction)

The current working mechanism is that it combines the incoming elements
using the reducefunction and emits the current reduced value. (so after
every incoming, value there is one emit with the reduced). Currently if the
reducer has higher parallelism than 1, the reduced value is only a 'local'
reduce on the given vertex (there is no global reduce step). The problem
here is that introducing a global reduce step would be equivalent to
setting the reducer parallelism to 1, since we want to emit the current
reduced value after each incoming data point. So the question here is if
the current mechanism makes sense without a global aggregate or should we
force parallelism 1.

This is also the case for aggregation operators.

 *Simple Batch/Window reduce:*
dataStream.window(1000).reduce(ReduceFunction)

The current working mechanism is that it combines incoming elements with a
ReduceFunction in windows/batches, currently also 'locally' on each
parallel vertex and emitting one reduced output after each window. Here the
same issue of global aggregation can be solved by introducing a global
aggregator vertex with parallelism 1, which wouldnt cause a serious
overhead if the windows are not too small.

Another issue here is the assumptions we can make about the user-defined
ReduceFunction. If we assume that the function is associative(we currently
assume this) then the window reduce operators can be implemented to be
almost as fast as simple reduces by storing pre-reduced groups of values.
Do you think it is okay to make this assumption?

*Batch/window groupreduce:*
dataStream.window(1000).reduceGroup(GroupReduceFunction)

The difference between .reduce and . groupReduce is that the user gets the
window/batch as an iterable which can be quite useful in some cases. The
problem here is the same as with the simple reduce, that is we couldnt
figure out how to make global aggregations efficient. Unlike with window
reduce where we can create a global aggregator vertex here that is
impossible because the different working mechanics of the GroupReduce
function (iterable input with custom output type).

So even if we will make the window reduce global, the window groupreduce
will have to remain local if we dont want to enforce a parallelism=1
bottleneck. This would make the API ambiguous.


*Grouped reduces*

dataStream.groupBy(keyPos).reduce(ReduceFunction)
datastream.groupBy.(keyPos).window(1000).reduce/groupreduce

Here we dont have the previous problems since local aggregations work as
globals.


So any ideas regarding this global/local reduce issue and reduce function
associativity are appreciated :)

Regards,
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: Streaming reduce discussion

Fabian Hueske
Hi, thanks for starting this discussion!
I added some comments inline.

2014-09-24 11:43 GMT+02:00 Gyula Fóra <[hidden email]>:

> Hey All,
>
> I would like to bring up a discussion regarding the different reduce
> functionalities in the streaming API, because there are several ways they
> can (and cannot) be implemented efficiently which will work in totally
> different ways.
>
> I will go through the different reduce types and their problems:
>
> *Simple reduce:*
> dataStream.reduce(ReduceFunction)
>
> The current working mechanism is that it combines the incoming elements
> using the reducefunction and emits the current reduced value. (so after
> every incoming, value there is one emit with the reduced). Currently if the
> reducer has higher parallelism than 1, the reduced value is only a 'local'
> reduce on the given vertex (there is no global reduce step). The problem
> here is that introducing a global reduce step would be equivalent to
> setting the reducer parallelism to 1, since we want to emit the current
> reduced value after each incoming data point. So the question here is if
> the current mechanism makes sense without a global aggregate or should we
> force parallelism 1.
>
>
Not sure if there are use cases for multiple local reducers. I think I'd
enforce DOP = 1 in this case.


> This is also the case for aggregation operators.
>
>  *Simple Batch/Window reduce:*
> dataStream.window(1000).reduce(ReduceFunction)
>
> The current working mechanism is that it combines incoming elements with a
> ReduceFunction in windows/batches, currently also 'locally' on each
> parallel vertex and emitting one reduced output after each window. Here the
> same issue of global aggregation can be solved by introducing a global
> aggregator vertex with parallelism 1, which wouldnt cause a serious
> overhead if the windows are not too small.
>
> Another issue here is the assumptions we can make about the user-defined
> ReduceFunction. If we assume that the function is associative(we currently
> assume this) then the window reduce operators can be implemented to be
> almost as fast as simple reduces by storing pre-reduced groups of values.
> Do you think it is okay to make this assumption?
>
>
The associativity assumption for reduce functions holds. This also
leveraged by the batch API which uses reduce functions for local aggregates
(combiners) as well as for global aggregates (reduce). For windowed
streaming you can do the same thing. Do local preaggregations on windows,
emit the partial result once a window is complete, and reinitialize the
partital aggragator (start with no initial state). The global aggregator
eagerly reduces a preaggregate with the last full aggregate (no windowing).
Since, preaggregators are reinitialized, the global aggregators does not
need to hold individiual state for the preaggregates and only keeps the
last full aggregate.


> *Batch/window groupreduce:*
> dataStream.window(1000).reduceGroup(GroupReduceFunction)
>
> The difference between .reduce and . groupReduce is that the user gets the
> window/batch as an iterable which can be quite useful in some cases. The
> problem here is the same as with the simple reduce, that is we couldnt
> figure out how to make global aggregations efficient. Unlike with window
> reduce where we can create a global aggregator vertex here that is
> impossible because the different working mechanics of the GroupReduce
> function (iterable input with custom output type).
>
> So even if we will make the window reduce global, the window groupreduce
> will have to remain local if we dont want to enforce a parallelism=1
> bottleneck. This would make the API ambiguous.
>
>
> You can do the same as for window.reduce if the GroupReduce function
implements the combine interface (Combine functions must be
associative!). In addition to the local preaggregates, the combine is also
be used on the global aggregate to further reduce the state by combining
the preaggregates. The reduce function is only called with a single value
(the combined preaggregates) to make sure that the result is correct.
If the GroupReduce function does not implement a combine interface, don't
think that this can be done in a practical way (i.e., without caching the
full stream).



> *Grouped reduces*
>
> dataStream.groupBy(keyPos).reduce(ReduceFunction)
> datastream.groupBy.(keyPos).window(1000).reduce/groupreduce
>
> Here we dont have the previous problems since local aggregations work as
> globals.
>
>
I think you can play the preaggregation/combine tricks for windows here as
well. So even in case of high data skew, you could do preaggregations for a
single group with multiple combiners in parallel.


>
> So any ideas regarding this global/local reduce issue and reduce function
> associativity are appreciated :)
>
> Regards,
> Gyula


I hope I got everything right ;-)

Cheers, Fabian
Reply | Threaded
Open this post in threaded view
|

Re: Streaming reduce discussion

Gyula Fóra
Hey Fabian,

I have been thinking a bit a about your comments, let me give you my
thoughts on them:

Simple reduce:
>Not sure if there are use cases for multiple local reducers. I think I'd
>enforce DOP = 1 in this case.

When I started the discussion yesterday I thought pretty much the same, but
then I realized since we have custom partitioners between the task vertexes
the user can control which tuples go to which vertex. So local reducers
actually make a lot of sense. For instance I can implement a range
partitioner by some field then perform reduce locally. And actually this
argument goes for all the local stream reducers even on windows. So maybe
there should be an option for local/global.

Simple Batch/Window reduce:

>The associativity assumption for reduce functions holds. This also
>leveraged by the batch API which uses reduce functions for local aggregates
>(combiners) as well as for global aggregates (reduce). For windowed
>streaming you can do the same thing. Do local preaggregations on windows,
>emit the partial result once a window is complete, and reinitialize the
>partital aggragator (start with no initial state). The global aggregator
>eagerly reduces a preaggregate with the last full aggregate (no windowing).
>Since, preaggregators are reinitialized, the global aggregators does not
>need to hold individiual state for the preaggregates and only keeps the
>last full aggregate.

Yes I agree with the preaggregation approach. On a second thought this will
only make sense on windowed data streams where the window is defined by
system time. Anything else, for example batch (number of incoming records)
seems impossible to align if the datastream is not 100% balanced, which
will not happen if the partitioning is not shuffle. (so this is another
issue)

Batch/window groupreduce:
> You can do the same as for window.reduce if the GroupReduce function
>implements the combine interface (Combine functions must be
>associative!). In addition to the local preaggregates, the combine is also
>be used on the global aggregate to further reduce the state by combining
>the preaggregates. The reduce function is only called with a single value
>(the combined preaggregates) to make sure that the result is correct.
>If the GroupReduce function does not implement a combine interface, don't
>think that this can be done in a practical way (i.e., without caching the
>full stream).

Good call with the combine interface, I haven't thought about that! Same
issue with not-time windows as in the previous one.

Group reduces:
>I think you can play the preaggregation/combine tricks for windows here as
>well. So even in case of high data skew, you could do preaggregations for a
>single group with multiple combiners in parallel.

Good point, I havent thought about this, we assumed that the grouping would
give a balanced distribution but you are right.


I suggest that we don't include these global reducers in the upcoming
release, there is a lot of ways the api can be implemented it seems. I
think we should give it more thought, and experiment with the different
approaches first before we release it. What do you think about this?

Regards,
Gyula


On Thu, Sep 25, 2014 at 12:14 AM, Fabian Hueske <[hidden email]> wrote:

> Hi, thanks for starting this discussion!
> I added some comments inline.
>
> 2014-09-24 11:43 GMT+02:00 Gyula Fóra <[hidden email]>:
>
> > Hey All,
> >
> > I would like to bring up a discussion regarding the different reduce
> > functionalities in the streaming API, because there are several ways they
> > can (and cannot) be implemented efficiently which will work in totally
> > different ways.
> >
> > I will go through the different reduce types and their problems:
> >
> > *Simple reduce:*
> > dataStream.reduce(ReduceFunction)
> >
> > The current working mechanism is that it combines the incoming elements
> > using the reducefunction and emits the current reduced value. (so after
> > every incoming, value there is one emit with the reduced). Currently if
> the
> > reducer has higher parallelism than 1, the reduced value is only a
> 'local'
> > reduce on the given vertex (there is no global reduce step). The problem
> > here is that introducing a global reduce step would be equivalent to
> > setting the reducer parallelism to 1, since we want to emit the current
> > reduced value after each incoming data point. So the question here is if
> > the current mechanism makes sense without a global aggregate or should we
> > force parallelism 1.
> >
> >
> Not sure if there are use cases for multiple local reducers. I think I'd
> enforce DOP = 1 in this case.
>
>
> > This is also the case for aggregation operators.
> >
> >  *Simple Batch/Window reduce:*
> > dataStream.window(1000).reduce(ReduceFunction)
> >
> > The current working mechanism is that it combines incoming elements with
> a
> > ReduceFunction in windows/batches, currently also 'locally' on each
> > parallel vertex and emitting one reduced output after each window. Here
> the
> > same issue of global aggregation can be solved by introducing a global
> > aggregator vertex with parallelism 1, which wouldnt cause a serious
> > overhead if the windows are not too small.
> >
> > Another issue here is the assumptions we can make about the user-defined
> > ReduceFunction. If we assume that the function is associative(we
> currently
> > assume this) then the window reduce operators can be implemented to be
> > almost as fast as simple reduces by storing pre-reduced groups of values.
> > Do you think it is okay to make this assumption?
> >
> >
> The associativity assumption for reduce functions holds. This also
> leveraged by the batch API which uses reduce functions for local aggregates
> (combiners) as well as for global aggregates (reduce). For windowed
> streaming you can do the same thing. Do local preaggregations on windows,
> emit the partial result once a window is complete, and reinitialize the
> partital aggragator (start with no initial state). The global aggregator
> eagerly reduces a preaggregate with the last full aggregate (no windowing).
> Since, preaggregators are reinitialized, the global aggregators does not
> need to hold individiual state for the preaggregates and only keeps the
> last full aggregate.
>
>
> > *Batch/window groupreduce:*
> > dataStream.window(1000).reduceGroup(GroupReduceFunction)
> >
> > The difference between .reduce and . groupReduce is that the user gets
> the
> > window/batch as an iterable which can be quite useful in some cases. The
> > problem here is the same as with the simple reduce, that is we couldnt
> > figure out how to make global aggregations efficient. Unlike with window
> > reduce where we can create a global aggregator vertex here that is
> > impossible because the different working mechanics of the GroupReduce
> > function (iterable input with custom output type).
> >
> > So even if we will make the window reduce global, the window groupreduce
> > will have to remain local if we dont want to enforce a parallelism=1
> > bottleneck. This would make the API ambiguous.
> >
> >
> > You can do the same as for window.reduce if the GroupReduce function
> implements the combine interface (Combine functions must be
> associative!). In addition to the local preaggregates, the combine is also
> be used on the global aggregate to further reduce the state by combining
> the preaggregates. The reduce function is only called with a single value
> (the combined preaggregates) to make sure that the result is correct.
> If the GroupReduce function does not implement a combine interface, don't
> think that this can be done in a practical way (i.e., without caching the
> full stream).
>
>
>
> > *Grouped reduces*
> >
> > dataStream.groupBy(keyPos).reduce(ReduceFunction)
> > datastream.groupBy.(keyPos).window(1000).reduce/groupreduce
> >
> > Here we dont have the previous problems since local aggregations work as
> > globals.
> >
> >
> I think you can play the preaggregation/combine tricks for windows here as
> well. So even in case of high data skew, you could do preaggregations for a
> single group with multiple combiners in parallel.
>
>
> >
> > So any ideas regarding this global/local reduce issue and reduce function
> > associativity are appreciated :)
> >
> > Regards,
> > Gyula
>
>
> I hope I got everything right ;-)
>
> Cheers, Fabian
>
Reply | Threaded
Open this post in threaded view
|

Re: Streaming reduce discussion

Fabian Hueske
2014-09-25 10:01 GMT+02:00 Gyula Fóra <[hidden email]>:

> Hey Fabian,
>
> I have been thinking a bit a about your comments, let me give you my
> thoughts on them:
>
> Simple reduce:
> >Not sure if there are use cases for multiple local reducers. I think I'd
> >enforce DOP = 1 in this case.
>
> When I started the discussion yesterday I thought pretty much the same, but
> then I realized since we have custom partitioners between the task vertexes
> the user can control which tuples go to which vertex. So local reducers
> actually make a lot of sense. For instance I can implement a range
> partitioner by some field then perform reduce locally. And actually this
> argument goes for all the local stream reducers even on windows. So maybe
> there should be an option for local/global.
>
>
True, if the user controls the partitioning it might be useful. :-)


> Simple Batch/Window reduce:
>
> >The associativity assumption for reduce functions holds. This also
> >leveraged by the batch API which uses reduce functions for local
> aggregates
> >(combiners) as well as for global aggregates (reduce). For windowed
> >streaming you can do the same thing. Do local preaggregations on windows,
> >emit the partial result once a window is complete, and reinitialize the
> >partital aggragator (start with no initial state). The global aggregator
> >eagerly reduces a preaggregate with the last full aggregate (no
> windowing).
> >Since, preaggregators are reinitialized, the global aggregators does not
> >need to hold individiual state for the preaggregates and only keeps the
> >last full aggregate.
>
> Yes I agree with the preaggregation approach. On a second thought this will
> only make sense on windowed data streams where the window is defined by
> system time. Anything else, for example batch (number of incoming records)
> seems impossible to align if the datastream is not 100% balanced, which
> will not happen if the partitioning is not shuffle. (so this is another
> issue)
>
>
I guess it depends on the windowing "contract". If we want to offer exact
windows, you are right. This is would be hard to implement. If windows are
an upper bound, it might work if n preaggregators preaggregate
(windowssize/n) elements. I think the question is, if the size of a window
is essential for the semantics of a job or "only" to control preformance
and accuracy. The former case is hard, the latter might be much easier to
solve. But this definitely needs more thought. I guess a bit research on
how other systems solve this might be useful. We're probably not the first
once to encounter this challenge ;-)


> Batch/window groupreduce:
> > You can do the same as for window.reduce if the GroupReduce function
> >implements the combine interface (Combine functions must be
> >associative!). In addition to the local preaggregates, the combine is also
> >be used on the global aggregate to further reduce the state by combining
> >the preaggregates. The reduce function is only called with a single value
> >(the combined preaggregates) to make sure that the result is correct.
> >If the GroupReduce function does not implement a combine interface, don't
> >think that this can be done in a practical way (i.e., without caching the
> >full stream).
>
> Good call with the combine interface, I haven't thought about that! Same
> issue with not-time windows as in the previous one.
>
> Group reduces:
> >I think you can play the preaggregation/combine tricks for windows here as
> >well. So even in case of high data skew, you could do preaggregations for
> a
> >single group with multiple combiners in parallel.
>
> Good point, I havent thought about this, we assumed that the grouping would
> give a balanced distribution but you are right.
>
>
> I suggest that we don't include these global reducers in the upcoming
> release, there is a lot of ways the api can be implemented it seems. I
> think we should give it more thought, and experiment with the different
> approaches first before we release it. What do you think about this?
>
>
+1 for excluding the global reduce.
We should also carefully think about the semantics of the windows too to
avoid that our semantics change (which still might happen...).


> Regards,
> Gyula
>
>
Cheers, Fabian

>
> On Thu, Sep 25, 2014 at 12:14 AM, Fabian Hueske <[hidden email]>
> wrote:
>
> > Hi, thanks for starting this discussion!
> > I added some comments inline.
> >
> > 2014-09-24 11:43 GMT+02:00 Gyula Fóra <[hidden email]>:
> >
> > > Hey All,
> > >
> > > I would like to bring up a discussion regarding the different reduce
> > > functionalities in the streaming API, because there are several ways
> they
> > > can (and cannot) be implemented efficiently which will work in totally
> > > different ways.
> > >
> > > I will go through the different reduce types and their problems:
> > >
> > > *Simple reduce:*
> > > dataStream.reduce(ReduceFunction)
> > >
> > > The current working mechanism is that it combines the incoming elements
> > > using the reducefunction and emits the current reduced value. (so after
> > > every incoming, value there is one emit with the reduced). Currently if
> > the
> > > reducer has higher parallelism than 1, the reduced value is only a
> > 'local'
> > > reduce on the given vertex (there is no global reduce step). The
> problem
> > > here is that introducing a global reduce step would be equivalent to
> > > setting the reducer parallelism to 1, since we want to emit the current
> > > reduced value after each incoming data point. So the question here is
> if
> > > the current mechanism makes sense without a global aggregate or should
> we
> > > force parallelism 1.
> > >
> > >
> > Not sure if there are use cases for multiple local reducers. I think I'd
> > enforce DOP = 1 in this case.
> >
> >
> > > This is also the case for aggregation operators.
> > >
> > >  *Simple Batch/Window reduce:*
> > > dataStream.window(1000).reduce(ReduceFunction)
> > >
> > > The current working mechanism is that it combines incoming elements
> with
> > a
> > > ReduceFunction in windows/batches, currently also 'locally' on each
> > > parallel vertex and emitting one reduced output after each window. Here
> > the
> > > same issue of global aggregation can be solved by introducing a global
> > > aggregator vertex with parallelism 1, which wouldnt cause a serious
> > > overhead if the windows are not too small.
> > >
> > > Another issue here is the assumptions we can make about the
> user-defined
> > > ReduceFunction. If we assume that the function is associative(we
> > currently
> > > assume this) then the window reduce operators can be implemented to be
> > > almost as fast as simple reduces by storing pre-reduced groups of
> values.
> > > Do you think it is okay to make this assumption?
> > >
> > >
> > The associativity assumption for reduce functions holds. This also
> > leveraged by the batch API which uses reduce functions for local
> aggregates
> > (combiners) as well as for global aggregates (reduce). For windowed
> > streaming you can do the same thing. Do local preaggregations on windows,
> > emit the partial result once a window is complete, and reinitialize the
> > partital aggragator (start with no initial state). The global aggregator
> > eagerly reduces a preaggregate with the last full aggregate (no
> windowing).
> > Since, preaggregators are reinitialized, the global aggregators does not
> > need to hold individiual state for the preaggregates and only keeps the
> > last full aggregate.
> >
> >
> > > *Batch/window groupreduce:*
> > > dataStream.window(1000).reduceGroup(GroupReduceFunction)
> > >
> > > The difference between .reduce and . groupReduce is that the user gets
> > the
> > > window/batch as an iterable which can be quite useful in some cases.
> The
> > > problem here is the same as with the simple reduce, that is we couldnt
> > > figure out how to make global aggregations efficient. Unlike with
> window
> > > reduce where we can create a global aggregator vertex here that is
> > > impossible because the different working mechanics of the GroupReduce
> > > function (iterable input with custom output type).
> > >
> > > So even if we will make the window reduce global, the window
> groupreduce
> > > will have to remain local if we dont want to enforce a parallelism=1
> > > bottleneck. This would make the API ambiguous.
> > >
> > >
> > > You can do the same as for window.reduce if the GroupReduce function
> > implements the combine interface (Combine functions must be
> > associative!). In addition to the local preaggregates, the combine is
> also
> > be used on the global aggregate to further reduce the state by combining
> > the preaggregates. The reduce function is only called with a single value
> > (the combined preaggregates) to make sure that the result is correct.
> > If the GroupReduce function does not implement a combine interface, don't
> > think that this can be done in a practical way (i.e., without caching the
> > full stream).
> >
> >
> >
> > > *Grouped reduces*
> > >
> > > dataStream.groupBy(keyPos).reduce(ReduceFunction)
> > > datastream.groupBy.(keyPos).window(1000).reduce/groupreduce
> > >
> > > Here we dont have the previous problems since local aggregations work
> as
> > > globals.
> > >
> > >
> > I think you can play the preaggregation/combine tricks for windows here
> as
> > well. So even in case of high data skew, you could do preaggregations
> for a
> > single group with multiple combiners in parallel.
> >
> >
> > >
> > > So any ideas regarding this global/local reduce issue and reduce
> function
> > > associativity are appreciated :)
> > >
> > > Regards,
> > > Gyula
> >
> >
> > I hope I got everything right ;-)
> >
> > Cheers, Fabian
> >
>