Browse Source

Setting HW parameters from GUI

master
Carlos Reding 1 year ago
parent
commit
404ddd2966

+ 41
- 4
puppeteer/__init__.py View File

@@ -23,7 +23,13 @@

from ._core import SetupDevice, SetProtocol
from ._dataparser import export_img_data, export_queue, house_keeper
import time
import time, subprocess, pickle, os


# Location of hardware settings:
hw_fName = "/hwsettings.hws"
Path = os.getcwd() + "/.srv"


def Controller(USR_INSTRUCTIONS, Connection, cancel_switch, ConnectionStatus,
BoxQueue, ProtocolStatus):
@@ -137,8 +143,8 @@ def QueueStatus(QueueFile, Connection, BUFFER_SIZE=64):
"""
export_queue(QueueFile, Connection, BUFFER_SIZE) # TODO: change method name from export_queue to report_progress?

def ExportData(Protocol, ProcessData, Connection, Status, BUFFER_SIZE=64,
Test=False):
def ExportData(Protocol, ProcessData, QueueFile, Connection, Status,
BUFFER_SIZE=64, Test=False):
"""
Export IMG or numerical information. Use variable `Status' to detect
wheter a user is connected through GUI or not.
@@ -164,5 +170,36 @@ def ExportData(Protocol, ProcessData, Connection, Status, BUFFER_SIZE=64,
print("All done.")
Connection.sendall("::Protocol run successfully::".encode())
# House keeping. Always. Test or not.
house_keeper(Protocol._dir_info, Connection, BUFFER_SIZE)
house_keeper(Protocol._dir_info, QueueFile, Connection, BUFFER_SIZE)

def CancelProtocol(Protocol, Routine=".cancel_protocol.sh"):
"""
Cancels a running protocol with IMMEDIATE effect. The use of flags with
multithreading cannot cancel a running protocol until the next iteration
be it within a minute or an hour. `CancelProtocol' soft-restarts the
light modulator and, therefore, protocols are cancelled immediately.
"""
# Stop hardware
Protocol._device._camera.close()
Protocol._device._heater.stop()
Protocol._device._servo.stop()
Protocol._device._light_panel.stop()
# Restart it all...
LiMO_Path = "/home/client/.srv/"
subprocess.call([".", LiMO_Path + Routine])

def CameraRepetitions():
"""
Routine to detect the number of replicates defined by the user through
the graphical user interface (GUI). If this setting does not exist,
default to 3.
"""
if os.path.exists(Path + hw_fName):
hwSettings = pickle.load(open(Path + hw_fName, "rb"))
Replicates = hwSettings["cameraRepetitions"]
Lag_per_replicate = hwSettings["SHUTTER_SPEED"]
else:
Replicates = 3
Lag_per_replicate = 0.25
return Replicates, Lag_per_replicate

BIN
puppeteer/__pycache__/__init__.cpython-37.pyc View File


BIN
puppeteer/__pycache__/_core.cpython-37.pyc View File


+ 90
- 35
puppeteer/_core.py View File

@@ -11,24 +11,72 @@ from ._treeparser import GenerateTree
from . import _dataparser as dp
import cv2

# Set PINs (MODE == BCM).
GPIO_PIN_RGB = [17, 27, 22] # RGB mode, simulated white Ws = R + 0.6*G + B.
GPIO_PIN_M = 17 # Monocolour mode (white).
FILTER_PIN = 3 # Servo holding filters.
HEATER_PIN = 10 # Heating unit.

# Default buffer size in bytes
BUFFER_SIZE = 64

# Temperature sampling frequency in seconds.
TEMPERATURE_SAMPLING_FREQ = 3.0
TEMPERATURE_TOLERANCE = 1.0

# Default buffer size in bytes
BUFFER_SIZE = 64
# Location of hardware settings:
hw_fName = "/hwsettings.hws"
Path = os.getcwd() + "/.srv"

# Load settings defined by in GUI, otherwise use defaults:
if os.path.exists(Path + hw_fName):
hwSettings = pickle.load(open(Path + hw_fName, "rb"))
# Set PINs (MODE == BCM).
GPIO_PIN_RGB = hwSettings["GPIO_PIN_RGB"] # RGB mode, simulated white Ws = R + G + B.
GPIO_PIN_M = hwSettings["GPIO_PIN_M"] # Monocolour mode (white).
FILTER_PIN = hwSettings["FILTER_PIN"] # Servo holding filters.
HEATER_PIN = hwSettings["HEATER_PIN"] # Heating unit.
__FILTER_PWM__ = hwSettings["filterSet"] # Filter locations
# Default camera settings
__SHUTTER_SPEED__ = hwSettings["SHUTTER_SPEED"] # Shutter speed time in SECONDS. Defaults to 0.25s
DEFAULT_ISO = hwSettings["DEFAULT_ISO"]
ISO_F1 = hwSettings["ISO_F1"]
ISO_F2 = hwSettings["ISO_F2"]
ISO_F3 = hwSettings["ISO_F3"]
# Light frequencies
DEFAULT_LIGHT_FREQ = hwSettings["DEFAULT_LIGHT_FREQ"]
LIGHT_FREQ_F1 = hwSettings["LIGHT_FREQ_F1"]
LIGHT_FREQ_F2 = hwSettings["LIGHT_FREQ_F2"]
LIGHT_FREQ_F3 = hwSettings["LIGHT_FREQ_F3"]
# Replicates
__REPLICATES__ = hwSettings["cameraRepetitions"]
else:
# Set PINs (MODE == BCM).
GPIO_PIN_RGB = [17, 27, 22] # RGB mode, simulated white Ws = R + G + B.
GPIO_PIN_M = 17 # Monocolour mode (white).
FILTER_PIN = 3 # Servo holding filters.
HEATER_PIN = 10 # Heating unit.
__FILTER_PWM__ = dict() # Filter locations
__FILTER_PWM__['Filter_1'] = 1775 # 475nm band-pass.
__FILTER_PWM__['Filter_2'] = 1460 # 520nm band-pass (centre).
__FILTER_PWM__['Filter_3'] = 1130 # 590 long-pass + 610nm short-pass.
__FILTER_PWM__['No_Filter'] = 850 # No filter (RGB image).
# Default camera settings
__SHUTTER_SPEED__ = 0.25 # Shutter speed time in SECONDS. Defaults to 0.25s
DEFAULT_ISO = 10
ISO_F1 = 100
ISO_F2 = 100
ISO_F3 = 400
# Light frequencies
DEFAULT_LIGHT_FREQ = 20000
LIGHT_FREQ_F1 = 200
LIGHT_FREQ_F2 = 200
LIGHT_FREQ_F3 = 200
# Replicates
__REPLICATES__ = 3

# Default camera settings
DEFAULT_ISO = 10
ISO_F1 = 100
ISO_F2 = 100
ISO_F3 = 400

def _img_settings(Camera, Filter=None):
"""
@@ -50,7 +98,7 @@ def _img_settings(Camera, Filter=None):
Camera.hflip = True # Ensures well A1 is in the bottom-left.
Camera.awb_mode = 'off'
Camera.awb_gains = (1.45, 1.45)
SHUTTER_SPEED = 0.25 # Shutter speed time in SECONDS. Defaults to 0.25s
SHUTTER_SPEED = __SHUTTER_SPEED__
if Filter == "Filter_1":
Camera.iso = ISO_F1
elif Filter == "Filter_2":
@@ -111,7 +159,7 @@ def img_acquisition_routine(self, light_range, wavelength, photo_path,

def img_acquisition_routine_triplicate(self, light_intensity, wavelength,
photo_path, file_name, flag=False,
Replicates=3):
Replicates=__REPLICATES__):
"""
# FIX: CHANGE DOC.
# Main image acquisition routine.\n
@@ -134,17 +182,18 @@ def img_acquisition_routine_triplicate(self, light_intensity, wavelength,
"""
for r in range(Replicates):
self._device._modulate_LED_intensity(light_intensity, channel=wavelength)
if wavelength == 1:
t.sleep(1)
else:
t.sleep(0.1) # Get's rid of French flag issue... why?
# if wavelength == 1:
# t.sleep(1)
# else:
# t.sleep(0.1) # Get's rid of French flag issue... why?
t.sleep(__SHUTTER_SPEED__)
yield photo_path + file_name + str(r) + self._pic_ext
if light_intensity == 255 or flag is True:
yield photo_path + "findWells" + self._pic_ext
# When finished, switch OFF
self._device._modulate_LED_intensity(0, channel=wavelength)

def _average_image(self, photo_path, Replicate_name, Replicates=3):
def _average_image(self, photo_path, Replicate_name, Replicates=__REPLICATES__):
"""
Load the relicate
"""
@@ -186,20 +235,26 @@ class SetupDevice:
different from its past position.
"""
# Define servo pulsewidth for each filter position.
filter_pws = dict()
# __FILTER_PWM__ = dict()
# if len(self._GPIO_PIN) == 1:
# __FILTER_PWM__['No_Filter'] = 0 # Monowavelength contains no servo (yet?).
# elif len(self._GPIO_PIN) == 3:
# __FILTER_PWM__['Filter_1'] = 1775 # 475nm band-pass.
# __FILTER_PWM__['Filter_2'] = 1460 # 520nm band-pass (centre).
# __FILTER_PWM__['Filter_3'] = 1130 # 590 long-pass + 610nm short-pass.
# __FILTER_PWM__['No_Filter'] = 850 # No filter (RGB image).
# else:
# raise ValueError("Channels must be 1 < chnl < 3, but ",
# str(len(self._GPIO_PIN)), " were specified.")
# Define servo pulsewidth for each filter position.
if len(self._GPIO_PIN) == 1:
filter_pws['No_Filter'] = 0 # Monowavelength contains no servo (yet?).
elif len(self._GPIO_PIN) == 3:
filter_pws['Filter_1'] = 1775 # 475nm band-pass.
filter_pws['Filter_2'] = 1460 # 520nm band-pass (centre).
filter_pws['Filter_3'] = 1130 # 590 long-pass + 610nm short-pass.
filter_pws['No_Filter'] = 850 # No filter (RGB image).
FILTER_PWM = dict()
FILTER_PWM['No_Filter'] = 0 # Monowavelength contains no servo (yet?).
else:
raise ValueError("Channels must be 1 < chnl < 3, but ",
str(len(self._GPIO_PIN)), " were specified.")
FILTER_PWM = __FILTER_PWM__
# Set filter.
if selected_filter != "expose_filters": # Helps with calibration.
new_pwm = filter_pws[selected_filter]
new_pwm = __FILTER_PWM__[selected_filter]
if new_pwm != current_pwm:
self._servo.set_servo_pulsewidth(self._FILTER_PIN, new_pwm) # Adjust.
t.sleep(1) # Gives the servo time to move....
@@ -207,7 +262,7 @@ class SetupDevice:
current_pwm = new_pwm
return current_pwm
elif selected_filter == "expose_filters":
return sorted(filter_pws)
return sorted(FILTER_PWM)
return 0

def _set_temperature(self, current_temperature, target_temperature,
@@ -282,12 +337,12 @@ class SetupDevice:
light_panel = dev()
if len(self._GPIO_PIN) == 1:
light_panel.set_mode(self._GPIO_PIN[0], OUTPUT)
light_panel.set_PWM_frequency(self._GPIO_PIN[0], 20000) # Hz (20MHz max with -s 2).
light_panel.set_PWM_frequency(self._GPIO_PIN[0], DEFAULT_LIGHT_FREQ) # Hz (20KHz max with -s 2).
light_panel.write(self._GPIO_PIN[0], 0) # Init off state.
elif len(self._GPIO_PIN) == 3:
for pin in self._GPIO_PIN:
light_panel.set_mode(pin, OUTPUT)
light_panel.set_PWM_frequency(pin, 20000) # Hz (20MHz max with -s 2).
light_panel.set_PWM_frequency(pin, DEFAULT_LIGHT_FREQ) # Hz (20KHz max with -s 2).
# light_panel.write(pin, 0) # Init off state.
else:
raise ValueError("Channels must be 1 < chnl < 3, but ",
@@ -750,7 +805,7 @@ class SetProtocol:
# cancel_protocol.is_set() is now True. STOP.
if temperature is not 0:
temperature_event.set()
temperature_modulation.join()
temperature_modulation.join(timout=2.0)
log = str("Protocol cancelled.")
print(log)
# Retrieve data available until this point.
@@ -773,7 +828,7 @@ class SetProtocol:
init_time = t.time() # Used to account for plate reading time.
for flt, wavelength in zip(self._filters, self._wavelengths):
if flt == "Filter_2":
self._device._light_panel.set_PWM_frequency(self._device._GPIO_PIN[wavelength], 25)
self._device._light_panel.set_PWM_frequency(self._device._GPIO_PIN[wavelength], LIGHT_FREQ_F2)
# Set camera on-the-fly to adapt to different wavelenghts.
# Because this step happens with any filter, it does not require
# a post-read reset.
@@ -785,7 +840,7 @@ class SetProtocol:
self._data_acquisition_triplicate(light_range, current_time, flt,
wavelength, singleRead)
if flt == "Filter_2":
self._device._light_panel.set_PWM_frequency(self._device._GPIO_PIN[wavelength], 20000)
self._device._light_panel.set_PWM_frequency(self._device._GPIO_PIN[wavelength], DEFAULT_LIGHT_FREQ)
# Turn LED off
self._device._modulate_LED_intensity(0, channel=wavelength)

+ 3
- 1
puppeteer/_dataparser/data_handler.py View File

@@ -136,13 +136,15 @@ def export_queue(QueueFile, connection, BUFFER_SIZE):
log = str("Queue sent successfully.\n")
print(log)

def house_keeper(dir_info, connection, BUFFER_SIZE):
def house_keeper(dir_info, QueueFile, connection, BUFFER_SIZE):
localPath = dir_info.root_path + dir_info.protocol_path + "/"
print("IMG data transferred successfuly. Erasing local data...")
cli = _get_response(connection, BUFFER_SIZE)
print(cli)
if cli.decode() == "Remove local data":
sh.rmtree(localPath)
if QueueFile:
os.remove(QueueFile)
print("...local data erased.")



+ 8
- 5
pupuicon/_corelib.py View File

@@ -1,6 +1,6 @@
import netifaces as ni
import socket, pickle
from .translator import ProtocolHandler
from .translator import ProtocolHandler, HardwareHandler

def _get_response(Connection, BUFFER_SIZE):
return Connection.recv(BUFFER_SIZE)
@@ -161,11 +161,9 @@ class NetworkOperator():
return Stream

def _retrieve_protocol(self, ProtocolSize, Connection, BUFFER_SIZE):
print("Retrieving protocol...")
ProtocolDump = Connection.recv(BUFFER_SIZE)
if len(ProtocolDump) < ProtocolSize:
ProtocolDump += Connection.recv(BUFFER_SIZE)
print("Done.")
return ProtocolDump

def _manage_connection(self, Connection, BUFFER_SIZE):
@@ -180,9 +178,7 @@ class NetworkOperator():
"""
handshake = self._retrieve_data(Connection, BUFFER_SIZE) # Waits for client.
# Handshake, what do I do?
print(handshake)
if handshake.decode() == "Run Protocol":
print("I need protocol...")
## .lif file opened in client side, then sent over. # THIS HAS
## The box keeps no hard copy of protocols files (.lif) # CHANGED!
InstructionSize = int(self._retrieve_data(Connection, BUFFER_SIZE).decode())
@@ -190,6 +186,13 @@ class NetworkOperator():
instructions_dump = self._retrieve_protocol(InstructionSize, Connection, BUFFER_SIZE**3)
UserInstructions = ProtocolHandler(instructions_dump)
return UserInstructions
elif handshake.decode() == "Hardware Settings":
SettingSize = int(self._retrieve_data(Connection, BUFFER_SIZE).decode())
Connection.sendall(str("Acknowledged").encode()) # Unlock to proceed.
settings_dump = self._retrieve_protocol(SettingSize, Connection, BUFFER_SIZE**3)
HardwareHandler(settings_dump) # Settings stored in disk. Nothing to return.
# BUT: return handshake to avoid errors with srv_controller.
return "Update Settings"
else:
return handshake.decode()


+ 1
- 1
pupuicon/translator/__init__.py View File

@@ -1,4 +1,4 @@
"""
Treat translator thoughtfully.
"""
from ._core import ProtocolHandler
from ._core import ProtocolHandler, HardwareHandler

+ 66
- 5
pupuicon/translator/_core.py View File

@@ -1,10 +1,12 @@
import pickle, sys, os
import numpy as np
import datetime as dt
from .handlers import translateUIToHardware # TODO: Fix weird dependency.
from .handlers import translateUIToHardware # TODO: Fix weird dependency (up to Rob).
from .handlers import hardwareVariables # TODO: Fix weird dependency (up to Rob).

# HACK: handlers.modname is _NOT_ idenfified as modname
sys.modules['translateUIToHardware'] = translateUIToHardware
sys.modules['hardwareVariables'] = hardwareVariables

class ProtocolHandler:
"""
@@ -13,10 +15,10 @@ class ProtocolHandler:
def _LIF_loader(self, protocol_dump):
"""
The user protocol (UserProtocol) is sent over the network to the
device as a pickle. This pickle, through protocol_Handler, will
device as a pickle. This pickle, through ProtocolHandler, will
then be loaded in the device and translated from Rob's protocol
to what the library puppeteer actually requires. For testing
purposes, protocol_Handler is a local pickle file with .lif (LIMO
purposes, ProtocolHandler is a local pickle file with .lif (LIMO
File) extension. The real use will _NOT_ involve local files.
"""
return pickle.loads(protocol_dump)
@@ -132,9 +134,12 @@ class ProtocolHandler:
QueueWait = QueueSteps # Do not insert otherwise (grabSettings)
# Generate Queue.
# Define QueueFile @ USR **home** directory.
cwd = os.getcwd().split('/') # splits path into array of strings
# cwd = os.getcwd().split('/') # splits path into array of strings
cwd = os.getcwd() # splits path into array of strings
srv_dir = "/.srv"
fName = "/queue.q"
QueueFile = '/'.join(cwd[:3]) + fName # [:3] ensures that path is /home/USR/fName. TODO: apply to treeparser.
# QueueFile = '/'.join(cwd[:3]) + fName # [:3] ensures that path is /home/USR/fName. FIX: Do we want user to see this?
QueueFile = cwd + srv_dir + fName # Stored in path hidden from user.
with open(QueueFile, "w") as fOut:
fOut.writelines("Date, Time, Status, Waiting time (min)\n")
for step, waiting_time in zip(QueueInfo, QueueWait):
@@ -152,3 +157,59 @@ class ProtocolHandler:
self.QueueFile = self._createQueue(self.delay,
UserInstructions.localTime,
UserInstructions.queueTimes)


class HardwareHandler():
"""
Retrieve hardware settings given by user and implement them.
"""
def _Settings_loader(self, settings_dump):
"""
The hardware settings (hwSettings) is sent over the network to the
device as a pickle. This pickle, through HardwareHandler, will
save this settings into disk..
"""
return pickle.loads(settings_dump)
def _Arrange_settings(self, Settings):
hwSettings = dict()
# GPIO information
hwSettings["GPIO_PIN_RGB"] = Settings.GPIO_PIN_RGB
hwSettings["GPIO_PIN_M"] = Settings.GPIO_PIN_M
hwSettings["FILTER_PIN"] = Settings.FILTER_PIN
hwSettings["HEATER_PIN"] = Settings.HEATER_PIN
# Filter position (in PWM)
hwSettings["filterSet"] = Settings.filter_pws
# Camera settings
hwSettings["DEFAULT_ISO"] = Settings.DEFAULT_ISO
hwSettings["ISO_F1"] = Settings.ISO_F1
hwSettings["ISO_F2"] = Settings.ISO_F2
hwSettings["ISO_F3"] = Settings.ISO_F3
hwSettings["SHUTTER_SPEED"] = Settings.SHUTTER_SPEED
# Light settings
hwSettings["DEFAULT_LIGHT_FREQ"] = Settings.DEFAULT_LIGHT_FREQ
hwSettings["LIGHT_FREQ_F1"] = Settings.LIGHT_FREQ_F1
hwSettings["LIGHT_FREQ_F2"] = Settings.LIGHT_FREQ_F2
hwSettings["LIGHT_FREQ_F3"] = Settings.LIGHT_FREQ_F3
# General settings
hwSettings["cameraRepetitions"] = Settings.cameraRepetitions
return hwSettings
def _Store_hwSettings(self, Settings):
"""
Store hardware settings into disk. The light modulator will seek
these settings every time it is booted up.
"""
# Set paths:
hw_fName = "/hwsettings.hws"
Path = os.getcwd() + "/.srv"
# Arrange settings from hws to something meaningful:
hwSettings = self._Arrange_settings(Settings)
# Save in disk:
pickle.dump(hwSettings, open(Path + hw_fName, "wb"))

def __init__(self, settings_dump):
# Load settings
Settings = self._Settings_loader(settings_dump)
# Store in disk
self._Store_hwSettings(Settings)

Loading…
Cancel
Save