Source code for gpaw.core.atom_arrays

from __future__ import annotations

import numbers
from typing import Sequence, overload

import numpy as np
from gpaw.core.matrix import Matrix
from gpaw.gpu import cupy as cp, XP
from gpaw.mpi import MPIComm, serial_comm
from gpaw.new import prod, zips
from gpaw.typing import Array1D, ArrayLike1D, Literal
from gpaw.new.c import dH_aii_times_P_ani_gpu


[docs]class AtomArraysLayout(XP): def __init__(self, shapes: Sequence[int | tuple[int, ...]], atomdist: AtomDistribution | MPIComm = serial_comm, dtype=float, xp=None): """Description of layout of atom arrays. Parameters ---------- shapes: Shapse of arrays - one for each atom. atomdist: Distribution of atoms. dtype: Data-type (float or complex). """ self.shape_a = [shape if isinstance(shape, tuple) else (shape,) for shape in shapes] if not isinstance(atomdist, AtomDistribution): atomdist = AtomDistribution(np.zeros(len(shapes), int), atomdist) self.atomdist = atomdist self.dtype = np.dtype(dtype) XP.__init__(self, xp or np) self.size = sum(prod(shape) for shape in self.shape_a) self.myindices = [] self.mysize = 0 I1 = 0 for a in atomdist.indices: I2 = I1 + prod(self.shape_a[a]) self.myindices.append((a, I1, I2)) self.mysize += I2 - I1 I1 = I2 def __repr__(self): return (f'AtomArraysLayout({self.shape_a}, {self.atomdist}, ' f'{self.dtype}, xp={self.xp.__name__})')
[docs] def new(self, atomdist=None, dtype=None, xp=None): """Create new AtomsArrayLayout object with new atomdist.""" return AtomArraysLayout(self.shape_a, atomdist or self.atomdist, dtype or self.dtype, xp or self.xp)
[docs] def empty(self, dims: int | tuple[int, ...] = (), comm: MPIComm = serial_comm) -> AtomArrays: """Create new AtomArrays object. parameters ---------- dims: Extra dimensions. comm: Distribute dimensions along this communicator. """ return AtomArrays(self, dims, comm)
[docs] def zeros(self, dims: int | tuple[int, ...] = (), comm: MPIComm = serial_comm) -> AtomArrays: aa = self.empty(dims, comm) aa.data[:] = 0.0 return aa
[docs] def sizes(self) -> tuple[list[dict[int, int]], Array1D]: """Compute array sizes for all ranks. >>> AtomArraysLayout([3, 4]).sizes() ([{0: 3, 1: 4}], array([7])) """ comm = self.atomdist.comm size_ra: list[dict[int, int]] = [{} for _ in range(comm.size)] size_r = np.zeros(comm.size, int) for a, (rank, shape) in enumerate(zips(self.atomdist.rank_a, self.shape_a)): size = prod(shape) size_ra[rank][a] = size size_r[rank] += size return size_ra, size_r
[docs]class AtomDistribution: def __init__(self, ranks: ArrayLike1D, comm: MPIComm = serial_comm): """Atom-distribution. Parameters ---------- ranks: List of ranks, one rank per atom. comm: MPI-communicator. """ self.comm = comm self.rank_a = np.array(ranks) self.indices = np.where(self.rank_a == comm.rank)[0]
[docs] @classmethod def from_number_of_atoms(cls, natoms: int, comm: MPIComm = serial_comm) -> AtomDistribution: """Distribute atoms evenly. >>> AtomDistribution.from_number_of_atoms(3).rank_a array([0, 0, 0]) """ blocksize = (natoms + comm.size - 1) // comm.size rank_a = np.empty(natoms, int) a1 = 0 for rank in range(comm.size): a2 = a1 + blocksize rank_a[a1:a2] = rank if a2 >= natoms: break a1 = a2 return cls(rank_a, comm)
[docs] @classmethod def from_atom_indices(cls, atom_indices: Sequence[int], comm: MPIComm = serial_comm, *, natoms: int | None = None) -> AtomDistribution: """Create distribution from atom indices. >>> AtomDistribution.from_atom_indices([0, 1, 2]).rank_a array([0, 0, 0]) """ if natoms is None: natoms = comm.max_scalar(max(atom_indices)) + 1 rank_a = np.zeros(natoms, int) # type: ignore rank_a[atom_indices] = comm.rank comm.sum(rank_a) return cls(rank_a, comm)
def __repr__(self): return (f'AtomDistribution(ranks={self.rank_a}, ' f'comm={self.comm.rank}/{self.comm.size})')
[docs] def gather(self): return AtomDistribution(np.zeros(len(self.rank_a), int))
[docs]class AtomArrays: def __init__(self, layout: AtomArraysLayout, dims: int | Sequence[int] = (), comm: MPIComm = serial_comm, data: np.ndarray | None = None): """AtomArrays object. parameters ---------- layout: Layout-description. dims: Extra dimensions. comm: Distribute dimensions along this communicator. data: Data array for storage. """ myshape = (layout.mysize,) domain_comm = layout.atomdist.comm dtype = layout.dtype self.myshape = myshape self.comm = comm self.domain_comm = domain_comm # convert int to tuple: self.dims = tuple(dims) if not isinstance(dims, int) else (dims,) if self.dims: d1, d2 = self.my_slice() mydims0 = d2 - d1 self.mydims = (mydims0,) + self.dims[1:] else: self.mydims = () fullshape = self.mydims + self.myshape if data is not None: if data.shape != fullshape: raise ValueError( f'Bad shape for data: {data.shape} != {fullshape}') if data.dtype != dtype: raise ValueError( f'Bad dtype for data: {data.dtype} != {dtype}') else: data = layout.xp.empty(fullshape, dtype) self.data = data self._matrix: Matrix | None = None # matrix view self.layout = layout self._arrays = {} for a, I1, I2 in layout.myindices: self._arrays[a] = self.data[..., I1:I2].reshape( self.mydims + layout.shape_a[a]) self.natoms: int = len(layout.shape_a)
[docs] def my_slice(self) -> tuple[int, int]: mydims0 = (self.dims[0] + self.comm.size - 1) // self.comm.size d1 = min(self.comm.rank * mydims0, self.dims[0]) d2 = min((self.comm.rank + 1) * mydims0, self.dims[0]) return d1, d2
def __repr__(self): txt = f'AtomArrays({self.layout}, dims={self.dims}' if self.comm.size > 1: txt += f', comm={self.comm.rank}/{self.comm.size}' return txt + ')' @property def matrix(self) -> Matrix: if self._matrix is not None: return self._matrix shape = (self.dims[0], prod(self.dims[1:]) * prod(self.myshape)) myshape = (self.mydims[0], prod(self.mydims[1:]) * prod(self.myshape)) dist = (self.comm, -1, 1) data = self.data.reshape(myshape) self._matrix = Matrix(*shape, data=data, dist=dist) return self._matrix
[docs] def new(self, *, layout=None, data=None, xp=None): """Create new AtomArrays object of same kind. Parameters ---------- layout: Layout-description. data: Array to use for storage. """ if xp is np: assert layout is None assert data is None assert self.layout.xp is cp layout = self.layout.new(xp=np) return AtomArrays(layout or self.layout, self.dims, self.comm, data=data)
[docs] def to_cpu(self): if self.layout.xp is np: return self return self.new(layout=self.layout.new(xp=np), data=cp.asnumpy(self.data))
[docs] def to_xp(self, xp): if self.layout.xp is xp: assert xp is np, 'cp -> cp should not be needed!' return self if xp is np: return self.new(layout=self.layout.new(xp=np), data=cp.asnumpy(self.data)) return self.new(layout=self.layout.new(xp=cp), data=cp.asarray(self.data))
def __getitem__(self, a): if isinstance(a, numbers.Integral): return self._arrays[a] if len(self.dims) == 1: a0, a1 = a assert a0 == slice(None) a_ai = AtomArrays(self.layout, data=self.data[a1].copy()) return a_ai 1 / 0
[docs] def get(self, a): return self._arrays.get(a)
def __setitem__(self, a, value): self._arrays[a][:] = value def __contains__(self, a): return a in self._arrays
[docs] def items(self): return self._arrays.items()
[docs] def keys(self): return self._arrays.keys()
[docs] def values(self): return self._arrays.values()
@overload def gather(self, broadcast: Literal[False] = False, copy: bool = False ) -> AtomArrays | None: ... @overload def gather(self, broadcast: Literal[True], copy: bool = False ) -> AtomArrays: ...
[docs] def gather(self, broadcast=False, copy=False): """Gather all atoms on master.""" comm = self.layout.atomdist.comm if comm.size == 1: if copy: aa = self.new() aa.data[:] = self.data return aa return self if comm.rank == 0 or broadcast: aa = self.new(layout=self.layout.new(atomdist=serial_comm)) else: aa = None if comm.rank == 0: size_ra, size_r = self.layout.sizes() n = prod(self.mydims) m = size_r.max() buffer = self.layout.xp.empty(n * m, self.layout.dtype) for rank in range(1, comm.size): buf = buffer[:n * size_r[rank]].reshape((n, size_r[rank])) comm.receive(buf, rank) b1 = 0 for a, size in size_ra[rank].items(): b2 = b1 + size A = aa[a] A[:] = buf[:, b1:b2].reshape(A.shape) b1 = b2 for a, array in self._arrays.items(): aa[a] = array else: comm.send(self.data, 0) if broadcast: comm.broadcast(aa.data, 0) return aa
[docs] def scatter_from(self, data: np.ndarray | AtomArrays | None = None) -> None: """Scatter atoms.""" if isinstance(data, AtomArrays): data = data.data comm = self.layout.atomdist.comm xp = self.layout.xp if comm.size == 1: self.data[:] = data return if comm.rank != 0: comm.receive(self.data, 0, 42) return size_ra, size_r = self.layout.sizes() aa = self.new(layout=self.layout.new(atomdist=serial_comm), data=data) requests = [] for rank, (totsize, size_a) in enumerate(zips(size_r, size_ra)): if rank != 0: buf = xp.empty(self.mydims + (totsize,), self.layout.dtype) b1 = 0 for a, size in size_a.items(): b2 = b1 + size buf[..., b1:b2] = aa[a].reshape(self.mydims + (size,)) b1 = b2 request = comm.send(buf, rank, 42, False) # Remember to store a reference to the # send buffer (buf) so that is isn't # deallocated requests.append((request, buf)) else: for a in size_a: self[a] = aa[a] for request, _ in requests: comm.wait(request)
[docs] def to_lower_triangle(self): """Convert `N*N` matrices to `N*(N+1)/2` vectors. >>> a = AtomArraysLayout([(3, 3)]).empty() >>> a[0][:] = [[11, 12, 13], ... [12, 22, 23], ... [13, 23, 33]] >>> a.to_lower_triangle()[0] array([11., 12., 22., 13., 23., 33.]) """ shape_a = [] for i1, i2 in self.layout.shape_a: assert i1 == i2 shape_a.append((i1 * (i1 + 1) // 2,)) xp = self.layout.xp layout = AtomArraysLayout(shape_a, self.layout.atomdist, dtype=self.layout.dtype, xp=xp) a_axp = layout.empty(self.dims) for a_xii, a_xp in zips(self.values(), a_axp.values()): i = a_xii.shape[-1] L = xp.tril_indices(i) for a_p, a_ii in zips(a_xp.reshape((-1, i * (i + 1) // 2)), a_xii.reshape((-1, i, i))): a_p[:] = a_ii[L] return a_axp
[docs] def to_full(self): r"""Convert `N(N+1)/2` vectors to `N\times N` matrices. >>> a = AtomArraysLayout([6]).empty() >>> a[0][:] = [1, 2, 3, 4, 5, 6] >>> a.to_full()[0] array([[1., 2., 4.], [2., 3., 5.], [4., 5., 6.]]) """ shape_a = [] for (p,) in self.layout.shape_a: i = int((2 * p + 0.25)**0.5) shape_a.append((i, i)) layout = AtomArraysLayout(shape_a, self.layout.atomdist, self.layout.dtype) a_axii = layout.empty(self.dims) for a_xp, a_xii in zips(self.values(), a_axii.values()): i = a_xii.shape[-1] a_xii[(...,) + np.tril_indices(i)] = a_xp u = (...,) + np.triu_indices(i, 1) a_xii[u] = np.swapaxes(a_xii, -1, -2)[u].conj() return a_axii
[docs] def moved(self, atomdist): if (self.layout.atomdist.rank_a == atomdist.rank_a).all(): return self assert self.comm.size == 1 layout = self.layout.new(atomdist=atomdist) new = layout.empty(self.dims) comm = atomdist.comm requests = [] for a, I1, I2 in self.layout.myindices: r = layout.atomdist.rank_a[a] if r == comm.rank: new[a][:] = self[a] else: requests.append(comm.send(np.ascontiguousarray(self[a]), r, block=False)) for a, I1, I2 in layout.myindices: r = self.layout.atomdist.rank_a[a] if r != comm.rank: target = new[a] buf = np.empty_like(target) comm.receive(buf, r) target[:] = buf comm.waitall(requests) return new
[docs] def redist(self, atomdist: AtomDistribution, comm1: MPIComm, comm2: MPIComm) -> AtomArrays: layout = self.layout.new(atomdist=atomdist) result = layout.empty(dims=self.dims) if comm1.rank == 0: a = self.gather() else: a = None if comm2.rank == 0: result.scatter_from(a) comm2.broadcast(result.data, 0) return result
[docs] def block_diag_multiply(self, block_diag_matrix_axii: AtomArrays, out_ani: AtomArrays, index: int | None = None) -> None: """Multiply by block diagonal matrix. with A, B and C refering to ``self``, ``block_diag_matrix_axii`` and ``out_ani``::: -- a a a > A B -> C -- ni ij nj i If index is not None, ``block_diag_matrix_axii`` must have an extra dimension: :math:`B_{ij}^{ax}` and x=index is used. """ xp = self.layout.xp if xp is np: if index is not None: block_diag_matrix_axii = block_diag_matrix_axii[:, index] for P_ni, dX_ii, out_ni in zips(self.values(), block_diag_matrix_axii.values(), out_ani.values()): out_ni[:] = P_ni @ dX_ii return ni_a = xp.array( [I2 - I1 for a, I1, I2 in self.layout.myindices], dtype=np.int32) data = block_diag_matrix_axii.data if index is not None: data = data[index] if self.data.size > 0: dH_aii_times_P_ani_gpu(data, ni_a, self.data, out_ani.data)