Source code for aicssegmentation.workflow.workflow

import numpy as np
import logging

from typing import Any, Dict, List
from aicssegmentation.exceptions import ArgumentNullError
from .workflow_step import WorkflowStep
from .workflow_definition import WorkflowDefinition
from napari.layers import Image

log = logging.getLogger(__name__)


[docs]class Workflow: """ Represents an executable aics-segmentation workflow This class provides the functionality to run a workflow using an image input according to the steps defined in its WorkflowDefinition. """ def __init__(self, workflow_definition: WorkflowDefinition, input_image: np.ndarray): if workflow_definition is None: raise ArgumentNullError("workflow_definition") if input_image is None: raise ArgumentNullError("input_image") self._definition: WorkflowDefinition = workflow_definition self._starting_image: np.ndarray = input_image self._next_step: int = 0 # Next step to execute self._results: List = list() # Store most recent step results @property def workflow_definition(self) -> WorkflowDefinition: return self._definition
[docs] def reset(self): """ Reset the workflow so it can be run again """ self._next_step = 0 self._results = list()
[docs] def get_next_step(self) -> WorkflowStep: """ Get the next step to be performed Params: none Returns: (WorkflowStep): next WorkflowStep object to perform on image None if all steps have already been executed """ if self._next_step >= len(self._definition.steps): return None return self._definition.steps[self._next_step]
[docs] def execute_next(self, parameters: Dict[str, Any] = None) -> np.ndarray: """ Execute the next workflow step. Params: parameters: Optional dictionary of parameter inputs to use when executing the step If parameters are not provided, the step's default parameters will be used Returns: result (np.ndarray): resultant image from running the next workflow step """ step = self.get_next_step() log.info(f"Executing step #{step.step_number}") # Pick which image to perform the workflow step on image: np.ndarray = None if self._next_step == 0: # First image, so use the starting image for the next workflow step image = [self._starting_image] elif self.is_done(): # No more workflow steps to perform # TODO: what to do if done with workflow # but execute_next is prompted? # printing message for now log.info("No steps left to run") else: image = list() for i in step.parent: res = self.get_result(i - 1) # parents are 1 indexed image.append(res) result: np.ndarray = self.get_next_step().execute(image, parameters or step.parameter_values) self._results.append(result) # Only increment after running step self._next_step += 1 return result
# TODO maybe change this to match the step number instead? # Review when we implement rerunning single workflow steps
[docs] def get_result(self, step_index: int) -> np.ndarray: """ Get the result image for a workflow step. You must call execute() on the workflow step in order to produce a result first before calling this function. Params: step_index (int): index of the WorkflowStep in the workflowengine to get the result image of. Returns: self.image (np.ndarray): Result of performing workflow step on the given image None if step has not been executed yet. """ if step_index < 0: return self._starting_image if step_index >= len(self._results): return None # returns None if the WorkflowStep has not been executed. return self._results[step_index]
[docs] def get_most_recent_result(self) -> np.ndarray: """ Get the result from the last executed WorkflowStep. Params: none Returns: (np.ndarray): Result of the last executed WorkflowStep, returns the starting image if no Workflowsteps have been run. """ if self._next_step == 0: return self._starting_image # TODO does this behavior make sense? Return None instead? else: return self.get_result(self._next_step - 1)
[docs] def execute_all(self) -> np.ndarray: """ Execute all steps in the Workflow Note: default parameters will be used to execute the steps. To execute a step with user-provided parameters, use execute_next() Params: none Returns: (np.ndarray): Result of the final WorkflowStep. """ self.reset() while not self.is_done(): self.execute_next() return self.get_most_recent_result()
[docs] def execute_step(self, i: int, parameters: Dict[str, Any], selected_image: List[Image]) -> np.ndarray: """ Args: i: step number (0 indexed) that you want to run Returns: """ # TODO: discuss using selected layer for all types step_to_run = self._definition.steps[i] image = [i.data for i in selected_image] result: np.ndarray = step_to_run.execute(image, parameters or step_to_run.parameter_values) if len(self._results) <= i: # this is the first time running this step self._results.append(result) else: # this step has been run before so replacing old value self._results[i] = result return result
[docs] def is_done(self) -> bool: """ Check if all WorkflowSteps have been executed. Params: none Returns: (bool): True if all WorkflowSteps have been executed, False if not """ return self._next_step >= len(self._definition.steps)