Source code for aicssegmentation.workflow.workflow_engine

import numpy as np

from typing import List, Union
from aicssegmentation.exceptions import ArgumentNullError
from .workflow import Workflow
from .batch_workflow import BatchWorkflow
from .workflow_definition import WorkflowDefinition
from .workflow_config import WorkflowConfig
from pathlib import Path


[docs]class WorkflowEngine: """ aicssegmentation workflow engine Use this class to access and execute aicssegmentation structure workflows """ def __init__(self, workflow_config: WorkflowConfig = None): self._workflow_config = workflow_config or WorkflowConfig() self._workflow_definitions = self._load_workflow_definitions() @property def workflow_definitions(self) -> List[WorkflowDefinition]: """ List of all workflow definitions """ return self._workflow_definitions
[docs] def get_executable_workflow(self, workflow_name: str, input_image: np.ndarray) -> Workflow: """ Get an executable workflow object inputs: workflow_name (str): Name of the workflow to load input_image (ndarray): input image for the workflow to execute on """ if workflow_name is None: raise ArgumentNullError("workflow_name") if input_image is None: raise ArgumentNullError("input_image") definition = self._get_workflow_definition(workflow_name) return Workflow(definition, input_image)
[docs] def get_executable_batch_workflow( self, workflow_name: str, input_dir: str, output_dir: str, channel_index: int = 0 ): """ Get an executable BatchWorkflow object inputs: workflow_name (str): Name of the workflow to load input_dir (str|Path): Directory containing input files for the batch processing output_dir (str|Path): Output directory for the batch processing channel_index (int): Index of the channel to process in each image (usually a structure channel) """ if workflow_name is None: raise ArgumentNullError("workflow_name") if input_dir is None: raise ArgumentNullError("input_dir") if output_dir is None: raise ArgumentNullError("output_dir") definition = self._get_workflow_definition(workflow_name) return BatchWorkflow(definition, input_dir, output_dir, channel_index)
[docs] def get_executable_workflow_from_config_file( self, file_path: Union[str, Path], input_image: np.ndarray ) -> Workflow: """ Get an executable workflow object from a configuration file inputs: file_path (str|Path): Path to the workflow configuration file input_image (ndarray): input image for the workflow to execute on """ if input_image is None: raise ArgumentNullError("input_image") if file_path is None: raise ArgumentNullError("file_path") definition = self._workflow_config.get_workflow_definition_from_config_file(Path(file_path)) return Workflow(definition, input_image)
[docs] def get_executable_batch_workflow_from_config_file( self, file_path: Union[str, Path], input_dir: Union[str, Path], output_dir: Union[str, Path], channel_index: int = 0, ): """ Get an executable batch workflow object from a configuration file inputs: file_path (str|Path): Path to the workflow configuration file input_dir (str|Path): Directory containing input files for the batch processing output_dir (str|Path): Output directory for the batch processing channel_index (int): Index of the channel to process in each image (usually a structure channel) """ if file_path is None: raise ArgumentNullError("file_path") if input_dir is None: raise ArgumentNullError("input_dir") if output_dir is None: raise ArgumentNullError("output_dir") definition = self._workflow_config.get_workflow_definition_from_config_file(Path(file_path)) return BatchWorkflow(definition, input_dir, output_dir, channel_index)
[docs] def save_workflow_definition(self, workflow_definition: WorkflowDefinition, output_file_path: Union[str, Path]): if workflow_definition is None: raise ArgumentNullError("workflow_definition") if output_file_path is None: raise ArgumentNullError("file_path") self._workflow_config.save_workflow_definition_as_json(workflow_definition, output_file_path)
def _load_workflow_definitions(self) -> List[WorkflowDefinition]: definitions = list() available_workflows = self._workflow_config.get_available_workflows() for name in available_workflows: definitions.append(self._workflow_config.get_workflow_definition(name)) return definitions def _get_workflow_definition(self, workflow_name: str) -> WorkflowDefinition: definition = next(filter(lambda d: d.name == workflow_name, self._workflow_definitions), None) if definition is None: raise ValueError( f"No available workflow definition found for {workflow_name}. Specify a valid workflow name." ) return definition