Source code for SDF.file_io.oif
import logging
import os
from datetime import datetime
from typing import BinaryIO
from PIL import Image
from SDF.config import CONFIG
from SDF.data_model import Workspace, SourceParameters, Instrument, Parameter, ImageDataset, ImageData, ParameterSet, \
AnonymousParameterSet
from SDF.sdf_rc import DEFAULT_DATE_FORMAT
if CONFIG.use_system_oiffile:
import oiffile
else:
from SDF.extern import oiffile
if CONFIG.use_system_tifffile:
from tifffile import TiffFile
else:
from SDF.extern.tifffile import TiffFile
logger = logging.getLogger(__name__)
[docs]def load_from_oib(filename: str) -> Workspace:
return load_from_oif(filename)
[docs]def load_from_oif(filename: str) -> Workspace:
with oiffile.OifFile(filename) as oif:
ws = Workspace(name=os.path.split(filename)[1])
ws.parameters.add(SourceParameters(converter_name="oif2sdf", source_file_type="oif-file", source_file=filename))
# instrument
instrument = Instrument(name=str(oif.mainfile['Acquisition Parameters Common']['Acquisition Device']))
instrument.add_from_structure(oif.mainfile)
ws.instruments.add(instrument)
# date
date_str = str(oif.mainfile['Acquisition Parameters Common']['ImageCaputreDate']).replace("-", ".")
ws.date = datetime.strptime(date_str, DEFAULT_DATE_FORMAT)
# owner
try:
ws.owner = str(oif.mainfile['File Info']['UserName'])
except KeyError:
pass
# handle images
current_channel = 0
current_channel_ws = None
for image_file in sorted(oif.tiffs.files):
# extract relevant part of filename
# (filenames can be like '...CXXXTXXX.tif', '...CXXXZXXX.tif', '...CXXX.tif', '...CXXX[T/Z]XXX-RXXX.tif')
path, f = os.path.split(image_file) # directory, '...CXXXTXXX.tif'
f, _ = os.path.splitext(f) # '...CXXXTXXX', '.tif'
_, f = f.split("C", 1) # '...', 'XXXTXXX'
# skip preview images
if "-R" in f: # C001T001-R001
continue
# extract channel and index information
if f.isdecimal():
channel = int(f)
ind = 1
elif "T" in f: # C001T001, time point index
channel, ind = map(int, f.split("T"))
elif "Z" in f: # C001Z001, focal plane index
channel, ind = map(int, f.split("Z"))
else:
raise ValueError(f"Unable to determine channel number and image index from filename {image_file}")
# if first image of current channel: generate new workspace
if channel != current_channel or current_channel_ws is None:
current_channel = channel
current_channel_ws = Workspace(name=f"channel {channel}")
ws.workspaces.add(current_channel_ws)
with oif.open_file(image_file) as img_fp:
data = ImageData(Image.fromarray(TiffFile(img_fp).asarray()))
image_ds = ImageDataset(name=f"image {ind}", data=data)
with oif.open_file(image_file.replace(".tif", ".pty")) as pty_fp:
for param in parse_pty_file(pty_fp):
image_ds.parameters.add(param)
current_channel_ws.datasets.add(image_ds)
return ws
[docs]def parse_pty_file(pty_file: BinaryIO) -> AnonymousParameterSet:
"""pty files have section starting with '[title]' and entries like 'name=value'."""
parameters = AnonymousParameterSet()
lines = pty_file.read().decode("utf-16").splitlines(keepends=False)
def is_section_header(_line: str) -> bool:
return _line.startswith("[") and _line.endswith("]")
if not is_section_header(lines[0]):
raise ValueError("First line is not a section header")
current_section = ParameterSet(name=lines[0][1:-1])
parameters.add(current_section)
for line in lines[1:]:
if is_section_header(line):
current_section = ParameterSet(name=line[1:-1])
parameters.add(current_section)
else:
current_section.add(Parameter(*line.split("=")))
return parameters