Browse Source

Clean Code

* Fix issues highlight in codacy
* Code style PEP8 (naming, import...)
* Improve Docstring comments
tags/v1.2.5
PommeDroid 10 months ago
parent
commit
a5bdc476da

+ 5
- 0
.gitignore View File

@@ -60,3 +60,8 @@ venv.bak/

# IDE
.idea


# flake
.flake8
requirements_flake8.txt

+ 5
- 4
argv/__init__.py View File

@@ -2,7 +2,8 @@ import argparse
import copy
import logging
import sys
from config import Config as conf

from config import Config as Conf
from argv.checkpoints import init_checkpoints_sub_parser, check_args_checkpoints_parser, set_args_checkpoints_parser
from argv.common import arg_help, arg_debug, arg_version
from argv.daemon import init_daemon_sub_parser
@@ -24,12 +25,12 @@ def run():

args = Parser.parser.parse_args()

conf.log = setup_log(logging.DEBUG) if args.debug else setup_log()
Conf.log = setup_log(logging.DEBUG) if args.debug else setup_log()
args = config_args(Parser.parser, args)

conf.log.debug(args)
Conf.log.debug(args)

conf.args = vars(args)
Conf.args = vars(args)
args.func(args)



+ 4
- 4
argv/checkpoints.py View File

@@ -2,7 +2,7 @@ import os
import sys

import checkpoints
from config import Config as conf
from config import Config as Conf
from argv.common import arg_help, arg_debug


@@ -44,7 +44,7 @@ def check_args_checkpoints_parser(parser, args):


def check_arg_checkpoints(parser, args):
conf.log.debug(args.checkpoints)
Conf.log.debug(args.checkpoints)
for _, v in args.checkpoints.items():
if not os.path.isfile(v):
parser.error(
@@ -54,7 +54,7 @@ def check_arg_checkpoints(parser, args):


def set_arg_checkpoints(args):
conf.log.debug(args.checkpoints)
Conf.log.debug(args.checkpoints)
args.checkpoints = {
'correct_to_mask': os.path.join(str(args.checkpoints), "cm.lib"),
'maskref_to_maskdet': os.path.join(str(args.checkpoints), "mm.lib"),
@@ -75,5 +75,5 @@ def arg_version(parser):
parser.add_argument(
"-v",
"--version",
action='version', version='checkpoints {}'.format(conf.checkpoints_version)
action='version', version='checkpoints {}'.format(Conf.checkpoints_version)
)

+ 2
- 3
argv/common.py View File

@@ -1,4 +1,4 @@
from config import Config as conf
from config import Config as Conf


def arg_debug(parser):
@@ -23,6 +23,5 @@ def arg_version(parser):
parser.add_argument(
"-v",
"--version",
action='version', version='%(prog)s {}'.format(conf.version)
action='version', version='%(prog)s {}'.format(Conf.version)
)


+ 0
- 2
argv/daemon.py View File

@@ -11,8 +11,6 @@ def init_daemon_sub_parser(subparsers):
)
daemon_parser.set_defaults(func=daemon.main)



# add daemon arguments
arg_help(daemon_parser)
arg_debug(daemon_parser)

+ 33
- 20
checkpoints.py View File

@@ -1,46 +1,59 @@
"""checkpoints logic."""
import logging
import os
import shutil
import sys
import tempfile

from config import Config as conf
from config import Config as Conf
from utils import setup_log, dl_file, unzip


def main(_):
if sum([1 for x in ["cm.lib", "mm.lib", "mn.lib"] if os.path.isfile(os.path.join(conf.args['checkpoints'], x))]):
conf.log.info("Checkpoints Found In {}".format(conf.args['checkpoints']))
"""
Start checkpoints main logic.

:param _: None
:return: None
"""
if sum([1 for x in ["cm.lib", "mm.lib", "mn.lib"] if os.path.isfile(os.path.join(Conf.args['checkpoints'], x))]):
Conf.log.info("Checkpoints Found In {}".format(Conf.args['checkpoints']))
else:
conf.log.warn("Checkpoints Not Found In {}".format(conf.args['checkpoints']))
conf.log.info("You Can Download Them Using : {} checkpoints download".format(sys.argv[0]))
Conf.log.warn("Checkpoints Not Found In {}".format(Conf.args['checkpoints']))
Conf.log.info("You Can Download Them Using : {} checkpoints download".format(sys.argv[0]))


def download(_):
conf.log = setup_log(logging.DEBUG) if conf.args['debug'] else setup_log()
"""
Start checkpoints download logic.

:param _: None
:return: None
"""
Conf.log = setup_log(logging.DEBUG) if Conf.args['debug'] else setup_log()
tempdir = tempfile.mkdtemp()
cdn_url = conf.checkpoints_cdn.format(conf.checkpoints_version)
temp_zip = os.path.join(tempdir, "{}.zip".format(conf.checkpoints_version))
cdn_url = Conf.checkpoints_cdn.format(Conf.checkpoints_version)
temp_zip = os.path.join(tempdir, "{}.zip".format(Conf.checkpoints_version))

try:
conf.log.info("Downloading {}".format(cdn_url))
dl_file(conf.checkpoints_cdn.format(conf.checkpoints_version), temp_zip)
Conf.log.info("Downloading {}".format(cdn_url))
dl_file(Conf.checkpoints_cdn.format(Conf.checkpoints_version), temp_zip)

conf.log.info("Extracting {}".format(temp_zip))
unzip(temp_zip, conf.args['checkpoints'])
Conf.log.info("Extracting {}".format(temp_zip))
unzip(temp_zip, Conf.args['checkpoints'])

conf.log.info("Moving Checkpoints To Final Location")
Conf.log.info("Moving Checkpoints To Final Location")

for c in ("cm.lib", "mm.lib", "mn.lib"):
if os.path.isfile(os.path.join(conf.args['checkpoints'], c)):
os.remove(os.path.join(conf.args['checkpoints'], c))
shutil.move(os.path.join(conf.args['checkpoints'], 'checkpoints', c), conf.args['checkpoints'])
shutil.rmtree(os.path.join(conf.args['checkpoints'], 'checkpoints'))
if os.path.isfile(os.path.join(Conf.args['checkpoints'], c)):
os.remove(os.path.join(Conf.args['checkpoints'], c))
shutil.move(os.path.join(Conf.args['checkpoints'], 'checkpoints', c), Conf.args['checkpoints'])
shutil.rmtree(os.path.join(Conf.args['checkpoints'], 'checkpoints'))

except Exception as e:
conf.log.error(e)
conf.log.error("Something Gone Bad Download Downloading The Checkpoints")
Conf.log.error(e)
Conf.log.error("Something Gone Bad Download Downloading The Checkpoints")
shutil.rmtree(tempdir)
sys.exit(1)
shutil.rmtree(tempdir)
conf.log.info("Checkpoints Downloaded Successfully")
Conf.log.info("Checkpoints Downloaded Successfully")

+ 14
- 7
config.py View File

@@ -1,7 +1,9 @@
"""Configuration."""


class Config:
"""
Variables Configuration Class
"""
"""Variables Configuration Class."""

version = "v1.1.0"
checkpoints_version = "v0.0.1"
checkpoints_cdn = "https://cdn.dreamnet.tech/releases/checkpoints/{}.zip"
@@ -12,14 +14,14 @@ class Config:
data_type = 32 # Supported data type i.e. 8, 16, 32 bit

# input/output sizes
batchSize = 1 # input batch size
batch_size = 1 # input batch size
input_nc = 3 # of input image channels
output_nc = 3 # of output image channels

# for setting inputs
# if true, takes images in order to make batches, otherwise takes them randomly
serial_batches = True
nThreads = (
n_threads = (
0
) # threads for loading data. Keep this value at 0! see: https://github.com/pytorch/pytorch/issues/12831
# Maximum number of samples allowed per dataset. If the dataset directory contains more than max_dataset_size,
@@ -27,9 +29,9 @@ class Config:
max_dataset_size = 1

# for generator
netG = "global" # selects model to use for netG
net_g = "global" # selects model to use for net_g
ngf = 64 # of gen filters in first conv layer
n_downsample_global = 4 # number of downsampling layers in netG
n_downsample_global = 4 # number of downsampling layers in net_g
n_blocks_global = (
9
) # number of residual blocks in the global generator network
@@ -53,4 +55,9 @@ class Config:
# Multiprocessing
@staticmethod
def multiprocessing():
"""
Return multiprocessing status.

:return: <boolean> True is multiprocessing can be use
"""
return Config.args['gpu_ids'] is None and Config.args['n_cores'] > 1

+ 40
- 8
daemon.py View File

@@ -1,3 +1,4 @@
"""daemon logic."""
import os
import sys
import time
@@ -5,27 +6,40 @@ import time
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

from config import Config as conf
from config import Config as Conf
from transform.gan.mask import CorrectToMask, MaskrefToMaskdet, MaskfinToNude
from transform.opencv.correct import DressToCorrect
from transform.opencv.mask import MaskToMaskref, MaskdetToMaskfin


class Watcher:
"""Watch a directory change."""

def __init__(self, watching_dir, out_dir):
"""
Watcher constructor.

:param watching_dir: <string> directory to watch
:param out_dir: <string> directory where save transformations
"""
self.__observer = Observer()
self.__watching_dir = watching_dir
self.__out_dir = out_dir

if not os.path.isdir(self.__watching_dir):
conf.log.error("{} Watching Dir Doesn't Exit.".format(self.__watching_dir))
Conf.log.error("{} Watching Dir Doesn't Exit.".format(self.__watching_dir))
sys.exit(0)

if not os.path.isdir(self.__out_dir):
conf.log.error("{} Output Dir Doesn't Exit.".format(self.__watching_dir))
Conf.log.error("{} Output Dir Doesn't Exit.".format(self.__watching_dir))
sys.exit(0)

def run(self):
"""
Run the Watcher.

:return: None
"""
event_handler = Handler(self.__out_dir)
self.__observer.schedule(event_handler, self.__watching_dir, recursive=True)
self.__observer.start()
@@ -36,25 +50,43 @@ class Watcher:
self.__observer.stop()
except Exception as e:
self.__observer.stop()
conf.log.error(e)
conf.log.error("An Unhandled Error Occurred.")
Conf.log.error(e)
Conf.log.error("An Unhandled Error Occurred.")
sys.exit(1)
self.__observer.join()


class Handler(FileSystemEventHandler):
"""Handle a change in a watch directory."""

def __init__(self, out_dir):
"""
Create an Handler.

:param out_dir: <string> directory where save transformations
"""
self.__out_dir = out_dir
self.__phases = [
DressToCorrect, CorrectToMask, MaskToMaskref, MaskrefToMaskdet, MaskdetToMaskfin, MaskfinToNude
]

def on_created(self, event):
"""
Call when a file or directory is created.

:param event: <DirCreatedEvent|FileCreatedEvent> trigger event
:return: None
"""
if event.is_directory:
conf.log.debug("Received directory created event {}.".format(event.src_path))
Conf.log.debug("Received directory created event {}.".format(event.src_path))
# TODO Implements this


def main():
Watcher("test_dir", "out_dir").run()
def main(_):
"""
Start daemon main logic.

:param _: None
:return: None
"""
Watcher("test_dir", "out_dir").run()

+ 18
- 7
gpu_info.py View File

@@ -1,12 +1,17 @@
import logging
"""gpu-info logic."""
import json as j

from torch import cuda
import json as j
from config import Config as conf
from utils import setup_log

from config import Config as Conf


def get_info():
"""
Get gpu info.

:return: <dict> gpu info
"""
return {
"has_cuda": cuda.is_available(),
"devices": [] if not cuda.is_available() else [cuda.get_device_name(i) for i in range(cuda.device_count())],
@@ -14,10 +19,16 @@ def get_info():


def main(_):
"""
Start gpu info main logic.

:param _: None
:return: None
"""
info = get_info()
if not conf.args['json']:
conf.log.info("Has Cuda: {}".format(info["has_cuda"]))
if not Conf.args['json']:
Conf.log.info("Has Cuda: {}".format(info["has_cuda"]))
for (i, device) in enumerate(info["devices"]):
conf.log.info("GPU {}: {}".format(i, device))
Conf.log.info("GPU {}: {}".format(i, device))
else:
print(j.dumps(info))

+ 47
- 45
main.py View File

@@ -1,3 +1,4 @@
"""main logic."""
import os
import sys
import time
@@ -6,9 +7,8 @@ from multiprocessing import freeze_support
import colorama

import argv
from config import Config as conf
from config import Config as Conf
from utils import check_shape

from processing import SimpleTransform, FolderImageTransform, MultipleImageTransform
from transform.gan.mask import CorrectToMask, MaskrefToMaskdet, MaskfinToNude
from transform.opencv.resize import ImageToCrop, ImageToOverlay, ImageToRescale, ImageToResized, ImageToResizedCrop
@@ -17,20 +17,18 @@ from transform.opencv.mask import MaskToMaskref, MaskdetToMaskfin


def main(_):
"""
Main logic entry point
"""
conf.log.info("Welcome to DreamPower")
"""Start main logic."""
Conf.log.info("Welcome to DreamPower")

if conf.args['gpu_ids']:
conf.log.info("GAN Processing Will Use GPU IDs: {}".format(conf.args['gpu_ids']))
if Conf.args['gpu_ids']:
Conf.log.info("GAN Processing Will Use GPU IDs: {}".format(Conf.args['gpu_ids']))
else:
conf.log.info("GAN Processing Will Use CPU")
Conf.log.info("GAN Processing Will Use CPU")

# Processing
start = time.time()
select_processing().run()
conf.log.info("Done! We have taken {} seconds".format(round(time.time() - start, 2)))
Conf.log.info("Done! We have taken {} seconds".format(round(time.time() - start, 2)))

# Exit
sys.exit()
@@ -38,52 +36,52 @@ def main(_):

def select_phases():
"""
Select the transformation phases to use following args parameters
Select the transformation phases to use following args parameters.

:return: <ImageTransform[]> list of image transformation
"""

def shift_step(shift_starting=0, shift_ending=0):
if not conf.args['steps']:
conf.args['steps'] = (0, 5)
conf.args['steps'] = (
conf.args['steps'][0] + shift_starting,
conf.args['steps'][1] + shift_ending
if not Conf.args['steps']:
Conf.args['steps'] = (0, 5)
Conf.args['steps'] = (
Conf.args['steps'][0] + shift_starting,
Conf.args['steps'][1] + shift_ending
)

def add_tail(phases, phase):
phases = [phase] + phases
if conf.args['steps'] and conf.args['steps'][0] != 0:
if Conf.args['steps'] and Conf.args['steps'][0] != 0:
shift_step(shift_starting=1)
if conf.args['steps'] and conf.args['steps'][1] == len(phases) - 1:
if Conf.args['steps'] and Conf.args['steps'][1] == len(phases) - 1:
shift_step(shift_ending=1)
return phases

def add_head(phases, phase):
phases = phases + [phase]
if conf.args['steps'] and conf.args['steps'][1] == len(phases) - 1:
if Conf.args['steps'] and Conf.args['steps'][1] == len(phases) - 1:
shift_step(shift_ending=1)
return phases

phases = [DressToCorrect, CorrectToMask, MaskToMaskref,
MaskrefToMaskdet, MaskdetToMaskfin, MaskfinToNude]

if conf.args['overlay']:
if Conf.args['overlay']:
phases = add_tail(phases, ImageToResized)
phases = add_tail(phases, ImageToCrop)
phases = add_head(phases, ImageToOverlay)
elif conf.args['auto_resize']:
elif Conf.args['auto_resize']:
phases = add_tail(phases, ImageToResized)
elif conf.args['auto_resize_crop']:
elif Conf.args['auto_resize_crop']:
phases = add_tail(phases, ImageToResizedCrop)
elif conf.args['auto_rescale']:
elif Conf.args['auto_rescale']:
phases = add_tail(phases, ImageToRescale)
elif os.path.isfile(conf.args['input']):
if not conf.args['ignore_size']:
check_shape(conf.args['input'])
elif os.path.isfile(Conf.args['input']):
if not Conf.args['ignore_size']:
check_shape(Conf.args['input'])
else:
conf.log.warn('Image Size Requirements Unchecked.')
Conf.log.warn('Image Size Requirements Unchecked.')

if conf.args['color_transfer']:
if Conf.args['color_transfer']:
phases = add_head(phases, ColorTransfer)

return phases
@@ -91,51 +89,55 @@ def select_phases():

def select_processing():
"""
Select the processing to use following args parameters
:return:
Select the processing to use following args parameters.

:return: <Process> a process to run
"""
phases = select_phases()
if os.path.isdir(conf.args['input']):
if os.path.isdir(Conf.args['input']):
process = processing_image_folder(phases)
elif conf.args['n_runs'] != 1:
process = multiple_image_processing(phases, conf.args['n_runs'])
elif Conf.args['n_runs'] != 1:
process = multiple_image_processing(phases, Conf.args['n_runs'])
else:
process = simple_image_processing(phases)
conf.log.debug("Process to execute : {}".format(process))
Conf.log.debug("Process to execute : {}".format(process))
return process


def simple_image_processing(phases):
"""
Define a simple image process ready to run
Define a simple image process ready to run.

:param phases: <ImageTransform[]> list of image transformation
:return: <SimpleTransform> a image process run ready
"""
return SimpleTransform(conf.args['input'], phases, conf.args['output'])
return SimpleTransform(Conf.args['input'], phases, Conf.args['output'])


def multiple_image_processing(phases, n):
def multiple_image_processing(phases, n_runs):
"""
Define a multiple image process ready to run
Define a multiple image process ready to run.

:param phases: <ImageTransform[]> list of image transformation
:param n: number of times to process
:param n_runs: number of times to process
:return: <MultipleTransform> a multiple image process run ready
"""
filename, extension = os.path.splitext(conf.args['output'])
filename, extension = os.path.splitext(Conf.args['output'])
return MultipleImageTransform(
[conf.args['input'] for _ in range(n)],
[Conf.args['input'] for _ in range(n_runs)],
phases,
["{}{}{}".format(filename, i, extension) for i in range(n)]
["{}{}{}".format(filename, i, extension) for i in range(n_runs)]
)


def processing_image_folder(phases):
"""
Define a folder image process ready to run
Define a folder image process ready to run.

:param phases: <ImageTransform[]> list of image transformation
:return: <FolderImageTransform> a image process run ready
"""
return FolderImageTransform(conf.args['input'], phases, conf.args['output'])
return FolderImageTransform(Conf.args['input'], phases, Conf.args['output'])


if __name__ == "__main__":

+ 92
- 77
processing/__init__.py View File

@@ -1,3 +1,4 @@
"""Processing."""
import json
import os
import pathlib
@@ -10,27 +11,27 @@ from multiprocessing.pool import ThreadPool

import cv2
import imageio
from config import Config as conf
from utils import camel_case_to_str, cv2_supported_extension, read_image, write_image, json_to_argv, check_shape

from config import Config as Conf
from utils import camel_case_to_str, cv2_supported_extension, read_image, write_image

class Process:
"""
Abstract Process Class

"""
class Process:
"""Abstract Process Class."""

def __init__(self, *_args, args=None):
"""
Process Constructor
:param args: <dict> args parameter to run the image transformation (default use conf.args)
Process Constructor.

:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
self.__start = time.time()
self._args = conf.args.copy() if args is None else args.copy()
self._args = Conf.args.copy() if args is None else args.copy()

def run(self):
"""
Run the process
Run the process.

:return: None
"""
self._info_start_run()
@@ -41,60 +42,69 @@ class Process:

def _info_start_run(self):
"""
Logging when the process run begin
Log info when the process run begin.

:return: None
"""
self.__start = time.time()
conf.log.info("Executing {}".format(camel_case_to_str(self.__class__.__name__)))
Conf.log.info("Executing {}".format(camel_case_to_str(self.__class__.__name__)))

def _info_end_run(self):
"""
Logging when the process run end
Log info when the process run end.

:return: None
"""
conf.log.info("{} Finish".format(camel_case_to_str(self.__class__.__name__)))
conf.log.debug("{} Done in {} seconds".format(
Conf.log.info("{} Finish".format(camel_case_to_str(self.__class__.__name__)))
Conf.log.debug("{} Done in {} seconds".format(
camel_case_to_str(self.__class__.__name__), round(time.time() - self.__start, 2)))

def _setup(self):
"""
Setup the process to be ready to execute
Configure the process to be ready to execute.

:return: None
"""
pass

def _execute(self):
"""
Execute the process
Execute the process.

:return: None
"""
pass

def _clean(self):
"""
Cleanup a process execution
Cleanup a process execution.

:return: None
"""

def __str__(self):
return str(self.__class__.__name__)


class SimpleTransform(Process):
"""
Simple Transform Class
"""
"""Simple Transform Class."""

def __init__(self, input_path, phases, output_path, args):
"""
Construct a Simple Transform .

:param input_path: <string> original image path to process
:param output_path: <string> image path to write the result.
:param phases: <ImageTransform[]> list of Class transformation each image
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
super().__init__(input_path, phases, output_path, args)

def __new__(cls, input_path, phases, output_path, args=None):
"""
Create the correct SimpleTransform object (ImageTransform or GiftTransform) corresponding to the input_path format
Create the correct SimpleTransform object corresponding to the input_path format.

:param input_path: <string> original image path to process
:param output_path: <string> image path to write the result.
:param phases: <ImageTransform[]> list of Class transformation each image
:param args: <dict> args parameter to run the image transformation (default use conf.args)
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
:return: <ImageTransform|GiftTransform|None> SimpleTransform object corresponding to the input_path format
"""
if os.path.splitext(input_path)[1] == ".gif":
@@ -106,17 +116,17 @@ class SimpleTransform(Process):


class ImageTransform(Process):
"""
Image Processing Class
"""
"""Image Processing Class."""

def __init__(self, input_path, phases, output_path, args=None):
"""
ProcessImage Constructor
Process Image Constructor.

:param input_path: <string> original image path to process
:param output_path: <string> image path to write the result.
:param args: <dict> args parameter to run the image transformation (default use conf.args)
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
:param phases: <ImageTransform[]> list Class of transformation each image
:param args: <dict> processing settings
"""
super().__init__(args=args)
self.__phases = phases
@@ -125,8 +135,8 @@ class ImageTransform(Process):
self.__starting_step = self._args['steps'][0] if self._args['steps'] else 0
self.__ending_step = self._args['steps'][1] if self._args['steps'] else None

conf.log.debug("All Phases : {}".format(self.__phases))
conf.log.debug("To Be Executed Phases : {}".format(self.__phases[self.__starting_step:self.__ending_step]))
Conf.log.debug("All Phases : {}".format(self.__phases))
Conf.log.debug("To Be Executed Phases : {}".format(self.__phases[self.__starting_step:self.__ending_step]))

path = self.__altered_path if os.path.isfile(input_path) or not self._args.get('folder_altered') \
else os.path.join(self._args['folder_altered'], os.path.basename(self.__output_path))
@@ -135,26 +145,27 @@ class ImageTransform(Process):
os.path.join(path, "{}.png".format(p().__class__.__name__))
for p in self.__phases[:self.__starting_step]
]
conf.log.debug(self.__image_steps)
Conf.log.debug(self.__image_steps)

def _info_start_run(self):
super()._info_start_run()
conf.log.info("Processing on {}".format(str(self.__image_steps)[2:-2]))
Conf.log.info("Processing on {}".format(str(self.__image_steps)[2:-2]))

def _setup(self):
try:
self.__image_steps = [read_image(x) if isinstance(x, str) else x for x in self.__image_steps]
except FileNotFoundError as e:
conf.log.error(e)
conf.log.error("{} is not able to resume because it not able to load required images. "
Conf.log.error(e)
Conf.log.error("{} is not able to resume because it not able to load required images. "
.format(camel_case_to_str(self.__class__.__name__)))
conf.log.error("Possible source of this error is that --altered argument is not a correct "
Conf.log.error("Possible source of this error is that --altered argument is not a correct "
"directory path that contains valid images.")
sys.exit(1)

def _execute(self):
"""
Execute all phases on the image
Execute all phases on the image.

:return: None
"""
for p in (x(args=self._args) for x in self.__phases[self.__starting_step:self.__ending_step]):
@@ -168,38 +179,38 @@ class ImageTransform(Process):

write_image(r, os.path.join(path, "{}.png".format(p.__class__.__name__)))

conf.log.debug("{} Step Image Of {} Execution".format(
Conf.log.debug("{} Step Image Of {} Execution".format(
os.path.join(path, "{}.png".format(p.__class__.__name__)),
camel_case_to_str(p.__class__.__name__),
))

write_image(self.__image_steps[-1], self.__output_path)
conf.log.info("{} Created".format(self.__output_path))
conf.log.debug("{} Result Image Of {} Execution"
Conf.log.info("{} Created".format(self.__output_path))
Conf.log.debug("{} Result Image Of {} Execution"
.format(self.__output_path, camel_case_to_str(self.__class__.__name__)))

return self.__image_steps[-1]


class MultipleImageTransform(Process):
"""
Multiple Image Processing Class
"""
"""Multiple Image Processing Class."""

def __init__(self, input_paths, phases, output_paths, children_process=SimpleTransform, args=None):
"""
ProcessMultipleImages Constructor
Process Multiple Images Constructor.

:param input_paths: <string[]> images path list to process
:param output_paths: <string> images path to write the result
:param children_process: <ImageTransform> Process to use on the list of input
:param phases: <ImageTransform[]> list of Class transformation use by the process each image
:param args: <dict> processing settings
"""
super().__init__(args=args)
self._phases = phases
self._input_paths = input_paths
self._output_paths = output_paths
self._process_list = []
self.__multiprocessing = conf.multiprocessing()
self.__multiprocessing = Conf.multiprocessing()
self.__children_process = children_process

def _setup(self):
@@ -208,41 +219,44 @@ class MultipleImageTransform(Process):

def _execute(self):
"""
Execute all phases on the list of images
Execute all phases on the list of images.

:return: None
"""

def process_one_image(a):
conf.log.info("Processing Image : {}/{}".format(a[1] + 1, len(self._process_list)))
Conf.log.info("Processing Image : {}/{}".format(a[1] + 1, len(self._process_list)))
a[0].run()

if not self.__multiprocessing:
for x in zip(self._process_list, range(len(self._process_list))):
process_one_image(x)
else:
conf.log.debug("Using Multiprocessing")
pool = ThreadPool(conf.args['n_cores'])
Conf.log.debug("Using Multiprocessing")
pool = ThreadPool(Conf.args['n_cores'])
pool.map(process_one_image, zip(self._process_list, range(len(self._process_list))))
pool.close()
pool.join()


class FolderImageTransform(MultipleImageTransform):
"""
Folder Image Processing Class
"""
"""Folder Image Processing Class."""

def __init__(self, input_folder_path, phases, output_folder_path, args=None):
"""
FolderImageTransform Constructor
Folder Image Transform Constructor.

:param input_folder_path: <string> path of the folder to process
:param phases: <ImageTransform[]> list of Image Transform to execute
:param output_folder_path: <string> path of the folder where save output
:param args: <dict> processing settings
"""
super().__init__([], phases, [], args=args)
self.__input_folder_path = input_folder_path
self.__output_folder_path = output_folder_path
self.__multiprocessing = conf.multiprocessing()
self.__multiprocessing = Conf.multiprocessing()

def _setup(self):
conf.log.debug([(r, d, f) for r, d, f in os.walk(self.__input_folder_path)])
Conf.log.debug([(r, d, f) for r, d, f in os.walk(self.__input_folder_path)])
self._process_list = [
MultipleImageTransform(
[
@@ -256,9 +270,9 @@ class FolderImageTransform(MultipleImageTransform):
'_out',
os.path.splitext(x.path)[1]
)
if not conf.args['output'] else
if not Conf.args['output'] else
os.path.join(
conf.args['output'],
Conf.args['output'],
pathlib.Path(*pathlib.Path(r).parts[1:]),
os.path.basename(x.path)
)
@@ -278,37 +292,36 @@ class FolderImageTransform(MultipleImageTransform):

json_path = os.path.join(folder_path, self._args['json_folder_name'])

conf.log.debug("Json Path Setting Path: {}".format(json_path))
Conf.log.debug("Json Path Setting Path: {}".format(json_path))
if not os.path.isfile(json_path):
conf.log.info("No Json File Settings Found In {}. Using Default Configuration. ".format(folder_path))
Conf.log.info("No Json File Settings Found In {}. Using Default Configuration. ".format(folder_path))
return add_folder_altered(self._args)
try:
with open(json_path, 'r') as f:
json_data = json.load(f)
except JSONDecodeError:
conf.log.info("Json File Settings {} Is Not In Valid JSON Format. Using Default Configuration. "
Conf.log.info("Json File Settings {} Is Not In Valid JSON Format. Using Default Configuration. "
.format(folder_path))
return add_folder_altered(self._args)
try:
from argv import Parser, config_args
a = config_args(Parser.parser, Parser.parser.parse_args(sys.argv[1:]), json_data=json_data)
conf.log.info("Using {} Configuration for processing {} folder. "
Conf.log.info("Using {} Configuration for processing {} folder. "
.format(json_path, folder_path))
return add_folder_altered(a)
except SystemExit:
conf.log.error("Arguments json file {} contains configuration error. "
Conf.log.error("Arguments json file {} contains configuration error. "
"Using Default Configuration".format(json_path))
return add_folder_altered(self._args)


class GifTransform(Process):
"""
GIF Image Processing Class
"""
"""GIF Image Processing Class."""

def __init__(self, input_path, phases, output_path, args=None):
"""
ImageTransformGIF Constructor
Image Transform GIF Constructor.

:param input_path: <string> gif path to process
:param output_path: <string> image path to write the result
:param phases: <ImageTransform[]> list of Class transformation use by the process each image
@@ -323,30 +336,32 @@ class GifTransform(Process):

def _setup(self):
self.__tmp_dir = tempfile.mkdtemp()
conf.log.debug("Temporay dir is {}".format(self.__tmp_dir))
Conf.log.debug("Temporay dir is {}".format(self.__tmp_dir))
imgs = imageio.mimread(self.__input_path)
conf.log.info("GIF have {} Frames To Process".format(len(imgs)))
Conf.log.info("GIF have {} Frames To Process".format(len(imgs)))
self.__temp_input_paths = [os.path.join(self.__tmp_dir, "intput_{}.png".format(i))
for i in range(len(imgs))]

self.__temp_output_paths = [os.path.join(self.__tmp_dir, "output_{}.png".format(i))
for i in range(len(imgs))]

[write_image(cv2.cvtColor(i[0], cv2.COLOR_RGB2BGR), i[1]) for i in zip(imgs, self.__temp_input_paths)]
for i in zip(imgs, self.__temp_input_paths):
write_image(cv2.cvtColor(i[0], cv2.COLOR_RGB2BGR), i[1])

def _execute(self):
"""
Execute all phases on each frames of the gif and recreate the gif
Execute all phases on each frames of the gif and recreate the gif.

:return: None
"""
MultipleImageTransform(self.__temp_input_paths, self.__phases, self.__temp_output_paths, args=self._args).run()

dir = os.path.dirname(self.__output_path)
if dir != '':
os.makedirs(os.path.dirname(self.__output_path), exist_ok=True)
dir_out = os.path.dirname(self.__output_path)
if dir_out != '':
os.makedirs(dir_out, exist_ok=True)
imageio.mimsave(self.__output_path, [imageio.imread(i) for i in self.__temp_output_paths])

conf.log.info("{} Gif Created ".format(self.__output_path))
Conf.log.info("{} Gif Created ".format(self.__output_path))

def _clean(self):
shutil.rmtree(self.__tmp_dir)

+ 4
- 6
scripts/build.py View File

@@ -1,14 +1,12 @@
import argparse
import importlib
from importlib import util
import logging
import os
import subprocess
import sys

spec = importlib.util.spec_from_file_location("_common",
os.path.join(os.path.dirname(os.path.abspath(__file__)), "./_common.py"))
c = importlib.util.module_from_spec(spec)
spec = util.spec_from_file_location("_common", os.path.join(os.path.dirname(os.path.abspath(__file__)), "./_common.py"))
c = util.module_from_spec(spec)
spec.loader.exec_module(c)


@@ -18,7 +16,7 @@ def add_arg_parser(parser):


def check_dependencies():
## System & Dependencies Check
# System & Dependencies Check
c.log.debug("OS : {}".format(c.get_os()))
c.log.debug("Python version : {}".format(c.get_python_version()))

@@ -87,5 +85,5 @@ if __name__ == '__main__':
if args.debug:
c.log.setLevel(logging.DEBUG)

#Build Cli
# Build Cli
run(args)

+ 5
- 7
scripts/setup.py View File

@@ -1,15 +1,13 @@
import argparse
import fileinput
import importlib
from importlib import util
import logging
import os
import subprocess
import sys

spec = importlib.util.spec_from_file_location("_common",
os.path.join(os.path.dirname(os.path.abspath(__file__)), "./_common.py"))
c = importlib.util.module_from_spec(spec)
spec = util.spec_from_file_location("_common", os.path.join(os.path.dirname(os.path.abspath(__file__)), "./_common.py"))
c = util.module_from_spec(spec)
spec.loader.exec_module(c)


@@ -93,16 +91,16 @@ def cli_setup(args, pip_commands_extend=None):


def run(args):
## System & Dependencies Check
# System & Dependencies Check
check_dependencies()

if args.debug:
c.log.setLevel(logging.DEBUG)

## Cli dependencies
# Cli dependencies
pip_commands_extend = (['--user'] if args.pip_user else []) + (['--no-cache-dir'] if args.pip_no_cache_dir else [])

## Pyinstaller
# Pyinstaller
if not args.no_pyinstaller:
pyinstaller(pip_commands_extend)


+ 50
- 19
transform/__init__.py View File

@@ -1,49 +1,80 @@
"""
Image Transformation
"""
"""Images Transforms."""
import time

from config import Config as conf
from config import Config as Conf
from utils import camel_case_to_str


class ImageTransform:
"""
Abstract Image Transformation Class
"""
"""Abstract Image Transformation Class."""

def __init__(self, input_index=(-1,), args=None):
"""
Image Transformation Class Constructor
Image Transformation Class Constructor.

:param input_index: <tuple> index where to take the inputs (default is (-1) for previous transformation)
:param args: <dict> args parameter to run the image transformation (default use conf.args)
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
self.__start = time.time()
self.input_index = input_index
self._args = conf.args.copy() if args is None else args.copy()
self._args = Conf.args.copy() if args is None else args.copy()

def run(self, *args):
"""
Run the Image Transform.

:param args: <dict> settings for the transformation
:return: <RGB> image
"""
self.__start = time.time()
self.info_start_run()
self.setup(*args)
r = self.execute(*args)
self.clean(*args)
self._setup(*args)
r = self._execute(*args)
self._clean(*args)
self.info_end_run()
return r

def info_start_run(self):
"""
Log info at the start of the run.

:return: None
"""
self.__start = time.time()
conf.log.info("Executing {}".format(camel_case_to_str(self.__class__.__name__)))
Conf.log.info("Executing {}".format(camel_case_to_str(self.__class__.__name__)))

def info_end_run(self):
conf.log.debug("{} Done in {} seconds".format(
"""
Log info at the end of the run.

:return: None
"""
Conf.log.debug("{} Done in {} seconds".format(
camel_case_to_str(self.__class__.__name__), round(time.time() - self.__start, 2)))

def setup(self, *args):
def _setup(self, *args):
"""
Configure the transformation.

:param args: <dict> settings for the transformation
:return: None
"""
pass

def execute(self, *args):
def _execute(self, *args):
"""
Execute the transformation.

:param args: <dict> settings for the transformation
:return: None
"""
pass

def clean(self, *args):
pass
def _clean(self, *args):
"""
Clean the transformation.

:param args: <dict> settings for the transformation
:return: None
"""
pass

+ 22
- 383
transform/gan/__init__.py View File

@@ -1,423 +1,62 @@
import functools
import os
from collections import OrderedDict
"""GAN Transforms."""

import cv2
import numpy as np
import torch
from PIL import Image
from torchvision import transforms as transforms

from config import Config as conf
from config import Config as Conf
from transform import ImageTransform
from transform.gan.generator import tensor2im
from transform.gan.model import DeepModel, DataLoader


class ImageTransformGAN(ImageTransform):
"""
Abstract GAN Image Transformation Class
"""
"""Abstract GAN Image Transformation Class."""

def __init__(self, checkpoint, phase, input_index=(-1,), args=None):
"""
Abstract GAN Image Transformation Class Constructor
Abstract GAN Image Transformation Class Constructor.

:param checkpoint: <string> path to the checkpoint
:param phase: <string> phase name
:param input_index: <tuple> index where to take the inputs (default is (-1) for previous transformation)
:param args: <dict> args parameter to run the image transformation (default use conf.args)
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
super().__init__(input_index=input_index, args=args)
self.__checkpoint = checkpoint
self.__phase = phase
self.__gpu_ids = self._args["gpu_ids"]

def setup(self, image):
def _setup(self, *args):
"""
Load Dataset and Model fot the image
:param image: <RGB> image to be transform
Load Dataset and Model fot the image.

:param args: <[RGB]> image to be transform
:return: None
"""
if self.__gpu_ids:
conf.log.debug("GAN Processing Using GPU IDs: {}".format(self.__gpu_ids))
Conf.log.debug("GAN Processing Using GPU IDs: {}".format(self.__gpu_ids))
else:
conf.log.debug("GAN Processing Using CPU")
Conf.log.debug("GAN Processing Using CPU")

c = conf()
c = Conf()

# Load custom phase options:
data_loader = DataLoader(c, image)
data_loader = DataLoader(c, args[0])
self.__dataset = data_loader.load_data()

# Create Model
self.__model = DeepModel()
self.__model.initialize(c, self.__gpu_ids, self.__checkpoint)

def execute(self, image):
def _execute(self, *args):
"""
Excute the GAN Transformation the image
:param image: <RGB> image to transform
Execute the GAN Transformation the image.

:param *args: <[RGB]> image to transform
:return: <RGB> image transformed
"""
for i, data in enumerate(self.__dataset):
mask = None
for data in self.__dataset:
generated = self.__model.inference(data["label"], data["inst"])
im = tensor2im(generated.data[0])
mask = cv2.cvtColor(im, cv2.COLOR_RGB2BGR)
return mask


class DataLoader:
def __init__(self, opt, cv_img):
super(DataLoader, self).__init__()

self.dataset = Dataset()
self.dataset.initialize(opt, cv_img)

self.dataloader = torch.utils.data.DataLoader(
self.dataset,
batch_size=opt.batchSize,
shuffle=not opt.serial_batches,
num_workers=int(opt.nThreads),
)

def load_data(self):
return self.dataloader

def __len__(self):
return 1


class Dataset(torch.utils.data.Dataset):
def __init__(self):
super(Dataset, self).__init__()

def initialize(self, opt, cv_img):
self.opt = opt

self.A = Image.fromarray(cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB))
self.dataset_size = 1

def __getitem__(self, index):
transform_A = get_transform(self.opt)
A_tensor = transform_A(self.A.convert("RGB"))

B_tensor = inst_tensor = feat_tensor = 0

input_dict = {
"label": A_tensor,
"inst": inst_tensor,
"image": B_tensor,
"feat": feat_tensor,
"path": "",
}

return input_dict

def __len__(self):
return 1


class DeepModel(torch.nn.Module):
def initialize(self, opt, gpu_ids, checkpoints_dir):

self.opt = opt
self.checkpoints_dir = checkpoints_dir

if gpu_ids is None:
self.gpu_ids = []
else:
self.gpu_ids = gpu_ids

self.netG = self.__define_G(
opt.input_nc,
opt.output_nc,
opt.ngf,
opt.netG,
opt.n_downsample_global,
opt.n_blocks_global,
opt.n_local_enhancers,
opt.n_blocks_local,
opt.norm,
self.gpu_ids,
)

# load networks
self.__load_network(self.netG)

def inference(self, label, inst):

# Encode Inputs
input_label, inst_map, _, _ = self.__encode_input(label, inst, infer=True)

# Fake Generation
input_concat = input_label

with torch.no_grad():
fake_image = self.netG.forward(input_concat)

return fake_image

# helper loading function that can be used by subclasses
def __load_network(self, network):

save_path = os.path.join(self.checkpoints_dir)

state_dict = torch.load(save_path)

if len(self.gpu_ids) > 1:
new_state_dict = OrderedDict()

for k, v in state_dict.items():
name = "module." + k # add `module.`
new_state_dict[name] = v
else:
new_state_dict = state_dict

network.load_state_dict(new_state_dict)

def __encode_input(
self, label_map, inst_map=None, real_image=None, feat_map=None, infer=False
):
if len(self.gpu_ids) > 0:
input_label = label_map.data.cuda() # GPU
else:
input_label = label_map.data # CPU

return input_label, inst_map, real_image, feat_map

def __weights_init(self, m):
classname = m.__class__.__name__
if classname.find("Conv") != -1:
m.weight.data.normal_(0.0, 0.02)
elif classname.find("BatchNorm2d") != -1:
m.weight.data.normal_(1.0, 0.02)
m.bias.data.fill_(0)

def __define_G(
self,
input_nc,
output_nc,
ngf,
netG,
n_downsample_global=3,
n_blocks_global=9,
n_local_enhancers=1,
n_blocks_local=3,
norm="instance",
gpu_ids=[],
):
norm_layer = self.__get_norm_layer(norm_type=norm)

# model
netG = GlobalGenerator(
input_nc, output_nc, ngf, n_downsample_global, n_blocks_global, norm_layer
)

if len(gpu_ids) > 1:
# print(
# "Using",
# len(gpu_ids),
# "of",
# torch.cuda.device_count(),
# "GPUs available.",
# )

netG = torch.nn.DataParallel(netG, gpu_ids)

if len(gpu_ids) > 0:
netG.cuda(gpu_ids[0])

netG.apply(self.__weights_init)

return netG

def __get_norm_layer(self, norm_type="instance"):
norm_layer = functools.partial(torch.nn.InstanceNorm2d, affine=False)
return norm_layer


##############################################################################
# Generator
##############################################################################


class GlobalGenerator(torch.nn.Module):
def __init__(
self,
input_nc,
output_nc,
ngf=64,
n_downsampling=3,
n_blocks=9,
norm_layer=torch.nn.BatchNorm2d,
padding_type="reflect",
):
assert n_blocks >= 0
super(GlobalGenerator, self).__init__()

activation = torch.nn.ReLU(True)
# activation = torch.nn.DataParallel(activation)

model = [
torch.nn.ReflectionPad2d(3),
torch.nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0),
norm_layer(ngf),
activation,
]
# downsample
for i in range(n_downsampling):
mult = 2 ** i
model += [
torch.nn.Conv2d(
ngf * mult, ngf * mult * 2, kernel_size=3, stride=2, padding=1
),
norm_layer(ngf * mult * 2),
activation,
]

# resnet blocks
mult = 2 ** n_downsampling
for i in range(n_blocks):
model += [
ResnetBlock(
ngf * mult,
padding_type=padding_type,
activation=activation,
norm_layer=norm_layer,
)
]

# upsample
for i in range(n_downsampling):
mult = 2 ** (n_downsampling - i)
model += [
torch.nn.ConvTranspose2d(
ngf * mult,
int(ngf * mult / 2),
kernel_size=3,
stride=2,
padding=1,
output_padding=1,
),
norm_layer(int(ngf * mult / 2)),
activation,
]
model += [
torch.nn.ReflectionPad2d(3),
torch.nn.Conv2d(ngf, output_nc, kernel_size=7, padding=0),
torch.nn.Tanh(),
]

self.model = torch.nn.Sequential(*model)
# self.model = torch.nn.DataParallel(self.model)

def forward(self, input):
return self.model(input)


# Define a resnet block


class ResnetBlock(torch.nn.Module):
def __init__(
self,
dim,
padding_type,
norm_layer,
activation=torch.nn.ReLU(True),
use_dropout=False,
):
super(ResnetBlock, self).__init__()
self.conv_block = self.__build_conv_block(
dim, padding_type, norm_layer, activation, use_dropout
)

def __build_conv_block(
self, dim, padding_type, norm_layer, activation, use_dropout
):
conv_block = []
p = 0
if padding_type == "reflect":
conv_block += [torch.nn.ReflectionPad2d(1)]
elif padding_type == "replicate":
conv_block += [torch.nn.ReplicationPad2d(1)]
elif padding_type == "zero":
p = 1
else:
raise NotImplementedError("padding [%s] is not implemented" % padding_type)

conv_block += [
torch.nn.Conv2d(dim, dim, kernel_size=3, padding=p),
norm_layer(dim),
activation,
]
if use_dropout:
conv_block += [torch.nn.Dropout(0.5)]

p = 0
if padding_type == "reflect":
conv_block += [torch.nn.ReflectionPad2d(1)]
elif padding_type == "replicate":
conv_block += [torch.nn.ReplicationPad2d(1)]
elif padding_type == "zero":
p = 1
else:
raise NotImplementedError("padding [%s] is not implemented" % padding_type)
conv_block += [
torch.nn.Conv2d(dim, dim, kernel_size=3, padding=p),
norm_layer(dim),
]

return torch.nn.Sequential(*conv_block)

def forward(self, x):
out = x + self.conv_block(x)
return out


# Data utils:


def get_transform(opt, method=Image.BICUBIC, normalize=True):
transform_list = []

base = float(2 ** opt.n_downsample_global)
if opt.netG == "local":
base *= 2 ** opt.n_local_enhancers
transform_list.append(
transforms.Lambda(lambda img: __make_power_2(img, base, method))
)

transform_list += [transforms.ToTensor()]

if normalize:
transform_list += [transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
return transforms.Compose(transform_list)


def __make_power_2(img, base, method=Image.BICUBIC):
ow, oh = img.size
h = int(round(oh / base) * base)
w = int(round(ow / base) * base)
if (h == oh) and (w == ow):
return img
return img.resize((w, h), method)


# Converts a Tensor into a Numpy array
# |imtype|: the desired type of the converted numpy array


def tensor2im(image_tensor, imtype=np.uint8, normalize=True):
if isinstance(image_tensor, list):
image_numpy = []
for i in range(len(image_tensor)):
image_numpy.append(tensor2im(image_tensor[i], imtype, normalize))
return image_numpy
image_numpy = image_tensor.cpu().float().numpy()
if normalize:
image_numpy = (np.transpose(image_numpy, (1, 2, 0)) + 1) / 2.0 * 255.0
else:
image_numpy = np.transpose(image_numpy, (1, 2, 0)) * 255.0
image_numpy = np.clip(image_numpy, 0, 255)
if image_numpy.shape[2] == 1 or image_numpy.shape[2] > 3:
image_numpy = image_numpy[:, :, 0]
return image_numpy.astype(imtype)

+ 245
- 0
transform/gan/generator.py View File

@@ -0,0 +1,245 @@
"""GAN generator."""
import numpy as np
import torch
from PIL import Image
from torchvision import transforms as transforms


class GlobalGenerator(torch.nn.Module):
"""Global Generator."""

def __init__(
self,
input_nc,
output_nc,
ngf=64,
n_downsampling=3,
n_blocks=9,
norm_layer=torch.nn.BatchNorm2d,
padding_type="reflect",
):
"""
Global Generator Constructor.

:param input_nc:
:param output_nc:
:param ngf:
:param n_downsampling:
:param n_blocks:
:param norm_layer:
:param padding_type:
"""
if n_blocks < 0:
raise AssertionError()
super(GlobalGenerator, self).__init__()

activation = torch.nn.ReLU(True)
# activation = torch.nn.DataParallel(activation)

model = [
torch.nn.ReflectionPad2d(3),
torch.nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0),
norm_layer(ngf),
activation,
]
# downsample
for i in range(n_downsampling):
mult = 2 ** i
model += [
torch.nn.Conv2d(
ngf * mult, ngf * mult * 2, kernel_size=3, stride=2, padding=1
),
norm_layer(ngf * mult * 2),
activation,
]

# resnet blocks
mult = 2 ** n_downsampling
for _ in range(n_blocks):
model += [
ResnetBlock(
ngf * mult,
padding_type=padding_type,
activation=activation,
norm_layer=norm_layer,
)
]

# upsample
for i in range(n_downsampling):
mult = 2 ** (n_downsampling - i)
model += [
torch.nn.ConvTranspose2d(
ngf * mult,
int(ngf * mult / 2),
kernel_size=3,
stride=2,
padding=1,
output_padding=1,
),
norm_layer(int(ngf * mult / 2)),
activation,
]
model += [
torch.nn.ReflectionPad2d(3),
torch.nn.Conv2d(ngf, output_nc, kernel_size=7, padding=0),
torch.nn.Tanh(),
]

self.model = torch.nn.Sequential(*model)
# self.model = torch.nn.DataParallel(self.model)

def forward(self, i):
"""
Global Generator forward.

:param i: <> input
:return:
"""
return self.model(i)


class ResnetBlock(torch.nn.Module):
"""Define a resnet block."""

def __init__(
self,
dim,
padding_type,
norm_layer,
activation=None,
use_dropout=False,
):
"""
Resnet Block constuctor.

:param dim: <> dim
:param padding_type: <> padding_type
:param norm_layer: <> norm_layer
:param activation: <> activation
:param use_dropout: <> use_dropout
"""
super(ResnetBlock, self).__init__()

if activation is None:
activation = torch.nn.ReLU(True)

self.conv_block = self.__build_conv_block(
dim, padding_type, norm_layer, activation, use_dropout
)

@staticmethod
def __build_conv_block(
dim, padding_type, norm_layer, activation, use_dropout
):
conv_block = []
p = 0
if padding_type == "reflect":
conv_block += [torch.nn.ReflectionPad2d(1)]
elif padding_type == "replicate":
conv_block += [torch.nn.ReplicationPad2d(1)]
elif padding_type == "zero":
p = 1
else:
raise NotImplementedError("padding [%s] is not implemented" % padding_type)

conv_block += [
torch.nn.Conv2d(dim, dim, kernel_size=3, padding=p),
norm_layer(dim),
activation,
]
if use_dropout:
conv_block += [torch.nn.Dropout(0.5)]

p = 0
if padding_type == "reflect":
conv_block += [torch.nn.ReflectionPad2d(1)]
elif padding_type == "replicate":
conv_block += [torch.nn.ReplicationPad2d(1)]
elif padding_type == "zero":
p = 1
else:
raise NotImplementedError("padding [%s] is not implemented" % padding_type)
conv_block += [
torch.nn.Conv2d(dim, dim, kernel_size=3, padding=p),
norm_layer(dim),
]

return torch.nn.Sequential(*conv_block)

def forward(self, x):
"""
Resnet Block forward.

:param x: <> input
:return: <> out
"""
out = x + self.conv_block(x)
return out


def get_transform(opt, method=Image.BICUBIC, normalize=True):
"""
Get transform list.

:param opt: <Config> configuration
:param method: <> transformation method used
:param normalize: <boolean> if true normalization is enable
:return:
"""
transform_list = []

base = float(2 ** opt.n_downsample_global)
if opt.net_g == "local":
base *= 2 ** opt.n_local_enhancers
transform_list.append(
transforms.Lambda(lambda img: make_power_2(img, base, method))
)

transform_list += [transforms.ToTensor()]

if normalize:
transform_list += [transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
return transforms.Compose(transform_list)


def make_power_2(img, base, method=Image.BICUBIC):
"""
Make power 2.

:param img: <> image
:param base: <> base
:param method: <> method
:return:
"""
ow, oh = img.size
h = int(round(oh / base) * base)
w = int(round(ow / base) * base)
if (h == oh) and (w == ow):
return img
return img.resize((w, h), method)


def tensor2im(image_tensor, imtype=np.uint8, normalize=True):
"""
Convert a Tensor into a Numpy array.

:param image_tensor: <> image tensor
:param imtype: <imtype> the desired type of the converted numpy array
:param normalize: <Boolean> if true normalization is enable
:return:
"""
if isinstance(image_tensor, list):
image_numpy = []
for i in image_tensor:
image_numpy.append(tensor2im(i, imtype, normalize))
return image_numpy
image_numpy = image_tensor.cpu().float().numpy()
if normalize:
image_numpy = (np.transpose(image_numpy, (1, 2, 0)) + 1) / 2.0 * 255.0
else:
image_numpy = np.transpose(image_numpy, (1, 2, 0)) * 255.0
image_numpy = np.clip(image_numpy, 0, 255)
if image_numpy.shape[2] == 1 or image_numpy.shape[2] > 3:
image_numpy = image_numpy[:, :, 0]
return image_numpy.astype(imtype)

+ 17
- 19
transform/gan/mask.py View File

@@ -1,20 +1,20 @@
"""GAN Mask Transforms."""
from transform.gan import ImageTransformGAN
from config import Config as conf
from config import Config as Conf


class CorrectToMask(ImageTransformGAN):
"""
Correct -> Mask [GAN]
"""
"""Correct -> Mask [GAN]."""

def __init__(self, input_index=(-1,), args=None):
"""
CorrectToMask Constructor
Correct To Mask constructor.

:param input_index: <tuple> index where to take the inputs (default is (-1) for previous transformation)
:param args: <dict> args parameter to run the image transformation (default use conf.args)
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
super().__init__(
(args if args is not None else conf.args)['checkpoints']["correct_to_mask"],
(args if args is not None else Conf.args)['checkpoints']["correct_to_mask"],
"correct_to_mask",
input_index=input_index,
args=args
@@ -22,18 +22,17 @@ class CorrectToMask(ImageTransformGAN):


class MaskrefToMaskdet(ImageTransformGAN):
"""
Maskref -> Maskdet [GAN]
"""
"""Maskref -> Maskdet [GAN]."""

def __init__(self, input_index=(-1,), args=None):
"""
MaskrefToMaskdet Constructor
Maskref To Maskdet constructor.

:param input_index: <tuple> index where to take the inputs (default is (-1) for previous transformation)
:param args: <dict> args parameter to run the image transformation (default use conf.args)
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
super().__init__(
(args if args is not None else conf.args)['checkpoints']["maskref_to_maskdet"],
(args if args is not None else Conf.args)['checkpoints']["maskref_to_maskdet"],
"maskref_to_maskdet",
input_index=input_index,
args=args
@@ -41,18 +40,17 @@ class MaskrefToMaskdet(ImageTransformGAN):


class MaskfinToNude(ImageTransformGAN):
"""
Maskfin -> Nude [GAN]
"""
"""Maskfin -> Nude [GAN]."""

def __init__(self, input_index=(-1,), args=None):
"""
MaskfinToNude Constructor
Maskfin To Nude constructor.

:param input_index: <tuple> index where to take the inputs (default is (-1) for previous transformation)
:param args: <dict> args parameter to run the image transformation (default use conf.args)
:param args: <dict> args parameter to run the image transformation (default use Conf.args)
"""
super().__init__(
(args if args is not None else conf.args)['checkpoints']["maskfin_to_nude"],
(args if args is not None else Conf.args)['checkpoints']["maskfin_to_nude"],
"maskfin_to_nude",
input_index=input_index,
args=args

+ 228
- 0
transform/gan/model.py View File

@@ -0,0 +1,228 @@
"""GAN Model."""
import functools
import os
from collections import OrderedDict

import cv2
import torch
from PIL import Image

from transform.gan.generator import GlobalGenerator, get_transform


class DataLoader:
"""Dataset loader class."""

def __init__(self, opt, cv_img):
"""
Construct Data loader.

:param opt: <Config> configuration to use
:param cv_img: <RGB> image
"""
super(DataLoader, self).__init__()

self.dataset = Dataset()
self.dataset.initialize(opt, cv_img)

self.dataloader = torch.utils.data.DataLoader(
self.dataset,
batch_size=opt.batch_size,
shuffle=not opt.serial_batches,
num_workers=int(opt.n_threads),
)

def load_data(self):
"""
Return loaded data.

:return: <> loaded data
"""
return self.dataloader

def __len__(self):
"""
Redefine __len___ for DataLoader.

:return: <int> 1
"""
return 1


class Dataset(torch.utils.data.Dataset):
"""Dataset class."""

def __init__(self):
"""Dataset Constructor."""
super(Dataset, self).__init__()

def initialize(self, opt, cv_img):
"""
Initialize the Dataset.

:param opt:
:param cv_img:
:return:
"""
self.opt = opt

self.A = Image.fromarray(cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB))
self.dataset_size = 1

def __getitem__(self, index):
"""
Redefine Dataset __getitem__.

:param index:
:return:
"""
transform_a = get_transform(self.opt)
a_tensor = transform_a(self.A.convert("RGB"))

b_tensor = inst_tensor = feat_tensor = 0

input_dict = {
"label": a_tensor,
"inst": inst_tensor,
"image": b_tensor,
"feat": feat_tensor,
"path": "",
}

return input_dict

def __len__(self):
"""
Redefine __len___ for Dataset.

:return: <int> 1
"""
return 1


class DeepModel(torch.nn.Module):
"""Deep Model."""

def initialize(self, opt, gpu_ids, checkpoints_dir):
"""
Deep Model initialize.

:param opt: <Config> configuration to use
:param gpu_ids: <int[]|None> gpu id to use (None = cpu)
:param checkpoints_dir: <string> path to the directoy where models are
:return:
"""
self.opt = opt
self.checkpoints_dir = checkpoints_dir

if gpu_ids is None:
self.gpu_ids = []
else:
self.gpu_ids = gpu_ids

self.net_g = self.__define_g(
opt.input_nc,
opt.output_nc,
opt.ngf,
opt.net_g,
opt.n_downsample_global,
opt.n_blocks_global,
opt.n_local_enhancers,
opt.n_blocks_local,
opt.norm,
self.gpu_ids,
)

# load networks
self.__load_network(self.net_g)

def inference(self, label, inst):
"""
Infere an image.

:param label: <> label
:param inst: <> isnt
:return: <RGB> image
"""
# Encode Inputs
input_label, _, _, _ = self.__encode_input(label, inst, infer=True)

# Fake Generation
input_concat = input_label

with torch.no_grad():
fake_image = self.net_g.forward(input_concat)

return fake_image

# helper loading function that can be used by subclasses
def __load_network(self, network):

save_path = os.path.join(self.checkpoints_dir)

state_dict = torch.load(save_path)

if len(self.gpu_ids) > 1:
new_state_dict = OrderedDict()

for k, v in state_dict.items():
name = "module." + k # add `module.`