import re
import socket
import threading
import scarab
from dripline.core import Service, ThrowReply
import logging
logger = logging.getLogger(__name__)
__all__ = []
__all__.append('EthernetSCPIService')
[docs]class EthernetSCPIService(Service):
'''
A fairly generic subclass of Service for connecting to ethernet-capable instruments/devices.
In is developed for and tested against devices with a SCPI-compliant command set, but may be usable with devices which do not strictly conform.
In particular, devices must support sending a response to every command received (either natively, or via SCPI's command composition) and responses are expected to include a termination marking complete transmission.
'''
def __init__(self,
socket_timeout=1.0,
socket_info=('localhost', 1234),
cmd_at_reconnect=['*OPC?'],
reconnect_test='1',
command_terminator='',
response_terminator=None,
reply_echo_cmd=False,
**kwargs
):
'''
Args:
socket_timeout (int): number of seconds to wait for a reply from the device before timeout.
socket_info (tuple or string): either socket.socket.connect argument tuple, or string that
parses into one.
cmd_at_reconnect ([str,...]): a list of commands to send to the device every time the socket
connection is estabilished note that these will be sent on *every* connection, which may be
disruptive to ongoing activity.
reconnect_test (str): expected return from the last command in the cmd_at_reconnect list, must
match exactly or the reconnect is deemed a failure
command_terminator (str): string to be post-pended to commands, indicating to the device that
the transmission is complete (often \\r, \\n, or \\r\\n - where escaping depends on string types)
response_terminator (str): string added to the end of a reply from the device, indicates the
end of the reply
reply_echo_cmd (bool): indicates that the device includes the the received command in its reply
'''
Service.__init__(self, **kwargs)
if isinstance(socket_info, str):
logger.debug(f"Formatting socket_info: {socket_info}")
re_str = "\([\"'](\S+)[\"'], ?(\d+)\)"
(ip,port) = re.findall(re_str,socket_info)[0]
socket_info = (ip,int(port))
if response_terminator is None or response_terminator == '':
raise ThrowReply('service_error_invalid_value', f"Invalid response terminator: <{repr(response_terminator)}>! Expect string")
if not isinstance(cmd_at_reconnect, list) or len(cmd_at_reconnect)==0:
if cmd_at_reconnect is not None:
raise ThrowReply('service_error_invalid_value', f"Invalid cmd_at_reconnect: <{repr(cmd_at_reconnect)}>! Expect non-zero length list")
self.alock = threading.Lock()
self.socket = socket.socket()
self.socket_timeout = float(socket_timeout)
self.socket_info = socket_info
self.cmd_at_reconnect = cmd_at_reconnect
self.reconnect_test = reconnect_test
self.command_terminator = command_terminator
self.response_terminator = response_terminator
self.reply_echo_cmd = reply_echo_cmd
self._reconnect()
def _reconnect(self):
'''
Method establishing socket to ethernet instrument.
Called by __init__ or send (on failed communication).
'''
self.socket.close()
self.socket = socket.socket()
try:
self.socket = socket.create_connection(self.socket_info, self.socket_timeout)
except (socket.error, socket.timeout) as err:
logger.warning(f"connection {self.socket_info} refused: {err}")
raise ThrowReply('resource_error_connection', f"Unable to establish ethernet socket {self.socket_info}")
logger.info(f"Ethernet socket {self.socket_info} established")
# Lantronix xDirect adapters have no query options
if self.cmd_at_reconnect is None:
return
commands = self.cmd_at_reconnect[:]
# Agilent/Keysight instruments give an unprompted *IDN? response on
# connection. This must be cleared before communicating with a blank
# listen or all future queries will be offset.
while commands[0] is None:
logger.debug("Emptying reconnect buffer")
commands.pop(0)
self._listen(blank_command=True)
response = self._send_commands(commands)
# Final cmd_at_reconnect should return '1' to test connection.
if response[-1] != self.reconnect_test:
self.socket.close()
logger.warning(f"Failed connection test. Response was {response}")
# exceptions.DriplineHardwareConnectionError
raise ThrowReply('resource_error_connection', "Failed connection test.")
[docs] def send_to_device(self, commands, **kwargs):
'''
Standard device access method to communicate with instrument.
NEVER RENAME THIS METHOD!
commands (list||None): list of command(s) to send to the instrument following (re)connection to the instrument, still must return a reply!
: if impossible, set as None to skip
'''
if isinstance(commands, str):
commands = [commands]
self.alock.acquire()
try:
data = self._send_commands(commands)
except socket.error as err:
logger.warning(f"socket.error <{err}> received, attempting reconnect")
self._reconnect()
data = self._send_commands(commands)
logger.critical("Ethernet connection reestablished")
# exceptions.DriplineHardwareResponselessError
except Exception as err:
logger.critical(str(err))
try:
self._reconnect()
data = self._send_commands(commands)
logger.critical("Query successful after ethernet connection recovered")
except socket.error: # simply trying to make it possible to catch the error below
logger.critical("Ethernet reconnect failed, dead socket")
raise ThrowReply('resource_error_connection', "Broken ethernet socket")
except Exception as err: ##TODO handle all exceptions, that seems questionable
logger.critical("Query failed after successful ethernet socket reconnect")
raise ThrowReply('resource_error_no_response', err)
finally:
self.alock.release()
to_return = ';'.join(data)
logger.debug(f"should return:\n{to_return}")
return to_return
def _send_commands(self, commands):
'''
Take a list of commands, send to instrument and receive responses, do any necessary formatting.
commands (list||None): list of command(s) to send to the instrument following (re)connection to the instrument, still must return a reply!
: if impossible, set as None to skip
'''
all_data=[]
for command in commands:
command += self.command_terminator
logger.debug(f"sending: {command.encode()}")
self.socket.send(command.encode())
if command == self.command_terminator:
blank_command = True
else:
blank_command = False
data = self._listen(blank_command)
if self.reply_echo_cmd:
if data.startswith(command):
data = data[len(command):]
elif not blank_command:
raise ThrowReply('device_error_connection', f'Bad ethernet query return: {data}')
logger.info(f"sync: {repr(command)} -> {repr(data)}")
all_data.append(data)
return all_data
def _listen(self, blank_command=False):
'''
Query socket for response.
blank_comands (bool): flag which is True when command is exactly the command terminator
'''
data = ''
try:
while True:
data += self.socket.recv(1024).decode(errors='replace')
if data.endswith(self.response_terminator):
terminator = self.response_terminator
break
# Special exception for disconnect of prologix box to avoid infinite loop
if data == '':
raise ThrowReply('resource_error_no_response', "Empty socket.recv packet")
except socket.timeout:
logger.warning(f"socket.timeout condition met; received:\n{repr(data)}")
if blank_command == False:
raise ThrowReply('resource_error_no_response', "Unexpected socket.timeout")
terminator = ''
logger.debug(repr(data))
data = data[0:data.rfind(terminator)]
return data