import numpy as np
from gpaw.arraydict import ArrayDict
def to_parent_comm(partition):
# XXX assume communicator is strided, i.e. regular.
# This actually imposes implicit limitations on things, but is not
# "likely" to cause trouble with the usual communicators, i.e.
# for gd/kd/bd.
parent = partition.comm.parent
if parent is None:
# This should not ordinarily be necessary, but when running with
# AtomPAW, it is. So let's stay out of trouble.
return partition
members = partition.comm.get_members()
parent_rank_a = members[partition.rank_a]
# XXXX we hope and pray that our communicator is "equivalent" to
# that which includes parent's rank0.
assert min(members) == members[0]
parent_rank_a -= members[0] # yuckkk
return AtomPartition(parent, parent_rank_a,
name='parent-%s' % partition.name)
class AtomicMatrixDistributor:
"""Class to distribute atomic dictionaries like dH_asp and D_asp."""
def __init__(self, atom_partition, broadcast_comm,
work_partition=None):
# Assumptions on communicators are as follows.
#
# atom_partition represents standard domain decomposition, and
# broadcast_comm are the corresponding kpt/band communicators
# together encompassing wfs.world.
#
# Initially, dH_asp are distributed over domains according to the
# physical location of each atom, but duplicated across band
# and k-point communicators.
#
# The idea is to transfer dH_asp so they are distributed equally
# among all ranks on wfs.world, and back, when necessary.
self.broadcast_comm = broadcast_comm
self.grid_partition = atom_partition
self.grid_unique_partition = to_parent_comm(self.grid_partition)
# This represents a full distribution across grid, kpt, and band.
if work_partition is None:
work_partition = self.grid_unique_partition.as_even_partition()
self.work_partition = work_partition
def distribute(self, D_asp):
# Right now the D are duplicated across the band/kpt comms.
# Here we pick out a set of unique D. With duplicates out,
# we can redistribute one-to-one to the larger work_partition.
# assert D_asp.partition == self.grid_partition
Ddist_asp = ArrayDict(self.grid_unique_partition, D_asp.shapes_a,
dtype=D_asp.dtype)
if self.broadcast_comm.rank != 0:
assert len(Ddist_asp) == 0
for a in Ddist_asp:
Ddist_asp[a] = D_asp[a]
Ddist_asp.redistribute(self.work_partition)
return Ddist_asp
def collect(self, dHdist_asp):
# We have an array on work_partition. We want first to
# collect it on grid_unique_partition, the broadcast it
# to grid_partition.
# First receive one-to-one from everywhere.
# assert dHdist_asp.partition == self.work_partition
dHdist_asp = dHdist_asp.copy()
dHdist_asp.redistribute(self.grid_unique_partition)
dH_asp = ArrayDict(self.grid_partition, dHdist_asp.shapes_a,
dtype=dHdist_asp.dtype)
if self.broadcast_comm.rank == 0:
buf = dHdist_asp.toarray()
assert not np.isnan(buf).any()
else:
buf = dH_asp.toarray()
buf[:] = np.nan # Let's be careful for now like --debug mode
self.broadcast_comm.broadcast(buf, 0)
assert not np.isnan(buf).any()
dH_asp.fromarray(buf)
return dH_asp
class EvenPartitioning:
"""Represents an even partitioning of N elements over a communicator.
For example N=17 and comm.size=5 will result in this distribution:
* rank 0 has 3 local elements: 0, 1, 2
* rank 1 has 3 local elements: 3, 4, 5
* rank 2 has 3 local elements: 6, 7, 8
* rank 3 has 4 local elements: 9, 10, 11, 12
* rank 4 has 4 local elements: 13, 14, 15, 16
This class uses only the 'rank' and 'size' communicator attributes."""
def __init__(self, comm, N):
# Conventions:
# n, N: local/global size
# i, I: local/global index
self.comm = comm
self.N = N
self.nlong = -(-N // comm.size) # size of a long slice
self.nshort = N // comm.size # size of a short slice
self.longcount = N % comm.size # number of ranks with a long slice
self.shortcount = comm.size - self.longcount # ranks with short slice
def nlocal(self, rank=None):
"""Get the number of locally stored elements."""
if rank is None:
rank = self.comm.rank
if rank < self.shortcount:
return self.nshort
else:
return self.nlong
def minmax(self, rank=None):
"""Get the minimum and maximum index of elements stored locally."""
if rank is None:
rank = self.comm.rank
I1 = self.nshort * rank
if rank < self.shortcount:
I2 = I1 + self.nshort
else:
I1 += rank - self.shortcount
I2 = I1 + self.nlong
return I1, I2
def slice(self, rank=None):
"""Get the list of indices of locally stored elements."""
I1, I2 = self.minmax(rank=rank)
return np.arange(I1, I2)
def global2local(self, I):
"""Get a tuple (rank, local index) from global index I."""
nIshort = self.nshort * self.shortcount
if I < nIshort:
return I // self.nshort, I % self.nshort
else:
Ioffset = I - nIshort
return (self.shortcount + Ioffset // self.nlong,
Ioffset % self.nlong)
def local2global(self, i, rank=None):
"""Get global index I corresponding to local index i on rank."""
if rank is None:
rank = self.comm.rank
return rank * self.nshort + max(rank - self.shortcount, 0) + i
def as_atom_partition(self, strided=False, name='unnamed-even'):
rank_a = [self.global2local(i)[0] for i in range(self.N)]
if strided:
rank_a = np.arange(self.comm.size).repeat(self.nlong)
rank_a = rank_a.reshape(self.comm.size, -1).T.ravel()
rank_a = rank_a[self.shortcount:].copy()
return AtomPartition(self.comm, rank_a, name=name)
def get_description(self):
lines = []
for a in range(self.comm.size):
elements = ', '.join(map(str, self.slice(a)))
line = 'rank %d has %d local elements: %s' % (a, self.nlocal(a),
elements)
lines.append(line)
return '\n'.join(lines)
# Interface for things that can be redistributed with general_redistribute
class Redistributable:
def get_recvbuffer(self, a):
raise NotImplementedError
def get_sendbuffer(self, a):
raise NotImplementedError
def assign(self, a):
raise NotImplementedError
# Let's keep this as an independent function for now in case we change the
# classes 5 times, like we do
def general_redistribute(comm, src_rank_a, dst_rank_a, redistributable):
# To do: it should be possible to specify duplication to several ranks
# But how is this done best?
requests = []
flags = (src_rank_a != dst_rank_a)
my_incoming_atom_indices = np.argwhere(
np.bitwise_and(flags, dst_rank_a == comm.rank)).ravel()
my_outgoing_atom_indices = np.argwhere(
np.bitwise_and(flags, src_rank_a == comm.rank)).ravel()
for a in my_incoming_atom_indices:
# Get matrix from old domain:
buf = redistributable.get_recvbuffer(a)
requests.append(comm.receive(buf, src_rank_a[a], tag=a, block=False))
# These arrays are not supposed to pointers into a larger,
# contiguous buffer, so we should make a copy - except we
# must wait until we have completed the send/receiving
# into them, so we will do it a few lines down.
redistributable.assign(a, buf)
for a in my_outgoing_atom_indices:
# Send matrix to new domain:
buf = redistributable.get_sendbuffer(a)
requests.append(comm.send(buf, dst_rank_a[a], tag=a, block=False))
comm.waitall(requests)
[docs]class AtomPartition:
"""Represents atoms distributed on a standard grid descriptor."""
def __init__(self, comm, rank_a, name='unnamed'):
self.comm = comm
self.rank_a = np.array(rank_a)
self.my_indices = self.get_indices(comm.rank)
self.natoms = len(rank_a)
self.name = name
def __eq__(self, other: object) -> bool:
if not isinstance(other, AtomPartition):
return NotImplemented
return (self.comm.compare(other.comm) in ['ident', 'congruent']
and np.array_equal(self.rank_a, other.rank_a))
def __ne__(self, other):
return not self == other
def as_serial(self):
return AtomPartition(self.comm, np.zeros(self.natoms, int),
name='%s-serial' % self.name)
def get_indices(self, rank):
return np.where(self.rank_a == rank)[0]
def as_even_partition(self):
even_part = EvenPartitioning(self.comm, len(self.rank_a))
return even_part.as_atom_partition()
def redistribute(self, new_partition, atomdict_ax, get_empty):
# XXX we the two communicators to be equal according to
# some proper criterion like MPI_Comm_compare -> MPI_IDENT.
# But that is not implemented, so we don't.
if self.comm.compare(new_partition.comm) not in ['ident',
'congruent']:
msg = ('Incompatible partitions %s --> %s. '
'Communicators must be at least congruent'
% (self, new_partition))
raise ValueError(msg)
# atomdict_ax may be a dictionary or a list of dictionaries
has_many = not hasattr(atomdict_ax, 'items')
if has_many:
class Redist:
def get_recvbuffer(self, a):
return get_empty(a)
def assign(self, a, b_x):
for u, d_ax in enumerate(atomdict_ax):
assert a not in d_ax
atomdict_ax[u][a] = b_x[u]
def get_sendbuffer(self, a):
return np.array([d_ax.data.pop(a) for d_ax in atomdict_ax])
else:
class Redist:
def get_recvbuffer(self, a):
return get_empty(a)
def assign(self, a, b_x):
assert a not in atomdict_ax
atomdict_ax[a] = b_x
def get_sendbuffer(self, a):
return atomdict_ax.data.pop(a)
try:
general_redistribute(self.comm, self.rank_a,
new_partition.rank_a, Redist())
except ValueError as err:
raise ValueError('redistribute %s --> %s: %s'
% (self, new_partition, err))
if isinstance(atomdict_ax, ArrayDict):
atomdict_ax.partition = new_partition # XXX
atomdict_ax.check_consistency()
def __repr__(self):
indextext = ', '.join(map(str, self.my_indices))
return ('%s %s@rank%d/%d (%d/%d): [%s]'
% (self.__class__.__name__, self.name, self.comm.rank,
self.comm.size, len(self.my_indices), self.natoms,
indextext))
def arraydict(self, shapes, dtype=float):
return ArrayDict(self, shapes, dtype)