Browse Source

Merge pull request #42 from PommeDroid/clean_code

Clean Code
master
deeppppp 3 years ago committed by GitHub
parent
commit
b6820f0cf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      .gitignore
  2. 9
      argv/__init__.py
  3. 8
      argv/checkpoints.py
  4. 5
      argv/common.py
  5. 2
      argv/daemon.py
  6. 53
      checkpoints.py
  7. 21
      config.py
  8. 48
      daemon.py
  9. 25
      gpu_info.py
  10. 92
      main.py
  11. 169
      processing/__init__.py
  12. 10
      scripts/build.py
  13. 12
      scripts/setup.py
  14. 69
      transform/__init__.py
  15. 405
      transform/gan/__init__.py
  16. 245
      transform/gan/generator.py
  17. 36
      transform/gan/mask.py
  18. 228
      transform/gan/model.py
  19. 27
      transform/opencv/__init__.py
  20. 62
      transform/opencv/correct.py
  21. 201
      transform/opencv/mask.py
  22. 112
      transform/opencv/resize.py
  23. 23
      transform/opencv/watermark.py
  24. 71
      utils.py

5
.gitignore vendored

@ -60,3 +60,8 @@ venv.bak/ @@ -60,3 +60,8 @@ venv.bak/
# IDE
.idea
# flake
.flake8
requirements_flake8.txt

9
argv/__init__.py

@ -2,7 +2,8 @@ import argparse @@ -2,7 +2,8 @@ import argparse
import copy
import logging
import sys
from config import Config as conf
from config import Config as Conf
from argv.checkpoints import init_checkpoints_sub_parser, check_args_checkpoints_parser, set_args_checkpoints_parser
from argv.common import arg_help, arg_debug, arg_version
from argv.daemon import init_daemon_sub_parser
@ -24,12 +25,12 @@ def run(): @@ -24,12 +25,12 @@ def run():
args = Parser.parser.parse_args()
conf.log = setup_log(logging.DEBUG) if args.debug else setup_log()
Conf.log = setup_log(logging.DEBUG) if args.debug else setup_log()
args = config_args(Parser.parser, args)
conf.log.debug(args)
Conf.log.debug(args)
conf.args = vars(args)
Conf.args = vars(args)
args.func(args)

8
argv/checkpoints.py

@ -2,7 +2,7 @@ import os @@ -2,7 +2,7 @@ import os
import sys
import checkpoints
from config import Config as conf
from config import Config as Conf
from argv.common import arg_help, arg_debug
@ -44,7 +44,7 @@ def check_args_checkpoints_parser(parser, args): @@ -44,7 +44,7 @@ def check_args_checkpoints_parser(parser, args):
def check_arg_checkpoints(parser, args):
conf.log.debug(args.checkpoints)
Conf.log.debug(args.checkpoints)
for _, v in args.checkpoints.items():
if not os.path.isfile(v):
parser.error(
@ -54,7 +54,7 @@ def check_arg_checkpoints(parser, args): @@ -54,7 +54,7 @@ def check_arg_checkpoints(parser, args):
def set_arg_checkpoints(args):
conf.log.debug(args.checkpoints)
Conf.log.debug(args.checkpoints)
args.checkpoints = {
'correct_to_mask': os.path.join(str(args.checkpoints), "cm.lib"),
'maskref_to_maskdet': os.path.join(str(args.checkpoints), "mm.lib"),
@ -75,5 +75,5 @@ def arg_version(parser): @@ -75,5 +75,5 @@ def arg_version(parser):
parser.add_argument(
"-v",
"--version",
action='version', version='checkpoints {}'.format(conf.checkpoints_version)
action='version', version='checkpoints {}'.format(Conf.checkpoints_version)
)

5
argv/common.py

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
from config import Config as conf
from config import Config as Conf
def arg_debug(parser):
@ -23,6 +23,5 @@ def arg_version(parser): @@ -23,6 +23,5 @@ def arg_version(parser):
parser.add_argument(
"-v",
"--version",
action='version', version='%(prog)s {}'.format(conf.version)
action='version', version='%(prog)s {}'.format(Conf.version)
)

2
argv/daemon.py

@ -11,8 +11,6 @@ def init_daemon_sub_parser(subparsers): @@ -11,8 +11,6 @@ def init_daemon_sub_parser(subparsers):
)
daemon_parser.set_defaults(func=daemon.main)
# add daemon arguments
arg_help(daemon_parser)
arg_debug(daemon_parser)

53
checkpoints.py

@ -1,46 +1,59 @@ @@ -1,46 +1,59 @@
"""checkpoints logic."""
import logging
import os
import shutil
import sys
import tempfile
from config import Config as conf
from config import Config as Conf
from utils import setup_log, dl_file, unzip
def main(_):
if sum([1 for x in ["cm.lib", "mm.lib", "mn.lib"] if os.path.isfile(os.path.join(conf.args['checkpoints'], x))]):
conf.log.info("Checkpoints Found In {}".format(conf.args['checkpoints']))
"""
Start checkpoints main logic.
:param _: None
:return: None
"""
if sum([1 for x in ["cm.lib", "mm.lib", "mn.lib"] if os.path.isfile(os.path.join(Conf.args['checkpoints'], x))]):
Conf.log.info("Checkpoints Found In {}".format(Conf.args['checkpoints']))
else:
conf.log.warn("Checkpoints Not Found In {}".format(conf.args['checkpoints']))
conf.log.info("You Can Download Them Using : {} checkpoints download".format(sys.argv[0]))
Conf.log.warn("Checkpoints Not Found In {}".format(Conf.args['checkpoints']))
Conf.log.info("You Can Download Them Using : {} checkpoints download".format(sys.argv[0]))
def download(_):
conf.log = setup_log(logging.DEBUG) if conf.args['debug'] else setup_log()
"""
Start checkpoints download logic.
:param _: None
:return: None
"""
Conf.log = setup_log(logging.DEBUG) if Conf.args['debug'] else setup_log()
tempdir = tempfile.mkdtemp()
cdn_url = conf.checkpoints_cdn.format(conf.checkpoints_version)
temp_zip = os.path.join(tempdir, "{}.zip".format(conf.checkpoints_version))
cdn_url = Conf.checkpoints_cdn.format(Conf.checkpoints_version)
temp_zip = os.path.join(tempdir, "{}.zip".format(Conf.checkpoints_version))
try:
conf.log.info("Downloading {}".format(cdn_url))
dl_file(conf.checkpoints_cdn.format(conf.checkpoints_version), temp_zip)
Conf.log.info("Downloading {}".format(cdn_url))
dl_file(Conf.checkpoints_cdn.format(Conf.checkpoints_version), temp_zip)
conf.log.info("Extracting {}".format(temp_zip))
unzip(temp_zip, conf.args['checkpoints'])
Conf.log.info("Extracting {}".format(temp_zip))
unzip(temp_zip, Conf.args['checkpoints'])
conf.log.info("Moving Checkpoints To Final Location")
Conf.log.info("Moving Checkpoints To Final Location")
for c in ("cm.lib", "mm.lib", "mn.lib"):
if os.path.isfile(os.path.join(conf.args['checkpoints'], c)):
os.remove(os.path.join(conf.args['checkpoints'], c))
shutil.move(os.path.join(conf.args['checkpoints'], 'checkpoints', c), conf.args['checkpoints'])
shutil.rmtree(os.path.join(conf.args['checkpoints'], 'checkpoints'))
if os.path.isfile(os.path.join(Conf.args['checkpoints'], c)):
os.remove(os.path.join(Conf.args['checkpoints'], c))
shutil.move(os.path.join(Conf.args['checkpoints'], 'checkpoints', c), Conf.args['checkpoints'])
shutil.rmtree(os.path.join(Conf.args['checkpoints'], 'checkpoints'))
except Exception as e:
conf.log.error(e)
conf.log.error("Something Gone Bad Download Downloading The Checkpoints")
Conf.log.error(e)
Conf.log.error("Something Gone Bad Download Downloading The Checkpoints")
shutil.rmtree(tempdir)
sys.exit(1)
shutil.rmtree(tempdir)
conf.log.info("Checkpoints Downloaded Successfully")
Conf.log.info("Checkpoints Downloaded Successfully")

21
config.py

@ -1,7 +1,9 @@ @@ -1,7 +1,9 @@
"""Configuration."""
class Config:
"""
Variables Configuration Class
"""
"""Variables Configuration Class."""
version = "v1.1.0"
checkpoints_version = "v0.0.1"
checkpoints_cdn = "https://cdn.dreamnet.tech/releases/checkpoints/{}.zip"
@ -12,14 +14,14 @@ class Config: @@ -12,14 +14,14 @@ class Config:
data_type = 32 # Supported data type i.e. 8, 16, 32 bit
# input/output sizes
batchSize = 1 # input batch size
batch_size = 1 # input batch size
input_nc = 3 # of input image channels
output_nc = 3 # of output image channels
# for setting inputs
# if true, takes images in order to make batches, otherwise takes them randomly
serial_batches = True
nThreads = (
n_threads = (
0
) # threads for loading data. Keep this value at 0! see: https://github.com/pytorch/pytorch/issues/12831
# Maximum number of samples allowed per dataset. If the dataset directory contains more than max_dataset_size,
@ -27,9 +29,9 @@ class Config: @@ -27,9 +29,9 @@ class Config:
max_dataset_size = 1
# for generator
netG = "global" # selects model to use for netG
net_g = "global" # selects model to use for net_g
ngf = 64 # of gen filters in first conv layer
n_downsample_global = 4 # number of downsampling layers in netG
n_downsample_global = 4 # number of downsampling layers in net_g
n_blocks_global = (
9
) # number of residual blocks in the global generator network
@ -53,4 +55,9 @@ class Config: @@ -53,4 +55,9 @@ class Config:
# Multiprocessing
@staticmethod
def multiprocessing():
"""
Return multiprocessing status.
:return: <boolean> True is multiprocessing can be use
"""
return Config.args['gpu_ids'] is None and Config.args['n_cores'] > 1

48
daemon.py

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
"""daemon logic."""
import os
import sys
import time
@ -5,27 +6,40 @@ import time @@ -5,27 +6,40 @@ import time
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from config import Config as conf
from config import Config as Conf
from transform.gan.mask import CorrectToMask, MaskrefToMaskdet, MaskfinToNude
from transform.opencv.correct import DressToCorrect
from transform.opencv.mask import MaskToMaskref, MaskdetToMaskfin
class Watcher:
"""Watch a directory change."""
def __init__(self, watching_dir, out_dir):
"""
Watcher constructor.
:param watching_dir: <string> directory to watch
:param out_dir: <string> directory where save transformations
"""
self.__observer = Observer()
self.__watching_dir = watching_dir
self.__out_dir = out_dir
if not os.path.isdir(self.__watching_dir):
conf.log.error("{} Watching Dir Doesn't Exit.".format(self.__watching_dir))
Conf.log.error("{} Watching Dir Doesn't Exit.".format(self.__watching_dir))
sys.exit(0)
if not os.path.isdir(self.__out_dir):
conf.log.error("{} Output Dir Doesn't Exit.".format(self.__watching_dir))
Conf.log.error("{} Output Dir Doesn't Exit.".format(self.__watching_dir))
sys.exit(0)
def run(self):
"""
Run the Watcher.
:return: None
"""
event_handler = Handler(self.__out_dir)
self.__observer.schedule(event_handler, self.__watching_dir, recursive=True)
self.__observer.start()
@ -36,25 +50,43 @@ class Watcher: @@ -36,25 +50,43 @@ class Watcher:
self.__observer.stop()
except Exception as e:
self.__observer.stop()
conf.log.error(e)
conf.log.error("An Unhandled Error Occurred.")
Conf.log.error(e)
Conf.log.error("An Unhandled Error Occurred.")
sys.exit(1)
self.__observer.join()
class Handler(FileSystemEventHandler):
"""Handle a change in a watch directory."""
def __init__(self, out_dir):
"""
Create an Handler.
:param out_dir: <string> directory where save transformations
"""
self.__out_dir = out_dir
self.__phases = [
DressToCorrect, CorrectToMask, MaskToMaskref, MaskrefToMaskdet, MaskdetToMaskfin, MaskfinToNude
]
def on_created(self, event):
"""
Call when a file or directory is created.
:param event: <DirCreatedEvent|FileCreatedEvent> trigger event
:return: None
"""
if event.is_directory:
conf.log.debug("Received directory created event {}.".format(event.src_path))
Conf.log.debug("Received directory created event {}.".format(event.src_path))
# TODO Implements this
def main():
Watcher("test_dir", "out_dir").run()
def main(_):
"""
Start daemon main logic.
:param _: None
:return: None
"""
Watcher("test_dir", "out_dir").run()

25
gpu_info.py

@ -1,12 +1,17 @@ @@ -1,12 +1,17 @@
import logging
"""gpu-info logic."""
import json as j
from torch import cuda
import json as j
from config import Config as conf
from utils import setup_log
from config import Config as Conf
def get_info():
"""
Get gpu info.
:return: <dict> gpu info
"""
return {
"has_cuda": cuda.is_available(),
"devices": [] if not cuda.is_available() else [cuda.get_device_name(i) for i in range(cuda.device_count())],
@ -14,10 +19,16 @@ def get_info(): @@ -14,10 +19,16 @@ def get_info():
def main(_):
"""
Start gpu info main logic.
:param _: None
:return: None
"""
info = get_info()
if not conf.args['json']:
conf.log.info("Has Cuda: {}".format(info["has_cuda"]))
if not Conf.args['json']:
Conf.log.info("Has Cuda: {}".format(info["has_cuda"]))
for (i, device) in enumerate(info["devices"]):
conf.log.info("GPU {}: {}".format(i, device))
Conf.log.info("GPU {}: {}".format(i, device))
else:
print(j.dumps(info))

92
main.py

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
"""main logic."""
import os
import sys
import time
@ -6,9 +7,8 @@ from multiprocessing import freeze_support @@ -6,9 +7,8 @@ from multiprocessing import freeze_support
import colorama
import argv
from config import Config as conf
from config import Config as Conf
from utils import check_shape
from processing import SimpleTransform, FolderImageTransform, MultipleImageTransform
from transform.gan.mask import CorrectToMask, MaskrefToMaskdet, MaskfinToNude
from transform.opencv.resize import ImageToCrop, ImageToOverlay, ImageToRescale, ImageToResized, ImageToResizedCrop
@ -17,20 +17,18 @@ from transform.opencv.mask import MaskToMaskref, MaskdetToMaskfin @@ -17,20 +17,18 @@ from transform.opencv.mask import MaskToMaskref, MaskdetToMaskfin
def main(_):
"""
Main logic entry point
"""
conf.log.info("Welcome to DreamPower")
"""Start main logic."""
Conf.log.info("Welcome to DreamPower")
if conf.args['gpu_ids']:
conf.log.info("GAN Processing Will Use GPU IDs: {}".format(conf.args['gpu_ids']))
if Conf.args['gpu_ids']:
Conf.log.info("GAN Processing Will Use GPU IDs: {}".format(Conf.args['gpu_ids']))
else:
conf.log.info("GAN Processing Will Use CPU")
Conf.log.info("GAN Processing Will Use CPU")
# Processing
start = time.time()
select_processing().run()
conf.log.info("Done! We have taken {} seconds".format(round(time.time() - start, 2)))
Conf.log.info("Done! We have taken {} seconds".format(round(time.time() - start, 2)))
# Exit
sys.exit()
@ -38,52 +36,52 @@ def main(_): @@ -38,52 +36,52 @@ def main(_):
def select_phases():
"""
Select the transformation phases to use following args parameters
Select the transformation phases to use following args parameters.
:return: <ImageTransform[]> list of image transformation
"""
def shift_step(shift_starting=0, shift_ending=0):
if not conf.args['steps']:
conf.args['steps'] = (0, 5)
conf.args['steps'] = (
conf.args['steps'][0] + shift_starting,
conf.args['steps'][1] + shift_ending
if not Conf.args['steps']:
Conf.args['steps'] = (0, 5)
Conf.args['steps'] = (
Conf.args['steps'][0] + shift_starting,
Conf.args['steps'][1] + shift_ending
)
def add_tail(phases, phase):
phases = [phase] + phases
if conf.args['steps'] and conf.args['steps'][0] != 0:
if Conf.args['steps'] and Conf.args['steps'][0] != 0:
shift_step(shift_starting=1)
if conf.args['steps'] and conf.args['steps'][1] == len(phases) - 1:
if Conf.args['steps'] and Conf.args['steps'][1] == len(phases) - 1:
shift_step(shift_ending=1)
return phases
def add_head(phases, phase):
phases = phases + [phase]
if conf.args['steps'] and conf.args['steps'][1] == len(phases) - 1:
if Conf.args['steps'] and Conf.args['steps'][1] == len(phases) - 1:
shift_step(shift_ending=1)
return phases
phases = [DressToCorrect, CorrectToMask, MaskToMaskref,
MaskrefToMaskdet, MaskdetToMaskfin, MaskfinToNude]
if conf.args['overlay']:
if Conf.args['overlay']:
phases = add_tail(phases, ImageToResized)
phases = add_tail(phases, ImageToCrop)
phases = add_head(phases, ImageToOverlay)
elif conf.args['auto_resize']:
elif Conf.args['auto_resize']:
phases = add_tail(phases, ImageToResized)
elif conf.args['auto_resize_crop']:
elif Conf.args['auto_resize_crop']:
phases = add_tail(phases, ImageToResizedCrop)
elif conf.args['auto_rescale']:
elif Conf.args['auto_rescale']:
phases = add_tail(phases, ImageToRescale)
elif os.path.isfile(conf.args['input']):
if not conf.args['ignore_size']:
check_shape(conf.args['input'])
elif os.path.isfile(Conf.args['input']):
if not Conf.args['ignore_size']:
check_shape(Conf.args['input'])
else:
conf.log.warn('Image Size Requirements Unchecked.')
Conf.log.warn('Image Size Requirements Unchecked.')
if conf.args['color_transfer']:
if Conf.args['color_transfer']:
phases = add_head(phases, ColorTransfer)
return phases
@ -91,51 +89,55 @@ def select_phases(): @@ -91,51 +89,55 @@ def select_phases():
def select_processing():
"""
Select the processing to use following args parameters
:return:
Select the processing to use following args parameters.
:return: <Process> a process to run
"""
phases = select_phases()
if os.path.isdir(conf.args['input']):
if os.path.isdir(Conf.args['input']):
process = processing_image_folder(phases)
elif conf.args['n_runs'] != 1:
process = multiple_image_processing(phases, conf.args['n_runs'])
elif Conf.args['n_runs'] != 1:
process = multiple_image_processing(phases, Conf.args['n_runs'])
else:
process = simple_image_processing(phases)
conf.log.debug("Process to execute : {}".format(process))
Conf.log.debug("Process to execute : {}".format(process))
return process
def simple_image_processing(phases):
"""
Define a simple image process ready to run
Define a simple image process ready to run.
:param phases: <ImageTransform[]> list of image transformation
:return: <SimpleTransform> a image process run ready
"""
return SimpleTransform(conf.args['input'], phases, conf.args['output'])
return SimpleTransform(Conf.args['input'], phases, Conf.args['output'])
def multiple_image_processing(phases, n):
def multiple_image_processing(phases, n_runs):
"""
Define a multiple image process ready to run
Define a multiple image process ready to run.
:param phases: <ImageTransform[]> list of image transformation
:param n: number of times to process
:param n_runs: number of times to process
:return: <MultipleTransform> a multiple image process run ready
"""
filename, extension = os.path.splitext(conf.args['output'])
filename, extension = os.path.splitext(Conf.args['output'])
return MultipleImageTransform(
[conf.args['input'] for _ in range(n)],
[Conf.args['input'] for _ in range(n_runs)],
phases,
["{}{}{}".format(filename, i, extension) for i in range(n)]
["{}{}{}".format(filename, i, extension) for i in range(n_runs)]
)
def processing_image_folder(phases):
"""
Define a folder image process ready to run
Define a folder image process ready to run.
:param phases: <ImageTransform[]> list of image transformation
:return: <FolderImageTransform> a image process run ready
"""
return FolderImageTransform(conf.args['input'], phases, conf.args['output'])
return FolderImageTransform(Conf.args['input'], phases, Conf.args['output'])
if __name__ == "__main__":

169
processing/__init__.py

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
"""Processing."""
import json
import os
import pathlib
@ -10,27 +11,27 @@ from multiprocessing.pool import ThreadPool @@ -10,27 +11,27 @@ from multiprocessing.pool import ThreadPool
import cv2
import imageio
from config import Config as conf
from utils import camel_case_to_str, cv2_supported_extension, read_image, write_image, json_to_argv, check_shape
from config import Config as Conf
from utils import camel_case_to_str, cv2_supported_extension, read_image, write_image
class Process:
"""
Abstract Process Class
"""
class Process:
"""Abstract Process Class."""
def __init__(self, *_args, args=None):
"""
Process Constructor
:param args: <dict> args parameter to run the image transformation (default use conf.args)
Process Constructor.
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
self.__start = time.time()
self._args = conf.args.copy() if args is None else args.copy()
self._args = Conf.args.copy() if args is None else args.copy()
def run(self):
"""
Run the process
Run the process.
:return: None
"""
self._info_start_run()
@ -41,60 +42,69 @@ class Process: @@ -41,60 +42,69 @@ class Process:
def _info_start_run(self):
"""
Logging when the process run begin
Log info when the process run begin.
:return: None
"""
self.__start = time.time()
conf.log.info("Executing {}".format(camel_case_to_str(self.__class__.__name__)))
Conf.log.info("Executing {}".format(camel_case_to_str(self.__class__.__name__)))
def _info_end_run(self):
"""
Logging when the process run end
Log info when the process run end.
:return: None
"""
conf.log.info("{} Finish".format(camel_case_to_str(self.__class__.__name__)))
conf.log.debug("{} Done in {} seconds".format(
Conf.log.info("{} Finish".format(camel_case_to_str(self.__class__.__name__)))
Conf.log.debug("{} Done in {} seconds".format(
camel_case_to_str(self.__class__.__name__), round(time.time() - self.__start, 2)))
def _setup(self):
"""
Setup the process to be ready to execute
Configure the process to be ready to execute.
:return: None
"""
pass
def _execute(self):
"""
Execute the process
Execute the process.
:return: None
"""
pass
def _clean(self):
"""
Cleanup a process execution
Cleanup a process execution.
:return: None
"""
def __str__(self):
return str(self.__class__.__name__)
class SimpleTransform(Process):
"""
Simple Transform Class
"""
"""Simple Transform Class."""
def __init__(self, input_path, phases, output_path, args):
"""
Construct a Simple Transform .
:param input_path: <string> original image path to process
:param output_path: <string> image path to write the result.
:param phases: <ImageTransform[]> list of Class transformation each image
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
super().__init__(input_path, phases, output_path, args)
def __new__(cls, input_path, phases, output_path, args=None):
"""
Create the correct SimpleTransform object (ImageTransform or GiftTransform) corresponding to the input_path format
Create the correct SimpleTransform object corresponding to the input_path format.
:param input_path: <string> original image path to process
:param output_path: <string> image path to write the result.
:param phases: <ImageTransform[]> list of Class transformation each image
:param args: <dict> args parameter to run the image transformation (default use conf.args)
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
:return: <ImageTransform|GiftTransform|None> SimpleTransform object corresponding to the input_path format
"""
if os.path.splitext(input_path)[1] == ".gif":
@ -106,17 +116,17 @@ class SimpleTransform(Process): @@ -106,17 +116,17 @@ class SimpleTransform(Process):
class ImageTransform(Process):
"""
Image Processing Class
"""
"""Image Processing Class."""
def __init__(self, input_path, phases, output_path, args=None):
"""
ProcessImage Constructor
Process Image Constructor.
:param input_path: <string> original image path to process
:param output_path: <string> image path to write the result.
:param args: <dict> args parameter to run the image transformation (default use conf.args)
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
:param phases: <ImageTransform[]> list Class of transformation each image
:param args: <dict> processing settings
"""
super().__init__(args=args)
self.__phases = phases
@ -125,8 +135,8 @@ class ImageTransform(Process): @@ -125,8 +135,8 @@ class ImageTransform(Process):
self.__starting_step = self._args['steps'][0] if self._args['steps'] else 0
self.__ending_step = self._args['steps'][1] if self._args['steps'] else None
conf.log.debug("All Phases : {}".format(self.__phases))
conf.log.debug("To Be Executed Phases : {}".format(self.__phases[self.__starting_step:self.__ending_step]))
Conf.log.debug("All Phases : {}".format(self.__phases))
Conf.log.debug("To Be Executed Phases : {}".format(self.__phases[self.__starting_step:self.__ending_step]))
path = self.__altered_path if os.path.isfile(input_path) or not self._args.get('folder_altered') \
else os.path.join(self._args['folder_altered'], os.path.basename(self.__output_path))
@ -135,26 +145,27 @@ class ImageTransform(Process): @@ -135,26 +145,27 @@ class ImageTransform(Process):
os.path.join(path, "{}.png".format(p().__class__.__name__))
for p in self.__phases[:self.__starting_step]
]
conf.log.debug(self.__image_steps)
Conf.log.debug(self.__image_steps)
def _info_start_run(self):
super()._info_start_run()
conf.log.info("Processing on {}".format(str(self.__image_steps)[2:-2]))
Conf.log.info("Processing on {}".format(str(self.__image_steps)[2:-2]))
def _setup(self):
try:
self.__image_steps = [read_image(x) if isinstance(x, str) else x for x in self.__image_steps]
except FileNotFoundError as e:
conf.log.error(e)
conf.log.error("{} is not able to resume because it not able to load required images. "
Conf.log.error(e)
Conf.log.error("{} is not able to resume because it not able to load required images. "
.format(camel_case_to_str(self.__class__.__name__)))
conf.log.error("Possible source of this error is that --altered argument is not a correct "
Conf.log.error("Possible source of this error is that --altered argument is not a correct "
"directory path that contains valid images.")
sys.exit(1)
def _execute(self):
"""
Execute all phases on the image
Execute all phases on the image.
:return: None
"""
for p in (x(args=self._args) for x in self.__phases[self.__starting_step:self.__ending_step]):
@ -168,38 +179,38 @@ class ImageTransform(Process): @@ -168,38 +179,38 @@ class ImageTransform(Process):
write_image(r, os.path.join(path, "{}.png".format(p.__class__.__name__)))
conf.log.debug("{} Step Image Of {} Execution".format(
Conf.log.debug("{} Step Image Of {} Execution".format(
os.path.join(path, "{}.png".format(p.__class__.__name__)),
camel_case_to_str(p.__class__.__name__),
))
write_image(self.__image_steps[-1], self.__output_path)
conf.log.info("{} Created".format(self.__output_path))
conf.log.debug("{} Result Image Of {} Execution"
Conf.log.info("{} Created".format(self.__output_path))
Conf.log.debug("{} Result Image Of {} Execution"
.format(self.__output_path, camel_case_to_str(self.__class__.__name__)))
return self.__image_steps[-1]
class MultipleImageTransform(Process):
"""
Multiple Image Processing Class
"""
"""Multiple Image Processing Class."""
def __init__(self, input_paths, phases, output_paths, children_process=SimpleTransform, args=None):
"""
ProcessMultipleImages Constructor
Process Multiple Images Constructor.
:param input_paths: <string[]> images path list to process
:param output_paths: <string> images path to write the result
:param children_process: <ImageTransform> Process to use on the list of input
:param phases: <ImageTransform[]> list of Class transformation use by the process each image
:param args: <dict> processing settings
"""
super().__init__(args=args)
self._phases = phases
self._input_paths = input_paths
self._output_paths = output_paths
self._process_list = []
self.__multiprocessing = conf.multiprocessing()
self.__multiprocessing = Conf.multiprocessing()
self.__children_process = children_process
def _setup(self):
@ -208,41 +219,44 @@ class MultipleImageTransform(Process): @@ -208,41 +219,44 @@ class MultipleImageTransform(Process):
def _execute(self):
"""
Execute all phases on the list of images
Execute all phases on the list of images.
:return: None
"""
def process_one_image(a):
conf.log.info("Processing Image : {}/{}".format(a[1] + 1, len(self._process_list)))
Conf.log.info("Processing Image : {}/{}".format(a[1] + 1, len(self._process_list)))
a[0].run()
if not self.__multiprocessing:
for x in zip(self._process_list, range(len(self._process_list))):
process_one_image(x)
else:
conf.log.debug("Using Multiprocessing")
pool = ThreadPool(conf.args['n_cores'])
Conf.log.debug("Using Multiprocessing")
pool = ThreadPool(Conf.args['n_cores'])
pool.map(process_one_image, zip(self._process_list, range(len(self._process_list))))
pool.close()
pool.join()
class FolderImageTransform(MultipleImageTransform):
"""
Folder Image Processing Class
"""
"""Folder Image Processing Class."""
def __init__(self, input_folder_path, phases, output_folder_path, args=None):
"""
FolderImageTransform Constructor
Folder Image Transform Constructor.
:param input_folder_path: <string> path of the folder to process
:param phases: <ImageTransform[]> list of Image Transform to execute
:param output_folder_path: <string> path of the folder where save output
:param args: <dict> processing settings
"""
super().__init__([], phases, [], args=args)
self.__input_folder_path = input_folder_path
self.__output_folder_path = output_folder_path
self.__multiprocessing = conf.multiprocessing()
self.__multiprocessing = Conf.multiprocessing()
def _setup(self):
conf.log.debug([(r, d, f) for r, d, f in os.walk(self.__input_folder_path)])
Conf.log.debug([(r, d, f) for r, d, f in os.walk(self.__input_folder_path)])
self._process_list = [
MultipleImageTransform(
[
@ -256,9 +270,9 @@ class FolderImageTransform(MultipleImageTransform): @@ -256,9 +270,9 @@ class FolderImageTransform(MultipleImageTransform):
'_out',
os.path.splitext(x.path)[1]
)
if not conf.args['output'] else
if not Conf.args['output'] else
os.path.join(
conf.args['output'],
Conf.args['output'],
pathlib.Path(*pathlib.Path(r).parts[1:]),
os.path.basename(x.path)
)
@ -278,37 +292,36 @@ class FolderImageTransform(MultipleImageTransform): @@ -278,37 +292,36 @@ class FolderImageTransform(MultipleImageTransform):
json_path = os.path.join(folder_path, self._args['json_folder_name'])
conf.log.debug("Json Path Setting Path: {}".format(json_path))
Conf.log.debug("Json Path Setting Path: {}".format(json_path))
if not os.path.isfile(json_path):
conf.log.info("No Json File Settings Found In {}. Using Default Configuration. ".format(folder_path))
Conf.log.info("No Json File Settings Found In {}. Using Default Configuration. ".format(folder_path))
return add_folder_altered(self._args)
try:
with open(json_path, 'r') as f:
json_data = json.load(f)
except JSONDecodeError:
conf.log.info("Json File Settings {} Is Not In Valid JSON Format. Using Default Configuration. "
Conf.log.info("Json File Settings {} Is Not In Valid JSON Format. Using Default Configuration. "
.format(folder_path))
return add_folder_altered(self._args)
try:
from argv import Parser, config_args
a = config_args(Parser.parser, Parser.parser.parse_args(sys.argv[1:]), json_data=json_data)
conf.log.info("Using {} Configuration for processing {} folder. "
Conf.log.info("Using {} Configuration for processing {} folder. "
.format(json_path, folder_path))
return add_folder_altered(a)
except SystemExit:
conf.log.error("Arguments json file {} contains configuration error. "
Conf.log.error("Arguments json file {} contains configuration error. "
"Using Default Configuration".format(json_path))
return add_folder_altered(self._args)
class GifTransform(Process):
"""
GIF Image Processing Class
"""
"""GIF Image Processing Class."""
def __init__(self, input_path, phases, output_path, args=None):
"""
ImageTransformGIF Constructor
Image Transform GIF Constructor.
:param input_path: <string> gif path to process
:param output_path: <string> image path to write the result
:param phases: <ImageTransform[]> list of Class transformation use by the process each image
@ -323,30 +336,32 @@ class GifTransform(Process): @@ -323,30 +336,32 @@ class GifTransform(Process):
def _setup(self):
self.__tmp_dir = tempfile.mkdtemp()
conf.log.debug("Temporay dir is {}".format(self.__tmp_dir))
Conf.log.debug("Temporay dir is {}".format(self.__tmp_dir))
imgs = imageio.mimread(self.__input_path)
conf.log.info("GIF have {} Frames To Process".format(len(imgs)))
Conf.log.info("GIF have {} Frames To Process".format(len(imgs)))
self.__temp_input_paths = [os.path.join(self.__tmp_dir, "intput_{}.png".format(i))
for i in range(len(imgs))]
self.__temp_output_paths = [os.path.join(self.__tmp_dir, "output_{}.png".format(i))
for i in range(len(imgs))]
[write_image(cv2.cvtColor(i[0], cv2.COLOR_RGB2BGR), i[1]) for i in zip(imgs, self.__temp_input_paths)]
for i in zip(imgs, self.__temp_input_paths):
write_image(cv2.cvtColor(i[0], cv2.COLOR_RGB2BGR), i[1])
def _execute(self):
"""
Execute all phases on each frames of the gif and recreate the gif
Execute all phases on each frames of the gif and recreate the gif.
:return: None
"""
MultipleImageTransform(self.__temp_input_paths, self.__phases, self.__temp_output_paths, args=self._args).run()
dir = os.path.dirname(self.__output_path)
if dir != '':
os.makedirs(os.path.dirname(self.__output_path), exist_ok=True)
dir_out = os.path.dirname(self.__output_path)
if dir_out != '':
os.makedirs(dir_out, exist_ok=True)
imageio.mimsave(self.__output_path, [imageio.imread(i) for i in self.__temp_output_paths])
conf.log.info("{} Gif Created ".format(self.__output_path))
Conf.log.info("{} Gif Created ".format(self.__output_path))
def _clean(self):
shutil.rmtree(self.__tmp_dir)

10
scripts/build.py

@ -1,14 +1,12 @@ @@ -1,14 +1,12 @@
import argparse
import importlib
from importlib import util
import logging
import os
import subprocess
import sys
spec = importlib.util.spec_from_file_location("_common",
os.path.join(os.path.dirname(os.path.abspath(__file__)), "./_common.py"))
c = importlib.util.module_from_spec(spec)
spec = util.spec_from_file_location("_common", os.path.join(os.path.dirname(os.path.abspath(__file__)), "./_common.py"))
c = util.module_from_spec(spec)
spec.loader.exec_module(c)
@ -18,7 +16,7 @@ def add_arg_parser(parser): @@ -18,7 +16,7 @@ def add_arg_parser(parser):
def check_dependencies():
## System & Dependencies Check
# System & Dependencies Check
c.log.debug("OS : {}".format(c.get_os()))
c.log.debug("Python version : {}".format(c.get_python_version()))
@ -87,5 +85,5 @@ if __name__ == '__main__': @@ -87,5 +85,5 @@ if __name__ == '__main__':
if args.debug:
c.log.setLevel(logging.DEBUG)
## Build Cli
# Build Cli
run(args)

12
scripts/setup.py

@ -1,15 +1,13 @@ @@ -1,15 +1,13 @@
import argparse
import fileinput
import importlib
from importlib import util
import logging
import os
import subprocess
import sys
spec = importlib.util.spec_from_file_location("_common",
os.path.join(os.path.dirname(os.path.abspath(__file__)), "./_common.py"))
c = importlib.util.module_from_spec(spec)
spec = util.spec_from_file_location("_common", os.path.join(os.path.dirname(os.path.abspath(__file__)), "./_common.py"))
c = util.module_from_spec(spec)
spec.loader.exec_module(c)
@ -93,16 +91,16 @@ def cli_setup(args, pip_commands_extend=None): @@ -93,16 +91,16 @@ def cli_setup(args, pip_commands_extend=None):
def run(args):
## System & Dependencies Check
# System & Dependencies Check
check_dependencies()
if args.debug:
c.log.setLevel(logging.DEBUG)
## Cli dependencies
# Cli dependencies
pip_commands_extend = (['--user'] if args.pip_user else []) + (['--no-cache-dir'] if args.pip_no_cache_dir else [])
## Pyinstaller
# Pyinstaller
if not args.no_pyinstaller:
pyinstaller(pip_commands_extend)

69
transform/__init__.py

@ -1,49 +1,80 @@ @@ -1,49 +1,80 @@
"""
Image Transformation
"""
"""Images Transforms."""
import time
from config import Config as conf
from config import Config as Conf
from utils import camel_case_to_str
class ImageTransform:
"""
Abstract Image Transformation Class
"""
"""Abstract Image Transformation Class."""
def __init__(self, input_index=(-1,), args=None):
"""
Image Transformation Class Constructor
Image Transformation Class Constructor.
:param input_index: <tuple> index where to take the inputs (default is (-1) for previous transformation)
:param args: <dict> args parameter to run the image transformation (default use conf.args)
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
self.__start = time.time()
self.input_index = input_index
self._args = conf.args.copy() if args is None else args.copy()
self._args = Conf.args.copy() if args is None else args.copy()
def run(self, *args):
"""
Run the Image Transform.
:param args: <dict> settings for the transformation
:return: <RGB> image
"""
self.__start = time.time()
self.info_start_run()
self.setup(*args)
r = self.execute(*args)
self.clean(*args)
self._setup(*args)
r = self._execute(*args)
self._clean(*args)
self.info_end_run()
return r
def info_start_run(self):
"""
Log info at the start of the run.
:return: None
"""
self.__start = time.time()
conf.log.info("Executing {}".format(camel_case_to_str(self.__class__.__name__)))
Conf.log.info("Executing {}".format(camel_case_to_str(self.__class__.__name__)))
def info_end_run(self):
conf.log.debug("{} Done in {} seconds".format(
"""
Log info at the end of the run.
:return: None
"""
Conf.log.debug("{} Done in {} seconds".format(
camel_case_to_str(self.__class__.__name__), round(time.time() - self.__start, 2)))
def setup(self, *args):
def _setup(self, *args):
"""
Configure the transformation.
:param args: <dict> settings for the transformation
:return: None
"""
pass
def execute(self, *args):
def _execute(self, *args):
"""
Execute the transformation.
:param args: <dict> settings for the transformation
:return: None
"""
pass
def clean(self, *args):
pass
def _clean(self, *args):
"""
Clean the transformation.
:param args: <dict> settings for the transformation
:return: None
"""
pass

405
transform/gan/__init__.py

@ -1,423 +1,62 @@ @@ -1,423 +1,62 @@
import functools
import os
from collections import OrderedDict
"""GAN Transforms."""
import cv2
import numpy as np
import torch
from PIL import Image
from torchvision import transforms as transforms
from config import Config as conf
from config import Config as Conf
from transform import ImageTransform
from transform.gan.generator import tensor2im
from transform.gan.model import DeepModel, DataLoader
class ImageTransformGAN(ImageTransform):
"""
Abstract GAN Image Transformation Class
"""
"""Abstract GAN Image Transformation Class."""
def __init__(self, checkpoint, phase, input_index=(-1,), args=None):
"""
Abstract GAN Image Transformation Class Constructor
Abstract GAN Image Transformation Class Constructor.
:param checkpoint: <string> path to the checkpoint
:param phase: <string> phase name
:param input_index: <tuple> index where to take the inputs (default is (-1) for previous transformation)
:param args: <dict> args parameter to run the image transformation (default use conf.args)
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
super().__init__(input_index=input_index, args=args)
self.__checkpoint = checkpoint
self.__phase = phase
self.__gpu_ids = self._args["gpu_ids"]
def setup(self, image):
def _setup(self, *args):
"""
Load Dataset and Model fot the image
:param image: <RGB> image to be transform
Load Dataset and Model fot the image.
:param args: <[RGB]> image to be transform
:return: None
"""
if self.__gpu_ids:
conf.log.debug("GAN Processing Using GPU IDs: {}".format(self.__gpu_ids))
Conf.log.debug("GAN Processing Using GPU IDs: {}".format(self.__gpu_ids))
else:
conf.log.debug("GAN Processing Using CPU")
Conf.log.debug("GAN Processing Using CPU")
c = conf()
c = Conf()
# Load custom phase options:
data_loader = DataLoader(c, image)
data_loader = DataLoader(c, args[0])
self.__dataset = data_loader.load_data()
# Create Model
self.__model = DeepModel()
self.__model.initialize(c, self.__gpu_ids, self.__checkpoint)
def execute(self, image):
def _execute(self, *args):
"""
Excute the GAN Transformation the image
:param image: <RGB> image to transform
Execute the GAN Transformation the image.
:param *args: <[RGB]> image to transform
:return: <RGB> image transformed
"""
for i, data in enumerate(self.__dataset):
mask = None
for data in self.__dataset:
generated = self.__model.inference(data["label"], data["inst"])
im = tensor2im(generated.data[0])
mask = cv2.cvtColor(im, cv2.COLOR_RGB2BGR)
return mask
class DataLoader:
def __init__(self, opt, cv_img):
super(DataLoader, self).__init__()
self.dataset = Dataset()
self.dataset.initialize(opt, cv_img)
self.dataloader = torch.utils.data.DataLoader(
self.dataset,
batch_size=opt.batchSize,
shuffle=not opt.serial_batches,
num_workers=int(opt.nThreads),
)
def load_data(self):
return self.dataloader
def __len__(self):
return 1
class Dataset(torch.utils.data.Dataset):
def __init__(self):
super(Dataset, self).__init__()
def initialize(self, opt, cv_img):
self.opt = opt
self.A = Image.fromarray(cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB))
self.dataset_size = 1
def __getitem__(self, index):
transform_A = get_transform(self.opt)
A_tensor = transform_A(self.A.convert("RGB"))
B_tensor = inst_tensor = feat_tensor = 0
input_dict = {
"label": A_tensor,
"inst": inst_tensor,
"image": B_tensor,
"feat": feat_tensor,
"path": "",
}
return input_dict
def __len__(self):