[DISCUSS] FLIP-19: Improved BLOB storage architecture

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[DISCUSS] FLIP-19: Improved BLOB storage architecture

Nico Kruber
Hi all,
I'd like to initiate a discussion on some architectural changes for the BLOB
server which finally add a proper cleanup story, remove some dead code, and
extend the BLOB store's use for off-loaded (large) RPC messages.

Please have a look at FLIP-19 that details the proposed changes:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB
+storage+architecture


Regards
Nico

PS: While doing the re-write, I'd also like to fix some concurrency issues in
the current code.

signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Chesnay Schepler-3
One thing i was wondering for a long time now is why the distributed
cache is not implemented
via the blob store.

The DC is essentially just a copy routine with local caching and
automatic cleanup, so basically what
the blob store is supposed to do (i guess).

On 13.06.2017 16:31, Nico Kruber wrote:

> Hi all,
> I'd like to initiate a discussion on some architectural changes for the BLOB
> server which finally add a proper cleanup story, remove some dead code, and
> extend the BLOB store's use for off-loaded (large) RPC messages.
>
> Please have a look at FLIP-19 that details the proposed changes:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB
> +storage+architecture
>
>
> Regards
> Nico
>
> PS: While doing the re-write, I'd also like to fix some concurrency issues in
> the current code.


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Nico Kruber
Actually, I don't see the automatic cleanup you referred to but from what I
can see around the DistributedCache class and its use, it is simply a registry
for user-files whose life cycle is completely managed by the user (upload,
download, delete). Files may even reside outside of flink's control and survive
job termination.

Therefore, it is a different use case which can be (mis-)used as some sort of
blob storage. It...
(1) will not work if there is no distributed/commonly-accessible file system,
(2) does not provide a life cycle management for distributed files,
(3) does not delete local files (this gets especially complicated if files are
shared, e.g. multiple task at one TaskManager running the same job with the
same libraries),
...


Nico

On Wednesday, 14 June 2017 12:29:55 CEST Chesnay Schepler wrote:

> One thing i was wondering for a long time now is why the distributed
> cache is not implemented
> via the blob store.
>
> The DC is essentially just a copy routine with local caching and
> automatic cleanup, so basically what
> the blob store is supposed to do (i guess).
>
> On 13.06.2017 16:31, Nico Kruber wrote:
> > Hi all,
> > I'd like to initiate a discussion on some architectural changes for the
> > BLOB server which finally add a proper cleanup story, remove some dead
> > code, and extend the BLOB store's use for off-loaded (large) RPC
> > messages.
> >
> > Please have a look at FLIP-19 that details the proposed changes:
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB
> > +storage+architecture
> >
> >
> > Regards
> > Nico
> >
> > PS: While doing the re-write, I'd also like to fix some concurrency issues
> > in the current code.


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Chesnay Schepler-3
The DC does delete local files. The user requests the file through the
RuntimeContext, but
the download and local caching is completely handled by Flink.

The problems you mention is exactly why I'm suggesting to rebuild the DC
on-top of the Blob service.

If a user currently wants to use the for a local file he has to do the
following:

 1. upload the file to dfs
 2. register file with flink
 3. delete distributed file

But in step 2, couldn't we load the file into the Blob storage and
remove it once the job finishes?

On 14.06.2017 15:23, Nico Kruber wrote:

> Actually, I don't see the automatic cleanup you referred to but from what I
> can see around the DistributedCache class and its use, it is simply a registry
> for user-files whose life cycle is completely managed by the user (upload,
> download, delete). Files may even reside outside of flink's control and survive
> job termination.
>
> Therefore, it is a different use case which can be (mis-)used as some sort of
> blob storage. It...
> (1) will not work if there is no distributed/commonly-accessible file system,
> (2) does not provide a life cycle management for distributed files,
> (3) does not delete local files (this gets especially complicated if files are
> shared, e.g. multiple task at one TaskManager running the same job with the
> same libraries),
> ...
>
>
> Nico
>
> On Wednesday, 14 June 2017 12:29:55 CEST Chesnay Schepler wrote:
>> One thing i was wondering for a long time now is why the distributed
>> cache is not implemented
>> via the blob store.
>>
>> The DC is essentially just a copy routine with local caching and
>> automatic cleanup, so basically what
>> the blob store is supposed to do (i guess).
>>
>> On 13.06.2017 16:31, Nico Kruber wrote:
>>> Hi all,
>>> I'd like to initiate a discussion on some architectural changes for the
>>> BLOB server which finally add a proper cleanup story, remove some dead
>>> code, and extend the BLOB store's use for off-loaded (large) RPC
>>> messages.
>>>
>>> Please have a look at FLIP-19 that details the proposed changes:
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB
>>> +storage+architecture
>>>
>>>
>>> Regards
>>> Nico
>>>
>>> PS: While doing the re-write, I'd also like to fix some concurrency issues
>>> in the current code.


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Biao Liu
I have the same concern with Chesnay Schepler. AFIK Flink does not support
DC as well as Mapreduce and Spark. We only support DC in DataSet API. And
DC in flink do not support local files. Is this a good change to refactor
DC too?

I have another concern, currently BLOB server has some conflicts with
FLIP-6 architecture. We start JM while submitting job instead of starting
it before in FLIP-6. If BLOB server is embedded in JM we can not upload
jars and files before JM started. But the fact is that we need jars
uploaded before starting JM. Correct me is I was wrong.
To solve this problem we can separate submitting job into different stage.
Or we can separate BLOB server as a independent component parallel with RM.

Maybe we can think more about these in FLIP-19, what do you think? @Nico
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Till Rohrmann
Hi Biao,

you're right that the BlobServer won't live in the JM in FLIP-6. Instead it
will either be part of the RM or the dispatcher component depending on the
actual implementation. The requirements for the BlobServer should, however,
be the same.

Concerning the question about Flink's distributed cache, I think this is an
orthogonal topic. I think that we should be able to piggy-back on the
BlobServer once we have refactored it. This should simplify the DC's
implementation a bit.

Cheers,
Till

On Thu, Jun 15, 2017 at 9:16 AM, Biao Liu <[hidden email]> wrote:

> I have the same concern with Chesnay Schepler. AFIK Flink does not support
> DC as well as Mapreduce and Spark. We only support DC in DataSet API. And
> DC in flink do not support local files. Is this a good change to refactor
> DC too?
>
> I have another concern, currently BLOB server has some conflicts with
> FLIP-6 architecture. We start JM while submitting job instead of starting
> it before in FLIP-6. If BLOB server is embedded in JM we can not upload
> jars and files before JM started. But the fact is that we need jars
> uploaded before starting JM. Correct me is I was wrong.
> To solve this problem we can separate submitting job into different stage.
> Or we can separate BLOB server as a independent component parallel with RM.
>
> Maybe we can think more about these in FLIP-19, what do you think? @Nico
>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Biao Liu
Hi Till

I agree with you about the Flink's DC. It is another topic indeed. I just
thought that we can think more about it before refactoring BLOB service.
Make sure that it's easy to implement DC on the refactored architecture.

I have another question about BLOB service. Can we abstract the BLOB
service to some high-level interfaces? May be just some put/get methods in
the interfaces. Easy to extend will be useful in some scenarios.

For example in Yarn mode, there are some cool features interesting us.
1. Yarn can localize files only once in one slave machine, all TMs in the
same job can share these files. That may save lots of bandwidth for large
scale jobs or jobs which have large BLOBs.
2. We can skip uploading files if they are already on DFS. That's a common
scenario in distributed cache.
3. Even more, actually we don't need a BlobServer component in Yarn mode.
We can rely on DFS to distribute files. There is always a DFS available in
Yarn cluster.

If we do so, the BLOB service through network can be the default
implementation. It could work in any situation. It's also clear that it
does not dependent on Hadoop explicitly. And we can do some optimization in
different kinds of clusters without any hacking.

That are just some rough ideas above. But I think well abstracted
interfaces will be very helpful.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture

Till Rohrmann
Hi Biao,

you're right. What you've described is a totally valid use case and we
should design the interfaces such that you can have specialized
implementations for the cases where you can exploit things like a common
DFS. I think Nico's design should include this.

Cheers,
Till

On Fri, Jun 16, 2017 at 4:10 PM, Biao Liu <[hidden email]> wrote:

> Hi Till
>
> I agree with you about the Flink's DC. It is another topic indeed. I just
> thought that we can think more about it before refactoring BLOB service.
> Make sure that it's easy to implement DC on the refactored architecture.
>
> I have another question about BLOB service. Can we abstract the BLOB
> service to some high-level interfaces? May be just some put/get methods in
> the interfaces. Easy to extend will be useful in some scenarios.
>
> For example in Yarn mode, there are some cool features interesting us.
> 1. Yarn can localize files only once in one slave machine, all TMs in the
> same job can share these files. That may save lots of bandwidth for large
> scale jobs or jobs which have large BLOBs.
> 2. We can skip uploading files if they are already on DFS. That's a common
> scenario in distributed cache.
> 3. Even more, actually we don't need a BlobServer component in Yarn mode.
> We can rely on DFS to distribute files. There is always a DFS available in
> Yarn cluster.
>
> If we do so, the BLOB service through network can be the default
> implementation. It could work in any situation. It's also clear that it
> does not dependent on Hadoop explicitly. And we can do some optimization in
> different kinds of clusters without any hacking.
>
> That are just some rough ideas above. But I think well abstracted
> interfaces will be very helpful.
>
Loading...