__author__ = "Robin De Schepper"
__email__ = "robingilbert.deschepper@unipv.it"
__version__ = "1.1.0"
import mpi4py.MPI as MPI
import sys
import time
import atexit
import warnings
import numpy as np
[docs]def sync(comm=None, master=0):
"""
Create a :class:`.WindowController` that synchronizes read write operations across all
MPI processes in the communicator.
:param comm: MPI communicator
:type comm: :class:`mpi4py.MPI.Communicator`
:param master: Rank of the master of the communicator, will be picked whenever
something needs to be organized or decided by a single node in the communicator.
:type comm: int
:return: A controller
:rtype: :class:`.WindowController`
"""
return WindowController(comm, master)
[docs]class WindowController:
"""
The ``WindowController`` manages the state of the MPI windows underlying the lock
functionality. Instances can be created using the :func:`.sync` factory function.
The controller can create read and write locks during which your MPI processes are
aware of each other's operations and a write lock will never be granted if other
read or write operations are ongoing, while read locks may be granted while other read
operations are ongoing, but not if any write locks are acquired or being requested.
"""
def __init__(self, comm=None, master=0):
if comm is None:
comm = MPI.COMM_WORLD
self._comm = comm
self._size = comm.Get_size()
self._rank = comm.Get_rank()
self._master = master
self._read_buffer = np.zeros(1, dtype=np.uint64)
self._write_buffer = np.zeros(1, dtype=np.uint64)
self._read_window = self._window(self._read_buffer)
self._write_window = self._window(self._write_buffer)
atexit.register(lambda: self.close())
self._closed = False
@property
def master(self):
"""
Return the MPI rank of the master process.
"""
return self._master
@property
def rank(self):
"""
Return the MPI rank of this process.
"""
return self._rank
@property
def closed(self):
"""
Is this ``WindowController`` in a closed state? If so, further locks can not be
requested.
"""
return self._closed
[docs] def close(self):
"""
Close the ``WindowController`` and its underlying MPI Windows.
"""
try:
self._read_window.Free()
self._write_window.Free()
except MPI.Exception:
pass
self._closed = True
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def _window(self, buffer):
if self._comm.Get_size() == 1:
return _WindowMock(buffer)
return MPI.Win.Create(buffer, True, MPI.INFO_NULL, self._comm)
[docs] def read(self):
"""
Acquire a read lock. Read locks can be granted while other read locks are held,
but will not start as long as write locks are held or being requested (write
operations have priority over read operations).
The preferred idiom for read locks is as follows:
.. code-block:: python
controller = sync()
with controller.read():
# Perform reading operation
pass
:return: A read lock
"""
return _ReadLock(
self._read_buffer, self._write_buffer, self._write_window, self._master
)
[docs] def write(self):
"""
Acquire a write lock. Will wait for all active read locks to be released and
prevent any new read locks from being aqcuired.
The preferred idiom for write locks is as follows:
.. code-block:: python
controller = sync()
with controller.write():
# Perform writing operation
pass
Keep in mind that if you run this code on multiple processes at the same time that
they will write one by one, but they will still all write eventually. If only one
of the nodes needs to perform the writing operation see :meth:`~.WindowController.single_write`
:return: An unfenced write lock
"""
return _WriteLock(
self._read_buffer,
self._read_window,
self._size,
self._write_buffer,
self._write_window,
self._master,
)
[docs] def single_write(self, handle=None, rank=None):
"""
Perform a collective operation where only 1 node writes to the resource and the
other processes wait for this operation to complete.
Python does not support any long jump patterns so the preferred idiom for
collective write locks is the fencing pattern:
.. code-block:: python
controller = sync()
with controller.single_write() as fence:
# Kick out any processes that don't have to write
fence.guard()
# Perform writing operation on just 1 process
pass
# All kicked out processes resume code together outside of the with block.
:return: A fenced write lock.
"""
if rank is None:
rank = self._master
fence = Fence(rank, self._rank == rank, self._comm)
if self._rank == rank:
return _WriteLock(
self._read_buffer,
self._read_window,
self._size,
self._write_buffer,
self._write_window,
self._master,
fence=fence,
handle=handle,
)
elif handle:
return _NoHandle(self._comm)
else:
return fence
class _WindowMock:
def __init__(self, buffer):
self._buffer = buffer
def noop(*args, **kwargs):
pass
noops = ["Free", "Get", "Lock", "Lock_all", "Unlock", "Unlock_all"]
for n in noops:
setattr(self, n, noop)
class _ReadLock:
def __init__(self, read_buffer, write_buffer, write_window, root):
self._read_buffer = read_buffer
self._write_window = write_window
self._write_buffer = write_buffer
self._root = root
def __enter__(self):
if self.locked():
self._nested_read_lock()
else:
self._read_lock()
def locked(self):
return self._read_buffer[0] != 0 or self._write_buffer[0] != 0
def _read_lock(self):
# Wait for the write lock to be available before starting your read operation
self._write_window.Lock(self._root)
self._read_buffer[0] = 1
self._write_window.Unlock(self._root)
def _nested_read_lock(self):
# Wait for the write lock to be available before starting your read operation
self._read_buffer[0] += 1
def __exit__(self, exc_type, exc_value, traceback):
# Stop read operation any time
self._read_buffer[0] -= 1
class _WriteLock:
def __init__(
self,
read_buffer,
read_window,
size,
write_buffer,
write_window,
root,
fence=None,
handle=None,
):
self._read_buffer = read_buffer
self._read_window = read_window
self._size = size
self._write_buffer = write_buffer
self._write_window = write_window
self._root = root
self._fence = fence
self._handle = handle
def locked(self):
return self._write_buffer[0] != 0
def __enter__(self):
if self.locked():
return self._nested_write_lock()
else:
return self._acquire_lock()
def _acquire_lock(self):
# We unset our read flag as we're waiting for the write lock and won't
# be reading as we wait. Nested deadlocks otherwise occur.
reading = self._read_buffer[0]
self._read_buffer[0] = 0
self._write_window.Lock(0)
self._read_window.Lock_all()
all_read = [np.zeros(1, dtype=np.uint64) for _ in range(self._size)]
while True:
for i in range(self._size):
self._read_window.Get([all_read[i], MPI.BOOL], i)
if sum(all_read)[0] == 0:
break
self._read_buffer[0] = reading
self._write_buffer[0] = 1
self._read_window.Unlock_all()
if self._handle is not None:
return self._handle
elif self._fence is not None:
return self._fence
def _nested_write_lock(self):
self._write_buffer[0] += 1
if self._handle is not None: # pragma: nocover
return self._handle
elif self._fence is not None: # pragma: nocover
return self._fence
def __exit__(self, exc_type, exc_value, traceback):
self._write_buffer[0] -= 1
if exc_type is not None: # pragma: nocover
warnings.warn(
"Exception during write lock. Deadlock might occur if you use `.collect`."
)
if not self.locked():
self._write_window.Unlock(0)
if self._fence is not None:
self._fence._comm.Barrier()
sys.stderr.flush()
[docs]class Fence:
"""
Can be used to fence off pieces of code from processes that shouldn't access it.
Additionally it can be used to share a resource to all processes that was created
within the fenced off code block using :meth:`.Fence.share` and
:meth:`.Fence.collect`.
"""
def __init__(self, master, access, comm):
"""
Create a fence to guard code blocks from certain MPI processes within a try or
with statement.
:param master: MPI rank that controls the fence and any resource sharing.
:type master: int
:param access: May this MPI process enter the fenced off block?
:type access: bool
:param comm: MPI communicator for collective resource sharing.
"""
self._master = master
self._access = access
self._comm = comm
self._obj = None
[docs] def guard(self):
"""
Kicks out all MPI processes that do not have access to the fenced off code block.
Works only within a ``with`` statement or a ``try`` statement that catches
:class:`.FencedSignal` exceptions.
"""
if not self._access:
raise FencedSignal()
[docs] def share(self, obj):
"""
Put an object to share with all other MPI processes from within a fenced off code
block.
"""
self._obj = obj
[docs] def collect(self):
"""
Collect the object that was put to share within the fenced off code block.
:return: Shared object
:rtype: any
"""
return self._comm.bcast(self._obj, root=self._master)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self._comm.Barrier()
if exc_type is FencedSignal:
return True
class _NoHandle:
def __init__(self, comm):
self._comm = comm
def __enter__(self):
return None
def __exit__(self, exc_type, exc_value, traceback):
self._comm.Barrier()
[docs]class FencedSignal(Exception):
pass
__all__ = ["sync", "WindowController", "Fence", "FencedSignal"]