Browse Source

Processing Module Refactoring

tags/v1.2.10
PommeDroid 2 years ago
parent
commit
6aec2d3ba2
6 changed files with 373 additions and 351 deletions
  1. 14
    75
      main.py
  2. 69
    276
      processing/__init__.py
  3. 91
    0
      processing/folder.py
  4. 62
    0
      processing/gif.py
  5. 81
    0
      processing/image.py
  6. 56
    0
      processing/multiple.py

+ 14
- 75
main.py View File

@@ -8,12 +8,9 @@ import colorama

import argv
from config import Config as Conf
from utils import check_shape
from processing import SimpleTransform, FolderImageTransform, MultipleImageTransform
from transform.gan.mask import CorrectToMask, MaskrefToMaskdet, MaskfinToNude
from transform.opencv.resize import ImageToCrop, ImageToOverlay, ImageToRescale, ImageToResized, ImageToResizedCrop
from transform.opencv.correct import DressToCorrect, ColorTransfer
from transform.opencv.mask import MaskToMaskref, MaskdetToMaskfin
from processing import SimpleProcessing
from processing.folder import FolderImageProcessing
from processing.multiple import MultipleImageProcessing


def main(_):
@@ -34,110 +31,52 @@ def main(_):
sys.exit()


def select_phases():
"""
Select the transformation phases to use following args parameters.

:return: <ImageTransform[]> list of image transformation
"""
def shift_step(shift_starting=0, shift_ending=0):
if not Conf.args['steps']:
Conf.args['steps'] = (0, 5)
Conf.args['steps'] = (
Conf.args['steps'][0] + shift_starting,
Conf.args['steps'][1] + shift_ending
)

def add_tail(phases, phase):
phases = [phase] + phases
if Conf.args['steps'] and Conf.args['steps'][0] != 0:
shift_step(shift_starting=1)
if Conf.args['steps'] and Conf.args['steps'][1] == len(phases) - 1:
shift_step(shift_ending=1)
return phases

def add_head(phases, phase):
phases = phases + [phase]
if Conf.args['steps'] and Conf.args['steps'][1] == len(phases) - 1:
shift_step(shift_ending=1)
return phases

phases = [DressToCorrect, CorrectToMask, MaskToMaskref,
MaskrefToMaskdet, MaskdetToMaskfin, MaskfinToNude]

if Conf.args['overlay']:
phases = add_tail(phases, ImageToResized)
phases = add_tail(phases, ImageToCrop)
phases = add_head(phases, ImageToOverlay)
elif Conf.args['auto_resize']:
phases = add_tail(phases, ImageToResized)
elif Conf.args['auto_resize_crop']:
phases = add_tail(phases, ImageToResizedCrop)
elif Conf.args['auto_rescale']:
phases = add_tail(phases, ImageToRescale)
elif os.path.isfile(Conf.args['input']):
if not Conf.args['ignore_size']:
check_shape(Conf.args['input'])
else:
Conf.log.warn('Image Size Requirements Unchecked.')

if Conf.args['color_transfer']:
phases = add_head(phases, ColorTransfer)

return phases


def select_processing():
"""
Select the processing to use following args parameters.

:return: <Process> a process to run
"""
phases = select_phases()
if os.path.isdir(Conf.args['input']):
process = processing_image_folder(phases)
process = processing_image_folder()
elif Conf.args['n_runs'] != 1:
process = multiple_image_processing(phases, Conf.args['n_runs'])
process = multiple_image_processing()
else:
process = simple_image_processing(phases)
process = simple_image_processing()
Conf.log.debug("Process to execute : {}".format(process))
return process


def simple_image_processing(phases):
def simple_image_processing():
"""
Define a simple image process ready to run.

:param phases: <ImageTransform[]> list of image transformation
:return: <SimpleTransform> a image process run ready
"""
return SimpleTransform(Conf.args['input'], phases, Conf.args['output'])
return SimpleProcessing()


def multiple_image_processing(phases, n_runs):
def multiple_image_processing():
"""
Define a multiple image process ready to run.

:param phases: <ImageTransform[]> list of image transformation
:param n_runs: number of times to process
:return: <MultipleTransform> a multiple image process run ready
"""
filename, extension = os.path.splitext(Conf.args['output'])
return MultipleImageTransform(
[Conf.args['input'] for _ in range(n_runs)],
phases,
["{}{}{}".format(filename, i, extension) for i in range(n_runs)]
)
Conf.args['input'] = [Conf.args['input'] for _ in range(Conf.args['n_runs'])]
Conf.args['output'] = ["{}{}{}".format(filename, i, extension) for i in range(Conf.args['n_runs'])]
return MultipleImageProcessing()


def processing_image_folder(phases):
def processing_image_folder():
"""
Define a folder image process ready to run.

:param phases: <ImageTransform[]> list of image transformation
:return: <FolderImageTransform> a image process run ready
"""
return FolderImageTransform(Conf.args['input'], phases, Conf.args['output'])
return FolderImageProcessing()


if __name__ == "__main__":

+ 69
- 276
processing/__init__.py View File

@@ -1,25 +1,19 @@
"""Processing."""
import json
import os
import pathlib
import shutil
import sys
import tempfile
import time
from json import JSONDecodeError
from multiprocessing.pool import ThreadPool

import cv2
import imageio

from config import Config as Conf
from utils import camel_case_to_str, cv2_supported_extension, read_image, write_image
from transform.gan.mask import CorrectToMask, MaskrefToMaskdet, MaskfinToNude
from transform.opencv.correct import DressToCorrect, ColorTransfer
from transform.opencv.mask import MaskToMaskref, MaskdetToMaskfin
from transform.opencv.resize import ImageToResized, ImageToCrop, ImageToOverlay, ImageToResizedCrop, ImageToRescale
from utils import camel_case_to_str, cv2_supported_extension, check_shape


class Process:
class Processing:
"""Abstract Process Class."""

def __init__(self, *_args, args=None):
def __init__(self, args=None):
"""
Process Constructor.

@@ -83,285 +77,84 @@ class Process:
"""


class SimpleTransform(Process):
class SimpleProcessing(Processing):
"""Simple Transform Class."""

def __init__(self, input_path, phases, output_path, args):
def __init__(self, args=None):
"""
Construct a Simple Transform .
Construct a Simple Processing .

:param input_path: <string> original image path to process
:param output_path: <string> image path to write the result.
:param phases: <ImageTransform[]> list of Class transformation each image
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
super().__init__(input_path, phases, output_path, args)
super().__init__(args)

def __new__(cls, input_path, phases, output_path, args=None):
def __new__(cls, args=None):
"""
Create the correct SimpleTransform object corresponding to the input_path format.

:param input_path: <string> original image path to process
:param output_path: <string> image path to write the result.
:param phases: <ImageTransform[]> list of Class transformation each image
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
:return: <ImageTransform|GiftTransform|None> SimpleTransform object corresponding to the input_path format
:return: <ImageProcessing|GiftProcessing|None> SimpleTransform object corresponding to the input_path format
"""
if os.path.splitext(input_path)[1] == ".gif":
return GifTransform(input_path, phases, output_path, args=args)
elif os.path.splitext(input_path)[1] in cv2_supported_extension():
return ImageTransform(input_path, phases, output_path, args=args)
args = Conf.args.copy() if args is None else args.copy()

if os.path.splitext(args['input'])[1] == ".gif":
from processing.gif import GifProcessing
return GifProcessing(args=args)
elif os.path.splitext(args['input'])[1] in cv2_supported_extension():
from processing.image import ImageProcessing
return ImageProcessing(args=args)
else:
return None


class ImageTransform(Process):
"""Image Processing Class."""

def __init__(self, input_path, phases, output_path, args=None):
"""
Process Image Constructor.

:param input_path: <string> original image path to process
:param output_path: <string> image path to write the result.
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
:param phases: <ImageTransform[]> list Class of transformation each image
:param args: <dict> processing settings
"""
super().__init__(args=args)
self.__phases = phases
self.__output_path = output_path
self.__altered_path = self._args['altered']
self.__starting_step = self._args['steps'][0] if self._args['steps'] else 0
self.__ending_step = self._args['steps'][1] if self._args['steps'] else None

Conf.log.debug("All Phases : {}".format(self.__phases))
Conf.log.debug("To Be Executed Phases : {}".format(self.__phases[self.__starting_step:self.__ending_step]))

path = self.__altered_path if os.path.isfile(input_path) or not self._args.get('folder_altered') \
else os.path.join(self._args['folder_altered'], os.path.basename(self.__output_path))

self.__image_steps = [input_path] + [
os.path.join(path, "{}.png".format(p().__class__.__name__))
for p in self.__phases[:self.__starting_step]
]
Conf.log.debug(self.__image_steps)

def _info_start_run(self):
super()._info_start_run()
Conf.log.info("Processing on {}".format(str(self.__image_steps)[2:-2]))

def _setup(self):
try:
self.__image_steps = [read_image(x) if isinstance(x, str) else x for x in self.__image_steps]
except FileNotFoundError as e:
Conf.log.error(e)
Conf.log.error("{} is not able to resume because it not able to load required images. "
.format(camel_case_to_str(self.__class__.__name__)))
Conf.log.error("Possible source of this error is that --altered argument is not a correct "
"directory path that contains valid images.")
sys.exit(1)

def _execute(self):
"""
Execute all phases on the image.

:return: None
"""
for p in (x(args=self._args) for x in self.__phases[self.__starting_step:self.__ending_step]):
r = p.run(*[self.__image_steps[i] for i in p.input_index])
self.__image_steps.append(r)

if self.__altered_path:
path = self.__altered_path \
if os.path.isfile(self._args['input']) or not self._args.get('folder_altered') \
else os.path.join(self._args['folder_altered'], os.path.basename(self.__output_path))

write_image(r, os.path.join(path, "{}.png".format(p.__class__.__name__)))

Conf.log.debug("{} Step Image Of {} Execution".format(
os.path.join(path, "{}.png".format(p.__class__.__name__)),
camel_case_to_str(p.__class__.__name__),
))

write_image(self.__image_steps[-1], self.__output_path)
Conf.log.info("{} Created".format(self.__output_path))
Conf.log.debug("{} Result Image Of {} Execution"
.format(self.__output_path, camel_case_to_str(self.__class__.__name__)))

return self.__image_steps[-1]


class MultipleImageTransform(Process):
"""Multiple Image Processing Class."""

def __init__(self, input_paths, phases, output_paths, children_process=SimpleTransform, args=None):
"""
Process Multiple Images Constructor.

:param input_paths: <string[]> images path list to process
:param output_paths: <string> images path to write the result
:param children_process: <ImageTransform> Process to use on the list of input
:param phases: <ImageTransform[]> list of Class transformation use by the process each image
:param args: <dict> processing settings
"""
super().__init__(args=args)
self._phases = phases
self._input_paths = input_paths
self._output_paths = output_paths
self._process_list = []
self.__multiprocessing = Conf.multiprocessing()
self.__children_process = children_process

def _setup(self):
self._process_list = [self.__children_process(i[0], self._phases, i[1], args=self._args)
for i in zip(self._input_paths, self._output_paths)]

def _execute(self):
"""
Execute all phases on the list of images.

:return: None
"""
def process_one_image(a):
Conf.log.info("Processing Image : {}/{}".format(a[1] + 1, len(self._process_list)))
a[0].run()

if not self.__multiprocessing:
for x in zip(self._process_list, range(len(self._process_list))):
process_one_image(x)
def select_phases(args):
"""
Select the transformation phases to use following args parameters.

:return: <ImageTransform[]> list of image transformation
"""
def shift_step(shift_starting=0, shift_ending=0):
if not args['steps']:
args['steps'] = (0, 5)
args['steps'] = (
args['steps'][0] + shift_starting,
args['steps'][1] + shift_ending
)

def add_tail(phases, phase):
phases = [phase] + phases
if args['steps'] and args['steps'][0] != 0:
shift_step(shift_starting=1)
if args['steps'] and args['steps'][1] == len(phases) - 1:
shift_step(shift_ending=1)
return phases

def add_head(phases, phase):
phases = phases + [phase]
if args['steps'] and args['steps'][1] == len(phases) - 1:
shift_step(shift_ending=1)
return phases

phases = [DressToCorrect, CorrectToMask, MaskToMaskref,
MaskrefToMaskdet, MaskdetToMaskfin, MaskfinToNude]
Conf.log.debug(args)
if args['overlay']:
phases = add_tail(phases, ImageToResized)
phases = add_tail(phases, ImageToCrop)
phases = add_head(phases, ImageToOverlay)
elif args['auto_resize']:
phases = add_tail(phases, ImageToResized)
elif args['auto_resize_crop']:
phases = add_tail(phases, ImageToResizedCrop)
elif args['auto_rescale']:
phases = add_tail(phases, ImageToRescale)
elif os.path.isfile(args['input']):
if not args['ignore_size']:
check_shape(args['input'])
else:
Conf.log.debug("Using Multiprocessing")
pool = ThreadPool(Conf.args['n_cores'])
pool.map(process_one_image, zip(self._process_list, range(len(self._process_list))))
pool.close()
pool.join()
Conf.log.warn('Image Size Requirements Unchecked.')

if args['color_transfer']:
phases = add_head(phases, ColorTransfer)

class FolderImageTransform(MultipleImageTransform):
"""Folder Image Processing Class."""

def __init__(self, input_folder_path, phases, output_folder_path, args=None):
"""
Folder Image Transform Constructor.

:param input_folder_path: <string> path of the folder to process
:param phases: <ImageTransform[]> list of Image Transform to execute
:param output_folder_path: <string> path of the folder where save output
:param args: <dict> processing settings
"""
super().__init__([], phases, [], args=args)
self.__input_folder_path = input_folder_path
self.__output_folder_path = output_folder_path
self.__multiprocessing = Conf.multiprocessing()

def _setup(self):
Conf.log.debug([(r, d, f) for r, d, f in os.walk(self.__input_folder_path)])
self._process_list = [
MultipleImageTransform(
[
x.path for x in os.scandir(r)
if x.is_file() and os.path.splitext(x.path)[1] in cv2_supported_extension() + [".gif"]
],
self._phases,
[
"{}{}{}".format(
os.path.splitext(x.path)[0],
'_out',
os.path.splitext(x.path)[1]
)
if not Conf.args['output'] else
os.path.join(
Conf.args['output'],
pathlib.Path(*pathlib.Path(r).parts[1:]),
os.path.basename(x.path)
)
for x in os.scandir(r)
if x.is_file() and os.path.splitext(x.path)[1] in cv2_supported_extension() + [".gif"]
],
args=self.__get_folder_args(r)
) for r, _, _ in os.walk(self.__input_folder_path)
]

def __get_folder_args(self, folder_path):
def add_folder_altered(args):
if args['altered']:
args['folder_altered'] = os.path.join(args['altered'],
pathlib.Path(*pathlib.Path(folder_path).parts[1:]))
return args

json_path = os.path.join(folder_path, self._args['json_folder_name'])

Conf.log.debug("Json Path Setting Path: {}".format(json_path))
if not os.path.isfile(json_path):
Conf.log.info("No Json File Settings Found In {}. Using Default Configuration. ".format(folder_path))
return add_folder_altered(self._args)
try:
with open(json_path, 'r') as f:
json_data = json.load(f)
except JSONDecodeError:
Conf.log.info("Json File Settings {} Is Not In Valid JSON Format. Using Default Configuration. "
.format(folder_path))
return add_folder_altered(self._args)
try:
from argv import Parser, config_args
a = config_args(Parser.parser, Parser.parser.parse_args(sys.argv[1:]), json_data=json_data)
Conf.log.info("Using {} Configuration for processing {} folder. "
.format(json_path, folder_path))
return add_folder_altered(a)
except SystemExit:
Conf.log.error("Arguments json file {} contains configuration error. "
"Using Default Configuration".format(json_path))
return add_folder_altered(self._args)


class GifTransform(Process):
"""GIF Image Processing Class."""

def __init__(self, input_path, phases, output_path, args=None):
"""
Image Transform GIF Constructor.

:param input_path: <string> gif path to process
:param output_path: <string> image path to write the result
:param phases: <ImageTransform[]> list of Class transformation use by the process each image
"""
super().__init__(args=args)
self.__phases = phases
self.__input_path = input_path
self.__output_path = output_path
self.__tmp_dir = None
self.__temp_input_paths = []
self.__temp_output_paths = []

def _setup(self):
self.__tmp_dir = tempfile.mkdtemp()
Conf.log.debug("Temporay dir is {}".format(self.__tmp_dir))
imgs = imageio.mimread(self.__input_path)
Conf.log.info("GIF have {} Frames To Process".format(len(imgs)))
self.__temp_input_paths = [os.path.join(self.__tmp_dir, "intput_{}.png".format(i))
for i in range(len(imgs))]

self.__temp_output_paths = [os.path.join(self.__tmp_dir, "output_{}.png".format(i))
for i in range(len(imgs))]

for i in zip(imgs, self.__temp_input_paths):
write_image(cv2.cvtColor(i[0], cv2.COLOR_RGB2BGR), i[1])

def _execute(self):
"""
Execute all phases on each frames of the gif and recreate the gif.

:return: None
"""
MultipleImageTransform(self.__temp_input_paths, self.__phases, self.__temp_output_paths, args=self._args).run()

dir_out = os.path.dirname(self.__output_path)
if dir_out != '':
os.makedirs(dir_out, exist_ok=True)
imageio.mimsave(self.__output_path, [imageio.imread(i) for i in self.__temp_output_paths])

Conf.log.info("{} Gif Created ".format(self.__output_path))

def _clean(self):
shutil.rmtree(self.__tmp_dir)
return phases

+ 91
- 0
processing/folder.py View File

@@ -0,0 +1,91 @@
"""Folder Image Transform Processing."""
import copy
import json
import os
import pathlib
import sys
from json import JSONDecodeError

from config import Config as Conf
from processing import select_phases
from processing.multiple import MultipleImageProcessing
from utils import cv2_supported_extension


class FolderImageProcessing(MultipleImageProcessing):
"""Folder Image Processing Class."""

def __init__(self, args=None):
"""
Folder Image Transform Constructor.

:param args: <dict> args parameter to run images transformations (default use Conf.args
"""
super().__init__(args=args)
self.__input_folder_path = self._args['input']
self.__output_folder_path = self._args['output']
self.__multiprocessing = Conf.multiprocessing()

def _setup(self):
Conf.log.debug([(r, d, f) for r, d, f in os.walk(self.__input_folder_path)])

for r, _, _ in os.walk(self.__input_folder_path):
args = copy.deepcopy(self._args)
args['input'] = [
x.path for x in os.scandir(r)
if x.is_file() and os.path.splitext(x.path)[1] in cv2_supported_extension() + [".gif"]
]
args['phases'] = select_phases(self._args)
args['output'] = [
"{}{}{}".format(
os.path.splitext(x.path)[0],
'_out',
os.path.splitext(x.path)[1]
)
if not Conf.args['output'] else
os.path.join(
Conf.args['output'],
pathlib.Path(*pathlib.Path(r).parts[1:]),
os.path.basename(x.path)
)
for x in os.scandir(r)
if x.is_file() and os.path.splitext(x.path)[1] in cv2_supported_extension() + [".gif"]
]

self._process_list.append(
MultipleImageProcessing(
args=self.__get_folder_args(args, r)
)
)

@staticmethod
def __get_folder_args(args, folder_path):
def add_folder_altered(args):
if args['altered']:
args['folder_altered'] = os.path.join(args['altered'],
pathlib.Path(*pathlib.Path(folder_path).parts[1:]))
return args

json_path = os.path.join(folder_path, args['json_folder_name'])

Conf.log.debug("Json Path Setting Path: {}".format(json_path))
if not os.path.isfile(json_path):
Conf.log.info("No Json File Settings Found In {}. Using Default Configuration. ".format(folder_path))
return add_folder_altered(args)
try:
with open(json_path, 'r') as f:
json_data = json.load(f)
except JSONDecodeError:
Conf.log.info("Json File Settings {} Is Not In Valid JSON Format. Using Default Configuration. "
.format(folder_path))
return add_folder_altered(args)
try:
from argv import Parser, config_args
a = config_args(Parser.parser, Parser.parser.parse_args(sys.argv[1:]), json_data=json_data)
Conf.log.info("Using {} Configuration for processing {} folder. "
.format(json_path, folder_path))
return add_folder_altered(a)
except SystemExit:
Conf.log.error("Arguments json file {} contains configuration error. "
"Using Default Configuration".format(json_path))
return add_folder_altered(args)

+ 62
- 0
processing/gif.py View File

@@ -0,0 +1,62 @@
"""GIF Transform Processing."""
import os
import shutil
import tempfile

import cv2
import imageio

from config import Config as Conf
from processing import Processing, select_phases
from processing.multiple_image import MultipleImageProcessing
from utils import write_image


class GifProcessing(Processing):
"""GIF Image Processing Class."""

def __init__(self, args=None):
"""
Image Transform GIF Constructor.

:param args: <dict> args parameter to run images transformations (default use Conf.args)
"""
super().__init__(args=args)
self.__phases = select_phases(self._args)
self.__input_path = args['input_path']
self.__output_path = args['output_path']
self.__tmp_dir = None
self.__temp_input_paths = []
self.__temp_output_paths = []

def _setup(self):
self.__tmp_dir = tempfile.mkdtemp()
Conf.log.debug("Temporay dir is {}".format(self.__tmp_dir))
imgs = imageio.mimread(self.__input_path)
Conf.log.info("GIF have {} Frames To Process".format(len(imgs)))
self.__temp_input_paths = [os.path.join(self.__tmp_dir, "intput_{}.png".format(i))
for i in range(len(imgs))]

self.__temp_output_paths = [os.path.join(self.__tmp_dir, "output_{}.png".format(i))
for i in range(len(imgs))]

for i in zip(imgs, self.__temp_input_paths):
write_image(cv2.cvtColor(i[0], cv2.COLOR_RGB2BGR), i[1])

def _execute(self):
"""
Execute all phases on each frames of the gif and recreate the gif.

:return: None
"""
MultipleImageProcessing(self.__temp_input_paths, self.__phases, self.__temp_output_paths, args=self._args).run()

dir_out = os.path.dirname(self.__output_path)
if dir_out != '':
os.makedirs(dir_out, exist_ok=True)
imageio.mimsave(self.__output_path, [imageio.imread(i) for i in self.__temp_output_paths])

Conf.log.info("{} Gif Created ".format(self.__output_path))

def _clean(self):
shutil.rmtree(self.__tmp_dir)

+ 81
- 0
processing/image.py View File

@@ -0,0 +1,81 @@
"""Image Transform Processing."""
import os
import sys

from config import Config as Conf
from processing import Processing, select_phases
from utils import read_image, camel_case_to_str, write_image


class ImageProcessing(Processing):
"""Image Processing Class."""

def __init__(self, args=None):
"""
Process Image Constructor.

:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
super().__init__(args=args)
self.__phases = select_phases(self._args)
self.__input_path = self._args['input']
self.__output_path = self._args['output']
self.__altered_path = self._args['altered']
self.__starting_step = self._args['steps'][0] if self._args['steps'] else 0
self.__ending_step = self._args['steps'][1] if self._args['steps'] else None

Conf.log.debug("All Phases : {}".format(self.__phases))
Conf.log.debug("To Be Executed Phases : {}".format(self.__phases[self.__starting_step:self.__ending_step]))

path = self.__altered_path if os.path.isfile(self.__input_path) or not self._args.get('folder_altered') \
else os.path.join(self._args['folder_altered'], os.path.basename(self.__output_path))

self.__image_steps = [self.__input_path] + [
os.path.join(path, "{}.png".format(p().__class__.__name__))
for p in self.__phases[:self.__starting_step]
]
Conf.log.debug(self.__image_steps)

def _info_start_run(self):
super()._info_start_run()
Conf.log.info("Processing on {}".format(str(self.__image_steps)[2:-2]))

def _setup(self):
try:
self.__image_steps = [read_image(x) if isinstance(x, str) else x for x in self.__image_steps]
except FileNotFoundError as e:
Conf.log.error(e)
Conf.log.error("{} is not able to resume because it not able to load required images. "
.format(camel_case_to_str(self.__class__.__name__)))
Conf.log.error("Possible source of this error is that --altered argument is not a correct "
"directory path that contains valid images.")
sys.exit(1)

def _execute(self):
"""
Execute all phases on the image.

:return: None
"""
for p in (x(args=self._args) for x in self.__phases[self.__starting_step:self.__ending_step]):
r = p.run(*[self.__image_steps[i] for i in p.input_index])
self.__image_steps.append(r)

if self.__altered_path:
path = self.__altered_path \
if os.path.isfile(self._args['input']) or not self._args.get('folder_altered') \
else os.path.join(self._args['folder_altered'], os.path.basename(self.__output_path))

write_image(r, os.path.join(path, "{}.png".format(p.__class__.__name__)))

Conf.log.debug("{} Step Image Of {} Execution".format(
os.path.join(path, "{}.png".format(p.__class__.__name__)),
camel_case_to_str(p.__class__.__name__),
))

write_image(self.__image_steps[-1], self.__output_path)
Conf.log.info("{} Created".format(self.__output_path))
Conf.log.debug("{} Result Image Of {} Execution"
.format(self.__output_path, camel_case_to_str(self.__class__.__name__)))

return self.__image_steps[-1]

+ 56
- 0
processing/multiple.py View File

@@ -0,0 +1,56 @@
"""Multiple Image Transform Processing."""
import copy
from multiprocessing.pool import ThreadPool

from config import Config as Conf
from processing import Processing, SimpleProcessing
from utils import camel_case_to_str


class MultipleImageProcessing(Processing):
"""Multiple Image Processing Class."""

def __init__(self, args=None, children_process=SimpleProcessing):
"""
Process Multiple Images Constructor.

:param children_process: <ImageTransform> Process to use on the list of input
:param args: args: <dict> args parameter to run images transformations (default use Conf.args)
"""
super().__init__(args=args)
self._input_paths = self._args['input']
self._output_paths = self._args['output']
self._process_list = []
self.__multiprocessing = Conf.multiprocessing()
self.__children_process = children_process

def _setup(self):
self._process_list = []

for input_path, output_path in zip(self._input_paths, self._output_paths):
args = copy.deepcopy(self._args)
args['input'] = input_path
args['output'] = output_path
self._process_list.append(self.__children_process(args=args))

def _execute(self):
"""
Execute all phases on the list of images.

:return: None
"""
def process_one_image(a):
Conf.log.info("{} : {}/{}".format(
camel_case_to_str(self.__class__.__name__), a[1] + 1, len(self._process_list)
))
a[0].run()

if not self.__multiprocessing:
for x in zip(self._process_list, range(len(self._process_list))):
process_one_image(x)
else:
Conf.log.debug("Using Multiprocessing")
pool = ThreadPool(Conf.args['n_cores'])
pool.map(process_one_image, zip(self._process_list, range(len(self._process_list))))
pool.close()
pool.join()

Loading…
Cancel
Save