Collective Communication Operations#
Collective communication operations accessible via the ccl attribute on the Iris instance (e.g. ctx.ccl.all_reduce(...)).
all_to_all#
- CCL.all_to_all(output_tensor, input_tensor, group=None, async_op=False, config=None)#
All-to-all collective operation.
Each rank sends a tensor chunk to each other rank and receives a tensor chunk from each other rank. Input/output tensors should have shape (M, N * world_size) where each chunk of N columns corresponds to one rank.
- Parameters:
output_tensor – Output tensor of shape (M, N * world_size)
input_tensor – Input tensor of shape (M, N * world_size)
group – ProcessGroup or None. If None, uses all ranks in shmem context. Default: None.
async_op – If False, performs a barrier at the end. If True, returns immediately. Default: False.
config – Config instance with kernel parameters (default: None). If None, uses default Config values.
Example
>>> ctx = iris.iris() >>> ctx.ccl.all_to_all(output_tensor, input_tensor)
>>> # Custom configuration >>> from iris.ccl import Config >>> config = Config(block_size_m=128, block_size_n=32) >>> ctx.ccl.all_to_all(output_tensor, input_tensor, config=config)
>>> # Async operation (no barrier) >>> ctx.ccl.all_to_all(output_tensor, input_tensor, async_op=True)
all_gather#
- CCL.all_gather(output_tensor, input_tensor, group=None, async_op=False, config=None)#
All-gather collective operation.
Each rank sends its input tensor to all ranks, and all ranks receive and concatenate all input tensors along dimension 0 (rows), matching torch.distributed.all_gather_into_tensor behavior.
- Parameters:
output_tensor – Output tensor of shape (world_size * M, N) - will contain concatenated inputs
input_tensor – Input tensor of shape (M, N) - local rank’s data to send
group – ProcessGroup or None. If None, uses all ranks in shmem context. Default: None.
async_op – If False, performs a barrier at the end. If True, returns immediately. Default: False.
config – Config instance with kernel parameters (default: None). If None, uses default Config values.
Example
>>> ctx = iris.iris() >>> # Input: (M, N), Output: (world_size * M, N) >>> ctx.ccl.all_gather(output_tensor, input_tensor)
>>> # Custom configuration >>> from iris.ccl import Config >>> config = Config(block_size_m=128, block_size_n=32) >>> ctx.ccl.all_gather(output_tensor, input_tensor, config=config)
>>> # Async operation (no barrier) >>> ctx.ccl.all_gather(output_tensor, input_tensor, async_op=True)
all_reduce_preamble#
- CCL.all_reduce_preamble(output_tensor, input_tensor, config=None, workspace=None)#
Prepare reusable workspace for all-reduce.
- Parameters:
output_tensor – Output tensor that will receive the reduced data.
input_tensor – Input tensor providing the local contribution.
config – Optional Config describing variant parameters.
workspace – Optional existing workspace to update/reuse.
- Returns:
Workspace object that can be passed to
all_reduce.
all_reduce#
- CCL.all_reduce(output_tensor, input_tensor, op=None, group=None, async_op=False, config=None, workspace=None)#
All-reduce collective operation.
Each rank has a local input tensor, and all ranks compute the sum of all input tensors. The result is written to output_tensor on all ranks.
- Parameters:
output_tensor – Output tensor of shape (M, N) - will contain sum of all inputs
input_tensor – Input tensor of shape (M, N) - local rank’s partial data
op – Reduction operation to apply. Currently only ReduceOp.SUM is supported. Default: ReduceOp.SUM.
group – ProcessGroup or None. If None, uses all ranks in shmem context. Default: None.
async_op – If False, performs a barrier at the end. If True, returns immediately. Default: False.
config – Config instance with kernel parameters (default: None). If None, uses default Config values. Set config.all_reduce_variant to choose variant: “atomic”, “ring”, or “two_shot”
workspace – Optional workspace prepared by
all_reduce_preambleto reuse internal buffers across invocations.
Example
>>> ctx = iris.iris() >>> ctx.ccl.all_reduce(output_tensor, input_tensor)
>>> # Custom configuration with ring variant >>> from iris.ccl import Config >>> config = Config(all_reduce_variant="ring") >>> ctx.ccl.all_reduce(output_tensor, input_tensor, config=config)
>>> # Two-shot variant with block distribution >>> config = Config(all_reduce_variant="two_shot", all_reduce_distribution=1) >>> ctx.ccl.all_reduce(output_tensor, input_tensor, config=config)
>>> # Async operation (no barrier) >>> ctx.ccl.all_reduce(output_tensor, input_tensor, async_op=True)
reduce_scatter#
- CCL.reduce_scatter(output_tensor, input_tensor, op=None, group=None, async_op=False, config=None)#
Reduce-scatter collective operation.
Each rank reduces its assigned tiles from all ranks’ inputs and stores the result only to its own output tensor. This is similar to all-reduce but without broadcasting the result to all ranks.
- Parameters:
output_tensor – Output tensor of shape (M, N) - will contain reduced tiles for this rank
input_tensor – Input tensor of shape (M, N) - local rank’s partial data
op – Reduction operation to apply. Currently only ReduceOp.SUM is supported. Default: ReduceOp.SUM.
group – ProcessGroup or None. If None, uses all ranks in shmem context. Default: None.
async_op – If False, performs a barrier at the end. If True, returns immediately. Default: False.
config – Config instance with kernel parameters (default: None). If None, uses default Config values. Only supports reduce_scatter_variant=”two_shot”.
Example
>>> ctx = iris.iris() >>> ctx.ccl.reduce_scatter(output_tensor, input_tensor)
>>> # Custom configuration >>> from iris.ccl import Config >>> config = Config(reduce_scatter_variant="two_shot", all_reduce_distribution=1) >>> ctx.ccl.reduce_scatter(output_tensor, input_tensor, config=config)