Source code for SDF.data_model.array_data_1d

import logging
import numbers
from typing import Tuple
from xml.etree.ElementTree import Element

import numpy as np

from SDF.data_model._helper_functions import pop_element_attribute, element_is_empty, pop_element_text
from SDF.data_model.abstract import Data

logger = logging.getLogger(__name__)


[docs]class ArrayData1D(Data): """Wraps a 1-dimensional Numpy ndarray, represents SDF <data> tags of dataset type 'sc'""" def __init__(self, data: np.ndarray, try_hex_transformation: bool = False): if not isinstance(data, np.ndarray): raise ValueError(f"Expected a numpy ndarray, got {type(data)}") if data.ndim != 1: raise ValueError(f"Expected 1D array, got {data.ndim} dimensions (shape: {data.shape})") self.__array = data self.__offset = None self.__multiplier = None if try_hex_transformation: self.__try_linear_transformation() @property def data(self) -> np.ndarray: """The wrapped array""" if self._is_linearly_transformed: return self.__array.astype(np.float64) * self.__multiplier + self.__offset else: return self.__array @property def type_for_xml(self) -> str: return "sc" def __try_linear_transformation(self) -> None: try: int_array, multiplier, offset = _find_linear_transformation(self.__array) self.__array = int_array self.__multiplier = multiplier self.__offset = offset except ValueError: pass @property def _is_linearly_transformed(self) -> bool: return self.__offset is not None and self.__multiplier is not None
[docs] def to_xml_element(self) -> Element: element = Element("data") # shape element.attrib["rows"] = str(len(self.data)) element.attrib["cols"] = "1" # data to 1D string iterable if self._is_linearly_transformed: element.attrib["type"] = "hex" element.attrib["multiplier"] = str(self.__multiplier) element.attrib["offset"] = str(self.__offset) text_iter = map(str.upper, (f"{i:x}" for i in self.__array.flatten())) elif np.issubdtype(self.data.dtype, np.integer): element.attrib["type"] = "int" text_iter = map(str, self.data.flatten()) elif np.issubdtype(self.data.dtype, np.floating): element.attrib["type"] = "float" text_iter = map(str, self.data.flatten()) else: raise ValueError(f"Cannot handle data of dtype {self.data.dtype}") # format data to XML text element.text = " ".join(text_iter) return element
[docs] @classmethod def from_xml_element(cls, element: Element) -> "ArrayData1D": if not element.tag == "data": raise ValueError(f"Expected a <data> element, got {element.tag}") # shape cols = int(pop_element_attribute(element, "cols")) rows = int(pop_element_attribute(element, "rows")) if 1 not in (cols, rows): raise ValueError("Can only read single-column values") rows = max(cols, rows) # data xml_dtype = pop_element_attribute(element, "type") text = pop_element_text(element) if xml_dtype == "hex": multiplier = float(pop_element_attribute(element, "multiplier")) offset = float(pop_element_attribute(element, "offset")) data = np.fromiter((int(s, base=16) for s in text.split()), dtype=int) ret = cls.from_linearly_transformed_array(data, multiplier, offset) elif xml_dtype == "int": data = np.fromiter(map(int, text.split()), dtype=int) ret = cls(data) elif xml_dtype == "float": data = np.fromiter(map(float, text.split()), dtype=float) ret = cls(data) else: raise ValueError(f"Expected attribute 'type' to be 'int', 'float' or 'hex', got '{xml_dtype}'") # handle multiplier and offset for non-hex data (quirk in old sdf implementation) if "multiplier" in element.attrib and "offset" in element.attrib: multiplier = float(pop_element_attribute(element, "multiplier")) offset = float(pop_element_attribute(element, "offset")) if np.allclose(ret.data, np.round(ret.data).astype(int)): # check for integers (also string like 123.0) data = np.round(ret.data).astype(int) # convert to integers # transform to non-negative integers with proper offset data_min = np.min(data) data -= data_min offset += data_min*multiplier ret = cls.from_linearly_transformed_array(data, multiplier, offset) else: raise ValueError("Found float array with multiplier and offset") if not element_is_empty(element): raise ValueError("Element is not empty") if len(ret.data) != rows: raise ValueError(f"Expected {rows} values, got {len(ret.data)}") return ret
[docs] @staticmethod def from_linearly_transformed_array(array: np.ndarray, multiplier: float, offset: float) -> "ArrayData1D": """Instantiate from a non-negative integer array with multiplier and offset""" if not isinstance(array, np.ndarray): raise ValueError(f"Expected a np.ndarray, got {type(array)}") if not np.issubdtype(array.dtype, np.integer): raise ValueError(f"Expected integer values, got {array.dtype}") if np.min(array) < 0: raise ValueError("Expected an integer array with only non-negative values") if not isinstance(multiplier, numbers.Real): raise ValueError(f"Expected multiplier to be a number, got {type(multiplier)}") if not isinstance(offset, numbers.Real): raise ValueError(f"Expected offset to be a number, got {type(multiplier)}") data = ArrayData1D(array) data.__multiplier = multiplier data.__offset = offset return data
def __repr__(self): return f"{self.__class__.__name__}({self.data!r})" def __eq__(self, other): if isinstance(other, ArrayData1D): return (self.data.shape == other.data.shape and np.allclose(self.data, other.data, equal_nan=True)) return False
def _find_linear_transformation(x: np.ndarray, rtol: float = 1e-5, max_tries: int = 1000 ) -> Tuple[np.ndarray, float, float]: """ Try to convert 1D array to a linear transformation of positive integers. This requires finding a multiplier that transforms all values in x to integers. The first tested multiplier is the smallest difference between two numbers in x, then this number is divided by increasing integers (2 to max_tries), until a solution is found. :param x: Numeric 1D array :param rtol: Relative tolerance for the approximation :param max_tries: Max. divisions of min. distance in x that is checked as potential multiplier :returns: y, mult, offs, i.e. y * mult + offs is close to x with a tolerance < rtol """ def check_results(int_array: np.ndarray, multiplier: float, offset: float) -> bool: if not np.issubdtype(int_array.dtype, np.integer): raise ValueError(f"Transformation led to dtype {int_array.dtype}") if not np.allclose(x, int_array * multiplier + offset, rtol=rtol): raise ValueError("Transformation did not deliver good results") return True logger.info(f"Trying to find linear transformation for array with shape {x.shape} and dtype {x.dtype}") if x.ndim != 1: raise ValueError(f"Can only handle 1D arrays, got {x.ndim} dimensions") if len(x) == 0: raise ValueError("Array is empty (0 elements)") if not np.isfinite(x).all(): raise ValueError("Array contains non-finite values, cannot convert to integers") if np.issubdtype(x.dtype, np.integer): x_min = np.min(x) arr, m, o = x-x_min, 1, x_min if check_results(arr, m, o): return arr, m, o # remove duplicates (speedup, as duplicates don't help finding a transformation) x_unique = np.unique(x) if len(x_unique) == 1: arr, m, o = np.zeros(len(x), dtype=int), 1, x[0] if check_results(arr, m, o): return arr, m, o # find step dx = np.unique(np.diff(np.sort(x_unique))) dx_min = np.min(dx) for d in range(1, max_tries+1): step = dx_min / d dx_rel = dx / step if np.allclose(dx_rel, dx_rel, rtol=rtol): break else: raise ValueError("Failed to find a linear transformation based on integer values") # check if flipping the sign leads to less required hex digits pos_mult, pos_offs = step, np.min(x) pos_ints = np.round((x-pos_offs) / pos_mult).astype(int) neg_mult, neg_offs = -step, np.max(x) neg_ints = np.round((x-neg_offs) / neg_mult).astype(int) pos_hex_digits = np.ceil(np.log(pos_ints + 1) / np.log(16)).sum() neg_hex_digits = np.ceil(np.log(neg_ints + 1) / np.log(16)).sum() if pos_hex_digits > neg_hex_digits: ints, mult, offs = neg_ints, neg_mult, neg_offs else: ints, mult, offs = pos_ints, pos_mult, pos_offs if not check_results(ints, mult, offs): raise ValueError("Conversion failed") return ints, mult, offs