Skip to content

Commit

Permalink
[BACKPORT] Ignore broadcaster's locality when assign subtasks (#2943) (
Browse files Browse the repository at this point in the history
…#2994)

Co-authored-by: He Kaisheng <[email protected]>
  • Loading branch information
wjsi and hekaisheng authored May 5, 2022
1 parent 7601ea6 commit 85331e8
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 3 deletions.
3 changes: 2 additions & 1 deletion mars/core/entity/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ...serialization.serializables import FieldTypes, TupleField
from ...serialization.serializables import BoolField, FieldTypes, TupleField
from ...utils import tokenize
from .core import EntityData, Entity


class ChunkData(EntityData):
__slots__ = ()

is_broadcaster = BoolField("is_broadcaster", default=False)
# optional fields
_index = TupleField("index", FieldTypes.uint32)

Expand Down
5 changes: 4 additions & 1 deletion mars/dataframe/merge/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def execute_map(cls, ctx, op):
@classmethod
def execute_reduce(cls, ctx, op: "DataFrameMergeAlign"):
chunk = op.outputs[0]
input_idx_to_df = dict(op.iter_mapper_data_with_index(ctx))
input_idx_to_df = dict(op.iter_mapper_data_with_index(ctx, skip_none=True))
row_idxes = sorted({idx[0] for idx in input_idx_to_df})

res = []
Expand Down Expand Up @@ -321,6 +321,7 @@ def _tile_one_chunk(cls, op, left, right):
elif len(left.chunks) == 1:
out_chunks = []
left_chunk = left.chunks[0]
left_chunk.is_broadcaster = True
for c in right.chunks:
merge_op = op.copy().reset_key()
out_chunk = merge_op.new_chunk(
Expand All @@ -338,6 +339,8 @@ def _tile_one_chunk(cls, op, left, right):
else:
out_chunks = []
right_chunk = right.chunks[0]
# set `is_broadcaster` as True
right_chunk.is_broadcaster = True
for c in left.chunks:
merge_op = op.copy().reset_key()
out_chunk = merge_op.new_chunk(
Expand Down
7 changes: 7 additions & 0 deletions mars/services/scheduling/supervisor/assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def _get_random_band(self, is_gpu: bool):

async def assign_subtasks(self, subtasks: List[Subtask]):
inp_keys = set()
broadcaster_keys = set()
selected_bands = dict()

if not self._bands:
Expand Down Expand Up @@ -120,6 +121,8 @@ async def assign_subtasks(self, subtasks: List[Subtask]):
continue
for indep_chunk in subtask.chunk_graph.iter_indep():
if isinstance(indep_chunk.op, Fetch):
if indep_chunk.is_broadcaster:
broadcaster_keys.add(indep_chunk.key)
inp_keys.add(indep_chunk.key)
elif isinstance(indep_chunk.op, FetchShuffle):
selected_bands[subtask.subtask_id] = [self._get_random_band(is_gpu)]
Expand All @@ -132,6 +135,10 @@ async def assign_subtasks(self, subtasks: List[Subtask]):
)

inp_metas = dict(zip(inp_keys, metas))
if broadcaster_keys:
# set broadcaster's size as 0 to avoid assigning all successors to same band.
for key in broadcaster_keys:
inp_metas[key]["store_size"] = 0
assigns = []
for subtask in subtasks:
is_gpu = any(c.op.gpu for c in subtask.chunk_graph)
Expand Down
34 changes: 34 additions & 0 deletions mars/services/scheduling/supervisor/tests/test_assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,40 @@ async def test_assign_cpu_tasks(actor_pool):
assert "gpu" in str(err.value)


@pytest.mark.asyncio
@pytest.mark.parametrize("actor_pool", [False], indirect=True)
async def test_assign_broadcaster(actor_pool):
pool, session_id, assigner_ref, cluster_api, meta_api = actor_pool

broadcaster = TensorFetch(key="x", source_key="x", dtype=np.dtype(int)).new_chunk(
[], is_broadcaster=True
)
input_chunk = TensorFetch(key="a", source_key="a", dtype=np.dtype(int)).new_chunk(
[]
)
result_chunk = TensorTreeAdd(args=[broadcaster, input_chunk]).new_chunk(
[broadcaster, input_chunk]
)

chunk_graph = ChunkGraph([result_chunk])
chunk_graph.add_node(broadcaster)
chunk_graph.add_node(input_chunk)
chunk_graph.add_node(result_chunk)
chunk_graph.add_edge(broadcaster, result_chunk)
chunk_graph.add_edge(input_chunk, result_chunk)

await meta_api.set_chunk_meta(
broadcaster, memory_size=1000, store_size=200, bands=[("address0", "numa-0")]
)
await meta_api.set_chunk_meta(
input_chunk, memory_size=200, store_size=200, bands=[("address1", "numa-0")]
)

subtask = Subtask("test_task", session_id, chunk_graph=chunk_graph)
[result] = await assigner_ref.assign_subtasks([subtask])
assert result == ("address1", "numa-0")


@pytest.mark.asyncio
@pytest.mark.parametrize("actor_pool", [True], indirect=True)
async def test_assign_gpu_tasks(actor_pool):
Expand Down
9 changes: 8 additions & 1 deletion mars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,14 @@ def build_fetch_chunk(
# for non-shuffle nodes, we build Fetch chunks
# to replace original chunk
op = chunk_op.get_fetch_op_cls(chunk)(sparse=chunk.op.sparse, gpu=chunk.op.gpu)
return op.new_chunk(None, kws=[params], _key=chunk.key, _id=chunk.id, **kwargs)
return op.new_chunk(
None,
is_broadcaster=chunk.is_broadcaster,
kws=[params],
_key=chunk.key,
_id=chunk.id,
**kwargs,
)


def build_fetch_tileable(tileable: TileableType) -> TileableType:
Expand Down

0 comments on commit 85331e8

Please sign in to comment.