Source code for SDF.force_sdf.jpk_sdf_to_force_sdf

import logging
from typing import List, Optional, Tuple

import numpy as np

from SDF.data_model import ArrayData1D, ArrayDataset1D, Instrument, AnonymousParameterSet, Parameter, Workspace
from SDF.force_sdf.force_sdf import ForceSDF, REQUIRED_SEGMENT_DATA_CHANNELS

logger = logging.getLogger(__name__)

JPK_CHANNEL_MAPPING = {
    'height': 'height',
    'vDeflection': 'vDeflection',
    'hDeflection': 'hDeflection',
    'measuredHeight': 'measuredHeight',
    'strainGaugeHeight': 'measuredHeight',
    'headHeight': 'headHeight',
}


[docs]def jpk_sdf_to_force_sdf(sdf: Workspace) -> ForceSDF: workspace = Workspace(f"ForceSDF {sdf.name}") original_parameters = sdf.workspaces[0].instruments["original-parameters"] workspace.instruments.add(Instrument("parameters", extract_global_parameters(original_parameters))) for full_segment_ws in sdf.workspaces: segment_str, segment_num_str = full_segment_ws.name.split() if segment_str != "segment": raise ValueError("Workspace in jpk force file is not a segment") segment_num = int(segment_num_str) datasets = extract_datasets(full_segment_ws) segment_parameters = extract_segment_parameters(full_segment_ws.instruments["original-parameters"]) segment_workspace = Workspace(f"segment {segment_num}", datasets=datasets) segment_workspace.instruments.add(Instrument("segment-parameters", segment_parameters)) workspace.workspaces.add(segment_workspace) return ForceSDF(workspace)
[docs]def extract_datasets(segment_workspace: Workspace) -> List[ArrayDataset1D]: logger.info(f"Extracting datasets of segment '{segment_workspace.name}'") datasets = [] for dataset in segment_workspace.datasets: if not isinstance(dataset, ArrayDataset1D): raise TypeError(f"Found dataset that is no ArrayDataset: {dataset}") if dataset.name not in JPK_CHANNEL_MAPPING: continue if dataset.name == "strainGaugeHeight" and "measuredHeight" in segment_workspace.datasets: continue if dataset.name == "capacitiveSensorHeight" and "measuredHeight" in segment_workspace.datasets: continue channel_parameters = segment_workspace.instruments["original-parameters"]["channel"][dataset.name] data, unit = scale_dataset(dataset, channel_parameters) new_dataset = ArrayDataset1D(JPK_CHANNEL_MAPPING[dataset.name], data=data, unit=unit) datasets.append(new_dataset) logger.info(f"Added dataset '{new_dataset.name}' with values " f"[{new_dataset.data[0]} ... {new_dataset.data[-1]}], shape: {new_dataset.data.shape}, ") missing_channels = REQUIRED_SEGMENT_DATA_CHANNELS.difference(map(lambda ds: ds.name, datasets)) if missing_channels: raise ValueError(f"Segment '{segment_workspace.name}' does not provide data channel(s) '{missing_channels}'") return datasets
[docs]def scale_dataset(segment_channel_dataset: ArrayDataset1D, channel_parameters: AnonymousParameterSet ) -> Tuple[ArrayData1D, Optional[str]]: logger.info(f"Scaling dataset '{segment_channel_dataset.name}'") logger.debug(f"Initial values: [{segment_channel_dataset.data[0]} ... {segment_channel_dataset.data[-1]}]") # determine initial scaling if "encoder" in channel_parameters: scaling_parameters = channel_parameters["encoder"]["scaling"] elif "data" in channel_parameters and "encoder" in channel_parameters["data"]: scaling_parameters = channel_parameters["data"]["encoder"]["scaling"] else: scaling_parameters = None if scaling_parameters is not None: multiplier = float(scaling_parameters["multiplier"].value) offset = float(scaling_parameters["offset"].value) unit = scaling_parameters["unit"]["unit"].value logger.debug(f"Initial scaling: multiplier={multiplier}, offset={offset}, unit='{unit}'") else: multiplier, offset, unit = 1, 0, None logger.debug(f"No initial scaling") # apply conversions conversion_set = channel_parameters["conversion-set"] for conversion in determine_conversions(conversion_set): conversion_parameters = conversion_set["conversion"][conversion]["scaling"] m = float(conversion_parameters["multiplier"].value) b = float(conversion_parameters["offset"].value) unit = conversion_parameters["unit"]["unit"].value logger.debug(f"Conversion '{conversion}': multiplier={m}, offset={b}, unit='{unit}'") multiplier *= m offset = m * offset + b if unit == "V": unit = None return ArrayData1D(segment_channel_dataset.data * multiplier + offset, try_hex_transformation=True), unit
[docs]def determine_conversions(conversion_set: AnonymousParameterSet) -> List[str]: logger.info("Determining conversions") base_conversion = conversion_set["conversions"]["base"].value logger.debug(f"Base conversion is '{base_conversion}'") target_conversion = conversion_set["conversions"]["default"].value if target_conversion == base_conversion: return [] conversions = [target_conversion] logger.debug(f"Target conversion is '{target_conversion}'") while True: previous_conversion = conversion_set["conversion"][conversions[-1]]["base-calibration-slot"].value logger.debug(f"Previous conversion is {previous_conversion}") if previous_conversion == base_conversion: conversion_chain = conversions[::-1] logger.debug(f"Full conversion chain is '{conversion_chain}'") return conversion_chain conversions.append(previous_conversion)
[docs]def extract_segment_parameters(original_parameters: AnonymousParameterSet) -> AnonymousParameterSet: segment_settings = original_parameters['force-segment-header']['settings']['segment-settings'] parameters = AnonymousParameterSet() parameters.add(Parameter("duration", segment_settings["duration"].value, "s")) parameters.add(Parameter("direction", segment_settings["type"].value)) try: position_in_map = original_parameters["force-segment-header"]["environment"]["xy-scanner-position-map"] position_index = position_in_map["xy-scanners"]["position-index"] tip_start_position = position_in_map["xy-scanner"]["tip-scanner"]["start-position"] except KeyError: return parameters parameters.add(Parameter("position_index", position_index.value)) parameters.add(Parameter("x", tip_start_position["x"].value)) parameters.add(Parameter("y", tip_start_position["y"].value)) return parameters
[docs]def extract_global_parameters(original_parameters: AnonymousParameterSet) -> AnonymousParameterSet: logger.info("Extracting global parameters") original_conversion_parameters = original_parameters['channel']['vDeflection']['conversion-set']['conversion'] parameters = AnonymousParameterSet() # distance conversion parameters distance_parameters = original_conversion_parameters["distance"] if distance_parameters["defined"].value == "false": logger.warning("Cannot find sensitivity, conversion to distance is undefined") sensitivity, unit = np.nan, None else: sensitivity = distance_parameters['scaling']['multiplier'].value unit = "m/V" parameters.add(Parameter("sensitivity", sensitivity, unit)) # force conversion parameters force_parameters = original_conversion_parameters["force"] if force_parameters["defined"].value == "false": logger.warning("Cannot find spring_constant, conversion to force is undefined") spring_constant, unit = np.nan, None else: spring_constant = force_parameters['scaling']['multiplier'].value unit = "N/m" parameters.add(Parameter("spring_constant", spring_constant, unit)) return parameters