Browse Source

core lib update

master
Carlos Reding 1 year ago
parent
commit
213035cef7

+ 4
- 3
puppeteer/__init__.py View File

@@ -22,7 +22,7 @@
"""

from ._core import SetupDevice, SetProtocol
from ._dataparser import export_img_data, export_queue
from ._dataparser import export_img_data, export_queue, house_keeper
import time

def Controller(USR_INSTRUCTIONS, Connection, cancel_switch, ConnectionStatus,
@@ -160,8 +160,9 @@ def ExportData(Protocol, ProcessData, Connection, Status, BUFFER_SIZE=64,
# Export temperature record _ONLY IF_ running a normal protocol.
if Test is False:
export_img_data(Protocol._dir_info, None, Connection, BUFFER_SIZE)
# House keeping.
# house_keeper(Protocol._dir_info)
if Status.is_set:
print("All done.")
Connection.sendall("::Protocol run successfully::".encode())
# House keeping. Always. Test or not.
house_keeper(Protocol._dir_info, Connection, BUFFER_SIZE)


+ 146
- 93
puppeteer/_core.py View File

@@ -27,8 +27,48 @@ BUFFER_SIZE = 64
# Default camera settings
DEFAULT_ISO = 10
ISO_F1 = 100
ISO_F2 = 100
ISO_F3 = 400

def _img_settings(Camera, Filter=None):
"""
Define camera settings. Auto-exposure or auto-white-balance result
in aleatory camera settings depending upon light conditions. _WE DO
NOT WANT THIS_. Turn them off and fix the camera values, making the
images independent from light conditions.\n

The default camera resolution in imdev V2 is HD720p (1280x720), but
it is not able to capture the plate (partial sensor area). The
smallest resolution using full sensor area is 1640x922, which offers
a good compromise between quality (data) and speed. HD1080p
resolution takes longer to acquire and process, and it also is
unable to capture a full microplate due to a different partial
sensor area used. Information about these limitations can be found
at https://imdev.readthedocs.io/en/release-1.13/fov.html#sensor-modes
"""
Camera.exposure_mode = 'auto' # Unlock Settings.
Camera.hflip = True # Ensures well A1 is in the bottom-left.
Camera.awb_mode = 'off'
Camera.awb_gains = (1.45, 1.45)
SHUTTER_SPEED = 0.25 # Shutter speed time in SECONDS. Defaults to 0.25s
if Filter == "Filter_1":
Camera.iso = ISO_F1
elif Filter == "Filter_2":
SHUTTER_SPEED = 5
Camera.iso = ISO_F2
elif Filter == "Filter_3":
Camera.iso = ISO_F3
else:
Camera.iso = DEFAULT_ISO
Camera.shutter_speed = int(SHUTTER_SPEED * 10**6) # Translate SHUTTER_SPEED to µs.
# max_res = Camera.MAX_RESOLUTION # How big can the image be?
# Camera.resolution = (int(max_res[0]), int(max_res[1]))
Camera.resolution = (1640, 1232) # Smallest resolution with full FoV (partial FoV results in plate not fully captured).
# imdev v2 has issues with 'exposure_mode = off' causing black img.
t.sleep(0.1) # Solves those issues (WTF?) as long as it's _BEFORE_ exposure_mode.
Camera.exposure_mode = 'off' # Lock Settings.
return Camera

def img_acquisition_routine(self, light_range, wavelength, photo_path,
file_name, flag=False, calibrate=False):
"""
@@ -42,10 +82,10 @@ def img_acquisition_routine(self, light_range, wavelength, photo_path,
This image is stored locally in `photo_path`.
`flag=False`:
Additionaly, an extra picture is saved
if the highest light intensity is used or else, `flag=True`. Defaults to
FALSE.\n
Additionaly, an extra picture is saved if the highest light intensity is
used or else, `flag=True`. Defaults to FALSE.\n
`calibrate=False':
The flag `calibrate' generates images at all intensities and filters
that are stored locally in the device, used for blank correction in the
case data is processed locally. Defaults to FALSE.
@@ -69,6 +109,55 @@ def img_acquisition_routine(self, light_range, wavelength, photo_path,
else:
self._device._modulate_LED_intensity(0, channel=wavelength)

def img_acquisition_routine_triplicate(self, light_intensity, wavelength,
photo_path, file_name, flag=False,
Replicates=3):
"""
# FIX: CHANGE DOC.
# Main image acquisition routine.\n
# This method returns a generator that screens through `light_range` and
# changes the LED intensity accordingly for each `wavelength`. A picture
# is taken at every iteration. The resulting `file_name` is a composite
# name, that includes metadata such as time, that here complemented by
# adding the corresponding `light_intensity` value (i.e. `Read_0s_255`).
# This image is stored locally in `photo_path`.
# `flag=False`:
# Additionaly, an extra picture is saved if the highest light intensity is
# used or else, `flag=True`. Defaults to FALSE.\n
# `calibrate=False':
# The flag `calibrate' generates images at all intensities and filters
# that are stored locally in the device, used for blank correction in the
# case data is processed locally. Defaults to FALSE.
"""
for r in range(Replicates):
self._device._modulate_LED_intensity(light_intensity, channel=wavelength)
if wavelength == 1:
t.sleep(1)
else:
t.sleep(0.1) # Get's rid of French flag issue... why?
yield photo_path + file_name + str(r) + self._pic_ext
if light_intensity == 255 or flag is True:
yield photo_path + "findWells" + self._pic_ext
# When finished, switch OFF
self._device._modulate_LED_intensity(0, channel=wavelength)

def _average_image(self, photo_path, Replicate_name, Replicates=3):
"""
Load the relicate
"""
for r in range(Replicates):
file_name = Replicate_name + str(r) + self._pic_ext
r_img = cv2.imread(photo_path + file_name)
if r == 0:
img_container = np.zeros_like(r_img.astype('int')) # WORKAROUND: np.mean wraps up uint8, so 256 becomes 0.
img_container += r_img.astype('int')
os.remove(photo_path + file_name) # Removes physical copy.
del r_img, file_name # Free up memory.
return np.uint8(img_container / Replicates)

def _get_response(Connection, BUFFER_SIZE):
return Connection.recv(BUFFER_SIZE)

@@ -199,7 +288,7 @@ class SetupDevice:
for pin in self._GPIO_PIN:
light_panel.set_mode(pin, OUTPUT)
light_panel.set_PWM_frequency(pin, 20000) # Hz (20MHz max with -s 2).
light_panel.write(pin, 0) # Init off state.
# light_panel.write(pin, 0) # Init off state.
else:
raise ValueError("Channels must be 1 < chnl < 3, but ",
str(len(self._GPIO_PIN)), " were specified.")
@@ -257,7 +346,7 @@ class SetupDevice:
# Camera
try:
camera = imdev()
camera = self._img_settings(camera)
camera = _img_settings(camera) # Set camera defaults.
log = str("Camera... [OK]")
status = str("OK")
except:
@@ -329,85 +418,14 @@ class SetupDevice:
if cli.decode() == "Acknowledged":
Connection.sendall(pickle.dumps(self._status_list))

def _img_settings(self, Camera):
"""
Define camera settings. Auto-exposure or auto-white-balance result
in aleatory camera settings depending upon light conditions. _WE DO
NOT WANT THIS_. Turn them off and fix the camera values, making the
images independent from light conditions.\n

The default camera resolution in imdev V2 is HD720p (1280x720), but
it is not able to capture the plate (partial sensor area). The
smallest resolution using full sensor area is 1640x922, which offers
a good compromise between quality (data) and speed. HD1080p
resolution takes longer to acquire and process, and it also is
unable to capture a full microplate due to a different partial
sensor area used. Information about these limitations can be found
at https://imdev.readthedocs.io/en/release-1.13/fov.html#sensor-modes
"""
Camera.hflip = True # Ensures well A1 is in the bottom-left.
Camera.awb_mode = 'off'
Camera.awb_gains = (1.45, 1.45)
SHUTTER_SPEED = 0.25 # Shutter speed time in SECONDS.
Camera.shutter_speed = int(SHUTTER_SPEED * 10**6) # Translate SHUTTER_SPEED to µs.
Camera.iso = DEFAULT_ISO
# max_res = Camera.MAX_RESOLUTION # How big can the image be?
# Camera.resolution = (int(max_res[0]), int(max_res[1]))
Camera.resolution = (1640, 1232) # Smallest resolution with full FoV (partial FoV results in plate not fully captured).
# imdev v2 has issues with 'exposure_mode = off' causing black img.
t.sleep(0.1) # Solves those issues (WTF?) as long as it's _BEFORE_ exposure_mode.
Camera.exposure_mode = 'off'
return Camera

# def _modulate_LED_intensity(self, light_intensity, channel=None):
# """ Change LED intensity for a given filter (channel). 'filter' is a
# function from python's core library, hence the use of 'channel'
# instead. Light panel is switched off prior to changing the filter
# to avoid the overlap of multiple wavelengths (LEDs). This step is
# fast enough to pass unnotices).\n

# In case of an RGB panel, the white light is emulated by reducing
# the green LED by 40% (visually checked).\n

# GPIO settings for 'rgb' plate_type : R (600nm, 0), G (GFP/YFP, 1)
# and B (CFP, 2).
# """
# available_filters = sorted(list(self._filter_set))
# if channel is None:
# Single LED light panel, only one wavelength.
# self._light_panel.set_PWM_dutycycle(self._GPIO_PIN[0],
# light_intensity)
# else:
# # Turn off _ALL_ LEDs prior to change settings.
# # Otherwise, multiple wavelengths will overlap.
# for pin in self._GPIO_PIN:
# self._light_panel.write(pin, 0)

# if channel == available_filters[0]: # CFP
# self._light_panel.set_PWM_dutycycle(self._GPIO_PIN[2],
# light_intensity)
# elif channel == available_filters[1]: # GFP/YFP
# self._light_panel.set_PWM_dutycycle(self._GPIO_PIN[1],
# light_intensity)
# elif channel == available_filters[2]: # OD
# self._light_panel.set_PWM_dutycycle(self._GPIO_PIN[0],
# light_intensity)
# elif channel == available_filters[3]:
# # Simulated white light based on RGB LEDs
# self._light_panel.set_PWM_dutycycle(self._GPIO_PIN[0],
# light_intensity)
# self._light_panel.set_PWM_dutycycle(self._GPIO_PIN[1],
# int(light_intensity * 0.6))
# self._light_panel.set_PWM_dutycycle(self._GPIO_PIN[2],
# light_intensity)

def _modulate_LED_intensity(self, light_intensity, channel=None):
"""
Change LED intensity for a given filter (channel). 'filter' is a
function from python's core library, hence the use of 'channel'
instead. Light panel is switched off prior to changing the filter
to avoid the overlap of multiple wavelengths (LEDs). This step is
fast enough to pass unnotices).\n
fast enough to pass unnotices) *** NOT IT DOES NOT!!! IT
IS NOTICEABLE!!!. STEP REMOVED ***\n

In case of an RGB panel, the white light is emulated by reducing
the green LED by 40% (visually checked).\n
@@ -420,15 +438,12 @@ class SetupDevice:
self._light_panel.set_PWM_dutycycle(self._GPIO_PIN[0],
light_intensity)
else:
# Turn off _ALL_ LEDs prior to change settings.
# Otherwise, multiple wavelengths will overlap.
for pin in self._GPIO_PIN:
self._light_panel.write(pin, 0)

if channel == 0: # CFP
self._light_panel.set_PWM_dutycycle(self._GPIO_PIN[2],
light_intensity)
elif channel == 1: # GFP/YFP
if light_intensity > 200:
light_intensity = 200 # Dutycycle is 255 means permanently ON, regardless of frequency.
self._light_panel.set_PWM_dutycycle(self._GPIO_PIN[1],
light_intensity)
elif channel == 2: # OD
@@ -539,6 +554,42 @@ class SetProtocol:
use_video_port=False, quality=90)
return 0

def _data_acquisition_triplicate(self, light_range, time, flt, wavelength,
singleRead):
""" Takes a picture at a given light_intensity. Files are named based
on light intensity, time and filter used. """
photo_path = self._dir_info.root_path + self._dir_info.protocol_path +\
self._dir_info.img_data_path + flt + "/"
Replicate_name = "Replicate_"
# This method optimises photography time by 1) using JPEG hardware
# acceleration, 2) avoiding the overheads of initialising cameras'
# still pipeline (init still port + encoder for every picture) and 3)
# eliminating overheads of initialising preview port. use_video_port
# can accelerate this further, but at the expense of a lower quality
# image.
for light_intensity in light_range:
self._device._camera.capture_sequence(img_acquisition_routine_triplicate(self,
light_intensity,
wavelength,
photo_path,
Replicate_name,
flag=singleRead),
format='jpeg', burst=True,
use_video_port=False, quality=90)
# Now generate `average' image
mean_img = _average_image(self, photo_path, Replicate_name)
# If no custom names provided, use a preset name.
if self._notes == '':
file_name = self._pic_file_name + str(time) + "s_" +\
str(light_intensity).zfill(3) + self._pic_ext
else:
file_name = self._pic_file_name + str(time) + "s_" +\
str(light_intensity).zfill(3)+ self._notes +\
self._pic_ext
cv2.imwrite(photo_path + file_name, mean_img)
del mean_img # Free memory.
return 0

def _detect_well_info(self, recalculate_wells,
plate_reference="findWells.jpg"):
"""
@@ -721,18 +772,20 @@ class SetProtocol:
# Read plate
init_time = t.time() # Used to account for plate reading time.
for flt, wavelength in zip(self._filters, self._wavelengths):
# Re-set ISO value to brighten up these filters...
if flt is "Filter_1":
self._device._camera.iso = ISO_F1
elif flt is "Filter_3":
self._device._camera.iso = ISO_F3
if flt == "Filter_2":
self._device._light_panel.set_PWM_frequency(self._device._GPIO_PIN[wavelength], 25)
# Set camera on-the-fly to adapt to different wavelenghts.
# Because this step happens with any filter, it does not require
# a post-read reset.
self._device._camera = _img_settings(self._device._camera,
Filter=flt)
servo_state = self._device.set_filter(flt, current_pwm=servo_state)
self._data_acquisition(light_range, current_time, flt,
# self._data_acquisition(light_range, current_time, flt,
# wavelength, singleRead)
self._data_acquisition_triplicate(light_range, current_time, flt,
wavelength, singleRead)
# Restore ISO value to DEFAULT_ISO to prevent overexposure
# in other filters.
if flt is "Filter_1" or flt is "Filter_3":
self._device._camera.iso = DEFAULT_ISO
if flt == "Filter_2":
self._device._light_panel.set_PWM_frequency(self._device._GPIO_PIN[wavelength], 20000)
# Turn LED off
self._device._modulate_LED_intensity(0, channel=wavelength)

+ 2
- 1
puppeteer/_dataparser/__init__.py View File

@@ -3,4 +3,5 @@ asd
"""
from .plate_sniffer import detect_wells as wells_id
from .plate_reader import PlateParser as read_plate
from .data_handler import export_numeric_data, export_img_data, export_queue
from .data_handler import export_numeric_data, export_img_data,\
export_queue, house_keeper

+ 10
- 9
puppeteer/_dataparser/data_handler.py View File

@@ -109,12 +109,6 @@ def export_img_data(dir_info, flt, connection, BUFFER_SIZE):
fIn.close()
# Once file exported, delete photos to save space.
connection.sendall(str("::DATA TRANSFER SUCCESS::").encode())
print("IMG data transferred successfuly. Erasing local data...")
cli = _get_response(connection, BUFFER_SIZE)
print(cli)
if cli.decode() == "Remove local data":
sh.rmtree(localPath)
print("...local data erased.")
else:
log = str("::FAILURE: cannot export any data::")
print(log)
@@ -142,6 +136,13 @@ def export_queue(QueueFile, connection, BUFFER_SIZE):
log = str("Queue sent successfully.\n")
print(log)

# def house_keeper(dir_info):
# localPath = dir_info.root_path + dir_info.protocol_path
# sh.rmtree(localPath)
def house_keeper(dir_info, connection, BUFFER_SIZE):
localPath = dir_info.root_path + dir_info.protocol_path + "/"
print("IMG data transferred successfuly. Erasing local data...")
cli = _get_response(connection, BUFFER_SIZE)
print(cli)
if cli.decode() == "Remove local data":
sh.rmtree(localPath)
print("...local data erased.")



+ 2
- 0
pupuicon/_corelib.py View File

@@ -118,6 +118,8 @@ class InitNetwork(object):
self._PORT = PORT
self._MAX_CONNECTIONS = MAX_CONNECTIONS
self.Socket, self.Connection, self.Curr_Interface = self._init_connection(BUFFER_SIZE)
# Before handing over the conection to the GUI, check if any data has
# not been transferred:

class NetworkOperator():
"""

Loading…
Cancel
Save