#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import os
import sys
import logging
import argparse
import traceback
import importlib
import pathlib
from glob import glob
import aicsimageio
###############################################################################
# Global Objects
PER_IMAGE = "per_img"
PER_DIR = "per_dir"
PER_CSV = "per_csv"
log = logging.getLogger()
logging.basicConfig(level=logging.INFO, format="[%(levelname)4s:%(lineno)4s %(asctime)s] %(message)s")
#
# Set the default log level for other modules used by this script
# logging.getLogger("labkey").setLevel(logging.ERROR)
# logging.getLogger("requests").setLevel(logging.WARNING)
# logging.getLogger("urllib3").setLevel(logging.WARNING)
###############################################################################
###############################################################################
[docs]class Args(object):
"""
Use this to define command line arguments and use them later.
For each argument do the following
1. Create a member in __init__ before the self.__parse call.
2. Provide a default value here.
3. Then in p.add_argument, set the dest parameter to that variable name.
See the debug parameter as an example.
"""
def __init__(self, log_cmdline=True):
self.debug = False
self.output_dir = "./"
self.struct_ch = 0
self.xy = 0.108
self.dask = False
#
self.__parse()
#
if self.debug:
log.setLevel(logging.DEBUG)
log.debug("-" * 80)
self.show_info()
log.debug("-" * 80)
@staticmethod
def __no_args_print_help(parser):
"""
This is used to print out the help if no arguments are provided.
Note:
- You need to remove it's usage if your script truly doesn't want arguments.
- It exits with 1 because it's an error if this is used in a script with
no args. That's a non-interactive use scenario - typically you don't want
help there.
"""
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
def __parse(self):
p = argparse.ArgumentParser()
# Add arguments
p.add_argument(
"-d",
"--debug",
action="store_true",
dest="debug",
help="If set debug log output is enabled",
)
p.add_argument(
"--struct_name",
dest="struct_name",
default="skip",
help="Legacy Option for backward compatibility] use workflow_name instead",
)
p.add_argument(
"--workflow_name",
dest="workflow_name",
default="template",
help="the name of your workflow",
)
p.add_argument(
"--struct_ch",
required=False,
type=int,
dest="struct_ch",
default=1,
help="the index of the structure channel of the image file, default is 1",
)
p.add_argument(
"--xy",
default=0.108,
type=float,
dest="xy",
help="the xy resolution of the image, default is 0.108",
)
p.add_argument(
"--rescale",
default=-1,
type=float,
dest="rescale",
help="the rescale ratio for x/y dimenstions, will overwrite --xy",
)
p.add_argument("--output_dir", dest="output_dir", help="output directory")
p.add_argument(
"--wrapper_dir",
dest="wrapper_dir",
default="_internal_",
help="wrapper directory",
)
p.add_argument(
"--use",
dest="output_type",
default="default",
help="how to output the results, mostly used options are default or array",
)
p.add_argument("--mitotic_stage", dest="mitotic_stage", default=None, help="mitotic_stage")
p.add_argument(
"--dask",
action="store_true",
help="if included, use dask. Omit to not use dask for parallelization",
)
subparsers = p.add_subparsers(dest="mode")
subparsers.required = True
parser_img = subparsers.add_parser(PER_IMAGE)
parser_img.add_argument("--input", dest="input_fname", help="input filename")
parser_dir = subparsers.add_parser(PER_DIR)
parser_dir.add_argument("--input_dir", dest="input_dir", help="input directory")
parser_dir.add_argument(
"--data_type",
default=".czi",
dest="data_type",
help="the image type to be processed, e.g., .czi (default) or .tiff",
)
parser_dir = subparsers.add_parser(PER_CSV)
parser_dir.add_argument("--csv", dest="csv_dir", help="csv file")
parser_dir.add_argument(
"--column",
help="the column to load file path",
)
self.__no_args_print_help(p)
p.parse_args(namespace=self)
[docs] def show_info(self):
log.debug("Working Dir:")
log.debug("\t{}".format(os.getcwd()))
log.debug("Command Line:")
log.debug("\t{}".format(" ".join(sys.argv)))
log.debug("Args:")
for k, v in self.__dict__.items():
log.debug("\t{}: {}".format(k, v))
###############################################################################
[docs]class Executor(object):
def __init__(self, args):
standard_xy = 0.108
if args.rescale > 0:
self.rescale_ratio = args.rescale
else:
if args.xy != standard_xy:
self.rescale_ratio = args.xy / standard_xy
else:
self.rescale_ratio = -1
[docs] def segment(self, fn, args, output_path):
if os.path.exists(str(output_path / (os.path.splitext(os.path.basename(fn))[0] + "_struct_segmentation.tiff"))):
print(f"skipping {fn} ....")
return
image_reader = aicsimageio.AICSImage(fn)
img = image_reader.data
# import pdb; pdb.set_trace()
# fixing the image reading
if len(img.shape) == 6:
# when z and c is not in order
if img.shape[-3] < img.shape[-4]:
img = np.transpose(img, (0, 1, 3, 2, 4, 5))
struct_img = img[0, 0, args.struct_ch, :, :, :].astype(np.float32)
else:
# when z and c is not in order
if img.shape[-3] < img.shape[-4]:
img = np.transpose(
img,
(
0,
2,
1,
3,
4,
),
)
struct_img = img[0, args.struct_ch, :, :, :].astype(np.float32)
# Check if the segmenation is mitotic stage specific
if args.mitotic_stage is None:
return self.SegModule(
struct_img,
self.rescale_ratio,
args.output_type,
output_path,
os.path.splitext(os.path.basename(fn))[0],
)
else:
return self.SegModule(
struct_img,
args.mitotic_stage,
self.rescale_ratio,
args.output_type,
output_path,
fn,
)
[docs] def execute(self, args):
if not args.struct_name == "skip":
if not args.workflow_name == "template":
print("only use either workflow_name or struct_name, should use both.")
quit()
args.workflow_name = args.struct_name
try:
if args.wrapper_dir == "_internal_":
module_name = "aicssegmentation.structure_wrapper.seg_" + args.workflow_name
seg_module = importlib.import_module(module_name)
else:
func_path = args.wrapper_dir
spec = importlib.util.spec_from_file_location(
"seg_" + args.workflow_name,
func_path + "/seg_" + args.workflow_name + ".py",
)
seg_module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(seg_module)
except Exception as e:
print("check errors in wrapper script")
print(str(e))
class_name = "Workflow_" + args.workflow_name
self.SegModule = getattr(seg_module, class_name)
except Exception as e:
print(e)
print("{} structure not found".format(args.workflow_name))
sys.exit(1)
output_path = pathlib.Path(args.output_dir)
if not os.path.exists(output_path):
os.mkdir(output_path)
##########################################################################
batch_mode = False
if args.mode == PER_IMAGE:
fname = os.path.basename(os.path.splitext(args.input_fname)[0])
image_reader = aicsimageio.AICSImage(args.input_fname)
img = image_reader.data
if len(img.shape) == 6:
struct_img = img[0, 0, args.struct_ch, :, :, :].astype(np.float32)
else:
struct_img = img[0, args.struct_ch, :, :, :].astype(np.float32)
# if args.mitotic_label == 'y':
# mitosis_seg = (args.input_fname).replace("raw", "mito_seg")
# mito_seg_reader = aicsimageio.AICSImage(mitosis_seg)
# mitosis_seg_img = mito_seg_reader.data
# mseg_img = mitosis_seg_img[0,0,:, 0, :, :].astype(np.float32)
# struct_img =struct_img * mseg_img
if args.mitotic_stage is None:
self.SegModule(struct_img, self.rescale_ratio, args.output_type, output_path, fname)
else:
self.SegModule(
struct_img,
args.mitotic_stage,
self.rescale_ratio,
args.output_type,
output_path,
fname,
)
elif args.mode == PER_DIR:
filenames = glob(args.input_dir + "/*" + args.data_type)
# [os.path.basename(os.path.splitext(f)[0])
# for f in os.listdir(args.input_dir)
# if f.endswith(args.data_type)]
filenames.sort()
batch_mode = True
elif args.mode == PER_CSV:
import pandas as pd
df = pd.read_csv(args.csv_dir)
filenames = df[args.column].unique()
batch_mode = True
if batch_mode:
if args.dask:
# using dask offers ~6x speedup on the template segmentation.
import dask
lazy_results = []
for _, fn in enumerate(filenames):
lazy_result = dask.delayed(self.segment)(fn, args, output_path)
lazy_results.append(lazy_result)
futures = dask.persist(*lazy_results)
dask.compute(*futures)
else:
for _, fn in enumerate(filenames):
self.segment(fn, args, output_path)
###############################################################################
[docs]def main():
dbg = False
try:
args = Args()
dbg = args.debug
# Do your work here - preferably in a class or function,
# passing in your args. E.g.
exe = Executor(args)
exe.execute(args)
except Exception as e:
log.error("=============================================")
if dbg:
log.error("\n\n" + traceback.format_exc())
log.error("=============================================")
log.error("\n\n" + str(e) + "\n")
log.error("=============================================")
sys.exit(1)
if __name__ == "__main__":
main()