You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

_core.py 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import pickle, sys, os
  2. import numpy as np
  3. import datetime as dt
  4. from .handlers import translateUIToHardware # TODO: Fix weird dependency (up to Rob).
  5. from .handlers import hardwareVariables # TODO: Fix weird dependency (up to Rob).
  6. # HACK: handlers.modname is _NOT_ idenfified as modname
  7. sys.modules['translateUIToHardware'] = translateUIToHardware
  8. sys.modules['hardwareVariables'] = hardwareVariables
  9. class ProtocolHandler:
  10. """
  11. Rob's LIF format (RLF?) to Puppeteer translation layer.
  12. """
  13. def _LIF_loader(self, protocol_dump):
  14. """
  15. The user protocol (UserProtocol) is sent over the network to the
  16. device as a pickle. This pickle, through ProtocolHandler, will
  17. then be loaded in the device and translated from Rob's protocol
  18. to what the library puppeteer actually requires. For testing
  19. purposes, ProtocolHandler is a local pickle file with .lif (LIMO
  20. File) extension. The real use will _NOT_ involve local files.
  21. """
  22. return pickle.loads(protocol_dump)
  23. def _translate(self, UserProtocol):
  24. """
  25. Translate information contained in Rob's pickle into something that
  26. can be implemented in puppeteer, the library controlling the device.
  27. """
  28. # Temperature
  29. self.temperature = UserProtocol.temperature
  30. # Light intensity range
  31. self.light_intensities = np.arange(UserProtocol.lightIntensityRangeMin,
  32. UserProtocol.lightIntensityRangeMax+1, # Blame python 0-index
  33. UserProtocol.lightIntensityRangeStep)
  34. if len(self.light_intensities) == 0:
  35. # If `grabSetting' mode...
  36. self.light_intensities = ( UserProtocol.lightIntensityRangeMax, )
  37. # LiMO filters:
  38. # GUI's filters are labelled 0..3, where 1 is Filter_1, 2 Filter_2,...
  39. # and 0 is No_Filter. To match GUI's and LiMO's order, subtract 1 unit
  40. # and replace `-1' (No_Filter) with 3 as No_Filter is the last filter
  41. # in LiMO.
  42. self.user_filters = [ f[1] - 1 for f in UserProtocol.filterToLEDPairing
  43. if f[1] >= 0 ]
  44. if -1 in self.user_filters:
  45. for w in range(self.user_filters.count(-1)):
  46. idx = self.user_filters.index(-1)
  47. self.user_filters[idx] = 3
  48. if len(self.user_filters) == 1:
  49. self.user_filters = tuple(self.user_filters) # Singleton. `int' is not iterable.
  50. # LiMO wavelenghts:
  51. # The GUI's wavelengths are laballed 0..3, where 0 corresponds to the
  52. # WHITE light. In a multi-LED light panel containing RED (1), GREEN (2),
  53. # and BLUE (3) wavelengths WHITE is achieved by mixing RGB in a certain
  54. # proportion. To match the GUI's and LiMO's order, subtract 1 unit and
  55. # replace `-1' (WHITE) with 3, as the GPIOs are 0, 1, and 2 for R, G, B.
  56. self.user_wavelengths = [ f[0] - 1 for f in UserProtocol.filterToLEDPairing
  57. if f[0] >= 0 ]
  58. if -1 in self.user_wavelengths:
  59. for w in range(self.user_wavelengths.count(-1)):
  60. idx = self.user_wavelengths.index(-1)
  61. self.user_wavelengths[idx] = 3
  62. if len(self.user_wavelengths) == 1:
  63. self.user_wavelengths = tuple(self.user_wavelengths) # Singleton. `int' is not iterable.
  64. # LiMO data-processing flag
  65. self.process_data = UserProtocol.doDataAnalysisOnLimo
  66. # Assay length and read frequency (_DEPRECATED_, now all
  67. # protocols have length zero. Keep code just in case.
  68. if UserProtocol.readFrequencyPerHour != 0:
  69. # Read frequency.
  70. user_freq = 60 / UserProtocol.readFrequencyPerHour # from 1/h to 1/min
  71. user_freq_SS, read_freq_MM = np.modf(user_freq)
  72. read_freq_SS = str(int(user_freq_SS * 60)).zfill(2)
  73. read_freq_MM = str(int(read_freq_MM)).zfill(2)
  74. if user_freq_SS == 0.0:
  75. read_freq_HH = str(int(read_freq_SS)).zfill(2)
  76. else:
  77. read_freq_HH = "00"
  78. # Required format is string "HH:MM:SS":
  79. self.frequency = ":".join([read_freq_HH, read_freq_MM, read_freq_SS])
  80. # Protocol duration.
  81. user_dur_MM, assay_HH = np.modf(UserProtocol.protocolDurationInHours)
  82. assay_SS, assay_MM = np.modf(user_dur_MM * 60)
  83. assay_HH = str(int(assay_HH)).zfill(2)
  84. assay_MM = str(int(assay_MM)).zfill(2)
  85. assay_SS = str(int(assay_SS)).zfill(2)
  86. # Required format is string "HH:MM:SS":
  87. self.length = ":".join([assay_HH, assay_MM, assay_SS])
  88. # Delay
  89. delay_HH = int(UserProtocol.protocolDelayHours)
  90. delay_MM = int(UserProtocol.protocolDelayMinutes)
  91. self.delay = delay_MM * 60 + delay_HH * 60 ** 2
  92. else:
  93. # This means take _ONE_ read, immediately.
  94. # TODO: What about a `kinetic' option (continuous reading for enzymes?)
  95. self.frequency = "00:00:00"
  96. self.length = "00:00:00"
  97. # Delay
  98. delay_HH = int(UserProtocol.protocolDelayHours)
  99. delay_MM = int(UserProtocol.protocolDelayMinutes)
  100. self.delay = delay_MM * 60 + delay_HH * 60 ** 2
  101. return self
  102. def _createQueue(self, Delay, QueueTime, QueueSteps):
  103. """
  104. Creates local queue to keep track of a) what steps have been
  105. requested by the user, and b) what steps have been completed.
  106. The user can then access the latter to keep track of progress.\n
  107. `QueueTime' _must_ be a datetime structure. `QueueSteps' is
  108. provided in *MINUTES*.
  109. """
  110. # Create list of expected read times.
  111. QueueInfo = list()
  112. for readQueued in QueueSteps:
  113. etr = QueueTime + dt.timedelta(minutes=readQueued)
  114. QueueInfo.append(list([etr.strftime("%F"), # Date
  115. etr.strftime("%T"), # Time (HH:MM:SS)
  116. "Queued"])) # Status. Queue by default.
  117. # Estimate read frequency (in *MINUTES*) from QueueSteps. Needs initial
  118. # incubation time.
  119. if len(QueueSteps) > 1:
  120. QueueWait = [next_step - prev_step for prev_step, next_step
  121. in zip(QueueSteps[:-1], QueueSteps[1:])]
  122. QueueWait.insert(0, Delay) # Insert incubation time for job 1.
  123. else:
  124. QueueWait = QueueSteps # Do not insert otherwise (grabSettings)
  125. # Generate Queue.
  126. # Define QueueFile @ USR **home** directory.
  127. # cwd = os.getcwd().split('/') # splits path into array of strings
  128. cwd = os.getcwd() # splits path into array of strings
  129. srv_dir = "/.srv"
  130. fName = "/queue.q"
  131. # QueueFile = '/'.join(cwd[:3]) + fName # [:3] ensures that path is /home/USR/fName. FIX: Do we want user to see this?
  132. QueueFile = cwd + srv_dir + fName # Stored in path hidden from user.
  133. with open(QueueFile, "w") as fOut:
  134. fOut.writelines("Date, Time, Status, Waiting time (min)\n")
  135. for step, waiting_time in zip(QueueInfo, QueueWait):
  136. fOut.writelines(', '.join([field for field in step]) +
  137. ", " + str(waiting_time) + "\n")
  138. return QueueWait, QueueInfo, QueueFile
  139. def __init__(self, protocol_dump):
  140. # Load instructions
  141. UserInstructions = self._LIF_loader(protocol_dump)
  142. # Translate instructions as per L|MO.
  143. self._translate(UserInstructions.LiMOinstructions)
  144. # Create Queue
  145. self.Queue, self.QueueInfo,\
  146. self.QueueFile = self._createQueue(self.delay,
  147. UserInstructions.localTime,
  148. UserInstructions.queueTimes)
  149. class HardwareHandler():
  150. """
  151. Retrieve hardware settings given by user and implement them.
  152. """
  153. def _Settings_loader(self, settings_dump):
  154. """
  155. The hardware settings (hwSettings) is sent over the network to the
  156. device as a pickle. This pickle, through HardwareHandler, will
  157. save this settings into disk..
  158. """
  159. return pickle.loads(settings_dump)
  160. def _Arrange_settings(self, Settings):
  161. hwSettings = dict()
  162. # GPIO information
  163. hwSettings["GPIO_PIN_RGB"] = Settings.GPIO_PIN_RGB
  164. hwSettings["GPIO_PIN_M"] = Settings.GPIO_PIN_M
  165. hwSettings["FILTER_PIN"] = Settings.FILTER_PIN
  166. hwSettings["HEATER_PIN"] = Settings.HEATER_PIN
  167. # Filter position (in PWM)
  168. hwSettings["filterSet"] = Settings.filter_pws
  169. # Camera settings
  170. hwSettings["DEFAULT_ISO"] = Settings.DEFAULT_ISO
  171. hwSettings["ISO_F1"] = Settings.ISO_F1
  172. hwSettings["ISO_F2"] = Settings.ISO_F2
  173. hwSettings["ISO_F3"] = Settings.ISO_F3
  174. hwSettings["SHUTTER_SPEED"] = Settings.SHUTTER_SPEED
  175. # Light settings
  176. hwSettings["DEFAULT_LIGHT_FREQ"] = Settings.DEFAULT_LIGHT_FREQ
  177. hwSettings["LIGHT_FREQ_F1"] = Settings.LIGHT_FREQ_F1
  178. hwSettings["LIGHT_FREQ_F2"] = Settings.LIGHT_FREQ_F2
  179. hwSettings["LIGHT_FREQ_F3"] = Settings.LIGHT_FREQ_F3
  180. # General settings
  181. hwSettings["cameraRepetitions"] = Settings.cameraRepetitions
  182. return hwSettings
  183. def _Store_hwSettings(self, Settings):
  184. """
  185. Store hardware settings into disk. The light modulator will seek
  186. these settings every time it is booted up.
  187. """
  188. # Set paths:
  189. hw_fName = "/hwsettings.hws"
  190. Path = os.getcwd() + "/.srv"
  191. # Arrange settings from hws to something meaningful:
  192. hwSettings = self._Arrange_settings(Settings)
  193. # Save in disk:
  194. pickle.dump(hwSettings, open(Path + hw_fName, "wb"))
  195. def __init__(self, settings_dump):
  196. # Load settings
  197. Settings = self._Settings_loader(settings_dump)
  198. # Store in disk
  199. self._Store_hwSettings(Settings)