Browse Source

Remove --gif option & enable gif support for --folder

* gif file are no autodetect
* enable the gif support for --folder #30
tags/v1.2.10
PommeDroid 2 years ago
parent
commit
8f7c67eb1e
6 changed files with 227 additions and 257 deletions
  1. 1
    7
      argv.py
  2. 7
    35
      main.py
  3. 217
    3
      processing/__init__.py
  4. 0
    58
      processing/gif.py
  5. 0
    152
      processing/image.py
  6. 2
    2
      utils.py

+ 1
- 7
argv.py View File

@@ -50,10 +50,8 @@ def config_args(parser, args):
parser.error("Input {} file doesn't exist.".format(args.input))
elif args.folder and not os.path.isdir(args.input):
parser.error("Input {} directory doesn't exist.".format(args.input))
elif not args.folder and not args.gif and os.path.splitext(args.input)[1] not in cv2_supported_extension():
elif not args.folder and os.path.splitext(args.input)[1] not in cv2_supported_extension() + [".gif"]:
parser.error("Input {} file not supported format.".format(args.input))
elif not args.folder and args.gif and os.path.splitext(args.input)[1] != ".gif":
parser.error("Input {} file if not a gif.".format(args.input))

def config_args_out():
if not args.folder and not args.output:
@@ -105,7 +103,6 @@ def run():
action="store_true",
help="Folder mode processing. "
"" # TODO Json config by dir
"Gif not supported atm", # TODO Gif Support
)
processing_mod = parser.add_mutually_exclusive_group()
processing_mod.add_argument(
@@ -152,9 +149,6 @@ def run():
default=0,
help="Pubic hair size scalar best results set to 0 to disable",
)
parser.add_argument(
"--gif", action="store_true", default=False, help="run the processing on a gif"
)
parser.add_argument(
"-n", "--n_runs", type=int, default=1, help="number of times to process input (default: 1)",
)

+ 7
- 35
main.py View File

@@ -8,8 +8,7 @@ import argv
from config import Config as conf
from utils import setup_log, read_image, check_shape

from processing.gif import SimpleGIFTransform
from processing.image import SimpleImageTransform, MultipleImageTransform, FolderImageTransform
from processing import SimpleTransform, FolderImageTransform, MultipleImageTransform
from transform.gan.mask import CorrectToMask, MaskrefToMaskdet, MaskfinToNude
from transform.opencv.resize import ImageToCrop, ImageToOverlay, ImageToRescale, ImageToResized, ImageToResizedCrop
from transform.opencv.correct import DressToCorrect
@@ -89,11 +88,7 @@ def select_processing():
"""
phases = select_phases()
if conf.args['folder']:
process = processing_image_folder(phases)
elif conf.args['gif'] and conf.args['n_runs'] != 1:
process = multiple_gif_processing(phases, conf.args['n_runs'])
elif conf.args['gif']:
process = simple_gif_processing(phases)
process = processing_image_folder(phases)
elif conf.args['n_runs'] != 1:
process = multiple_image_processing(phases, conf.args['n_runs'])
else:
@@ -102,38 +97,13 @@ def select_processing():
return process


def simple_gif_processing(phases):
"""
Define a simple gif process ready to run
:param phases: <ImageTransform[]> list of image transformation
:return: <SimpleGIFTransform> a gif process run ready
"""
return SimpleGIFTransform(conf.args['input'], phases, conf.args['output'])


def multiple_gif_processing(phases, n):
"""
Define a multiple gif process ready to run
:param phases: <ImageTransform[]> list of image transformation
:param n: number of times to process
:return: <MultipleTransform> a multiple gif process run ready
"""
filename, extension = os.path.splitext(conf.args['output'])
return MultipleImageTransform(
[conf.args['input'] for _ in range(n)],
phases,
["{}{}{}".format(filename, i, extension) for i in range(n)],
SimpleGIFTransform
)


def simple_image_processing(phases):
"""
Define a simple image process ready to run
:param phases: <ImageTransform[]> list of image transformation
:return: <SimpleImageTransform> a image process run ready
:return: <SimpleTransform> a image process run ready
"""
return SimpleImageTransform(conf.args['input'], phases, conf.args['output'])
return SimpleTransform(conf.args['input'], phases, conf.args['output'])


def multiple_image_processing(phases, n):
@@ -150,14 +120,16 @@ def multiple_image_processing(phases, n):
["{}{}{}".format(filename, i, extension) for i in range(n)]
)


def processing_image_folder(phases):
"""
Define a folder image process ready to run
:param phases: <ImageTransform[]> list of image transformation
:return: <SimpleImageTransform> a image process run ready
:return: <FolderImageTransform> a image process run ready
"""
return FolderImageTransform(conf.args['input'], phases, conf.args['output'])


if __name__ == "__main__":
freeze_support()
# start_rook()

+ 217
- 3
processing/__init__.py View File

@@ -1,14 +1,21 @@
import os
import shutil
import sys
import tempfile
import time
from multiprocessing.pool import ThreadPool

import cv2
import imageio

from config import Config as conf
from utils import camel_case_to_str
from utils import camel_case_to_str, cv2_supported_extension, read_image, write_image


class Process:
"""
Abstract Process Class
"""

def __init__(self):
self.__start = time.time()

@@ -38,4 +45,211 @@ class Process:
pass

def __str__(self):
return str(self.__class__.__name__)
return str(self.__class__.__name__)


class SimpleTransform(Process):
def __init__(self, *args):
super().__init__()

def __new__(cls, input_path, phases, output_path):
if os.path.splitext(input_path)[1] == ".gif":
return GifTransform(input_path, phases, output_path)
elif os.path.splitext(input_path)[1] in cv2_supported_extension():
return SimpleImageTransform(input_path, phases, output_path)
else:
return None


class SimpleImageTransform(Process):
"""
Simple Image Processing Class
"""

def __init__(self, input_path, phases, output_path):
"""
ProcessImage Constructor
:param input_path: <string> original image path to process
:param output_path: <string> image path to write the result.
:param phases: <ImageTransform[]> list of transformation each image
"""
super().__init__()
self.__phases = phases
self.__output_path = output_path
self.__altered_path = conf.args['altered']
self.__starting_step = conf.args['steps'][0] if conf.args['steps'] else 0
self.__ending_step = conf.args['steps'][1] if conf.args['steps'] else None

conf.log.debug("All Phases : {}".format(self.__phases))
conf.log.debug("To Be Executed Phases : {}".format(self.__phases[self.__starting_step:self.__ending_step]))

self.__image_steps = [input_path] + [
os.path.join(self.__altered_path, "{}.png".format(p.__class__.__name__))
for p in self.__phases[:self.__starting_step]
]

def info_start_run(self):
super().info_start_run()
conf.log.info("Processing on {}".format(str(self.__image_steps)[2:-2]))

def setup(self):
try:
self.__image_steps = [read_image(x) if isinstance(x, str) else x for x in self.__image_steps]
except FileNotFoundError as e:
conf.log.error(e)
conf.log.error("{} is not able to resume because it not able to load required images. "
.format(camel_case_to_str(self.__class__.__name__)))
conf.log.error("Possible source of this error is that --altered argument is not a correct "
"directory path that contains valid images.")
sys.exit(1)

def execute(self):
"""
Execute all phases on the image
:return: None
"""
for p in self.__phases[len(self.__image_steps) - 1:]:
r = p.run(*[self.__image_steps[i] for i in p.input_index])
self.__image_steps.append(r)

if self.__altered_path:
write_image(r, os.path.join(self.__altered_path, "{}.png".format(p.__class__.__name__)))
conf.log.debug("Writing {}, Result of the Execution of {}"
.format(
os.path.join(self.__altered_path, "{}.png".format(p.__class__.__name__)),
camel_case_to_str(p.__class__.__name__),
))

write_image(self.__image_steps[-1], self.__output_path)
conf.log.info("{} Created".format(self.__output_path))
conf.log.debug("{} Result Image Of {} Execution"
.format(self.__output_path, camel_case_to_str(self.__class__.__name__)))

return self.__image_steps[-1]


class MultipleImageTransform(Process):
"""
Multiple Image Processing Class
"""

def __init__(self, input_paths, phases, output_paths, children_process=SimpleTransform):
"""
ProcessMultipleImages Constructor
:param input_paths: <string[]> images path list to process
:param output_paths: <string> images path to write the result
:param children_process: <ImageTransform> Process to use on the list of input
:param phases: <ImageTransform[]> list of transformation use by the process each image
"""
super().__init__()
self._phases = phases
self._input_paths = input_paths
self._output_paths = output_paths
self._process_list = []
self.__multiprocessing = conf.multiprocessing()
self.__children_process = children_process

def setup(self):
# TODO detect GIF or JPEG
self._process_list = [self.__children_process(i[0], self._phases, i[1])
for i in zip(self._input_paths, self._output_paths)]

def execute(self):
"""
Execute all phases on the list of images
:return: None
"""

def process_one_image(a):
conf.log.info("Processing image : {}/{}".format(a[1] + 1, len(self._process_list)))
a[0].run()

if not self.__multiprocessing:
for x in zip(self._process_list, range(len(self._process_list))):
process_one_image(x)
else:
conf.log.debug("Using Multiprocessing")
pool = ThreadPool(conf.args['n_cores'])
pool.map(process_one_image, zip(self._process_list, range(len(self._process_list))))
pool.close()
pool.join()


class FolderImageTransform(MultipleImageTransform):
"""
Folder Image Processing Class
"""

def __init__(self, input_folder_path, phases, output_folder_path):
"""
FolderImageTransform Constructor
"""
super().__init__([], phases, [])
self.__input_folder_path = input_folder_path
self.__output_folder_path = output_folder_path
self.__multiprocessing = conf.multiprocessing()

def setup(self):
conf.log.debug([(r, d, f) for r, d, f in os.walk(self.__input_folder_path)])
self._process_list = [
MultipleImageTransform(
[
x.path for x in os.scandir(os.path.join(r))
if x.is_file() and os.path.splitext(x.path)[1] in cv2_supported_extension() + [".gif"]
],
self._phases,
[
"{}{}{}".format(os.path.splitext(x.path)[0], '_out', os.path.splitext(x.path)[1])
if not conf.args['output'] else os.path.join(conf.args['output'], r, os.path.basename(x.path))
for x in os.scandir(os.path.join(r))
if x.is_file() and os.path.splitext(x.path)[1] in cv2_supported_extension() + [".gif"]
]
) for r, _, _ in os.walk(self.__input_folder_path)
]


class GifTransform(Process):
"""
GIF Image Processing Class
"""

def __init__(self, input_path, phases, output_path):
"""
ImageTransformGIF Constructor
:param images: <string> gif path to process
:param output_path: <string> image path to write the result
:param phases: <ImageTransform[]> list of transformation use by the process each image
"""
super().__init__()
self.__phases = phases
self.__input_path = input_path
self.__output_path = output_path
self.__tmp_dir = None
self.__temp_input_paths = []
self.__temp_output_paths = []

def setup(self):
self.__tmp_dir = tempfile.mkdtemp()
conf.log.debug("Temporay dir is {}".format(self.__tmp_dir))
imgs = imageio.mimread(self.__input_path)
conf.log.info("GIF have {} Frames To Process".format(len(imgs)))
self.__temp_input_paths = [os.path.join(self.__tmp_dir, "intput_{}.png".format(i))
for i in range(len(imgs))]

self.__temp_output_paths = [os.path.join(self.__tmp_dir, "output_{}.png".format(i))
for i in range(len(imgs))]

[write_image(cv2.cvtColor(i[0], cv2.COLOR_RGB2BGR), i[1]) for i in zip(imgs, self.__temp_input_paths)]

def execute(self):
"""
Execute all phases on each frames of the gif and recreate the gif
:return: None
"""
MultipleImageTransform(self.__temp_input_paths, self.__phases, self.__temp_output_paths).run()

imageio.mimsave(self.__output_path, [imageio.imread(i) for i in self.__temp_output_paths])
conf.log.info("{} Gif Created ".format(self.__output_path))

def clean(self):
shutil.rmtree(self.__tmp_dir)

+ 0
- 58
processing/gif.py View File

@@ -1,58 +0,0 @@
import os
import shutil
import tempfile

import cv2
import imageio

from config import Config as conf
from processing import Process
from processing.image import MultipleImageTransform
from utils import write_image


class SimpleGIFTransform(Process):
"""
GIF Image Processing Class
"""

def __init__(self, input_path, phases, output_path):
"""
ImageTransformGIF Constructor
:param images: <string> gif path to process
:param output_path: <string> image path to write the result
:param phases: <ImageTransform[]> list of transformation use by the process each image
"""
super().__init__()
self.__phases = phases
self.__input_path = input_path
self.__output_path = output_path
self.__tmp_dir = None
self.__temp_input_paths = []
self.__temp_output_paths = []

def setup(self):
self.__tmp_dir = tempfile.mkdtemp()
conf.log.debug("Temporay dir is {}".format(self.__tmp_dir))
imgs = imageio.mimread(self.__input_path)
conf.log.info("GIF have {} Frames To Process".format(len(imgs)))
self.__temp_input_paths = [os.path.join(self.__tmp_dir, "intput_{}.png".format(i))
for i in range(len(imgs))]

self.__temp_output_paths = [os.path.join(self.__tmp_dir, "output_{}.png".format(i))
for i in range(len(imgs))]

[write_image(cv2.cvtColor(i[0], cv2.COLOR_RGB2BGR), i[1]) for i in zip(imgs, self.__temp_input_paths)]

def execute(self):
"""
Execute all phases on each frames of the gif and recreate the gif
:return: None
"""
MultipleImageTransform(self.__temp_input_paths, self.__phases, self.__temp_output_paths).run()

imageio.mimsave(self.__output_path, [imageio.imread(i) for i in self.__temp_output_paths])
conf.log.info("{} Gif Created ".format(self.__output_path))

def clean(self):
shutil.rmtree(self.__tmp_dir)

+ 0
- 152
processing/image.py View File

@@ -1,152 +0,0 @@
import os
import sys
from multiprocessing.pool import ThreadPool

from config import Config as conf
from processing import Process
from utils import read_image, write_image, camel_case_to_str, cv2_supported_extension


class SimpleImageTransform(Process):
"""
Simple Image Processing Class
"""

def __init__(self, input_path, phases, output_path):
"""
ProcessImage Constructor
:param input_path: <string> original image path to process
:param output_path: <string> image path to write the result.
:param phases: <ImageTransform[]> list of transformation each image
"""
super().__init__()
self.__phases = phases
self.__output_path = output_path
self.__altered_path = conf.args['altered']
self.__starting_step = conf.args['steps'][0] if conf.args['steps'] else 0
self.__ending_step = conf.args['steps'][1] if conf.args['steps'] else None

conf.log.debug("All Phases : {}".format(self.__phases))
conf.log.debug("To Be Executed Phases : {}".format(self.__phases[self.__starting_step:self.__ending_step]))

self.__image_steps = [input_path] + [
os.path.join(self.__altered_path, "{}.png".format(p.__class__.__name__))
for p in self.__phases[:self.__starting_step]
]

def info_start_run(self):
super().info_start_run()
conf.log.info("Processing on {}".format(str(self.__image_steps)[2:-2]))

def setup(self):
try:
self.__image_steps = [read_image(x) if isinstance(x, str) else x for x in self.__image_steps]
except FileNotFoundError as e:
conf.log.error(e)
conf.log.error("{} is not able to resume because it not able to load required images. "
.format(camel_case_to_str(self.__class__.__name__)))
conf.log.error("Possible source of this error is that --altered argument is not a correct "
"directory path that contains valid images.")
sys.exit(1)

def execute(self):
"""
Execute all phases on the image
:return: None
"""
for p in self.__phases[len(self.__image_steps) - 1:]:
r = p.run(*[self.__image_steps[i] for i in p.input_index])
self.__image_steps.append(r)

if self.__altered_path:
write_image(r, os.path.join(self.__altered_path, "{}.png".format(p.__class__.__name__)))
conf.log.debug("Writing {}, Result of the Execution of {}"
.format(
os.path.join(self.__altered_path, "{}.png".format(p.__class__.__name__)),
camel_case_to_str(p.__class__.__name__),
))

write_image(self.__image_steps[-1], self.__output_path)
conf.log.info("{} Created".format(self.__output_path))
conf.log.debug("{} Result Image Of {} Execution"
.format(self.__output_path, camel_case_to_str(self.__class__.__name__)))

return self.__image_steps[-1]


class MultipleImageTransform(Process):
"""
Multiple Image Processing Class
"""

def __init__(self, input_paths, phases, output_paths, children_process=SimpleImageTransform):
"""
ProcessMultipleImages Constructor
:param input_paths: <string[]> images path list to process
:param output_paths: <string> images path to write the result
:param children_process: <ImageTransform> Process to use on the list of input
:param phases: <ImageTransform[]> list of transformation use by the process each image
"""
super().__init__()
self._phases = phases
self._input_paths = input_paths
self._output_paths = output_paths
self._process_list = []
self.__multiprocessing = conf.multiprocessing()
self.__children_process = children_process

def setup(self):
self._process_list = [self.__children_process(i[0], self._phases, i[1])
for i in zip(self._input_paths, self._output_paths)]

def execute(self):
"""
Execute all phases on the list of images
:return: None
"""

def process_one_image(a):
conf.log.info("Processing image : {}/{}".format(a[1] + 1, len(self._process_list)))
a[0].run()

if not self.__multiprocessing:
for x in zip(self._process_list, range(len(self._process_list))):
process_one_image(x)
else:
conf.log.debug("Using Multiprocessing")
pool = ThreadPool(conf.args['n_cores'])
pool.map(process_one_image, zip(self._process_list, range(len(self._process_list))))
pool.close()
pool.join()


class FolderImageTransform(MultipleImageTransform):
"""
Folder Image Processing Class
"""

def __init__(self, input_folder_path, phases, output_folder_path):
"""
FolderImageTransform Constructor
"""
super().__init__([], phases, [])
self.__input_folder_path = input_folder_path
self.__output_folder_path = output_folder_path
self.__multiprocessing = conf.multiprocessing()

def setup(self):
conf.log.debug([(r, d, f) for r, d, f in os.walk(self.__input_folder_path)])
self._process_list = [
MultipleImageTransform(
[
x.path for x in os.scandir(os.path.join(r))
if x.is_file() and os.path.splitext(x.path)[1] in cv2_supported_extension()],
self._phases,
[
"{}{}{}".format(os.path.splitext(x.path)[0], '_out', os.path.splitext(x.path)[1])
if not conf.args['output'] else os.path.join(conf.args['output'], r, os.path.basename(x.path))
for x in os.scandir(os.path.join(r))
if x.is_file() and os.path.splitext(x.path)[1] in cv2_supported_extension()
]
) for r, _, _ in os.walk(self.__input_folder_path)
]

+ 2
- 2
utils.py View File

@@ -112,5 +112,5 @@ def camel_case_to_str(identifier):


def cv2_supported_extension():
return ".bmp", ".dib", ".jpeg", ".jpg", ".jpe", ".jp2", ".png", \
".pbm", ".pgm", "ppm", ".sr", ".ras", ".tiff", ".tif"
return [".bmp", ".dib", ".jpeg", ".jpg", ".jpe", ".jp2", ".png",
".pbm", ".pgm", "ppm", ".sr", ".ras", ".tiff", ".tif"]

Loading…
Cancel
Save