feat: decouple partition location from executor metadata#1853
feat: decouple partition location from executor metadata#1853sandugood wants to merge 5 commits into
Conversation
milenkovicm
left a comment
There was a problem hiding this comment.
thanks for contribution @sandugood
i think this is good first step, but we need to go a bit further and deduplicate executor connectivity information, is there a need to have same peace of information in thousents of places.
when we serialise PartitionLocation can we serialise it in two strucutres, vector of partition locations and a hash map of executor id -> executor metadata. partition location could reference executor with executor id (which we can store as bytes)
For struct PartitionLocation, can we make share executor_meta behind Arc (pub executor_meta: Arc<ExecutorMetadata>)
#[derive(Debug, Clone)]
pub struct PartitionLocation {
/// The source partition ID from the map stage.
pub map_partition_id: usize,
/// The partition identifier.
pub partition_id: PartitionId,
/// Metadata about the executor hosting this partition.
pub executor_meta: Arc<ExecutorMetadata>,
/// Statistics about the partition data.
pub partition_stats: PartitionStats,
/// shuffle file id
pub file_id: Option<u64>,
/// whether this partition uses sort shuffle
pub is_sort_shuffle: bool,
}so basically when we do ser/de we can deduplicate executor meta.
Also, I'm not sure if flattening ExecutorSpecification and the other structure makes more sense than making it optional
This pr will need some effort, but it will help a lot cases where there is many partitions
thanks a lot
|
Thank you for your review and ideas @milenkovicm |
e5ac776 to
7fc0cea
Compare
|
Refactored the code:
|
Which issue does this PR close?
Closes #1851.
Rationale for this change
In the current implementation there is a problem - in the
PartitionLocationthat is used in each shuffle operation (for each partition in the previous stage) there wasexecutor_metadatafield (which is ofExecutorMetadatatype) filled with unnecessary info, because it did not add up any information that could be used byShuffleReaderExecto extract partition (or i.e resume the execution from a failed stage)What changes are included in this PR?
ExecutorMetadatawas decoupled fromPartitionLocationand now it is used as a separate struct for:PartitionLocationis now exposed only to theexecutor_id,hostandport, which is sufficient for fetching needed partitions by theShuffleReaderExecThis way we can save up a lot of space and data-transfer during each shuffle operation (potentially removing possibility of
Scheduler's OOM errors and improving speed of queries with lots of partitions)Are there any user-facing changes?
Yes.
In the REST API interface
ExecutorResponsedoesnt contain nested struct withExecutor's hardware and OS info. It was flattened.Potential follow-up: