Source code for SDF.file_io.mfp

import logging
import os
from typing import List

import numpy as np

from SDF.config import CONFIG
from SDF.data_model import Workspace, SourceParameters, Instrument, ArrayDataset1D, Parameter, ParameterSet, \
    ArrayDataset2D, ArrayData2D, ArrayData1D

if CONFIG.use_system_mfpfile:
    import mfpfile
else:
    from SDF.extern import mfpfile


logger = logging.getLogger(__name__)


[docs]def load_from_mfp(filename: str) -> Workspace: mfp = mfpfile.MFPFile(filename) workspace = Workspace(name=f"ibw file {os.path.split(filename)[1]}") source_par = SourceParameters(converter_name="mfp2sdf", source_file=filename, source_file_type="mfp-file") workspace.parameters.add(source_par) workspace.parameters.add(Parameter("version-in-ibw-file", mfp.ibw["version"])) original_parameters, labels = parse_parameters(mfp) workspace.instruments.add(original_parameters) data_workspace = extract_data(mfp, labels) workspace.workspaces.add(data_workspace) return workspace
[docs]def parse_parameters(mfp: mfpfile.MFPFile): wave = mfp.ibw["wave"] data_array = wave["wData"] expected_num_of_labels = 0 if len(data_array.shape) == 1 else data_array.shape[-1] original_parameters = Instrument("original-parameters") # labels labels = parse_labels(mfp) if len(labels) != expected_num_of_labels: raise ValueError(f"Expected {expected_num_of_labels} labels, found {len(labels)}") label_par = ParameterSet("labels") for i, label in enumerate(labels): label_par.add(Parameter(f"label{i}", label)) original_parameters.add(label_par) # other parameters original_parameters.add_from_structure(wave["wave_header"]) original_parameters.add_from_structure({"mfp-parameters": mfp.parameters}) original_parameters.add_from_structure({"bin_header": wave["bin_header"]}) for par_name in ["formula", "sIndices", "data_units", "dimension_units"]: original_parameters.add(Parameter(par_name, wave[par_name])) return original_parameters, labels
[docs]def parse_labels(mfp: mfpfile.MFPFile) -> List[str]: labels = [] for label_list in mfp.ibw["wave"]["labels"]: if len(label_list) == 0: continue for value in label_list: if not value: continue labels.append(value.decode("ascii")) return labels
[docs]def extract_data(mfp: mfpfile.MFPFile, labels: List[str]) -> Workspace: data_array: np.ndarray = mfp.ibw["wave"]["wData"] ndim = data_array.ndim data_workspace = Workspace(name="Data") if ndim == 0: raise ValueError("data array is empty") elif ndim == 1: # unlabeled single-column array data_workspace.datasets.add(ArrayDataset1D(name="data", data=ArrayData1D(data_array))) elif ndim == 2: # multiple single-column arrays for i in range(data_array.shape[1]): data_workspace.datasets.add(ArrayDataset1D(labels[i], data=ArrayData1D(data_array[:, i]))) elif ndim == 3: # multiple single-column arrays or heightmaps for i in range(data_array.shape[2]): data_workspace.datasets.add(ArrayDataset2D(labels[i], data=ArrayData2D(data_array[:, :, i]))) else: raise ValueError(f"Cannot handle {ndim} dimensional array") return data_workspace