Source code for SDF.force_sdf.mfp_sdf_to_force_sdf
import logging
from typing import Dict, List, Tuple, Optional
import numpy as np
from SDF.data_model import ArrayData1D, AnonymousParameterSet, ArrayDataset1D, Instrument, Parameter, Workspace
from SDF.force_sdf import ForceSDF, REQUIRED_SEGMENT_DATA_CHANNELS
logger = logging.getLogger(__name__)
MFP_CHANNEL_UNIT_MAPPING: Dict[str, Tuple[str, Optional[str]]] = {
'Raw': ('height', "m"),
'Defl': ('vDeflection', "m"),
'Amp': ('Amp', None),
'Phase': ('Phase', None),
'Time': ('Time', "s"),
'LVDT': ('measuredHeight', "m"),
'ZSnsr': ("measuredHeight", "m") # Mail Ingo 2020-04-07: ZSnsr is equivalent to LVDT, should be mutually exclusive
}
MFP_OPTIONAL_PARAMS = [
"ExtendZ", # software setting
"RetractZ", # software setting
"StartDist", # software setting
"ForceDist", # software setting
"ForceDistSign", # order of extend/retract (1: normal)
"VelocitySynch", # 1: same velocity for extend/retract
"Velocity",
"DwellSetting",
"NumPtsPerSec",
"ForceSpotNumber",
"KappaFactor",
"VirtDeflSlope",
"VirtDeflOffset",
"UsingVirtDefl",
"UseVirtDeflOffset"
"IndentMode",
"AmpInvOLS",
"Amp2InvOLS",
"Direction",
]
[docs]def mfp_sdf_to_force_sdf(sdf: Workspace) -> ForceSDF:
original_parameters = sdf.instruments["original-parameters"]
parameters = extract_parameters(original_parameters["mfp-parameters"])
segment_indices = extract_segment_indices(original_parameters)
n_segments = len(segment_indices) - 1
directions = extract_directions(original_parameters)
if len(directions) != n_segments:
raise ValueError(f"Found {len(directions)} directions but indices for {n_segments} segments")
data_dict: Dict[str, Tuple[np.ndarray, Optional[str]]] = dict()
for dataset in sdf.workspaces["Data"].datasets:
if not isinstance(dataset, ArrayDataset1D):
raise TypeError("Given dataset is no ArrayDataset")
if dataset.name in MFP_CHANNEL_UNIT_MAPPING.keys():
name, unit = MFP_CHANNEL_UNIT_MAPPING[dataset.name]
data_dict[name] = dataset.data, unit
workspace = Workspace(f"ForceSDF {sdf.name}", instruments=(parameters,))
for i in range(n_segments):
segment_start = segment_indices[i]
segment_end = segment_indices[i+1]
segment_direction = directions[i]
segment_duration = extract_segment_duration(data_dict, original_parameters, segment_start, segment_end)
segment_parameters = Instrument("segment-parameters")
segment_parameters.add(Parameter("duration", segment_duration, "s" if np.isfinite(segment_duration) else None))
segment_parameters.add(Parameter("direction", segment_direction))
segment_workspace = Workspace(f"segment {i}", instruments=(segment_parameters,))
for dataset in extract_segment_datasets(data_dict, segment_start, segment_end):
segment_workspace.datasets.add(dataset)
workspace.workspaces.add(segment_workspace)
return ForceSDF(workspace)
[docs]def extract_segment_datasets(data_dict: Dict[str, Tuple[np.ndarray, Optional[str]]], segment_start: int,
segment_end: int) -> List[ArrayDataset1D]:
datasets = []
for channel_name, (data, unit) in data_dict.items():
data = ArrayData1D(data[segment_start:segment_end], try_hex_transformation=True)
datasets.append(ArrayDataset1D(name=channel_name, data=data, unit=unit))
return datasets
[docs]def extract_segment_duration(data_dict: Dict[str, Tuple[np.ndarray, Optional[str]]],
original_parameters: AnonymousParameterSet, segment_start: int,
segment_end: int) -> float:
if "Time" in data_dict: # Ingo 2020-04-07: Time array is most reliable
time = data_dict["Time"][0]
segment_end = segment_end if segment_end < len(time) else -1 # end can be out of range (indices can be rounded)
return time[segment_end] - time[segment_start]
if "NumPtsPerSec" in original_parameters["mfp-parameters"]:
num_pts = segment_end - segment_start
num_pts_per_sec = int(original_parameters["mfp-parameters"]["NumPtsPerSec"].value)
return num_pts / num_pts_per_sec
logger.warning("Found no way to compute segment duration")
return np.nan
[docs]def extract_segment_indices(original_parameters: AnonymousParameterSet) -> np.ndarray:
indices = np.array([int(float(i)) for i in original_parameters["mfp-parameters"]["Indexes"].value.split(",") if i])
indices[1:] += 1
return indices
[docs]def extract_directions(original_parameters: AnonymousParameterSet) -> List[str]:
# Ingo 2020-03-31: assumption: discard first element, -1,0,1 always mean the same
direction_mapping = {
"-1": "z-retract-force",
"0": "constant-height-pause",
"1": "z-extend-force",
}
direction_str = original_parameters["mfp-parameters"]["Direction"].value
raw_directions = [direction for direction in direction_str.split(",") if direction]
raw_directions = raw_directions[1:]
return [direction_mapping[direction] for direction in raw_directions]
[docs]def extract_parameters(mfp_parameters: AnonymousParameterSet) -> Instrument:
parameters = Instrument("parameters")
sensitivity = float(mfp_parameters["InvOLS"].value)
spring_constant = float(mfp_parameters["SpringConstant"].value)
if sensitivity == 1e-7 and spring_constant == 1:
logger.warning("Spring constant and sensitivity are fallback values")
parameters.add(Parameter("sensitivity", sensitivity))
parameters.add(Parameter("spring_constant", spring_constant))
else:
parameters.add(Parameter("sensitivity", sensitivity, "m/V"))
parameters.add(Parameter("spring_constant", spring_constant, "N/m"))
for par_name in MFP_OPTIONAL_PARAMS:
if par_name in mfp_parameters:
parameters.add(Parameter(f"mfp-{par_name}", mfp_parameters[par_name].value))
return parameters
[docs]def is_mfp_force_curve(sdf: Workspace) -> bool:
"""
Translate mfp-specific channel names to canonical ForceSDF channel named and check
if all required channels are present
"""
data_workspace = sdf.workspaces["Data"]
out_channels = set()
for in_channel in data_workspace.datasets.keys():
try:
out_channel, out_unit = MFP_CHANNEL_UNIT_MAPPING[in_channel]
if out_channel in out_channels:
raise ValueError(f"Found multiple channels mapping to {out_channel}")
out_channels.add(out_channel)
except KeyError:
pass
return REQUIRED_SEGMENT_DATA_CHANNELS.issubset(out_channels)