#!/usr/bin/python3
import logging
import re
import time
from typing import Optional
import pyvisa
from .data_classes import (
HeatersStatus,
HumidityStatus,
OperationMode,
TemperatureStatus,
TestAreaState,
)
from .exceptions import MonitorError, SettingError
_LOGGER = logging.getLogger(__name__)
[docs]
class EspecPr3j:
"""
Implements the basic operation of the environmental chamber.
Args:
`hostname (Optional[str])`: Host name of the environmental chamber. Default is
None. If None, the resource_path must be provided. Can't be used with
resource_path. It can be an IP address.
`temperature_accuracy (Optional[float])`: The accuracy considered when setting
the temperature. Default is 0.5.
`humidity_accuracy (Optional[float])`: The accuracy considered when setting the
humidity. Default is 3.0.
`resource_path (Optional[str])`: The resource path of the environmental chamber.
If None, the hostname must be provided. Can't be used with hostname. Default
is None.
`resource_namager (Optional[pyvisa.ResourceManager])`: An optional PyVISA
resource manager. If None, the default one is used. Default is None.
`communication_timeout (Optional[int])`: The communication timeout in
milliseconds. Default is 5000.
"""
MONITOR_COMMAND_DELAY = 0.2
"""Delay in seconds when sending a command to the environmental chamber
(program-related delay is 0.3)"""
SETTING_COMMAND_DELAY = 0.5
"""Delay in seconds when sending a setting command to the environmental chamber
(program-related delay is 1)"""
LINE_TERMINATION = "\r\n"
"""The line termination character used by the environmental chamber"""
TCP_PORT = 57732
"""The TCP port of the environmental chamber"""
def __init__(
self,
hostname: Optional[str] = None,
temperature_accuracy: Optional[float] = None,
humidity_accuracy: Optional[float] = None,
resource_path: Optional[str] = None,
resource_manager: Optional[pyvisa.ResourceManager] = None,
communication_timeout: Optional[int] = None,
):
assert (hostname is None) or (resource_path is None)
assert (hostname is not None) or (resource_path is not None)
self.hostname = hostname
"""The IP address of the environmental chamber"""
self.temperature_accuracy = temperature_accuracy or 0.5
"""The accuracy considered when setting the temperature"""
self.humidity_accuracy = humidity_accuracy or 3.0
"""The accuracy considered when setting the humidity"""
# we try to connect to the environmental chamber just to see if there is an
# error
self._resource_manager = resource_manager or pyvisa.ResourceManager()
if resource_path is None:
resource_path = f"TCPIP0::{self.hostname}" # noqa E231
resource_path += f"::{self.TCP_PORT}::SOCKET" # noqa E231
self.resource_path = resource_path
"""Resource path of the environmental chamber"""
self._chamber = self._resource_manager.open_resource(self.resource_path)
_LOGGER.debug(f"Connected to the environmental chamber at {self.resource_path}")
self._chamber.write_termination = self.LINE_TERMINATION
self._chamber.read_termination = self.LINE_TERMINATION
self._chamber.timeout = communication_timeout or 5000
def _target_temperature_reached(self) -> bool:
"""
Checks if the current temperature is within the target temperature range.
"""
temperature_status = self.get_temperature_status()
current = temperature_status.current_temperature
target = temperature_status.target_temperature
_LOGGER.debug(
f"Current temperature: {current}°C, Target temperature: {target} +-"
f"{self.temperature_accuracy}°C"
)
return abs(current - target) <= self.temperature_accuracy
def _target_humidity_reached(self) -> bool:
"""
Checks if the current humidity is within the target humidity range. If the
humidity control is disabled, it always returns True.
"""
humidity_status = self.get_humidity_status()
current = humidity_status.current_humidity
target = humidity_status.target_humidity
if target is None:
_LOGGER.debug("Humidity control is disabled")
return True
_LOGGER.debug(
f"Current humidity: {current}%, Target humidity: {target} +-"
f"{self.humidity_accuracy}%"
)
return abs(current - target) <= self.humidity_accuracy
[docs]
def get_temperature_status(self) -> TemperatureStatus:
"""
Gets the temperature status of the environmental chamber. This includes the
current temperature, set temperature, upper limit, and lower limit.
Raises:
`MonitorError`: If an error occurred when getting the temperature status.
"""
# send the request to the chamber
response = self._chamber.query("TEMP?", delay=self.MONITOR_COMMAND_DELAY)
# data format: [current temp, set temp, upper limit, lower limit]
pattern = re.compile(
r"(?P<current>\d+\.\d+)"
r",(?P<target>\d+\.\d+)"
r",(?P<upper>\d+\.\d+)"
r",(?P<lower>\d+\.\d+)"
)
match = pattern.match(response)
if match is None:
_LOGGER.error("Failed to get the temperature status")
_LOGGER.debug(f"Response: '{response}'")
raise MonitorError("Failed to get the temperature status")
# convert into float numbers
temperature_status = TemperatureStatus(
current_temperature=float(match["current"]),
target_temperature=float(match["target"]),
upper_limit=float(match["upper"]),
lower_limit=float(match["lower"]),
)
return temperature_status
[docs]
def get_humidity_status(self) -> HumidityStatus:
"""
Gets the humidity status of the environmental chamber. This includes the current
humidity, set humidity, upper limit, and lower limit.
Raises:
`MonitorError`: If an error occurred when getting the humidity status.
"""
# send the request to the chamber
response = self._chamber.query("HUMI?", delay=self.MONITOR_COMMAND_DELAY)
# data format: [current humi, set humi, upper limit, lower limit]
pattern = re.compile(
r"(?P<current>\d+)"
r",(?P<target>OFF|\d+)"
r",(?P<upper>\d+)"
r",(?P<lower>\d+)"
)
match = pattern.match(response)
if match is None:
_LOGGER.error("Failed to get the temperature status")
_LOGGER.debug(f"Response: '{response}'")
raise MonitorError("Failed to get the humidity status")
if match["target"] == "OFF":
target_humidity = None
else:
target_humidity = float(match["target"])
# convert into float numbers
humidity_status = HumidityStatus(
current_humidity=float(match["current"]),
target_humidity=target_humidity,
upper_limit=float(match["upper"]),
lower_limit=float(match["lower"]),
)
return humidity_status
[docs]
def set_target_temperature(self, temperature: float):
"""
Sets the target temperature of the environmental chamber.
Args:
`temperature`: The target temperature to set in Celsius.
Raises:
`ClimateChamberSettingError`: If an error occurred when setting the
target temperature.
"""
# sets the temp of the chamber, temperature
_LOGGER.debug(f"Setting target temperature to {temperature}°C")
response = self._chamber.query(
f"TEMP, S{temperature:.1f}", delay=self.SETTING_COMMAND_DELAY # noqa E231
)
# verify the response
response_pattern = re.compile(r"OK:TEMP, S\d+.\d+")
if not response_pattern.match(response):
_LOGGER.error("Failed to set the target temperature")
_LOGGER.debug(f"Response: '{response}'")
raise SettingError("Failed to set the target temperature")
[docs]
def set_target_humidity(self, humidity: Optional[float] = None):
"""
Sets the target humidity of the environmental chamber.
Args:
`humidity`: The target humidity to set in percentage. If None, the humidity
control is disabled.
Raises:
`ClimateChamberSettingError`: If an error occurred when setting the
target humidity.
"""
if humidity is None:
_LOGGER.debug("Disabling humidity control")
response = self._chamber.query(
"HUMI, SOFF", delay=self.SETTING_COMMAND_DELAY
)
response_pattern = re.compile(r"OK:HUMI, SOFF")
else:
# sets the humidity of the chamber, (float) humidity
_LOGGER.debug(f"Setting target humidity to {humidity}%")
response = self._chamber.query(
f"HUMI, S{humidity}", delay=self.SETTING_COMMAND_DELAY
)
response_pattern = re.compile(r"OK:HUMI, S\d+.*\d*")
# verify the response
if not response_pattern.match(response):
_LOGGER.error("Failed to set the target humidity")
_LOGGER.debug(f"Response: '{response}'")
raise SettingError("Failed to set the target humidity")
[docs]
def close(self):
"""
Closes the connection to the environmental chamber.
"""
_LOGGER.debug("Closing the connection to the environmental chamber")
self._chamber.close()
[docs]
def get_test_area_state(self) -> TestAreaState:
"""
Get the chamber test area state.
"""
response: str = self._chamber.query("MON?", delay=self.MONITOR_COMMAND_DELAY)
# output data format: [temp, humid, op-state, num. of alarms]
pattern = re.compile(
r"(?P<temp>\d+\.\d+),(?P<humid>\d+),(?P<state>\w+),(?P<alarms>\d+)"
)
match = pattern.match(response)
if match is None:
_LOGGER.error("Failed to get the test area state")
_LOGGER.debug(f"Response: '{response}'")
raise MonitorError("Failed to get the test area state")
test_area_state = TestAreaState(
current_temperature=float(match["temp"]),
current_humidity=float(match["humid"]),
operation_state=OperationMode.from_str(match["state"]),
number_of_alarms=int(match["alarms"]),
)
return test_area_state
[docs]
def set_temperature_limits(self, upper_limit: float, lower_limit: float):
"""
Sets the upper and lower temperature limits for the chamber.
Args:
`upper_limit`: The temperature upper limit in Celsius.
`lower_limit`: The temperature lower limit in Celsius.
Raises:
`ClimateChamberSettingError`: If an error occurred when setting the
temperature limits.
"""
_LOGGER.debug(
f"Setting temperature limits to {upper_limit}°C and {lower_limit}°C"
)
response = self._chamber.query(f"TEMP, H{upper_limit: 0.1f}")
response_pattern = re.compile(r"OK:TEMP, H \d+.\d+")
if not response_pattern.match(response):
_LOGGER.error("Failed to set the upper temperature limit")
_LOGGER.debug(f"Response: '{response}'")
raise SettingError("Failed to set the upper temperature limit")
response = self._chamber.query(f"TEMP, L{lower_limit: 0.1f}")
response_pattern = re.compile(r"OK:TEMP, L \d+.\d+")
if not response_pattern.match(response):
_LOGGER.error("Failed to set the lower temperature limit")
_LOGGER.debug(f"Response: '{response}'")
raise SettingError("Failed to set the lower temperature limit")
[docs]
def set_humidity_limits(self, upper_limit: float, lower_limit: float):
"""
Sets the upper and lower humidity limits for the chamber
Args:
`upper_limit`: The humidity upper limit.
`lower_limit`: The humidity lower limit.
Raises:
`ClimateChamberSettingError`: If an error occurred when setting the
humidity limits.
"""
_LOGGER.debug(f"Setting humidity limits to {upper_limit}% and {lower_limit}%")
response = self._chamber.query("HUMI, H" + str(upper_limit))
response_pattern = re.compile(r"OK:HUMI, H\d+")
if not response_pattern.match(response):
_LOGGER.error("Failed to set the upper humidity limit")
_LOGGER.debug(f"Response: '{response}'")
raise SettingError("Failed to set the upper humidity limit")
response = self._chamber.query("HUMI, L" + str(lower_limit))
response_pattern = re.compile(r"OK:HUMI, L\d+")
if not response_pattern.match(response):
_LOGGER.error("Failed to set the lower humidity limit")
_LOGGER.debug(f"Response: '{response}'")
raise SettingError("Failed to set the lower humidity limit")
[docs]
def get_mode(self) -> OperationMode:
"""
Gets the operation mode of the environmental chamber.
"""
response = self._chamber.query("MODE?", delay=self.MONITOR_COMMAND_DELAY)
pattern = re.compile(r"(?P<mode>\w+)")
match = pattern.match(response)
if match is None:
_LOGGER.error("Failed to get the operation mode")
_LOGGER.debug(f"Response: '{response}'")
raise MonitorError("Failed to get the operation mode")
return OperationMode.from_str(response)
[docs]
def set_mode(self, mode: OperationMode):
"""
Sets the operation mode of the environmental chamber.
Args:
`mode`: The operation mode to set.
"""
# sets the mode of the chamber:
_LOGGER.debug(f"Setting operation mode to {mode}")
response = self._chamber.query(
f"MODE, {mode}", delay=self.SETTING_COMMAND_DELAY
)
response_pattern = re.compile(r"OK:MODE, (?P<mode>\w+)")
match = response_pattern.match(response)
if match is None:
_LOGGER.error("Failed to set the operation mode")
_LOGGER.debug(f"Response: '{response}'")
raise SettingError("Failed to set the operation mode")
set_mode = OperationMode.from_str(match["mode"])
if set_mode != mode:
_LOGGER.error(
f"Operation mode not set correctly"
f" (current: {match['mode']}, expected: {mode})"
)
_LOGGER.debug(f"Response: '{response}'")
_LOGGER.debug(f"Mode: '{match['mode']}'")
raise SettingError("Failed to set the operation mode")
return response
[docs]
def set_constant_condition(
self,
temperature: float,
humidity: Optional[float] = None,
stable_time=60.0,
poll_interval=1.0,
):
"""
Sets the environmental chamber to a constant temperature and humidity condition
and waits until the setpoints are reached and stable.
Args:
`temperature`: The temperature to set in Celsius.
`humidity`: The humidity to set in percentage. Default is None (humidity
control is disabled)
`stable_time`: The time in seconds to wait until the setpoints are stable.
Default is 60.
`poll_interval`: The time in seconds to wait between each check.
Default is 1.
"""
_LOGGER.debug(f"Setting constant condition {temperature}°C, {humidity}%")
self.set_target_temperature(temperature)
self.set_target_humidity(humidity)
self.set_mode(OperationMode.CONSTANT)
start_time = time.time()
_LOGGER.debug("Waiting for the setpoints to be reached")
while True:
stable = (
self._target_temperature_reached() and self._target_humidity_reached()
)
if not stable:
_LOGGER.debug("Setpoints not reached yet")
start_time = time.time()
if time.time() - start_time >= stable_time:
_LOGGER.debug("Setpoints reached and stable")
break
time.sleep(poll_interval)
[docs]
def get_heater_percentage(self) -> HeatersStatus:
"""
Gets the output of the heaters
"""
response = self._chamber.query("%?", delay=self.MONITOR_COMMAND_DELAY)
pattern = re.compile(r"\d+,(?P<temp>\d+\.\d+),(?P<humid>\d+\.\d+)")
match = pattern.match(response)
if match is None:
_LOGGER.error("Failed to get the heaters status")
_LOGGER.debug(f"Response: '{response}'")
raise MonitorError("Failed to get the heaters status")
heaters = HeatersStatus(
temperature_heater=float(match["temp"]),
humidity_heater=float(match["humid"]),
)
return heaters
def __del__(self):
self.close()