Source code for SDF.force_sdf.mfp_sdf_to_force_sdf

import logging
from typing import Dict, List, Tuple, Optional

import numpy as np

from SDF.data_model import ArrayData1D, AnonymousParameterSet, ArrayDataset1D, Instrument, Parameter, Workspace
from SDF.force_sdf import ForceSDF, REQUIRED_SEGMENT_DATA_CHANNELS

logger = logging.getLogger(__name__)

MFP_CHANNEL_UNIT_MAPPING: Dict[str, Tuple[str, Optional[str]]] = {
    'Raw': ('height', "m"),
    'Defl': ('vDeflection', "m"),
    'Amp': ('Amp', None),
    'Phase': ('Phase', None),
    'Time': ('Time', "s"),
    'LVDT': ('measuredHeight', "m"),
    'ZSnsr': ("measuredHeight", "m")  # Mail Ingo 2020-04-07: ZSnsr is equivalent to LVDT, should be mutually exclusive
}

MFP_OPTIONAL_PARAMS = [
    "ExtendZ",  # software setting
    "RetractZ",  # software setting
    "StartDist",  # software setting
    "ForceDist",  # software setting
    "ForceDistSign",  # order of extend/retract (1: normal)
    "VelocitySynch",  # 1: same velocity for extend/retract
    "Velocity",
    "DwellSetting",
    "NumPtsPerSec",
    "ForceSpotNumber",
    "KappaFactor",
    "VirtDeflSlope",
    "VirtDeflOffset",
    "UsingVirtDefl",
    "UseVirtDeflOffset"
    "IndentMode",
    "AmpInvOLS",
    "Amp2InvOLS",
    "Direction",
]


[docs]def mfp_sdf_to_force_sdf(sdf: Workspace) -> ForceSDF: original_parameters = sdf.instruments["original-parameters"] parameters = extract_parameters(original_parameters["mfp-parameters"]) segment_indices = extract_segment_indices(original_parameters) n_segments = len(segment_indices) - 1 directions = extract_directions(original_parameters) if len(directions) != n_segments: raise ValueError(f"Found {len(directions)} directions but indices for {n_segments} segments") data_dict: Dict[str, Tuple[np.ndarray, Optional[str]]] = dict() for dataset in sdf.workspaces["Data"].datasets: if not isinstance(dataset, ArrayDataset1D): raise TypeError("Given dataset is no ArrayDataset") if dataset.name in MFP_CHANNEL_UNIT_MAPPING.keys(): name, unit = MFP_CHANNEL_UNIT_MAPPING[dataset.name] data_dict[name] = dataset.data, unit workspace = Workspace(f"ForceSDF {sdf.name}", instruments=(parameters,)) for i in range(n_segments): segment_start = segment_indices[i] segment_end = segment_indices[i+1] segment_direction = directions[i] segment_duration = extract_segment_duration(data_dict, original_parameters, segment_start, segment_end) segment_parameters = Instrument("segment-parameters") segment_parameters.add(Parameter("duration", segment_duration, "s" if np.isfinite(segment_duration) else None)) segment_parameters.add(Parameter("direction", segment_direction)) segment_workspace = Workspace(f"segment {i}", instruments=(segment_parameters,)) for dataset in extract_segment_datasets(data_dict, segment_start, segment_end): segment_workspace.datasets.add(dataset) workspace.workspaces.add(segment_workspace) return ForceSDF(workspace)
[docs]def extract_segment_datasets(data_dict: Dict[str, Tuple[np.ndarray, Optional[str]]], segment_start: int, segment_end: int) -> List[ArrayDataset1D]: datasets = [] for channel_name, (data, unit) in data_dict.items(): data = ArrayData1D(data[segment_start:segment_end], try_hex_transformation=True) datasets.append(ArrayDataset1D(name=channel_name, data=data, unit=unit)) return datasets
[docs]def extract_segment_duration(data_dict: Dict[str, Tuple[np.ndarray, Optional[str]]], original_parameters: AnonymousParameterSet, segment_start: int, segment_end: int) -> float: if "Time" in data_dict: # Ingo 2020-04-07: Time array is most reliable time = data_dict["Time"][0] segment_end = segment_end if segment_end < len(time) else -1 # end can be out of range (indices can be rounded) return time[segment_end] - time[segment_start] if "NumPtsPerSec" in original_parameters["mfp-parameters"]: num_pts = segment_end - segment_start num_pts_per_sec = int(original_parameters["mfp-parameters"]["NumPtsPerSec"].value) return num_pts / num_pts_per_sec logger.warning("Found no way to compute segment duration") return np.nan
[docs]def extract_segment_indices(original_parameters: AnonymousParameterSet) -> np.ndarray: indices = np.array([int(float(i)) for i in original_parameters["mfp-parameters"]["Indexes"].value.split(",") if i]) indices[1:] += 1 return indices
[docs]def extract_directions(original_parameters: AnonymousParameterSet) -> List[str]: # Ingo 2020-03-31: assumption: discard first element, -1,0,1 always mean the same direction_mapping = { "-1": "z-retract-force", "0": "constant-height-pause", "1": "z-extend-force", } direction_str = original_parameters["mfp-parameters"]["Direction"].value raw_directions = [direction for direction in direction_str.split(",") if direction] raw_directions = raw_directions[1:] return [direction_mapping[direction] for direction in raw_directions]
[docs]def extract_parameters(mfp_parameters: AnonymousParameterSet) -> Instrument: parameters = Instrument("parameters") sensitivity = float(mfp_parameters["InvOLS"].value) spring_constant = float(mfp_parameters["SpringConstant"].value) if sensitivity == 1e-7 and spring_constant == 1: logger.warning("Spring constant and sensitivity are fallback values") parameters.add(Parameter("sensitivity", sensitivity)) parameters.add(Parameter("spring_constant", spring_constant)) else: parameters.add(Parameter("sensitivity", sensitivity, "m/V")) parameters.add(Parameter("spring_constant", spring_constant, "N/m")) for par_name in MFP_OPTIONAL_PARAMS: if par_name in mfp_parameters: parameters.add(Parameter(f"mfp-{par_name}", mfp_parameters[par_name].value)) return parameters
[docs]def is_mfp_force_curve(sdf: Workspace) -> bool: """ Translate mfp-specific channel names to canonical ForceSDF channel named and check if all required channels are present """ data_workspace = sdf.workspaces["Data"] out_channels = set() for in_channel in data_workspace.datasets.keys(): try: out_channel, out_unit = MFP_CHANNEL_UNIT_MAPPING[in_channel] if out_channel in out_channels: raise ValueError(f"Found multiple channels mapping to {out_channel}") out_channels.add(out_channel) except KeyError: pass return REQUIRED_SEGMENT_DATA_CHANNELS.issubset(out_channels)