Source code for qpformat.file_formats.series_base

import abc
import copy
import functools
import io
import pathlib
import warnings

import numpy as np
import qpimage

from .util import hash_obj


[docs] class SeriesData(object): """Series data file format base class """ __meta__ = abc.ABCMeta is_series = True priority = 0 # decrease to get higher priority def __init__(self, path, meta_data=None, holo_kw=None, qpretrieve_kw=None, as_type="float32"): """ Parameters ---------- path: str or pathlib.Path Path to the experimental data file. meta_data: dict Dictionary containing meta data. see :py:class:`qpimage.META_KEYS`. holo_kw: dict Deprecated, please use `qpretrieve_kw` instead! qpretrieve_kw: dict Keyword arguments passed to :ref:`qpretrieve <qpretrieve:index>` for phase retrieval from interferometric data. as_type: str Defines the data type that the input data is casted to. The default is "float32" which saves memory. If high numerical accuracy is required (does not apply for a simple 2D phase analysis), set this to double precision ("float64"). """ if qpretrieve_kw is None: qpretrieve_kw = {} if holo_kw is not None: warnings.warn( "`holo_kw` is deprecated! Please use `qpretrieve_kw` instead", DeprecationWarning) # map deprecated parameters to `qpretrieve_kw` for key in holo_kw: if key == "sideband": if holo_kw[key] in [-1, 1]: qpretrieve_kw["invert_phase"] = holo_kw[key] == -1 else: qpretrieve_kw["sideband_freq"] = tuple(holo_kw[key]) qpretrieve_kw["invert_phase"] = False elif key == "zero_pad": qpretrieve_kw["padding"] = holo_kw["zero_pad"] else: qpretrieve_kw[key] = holo_kw[key] if meta_data is None: meta_data = {} #: Enforced dtype via keyword arguments self.as_type = as_type if isinstance(path, io.IOBase): # io.IOBase self.path = path else: #: pathlib.Path to data file or io.IOBase self.path = pathlib.Path(path).resolve() # check for valid metadata keys for key in meta_data: if key not in qpimage.meta.META_KEYS: msg = "Invalid metadata key `{}`!".format(key) \ + "Valid keys: {}".format(sorted(qpimage.meta.META_KEYS)) raise ValueError(msg) #: Enforced metadata via keyword arguments self.meta_data = copy.copy(meta_data) #: Keyword arguments for interferometric phase retrieval self.qpretrieve_kw = qpretrieve_kw self._bgdata = [] #: Unique string that identifies the background data that #: was set using `set_bg`. self.background_identifier = None #: the file format name self.format = self.__class__.__name__ def __repr__(self): rep = f"<qpformat {self.format} '{self.path}'" \ + f", {len(self)} image" \ + ("s " if len(self) > 1 else " ") \ + f"at {hex(id(self))}>" meta = [] if "wavelength" in self.meta_data: wl = self.meta_data["wavelength"] if 2000e-9 > wl > 10e-9: # convenience for light microscopy meta.append(f"λ={wl * 1e9:.1f}nm") else: meta.append(f"λ={wl:.2e}m") if "pixel size" in self.meta_data: pxm = self.meta_data["pixel size"] if 1e-3 > pxm > 1e-8: # convenience for light microscopy meta.append(f"1px={pxm * 1e6}µm") else: meta.append(f"1px={pxm}m") rep = ", ".join([rep] + meta) return rep @abc.abstractmethod def __len__(self): """Return number of samples of a data set""" def _compute_bgid(self, bg=None): """Return a unique identifier for the background data""" if bg is None: bg = self._bgdata if isinstance(bg, qpimage.QPImage): # Single QPImage if "identifier" in bg: return bg["identifier"] else: data = [bg.amp, bg.pha] for key in sorted(list(bg.meta.keys())): val = bg.meta[key] data.append("{}={}".format(key, val)) return hash_obj(data) elif (isinstance(bg, list) and isinstance(bg[0], qpimage.QPImage)): # List of QPImage data = [] for bgii in bg: data.append(self._compute_bgid(bgii)) return hash_obj(data) elif (isinstance(bg, SeriesData) and (len(bg) == 1 or len(bg) == len(self))): # DataSet return bg.identifier else: raise ValueError("Unknown background data type: {}".format(bg)) @functools.lru_cache(maxsize=32) def _identifier_data(self): data = [] # data if isinstance(self.path, io.IOBase): self.path.seek(0) data.append(self.path.read(50 * 1024)) else: with self.path.open("rb") as fd: data.append(fd.read(50 * 1024)) data.append(self.path.stat().st_size) data += self._identifier_meta() return hash_obj(data) @functools.lru_cache(maxsize=32) def _identifier_meta(self): data = [] # meta data for key in sorted(list(self.meta_data.keys())): value = self.meta_data[key] data.append("{}={}".format(key, value)) # qpretrieve keywords for key in sorted(list(self.qpretrieve_kw.keys())): data.append(f"{key}={self.qpretrieve_kw[key]}") return hash_obj(data) @property def identifier(self): """Return a unique identifier for the given data set""" if self.background_identifier is None: idsum = self._identifier_data() else: idsum = hash_obj([self._identifier_data(), self.background_identifier]) return idsum @property @functools.lru_cache() def shape(self): """Return dataset shape (lenght, image0, image1). This should be overridden by the subclass, because by default the first qpimage is used for that. """ qpi0 = self.get_qpimage_raw(0) return len(self), qpi0.shape[0], qpi0.shape[1]
[docs] def get_identifier(self, idx): """Return an identifier for the data at index `idx` .. versionchanged:: 0.4.2 indexing starts at 1 instead of 0 """ return "{}:{}".format(self.identifier, idx + 1)
[docs] def get_metadata(self, idx): """Return the metadata of data at index `idx` Subclasses should implement this with speed in mind. """ meta_data = copy.deepcopy(self.meta_data) meta_data["identifier"] = self.get_identifier(idx) return meta_data
[docs] def get_name(self, idx): """Return name of data at index `idx` .. versionchanged:: 0.4.2 indexing starts at 1 instead of 0 """ return "{}:{}".format(self.path, idx + 1)
[docs] def get_qpimage(self, idx): """Return background-corrected QPImage of data at index `idx`""" # raw data qpi = self.get_qpimage_raw(idx) if "identifier" not in qpi: msg = "`get_qpimage_raw` does not set 'identifier' " \ + "in class '{}'!".format(self.__class__) raise KeyError(msg) # bg data if self._bgdata: if len(self._bgdata) == 1: # One background for all bgidx = 0 else: bgidx = idx if isinstance(self._bgdata, SeriesData): # `get_qpimage` does take `idx` bg = self._bgdata.get_qpimage_raw(bgidx) else: # `self._bgdata` is a QPImage bg = self._bgdata[bgidx] qpi.set_bg_data(bg_data=bg) return qpi
[docs] @abc.abstractmethod def get_qpimage_raw(self, idx): """Return QPImage without background correction Note that this method must always return a QPImage instance with the "identifier" metadata key set! """
def get_time(self, idx): warnings.warn("`get_time` is deprecated, use " "`get_metadata().get('time', np.nan)` instead!", DeprecationWarning) return self.get_metadata(idx).get('time', np.nan)
[docs] def saveh5(self, h5file, qpi_slice=None, series_slice=None, time_interval=None, count=None, max_count=None): """Save the data set as an HDF5 file (qpimage.QPSeries format) Parameters ---------- h5file: str, pathlib.Path, or h5py.Group Where to store the series data qpi_slice: tuple of (slice, slice) If not None, only store a slice of each QPImage in `h5file`. A value of None is equivalent to ``(slice(0, -1), slice(0, -1))``. series_slice: slice If None, save the entire series, otherwise only save the images specified by this slice. time_interval: tuple of (float, float) If not None, only stores QPImages that were recorded within the given time interval. count, max_count: multiprocessing.Value Can be used to monitor the progress of the algorithm. Initially, the value of `max_count.value` is incremented by the total number of steps. At each step, the value of `count.value` is incremented. Notes ----- The series "identifier" meta data is only set when all of `qpi_slice`, `series_slice`, and `time_interval` are None. """ # set up slice to export if series_slice is None: sl = range(len(self)) else: sl = range(series_slice.start, series_slice.stop) # set up time interval if time_interval is None: ta = -np.inf tb = np.inf else: ta, tb = time_interval # set max_count according to slice if max_count is not None: max_count.value += len(sl) qpskw = {"h5file": h5file, "h5mode": "w", } if (qpi_slice is None and series_slice is None and time_interval is None): # Only add series identifier if series complete. # (We assume that if any of the above kwargs is set, # the series data is somehow modified) qpskw["identifier"] = self.identifier with qpimage.QPSeries(**qpskw) as qps: increment = 0 for ii in sl: ti = self.get_metadata(ii).get("time", np.nan) if ti < ta or ti > tb: # Not part of the series pass else: increment += 1 if increment == 1 or len(self._bgdata) != 1: # initial image or series data where each image # has a unique background image qpi = self.get_qpimage(ii) if qpi_slice is not None: qpi = qpi[qpi_slice] qps.add_qpimage(qpi) else: # hard-link the background data qpiraw = self.get_qpimage_raw(ii) if qpi_slice is not None: qpiraw = qpiraw[qpi_slice] qps.add_qpimage(qpiraw, bg_from_idx=0) if count is not None: count.value += 1
[docs] def set_bg(self, dataset): """Set background data Parameters ---------- dataset: `DataSet`, `qpimage.QPImage`, or int If the ``len(dataset)`` matches ``len(self)``, then background correction is performed element-wise. Otherwise, ``len(dataset)`` must be one and is used for all data of ``self``. See Also -------- get_qpimage: obtain the background corrected QPImage """ if isinstance(dataset, qpimage.QPImage): # Single QPImage self._bgdata = [dataset] elif (isinstance(dataset, list) and len(dataset) == len(self) and isinstance(dataset[0], qpimage.QPImage)): # List of QPImage self._bgdata = dataset elif (isinstance(dataset, SeriesData) and (len(dataset) == 1 or len(dataset) == len(self))): # DataSet self._bgdata = dataset else: raise ValueError("Bad length or type for bg: {}".format(dataset)) self.background_identifier = self._compute_bgid()
[docs] @staticmethod @abc.abstractmethod def verify(path): """Verify that `path` has this file format Returns `True` if the file format matches. The implementation of this method should be fast and memory efficient, because e.g. the "GroupFolder" file format depends on it. """