Source code for SDF.force_sdf.jpk_sdf_to_force_sdf
import logging
from typing import List, Optional, Tuple
import numpy as np
from SDF.data_model import ArrayData1D, ArrayDataset1D, Instrument, AnonymousParameterSet, Parameter, Workspace
from SDF.force_sdf.force_sdf import ForceSDF, REQUIRED_SEGMENT_DATA_CHANNELS
logger = logging.getLogger(__name__)
JPK_CHANNEL_MAPPING = {
'height': 'height',
'vDeflection': 'vDeflection',
'hDeflection': 'hDeflection',
'measuredHeight': 'measuredHeight',
'strainGaugeHeight': 'measuredHeight',
'headHeight': 'headHeight',
}
[docs]def jpk_sdf_to_force_sdf(sdf: Workspace) -> ForceSDF:
workspace = Workspace(f"ForceSDF {sdf.name}")
original_parameters = sdf.workspaces[0].instruments["original-parameters"]
workspace.instruments.add(Instrument("parameters", extract_global_parameters(original_parameters)))
for full_segment_ws in sdf.workspaces:
segment_str, segment_num_str = full_segment_ws.name.split()
if segment_str != "segment":
raise ValueError("Workspace in jpk force file is not a segment")
segment_num = int(segment_num_str)
datasets = extract_datasets(full_segment_ws)
segment_parameters = extract_segment_parameters(full_segment_ws.instruments["original-parameters"])
segment_workspace = Workspace(f"segment {segment_num}", datasets=datasets)
segment_workspace.instruments.add(Instrument("segment-parameters", segment_parameters))
workspace.workspaces.add(segment_workspace)
return ForceSDF(workspace)
[docs]def extract_datasets(segment_workspace: Workspace) -> List[ArrayDataset1D]:
logger.info(f"Extracting datasets of segment '{segment_workspace.name}'")
datasets = []
for dataset in segment_workspace.datasets:
if not isinstance(dataset, ArrayDataset1D):
raise TypeError(f"Found dataset that is no ArrayDataset: {dataset}")
if dataset.name not in JPK_CHANNEL_MAPPING:
continue
if dataset.name == "strainGaugeHeight" and "measuredHeight" in segment_workspace.datasets:
continue
if dataset.name == "capacitiveSensorHeight" and "measuredHeight" in segment_workspace.datasets:
continue
channel_parameters = segment_workspace.instruments["original-parameters"]["channel"][dataset.name]
data, unit = scale_dataset(dataset, channel_parameters)
new_dataset = ArrayDataset1D(JPK_CHANNEL_MAPPING[dataset.name], data=data, unit=unit)
datasets.append(new_dataset)
logger.info(f"Added dataset '{new_dataset.name}' with values "
f"[{new_dataset.data[0]} ... {new_dataset.data[-1]}], shape: {new_dataset.data.shape}, ")
missing_channels = REQUIRED_SEGMENT_DATA_CHANNELS.difference(map(lambda ds: ds.name, datasets))
if missing_channels:
raise ValueError(f"Segment '{segment_workspace.name}' does not provide data channel(s) '{missing_channels}'")
return datasets
[docs]def scale_dataset(segment_channel_dataset: ArrayDataset1D, channel_parameters: AnonymousParameterSet
) -> Tuple[ArrayData1D, Optional[str]]:
logger.info(f"Scaling dataset '{segment_channel_dataset.name}'")
logger.debug(f"Initial values: [{segment_channel_dataset.data[0]} ... {segment_channel_dataset.data[-1]}]")
# determine initial scaling
if "encoder" in channel_parameters:
scaling_parameters = channel_parameters["encoder"]["scaling"]
elif "data" in channel_parameters and "encoder" in channel_parameters["data"]:
scaling_parameters = channel_parameters["data"]["encoder"]["scaling"]
else:
scaling_parameters = None
if scaling_parameters is not None:
multiplier = float(scaling_parameters["multiplier"].value)
offset = float(scaling_parameters["offset"].value)
unit = scaling_parameters["unit"]["unit"].value
logger.debug(f"Initial scaling: multiplier={multiplier}, offset={offset}, unit='{unit}'")
else:
multiplier, offset, unit = 1, 0, None
logger.debug(f"No initial scaling")
# apply conversions
conversion_set = channel_parameters["conversion-set"]
for conversion in determine_conversions(conversion_set):
conversion_parameters = conversion_set["conversion"][conversion]["scaling"]
m = float(conversion_parameters["multiplier"].value)
b = float(conversion_parameters["offset"].value)
unit = conversion_parameters["unit"]["unit"].value
logger.debug(f"Conversion '{conversion}': multiplier={m}, offset={b}, unit='{unit}'")
multiplier *= m
offset = m * offset + b
if unit == "V":
unit = None
return ArrayData1D(segment_channel_dataset.data * multiplier + offset, try_hex_transformation=True), unit
[docs]def determine_conversions(conversion_set: AnonymousParameterSet) -> List[str]:
logger.info("Determining conversions")
base_conversion = conversion_set["conversions"]["base"].value
logger.debug(f"Base conversion is '{base_conversion}'")
target_conversion = conversion_set["conversions"]["default"].value
if target_conversion == base_conversion:
return []
conversions = [target_conversion]
logger.debug(f"Target conversion is '{target_conversion}'")
while True:
previous_conversion = conversion_set["conversion"][conversions[-1]]["base-calibration-slot"].value
logger.debug(f"Previous conversion is {previous_conversion}")
if previous_conversion == base_conversion:
conversion_chain = conversions[::-1]
logger.debug(f"Full conversion chain is '{conversion_chain}'")
return conversion_chain
conversions.append(previous_conversion)
[docs]def extract_segment_parameters(original_parameters: AnonymousParameterSet) -> AnonymousParameterSet:
segment_settings = original_parameters['force-segment-header']['settings']['segment-settings']
parameters = AnonymousParameterSet()
parameters.add(Parameter("duration", segment_settings["duration"].value, "s"))
parameters.add(Parameter("direction", segment_settings["type"].value))
try:
position_in_map = original_parameters["force-segment-header"]["environment"]["xy-scanner-position-map"]
position_index = position_in_map["xy-scanners"]["position-index"]
tip_start_position = position_in_map["xy-scanner"]["tip-scanner"]["start-position"]
except KeyError:
return parameters
parameters.add(Parameter("position_index", position_index.value))
parameters.add(Parameter("x", tip_start_position["x"].value))
parameters.add(Parameter("y", tip_start_position["y"].value))
return parameters
[docs]def extract_global_parameters(original_parameters: AnonymousParameterSet) -> AnonymousParameterSet:
logger.info("Extracting global parameters")
original_conversion_parameters = original_parameters['channel']['vDeflection']['conversion-set']['conversion']
parameters = AnonymousParameterSet()
# distance conversion parameters
distance_parameters = original_conversion_parameters["distance"]
if distance_parameters["defined"].value == "false":
logger.warning("Cannot find sensitivity, conversion to distance is undefined")
sensitivity, unit = np.nan, None
else:
sensitivity = distance_parameters['scaling']['multiplier'].value
unit = "m/V"
parameters.add(Parameter("sensitivity", sensitivity, unit))
# force conversion parameters
force_parameters = original_conversion_parameters["force"]
if force_parameters["defined"].value == "false":
logger.warning("Cannot find spring_constant, conversion to force is undefined")
spring_constant, unit = np.nan, None
else:
spring_constant = force_parameters['scaling']['multiplier'].value
unit = "N/m"
parameters.add(Parameter("spring_constant", spring_constant, unit))
return parameters