Source code for SDF.file_io.oif

import logging
import os
from datetime import datetime
from typing import BinaryIO

from PIL import Image

from SDF.config import CONFIG
from SDF.data_model import Workspace, SourceParameters, Instrument, Parameter, ImageDataset, ImageData, ParameterSet, \
    AnonymousParameterSet
from SDF.sdf_rc import DEFAULT_DATE_FORMAT

if CONFIG.use_system_oiffile:
    import oiffile
else:
    from SDF.extern import oiffile

if CONFIG.use_system_tifffile:
    from tifffile import TiffFile
else:
    from SDF.extern.tifffile import TiffFile

logger = logging.getLogger(__name__)


[docs]def load_from_oib(filename: str) -> Workspace: return load_from_oif(filename)
[docs]def load_from_oif(filename: str) -> Workspace: with oiffile.OifFile(filename) as oif: ws = Workspace(name=os.path.split(filename)[1]) ws.parameters.add(SourceParameters(converter_name="oif2sdf", source_file_type="oif-file", source_file=filename)) # instrument instrument = Instrument(name=str(oif.mainfile['Acquisition Parameters Common']['Acquisition Device'])) instrument.add_from_structure(oif.mainfile) ws.instruments.add(instrument) # date date_str = str(oif.mainfile['Acquisition Parameters Common']['ImageCaputreDate']).replace("-", ".") ws.date = datetime.strptime(date_str, DEFAULT_DATE_FORMAT) # owner try: ws.owner = str(oif.mainfile['File Info']['UserName']) except KeyError: pass # handle images current_channel = 0 current_channel_ws = None for image_file in sorted(oif.tiffs.files): # extract relevant part of filename # (filenames can be like '...CXXXTXXX.tif', '...CXXXZXXX.tif', '...CXXX.tif', '...CXXX[T/Z]XXX-RXXX.tif') path, f = os.path.split(image_file) # directory, '...CXXXTXXX.tif' f, _ = os.path.splitext(f) # '...CXXXTXXX', '.tif' _, f = f.split("C", 1) # '...', 'XXXTXXX' # skip preview images if "-R" in f: # C001T001-R001 continue # extract channel and index information if f.isdecimal(): channel = int(f) ind = 1 elif "T" in f: # C001T001, time point index channel, ind = map(int, f.split("T")) elif "Z" in f: # C001Z001, focal plane index channel, ind = map(int, f.split("Z")) else: raise ValueError(f"Unable to determine channel number and image index from filename {image_file}") # if first image of current channel: generate new workspace if channel != current_channel or current_channel_ws is None: current_channel = channel current_channel_ws = Workspace(name=f"channel {channel}") ws.workspaces.add(current_channel_ws) with oif.open_file(image_file) as img_fp: data = ImageData(Image.fromarray(TiffFile(img_fp).asarray())) image_ds = ImageDataset(name=f"image {ind}", data=data) with oif.open_file(image_file.replace(".tif", ".pty")) as pty_fp: for param in parse_pty_file(pty_fp): image_ds.parameters.add(param) current_channel_ws.datasets.add(image_ds) return ws
[docs]def parse_pty_file(pty_file: BinaryIO) -> AnonymousParameterSet: """pty files have section starting with '[title]' and entries like 'name=value'.""" parameters = AnonymousParameterSet() lines = pty_file.read().decode("utf-16").splitlines(keepends=False) def is_section_header(_line: str) -> bool: return _line.startswith("[") and _line.endswith("]") if not is_section_header(lines[0]): raise ValueError("First line is not a section header") current_section = ParameterSet(name=lines[0][1:-1]) parameters.add(current_section) for line in lines[1:]: if is_section_header(line): current_section = ParameterSet(name=line[1:-1]) parameters.add(current_section) else: current_section.add(Parameter(*line.split("="))) return parameters