Source code for aicssegmentation.workflow.workflow_config

import json

from pathlib import Path
from typing import Dict, List
from aicssegmentation.util.directories import Directories
from .segmenter_function import SegmenterFunction, FunctionParameter, WidgetType
from .workflow_definition import WorkflowDefinition, PrebuiltWorkflowDefinition
from .workflow_step import WorkflowStep, WorkflowStepCategory


[docs]class ConfigurationException(Exception): """ Raised when errors are encountered reading from Configuration files """ pass
[docs]class WorkflowConfig: """ Provides access to structure workflow configuration """ def __init__(self): self._all_functions = None self._available_workflow_names = None
[docs] def get_available_workflows(self) -> List[str]: """ Get the list of all workflows available through configuration """ if self._available_workflow_names is None: json_list = sorted(Directories.get_structure_config_dir().glob("conf_*.json")) self._available_workflow_names = [p.stem[5:] for p in json_list] return self._available_workflow_names
[docs] def get_all_functions(self) -> List[SegmenterFunction]: """ Get the list of all available Functions from configuration """ if self._all_functions is None: path = Directories.get_structure_config_dir() / "all_functions.json" try: with open(path) as file: obj = json.load(file) self._all_functions = self._all_functions_decoder(obj) except Exception as ex: raise ConfigurationException(f"Error reading json configuration from {path}") from ex return self._all_functions
[docs] def get_workflow_definition(self, workflow_name: str) -> PrebuiltWorkflowDefinition: """ Get a WorkflowDefinition for the given workflow from the corresponding prebuilt json structure config """ if workflow_name is None or len(workflow_name.strip()) == 0: raise ValueError("workflow_name cannot be empty") if workflow_name not in self.get_available_workflows(): raise ValueError(f"No workflow configuration available for {workflow_name}") path = Directories.get_structure_config_dir() / f"conf_{workflow_name}.json" return self.get_workflow_definition_from_config_file(path, workflow_name, prebuilt=True)
[docs] def get_workflow_definition_from_config_file( self, file_path: Path, workflow_name: str = None, prebuilt: bool = False ) -> WorkflowDefinition: """ Get a WorkflowDefinition based off the given json configuration file """ if file_path.suffix.lower() != ".json": raise ValueError("Workflow configuration file must be a json file with .json file extension.") with open(file_path) as file: try: obj = json.load(file) return self._workflow_decoder(obj, workflow_name or file_path.name, prebuilt) except Exception as ex: raise ConfigurationException(f"Error reading json configuration from {file_path}") from ex
[docs] def save_workflow_definition_as_json(self, workflow_definition: WorkflowDefinition, output_file_path: Path): """ Save a WorkflowDefinition as a json config file """ if output_file_path.suffix.lower() != ".json": raise ValueError("Workflow configuration file save path must have a .json extension.") with open(output_file_path, "w") as file: json.dump(self._workflow_encoder(workflow_definition), file, indent=4, sort_keys=True)
def _all_functions_decoder(self, obj: Dict) -> List[SegmenterFunction]: """ Decode Functions config (all_functions.json) """ def build_function_parameter(name: str, data: Dict): return FunctionParameter( name=name, widget_type=WidgetType.from_str(data["widget_type"]), data_type=data["data_type"], min_value=data.get("min", None), max_value=data.get("max", None), increment=data.get("increment", None), options=data.get("options", None), ) functions = list() for function_k, function_v in obj.items(): function = SegmenterFunction( name=function_k, display_name=function_v["name"], function=function_v["python::function"], module=function_v["python::module"], ) if function_v.get("parameters") is not None and len(function_v["parameters"]) > 0: params = dict() for param_k, param_v in function_v["parameters"].items(): param_name = param_k params[param_name] = list() if isinstance(param_v, dict): params[param_name].append(build_function_parameter(param_name, param_v)) elif isinstance(param_v, list): for item in param_v: params[param_name].append(build_function_parameter(param_name, item)) function.parameters = params functions.append(function) return functions def _workflow_decoder(self, obj: Dict, workflow_name: str, prebuilt: bool = False) -> WorkflowDefinition: """ Decode Workflow config (conf_{workflow_name}.json) """ functions = self.get_all_functions() steps: List[WorkflowStep] = list() for step_k, step_v in obj.items(): step_number = int(step_k) function_id = step_v["function"] function = next(filter(lambda f: f.name == function_id, functions), None) if function is None: raise ConfigurationException( f"Could not find a Segmenter function matching the function identifier <{function_id}>." ) if isinstance(step_v["parent"], list): parent = step_v["parent"] else: parent = [step_v["parent"]] step = WorkflowStep( category=WorkflowStepCategory.from_str(step_v["category"]), function=function, step_number=step_number, parent=parent, ) if step_v.get("parameter_values") is not None and len(step_v["parameter_values"]) > 0: param_defaults = dict() for param_k, param_v in step_v["parameter_values"].items(): param_name = param_k param_defaults[param_name] = param_v step.parameter_values = param_defaults steps.append(step) steps.sort(key=lambda s: s.step_number) if prebuilt: return PrebuiltWorkflowDefinition(workflow_name, steps) else: return WorkflowDefinition(workflow_name, steps) def _workflow_encoder(self, workflow_definition: WorkflowDefinition) -> Dict: """ Encode a WorkflowDefinition to a json dictionary """ # TODO add header / version ? result = dict() for step in workflow_definition.steps: step_number = str(step.step_number) parent = step.parent[0] if len(step.parent) == 1 else step.parent step_dict = { step_number: {"function": step.function.name, "category": step.category.value, "parent": parent} } if step.parameter_values is not None: step_dict[step_number].update({"parameter_values": step.parameter_values}) result.update(step_dict) return result