Source code for aicssegmentation.workflow.batch_workflow

import numpy as np

from datetime import datetime
from typing import List, Union
from aicsimageio import AICSImage
from aicsimageio.writers import OmeTiffWriter
from pathlib import Path
from aicssegmentation.util.filesystem import FileSystemUtilities
from aicssegmentation.exceptions import ArgumentNullError
from .workflow import Workflow
from .workflow_definition import WorkflowDefinition

SUPPORTED_FILE_EXTENSIONS = ["tiff", "tif", "czi"]


[docs]class BatchWorkflow: """ Represents a batch of workflows to process. This class provides the functionality to run batches of workflows using multiple image inputs from a input directory according to the steps defined in its WorkflowDefinition. """ def __init__( self, workflow_definition: WorkflowDefinition, input_dir: Union[str, Path], output_dir: Union[str, Path], channel_index: int = 0, ): if workflow_definition is None: raise ArgumentNullError("workflow_definition") if input_dir is None: raise ArgumentNullError("input_dir") if output_dir is None: raise ArgumentNullError("output_dir") self._workflow_definition = workflow_definition self._input_dir = Path(input_dir) if not self._input_dir.exists(): raise ValueError("The input directory does not exist") self._output_dir = Path(output_dir) self._channel_index = channel_index self._processed_files: int = 0 self._failed_files: int = 0 self._log_path: Path = self._output_dir / f"log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" # Create the output directory at output_dir if it does not exist already if not self._output_dir.exists(): FileSystemUtilities.create_directory(self._output_dir) self._input_files = self._get_input_files(self._input_dir, SUPPORTED_FILE_EXTENSIONS) self._execute_generator = self._execute_generator_func() @property def total_files(self) -> int: return len(self._input_files) @property def processed_files(self) -> int: return self._processed_files @property def failed_files(self) -> int: return self._failed_files @property def input_dir(self) -> Path: return self._input_dir @property def output_dir(self) -> Path: return self._output_dir
[docs] def is_done(self) -> bool: """ Indicates whether all files / steps have been executed Use this to know when the batch workflow is complete if manually executing the workflow with execute_next() Returns: (bool): True if all files/steps have been executed, False if not """ return self._processed_files == self.total_files
[docs] def execute_all(self): if self.is_done(): print("No files left to process") return print("Starting batch workflow...") print(f"Found {self.total_files} files to process.") while not self.is_done(): self.execute_next() self.write_log_file_summary() print(f"Batch workflow complete. Check {self._log_path} for output log and summary.")
[docs] def execute_next(self): if self.is_done(): print("No files left to process") return next(self._execute_generator)
def _execute_generator_func(self): for f in self._input_files: try: print(f"Start file {f.name}") # read and format image in the way we expect read_image = AICSImage(f) image_from_path = self._format_image_to_3d(read_image) # Run workflow on image workflow = Workflow(self._workflow_definition, image_from_path) while not workflow.is_done(): workflow.execute_next() yield # Save output output_path = self._output_dir / f"{f.stem}.segmentation.tiff" result = workflow.get_most_recent_result() OmeTiffWriter.save(data=self._format_output(result), uri=output_path, dim_order="ZYX") msg = f"SUCCESS: {f}. Output saved at {output_path}" print(msg) self._write_to_log_file(msg) except Exception as ex: self._failed_files += 1 msg = f"FAILED: {f}, ERROR: {ex}" print(msg) self._write_to_log_file(msg) finally: self._processed_files += 1 yield
[docs] def write_log_file_summary(self): """ Write a log file to the output folder. """ if self._processed_files == 0: report = ( f"There were no files to process in the input directory to process \n " f"Using the Workflow: {self._workflow_definition.name}" ) else: files_processed = self._processed_files - self._failed_files report = ( f"{files_processed}/{self._processed_files} files were successfully processed \n " f"Using the Workflow: {self._workflow_definition.name}" ) self._write_to_log_file(report)
def _format_image_to_3d(self, image: AICSImage) -> np.ndarray: """ Format images in the way that aics-segmention expects for most workflows (3d, zyx) Params: image_path (AICSImage): image to format Returns: np.ndarray: segment-able image for aics-segmentation """ if len(image.scenes) > 1: raise ValueError("Multi-Scene images are unsupported") if image.dims.T > 1: raise ValueError("Timelapse images are unsupported.") if image.dims.C > 1: return image.get_image_data("ZYX", C=self._channel_index) return image.get_image_data("ZYX") def _format_output(self, image: np.ndarray): """ Format segmented images to uint8 to save via AICSImage Params: image (np.ndarray): segmented image Returns: np.ndarray: image converted to uint8 for saving """ image = image.astype(np.uint8) image[image > 0] = 255 return image def _write_to_log_file(self, text: str): with open(self._log_path, "a") as writer: writer.write(f"{text}\n") def _get_input_files(self, input_dir: Path, extensions: List[str]) -> List[Path]: input_files = list() for ext in extensions: input_files.extend(input_dir.glob(f"*.{ext}")) return input_files