from stomp.connect import StompConnection12
import pyxb.utils.domutils as domutils
import darwinpush.xb.pushport as pp
from darwinpush.parser import Parser
import enum
import multiprocessing
import sys
import threading
import time
import zlib
import logging
log = logging.getLogger("darwinpush")
##### Code for STOMP debugging
#import logging
#console = logging.StreamHandler()
#console.setFormatter(logging.Formatter('[%(asctime)s] %(name)-12s %(levelname)-8s %(message)s'))
#logging.getLogger().addHandler(console)
#logging.getLogger().setLevel(logging.DEBUG)
#LOGGER = logging.getLogger('stomp')
#####
def listener_process(c, q):
listener = c(q)
listener._run()
def parser_process(q_in, q_out):
parser = Parser(q_in, q_out)
parser.run()
class ErrorType(enum.Enum):
DecompressionError = 1
ParseError = 2
class Error:
def __init__(self, error_type, message, exception):
self._error_type = error_type
self._payload = payload
self._exception = exception
@property
def payload(self):
return self._payload
@property
def error_type(self):
return self._error_type
@property
def exception(self):
return self._exception
def __str__(self):
return str(self._exception)
def __repr__(self):
return str(self)
def has_method(_class, _method):
return callable(getattr(_class, _method, None))
[docs]class Client:
""" The object that acts as the Client to the National Rail enquries Darwin Push Port STOMP server.
You should instantiate an instance of this object, with the required parameters to act as the
client to the Darwin Push Port. Listeners registered with this object will be passed messages
that are received from the server once they have been turned into the relevant python object.
Args:
stomp_user: Your STOMP user name taken from the National Rail Open Data portal.
stomp_password: Your STOMP password taken from the National Rail Open Data portal.
stomp_queue: Your STOMP queue name taken from the National Rail Open Data portal.
listener: The class object (not an instance of it) for your Listener subclass.
"""
def __init__(self, stomp_user, stomp_password, stomp_queue, listener):
self.stomp_user = stomp_user
self.stomp_password = stomp_password
self.stomp_queue = stomp_queue
self.listener_queue = multiprocessing.Queue()
self.parser_queue = multiprocessing.Queue()
self.listener_process = multiprocessing.Process(target=listener_process, args=(listener, self.listener_queue))
self.listener_process.start()
self.parser_process = multiprocessing.Process(target=parser_process, args=(self.parser_queue, self.listener_queue))
self.parser_process.start()
def connect(self):
""" Connect to the Darwin Push Port and start receiving messages."""
self.connected = True
self._run()
def _run(self):
self.thread = threading.Thread(target=self._connect)
self.thread.daemon = True
self.thread.start()
def _connect(self):
self.client = StompClient()
self.client.connect(self.stomp_user, self.stomp_password, self.stomp_queue, self)
while self.connected:
time.sleep(1)
def _on_message(self, headers, message):
# Decode the message and parse it as an XML DOM.
doc = domutils.StringToDOM(message.decode("utf-8"))
# Parse the record with pyXb.
m = pp.CreateFromDOM(doc.documentElement)
self.parser_queue.put((m, message))
def _on_error(self, headers, message):
print("Error: %s, %s" % (headers, message))
def _on_local_error(self, error):
print("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Caught Message Error in Client Thread +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+")
print(str(error))
print("+-+-+-+-+-+-+-+-+-+-+-+-+-+-++-+-+-+-+-+-+-+-+-+-+-+-+-+-++-+-+-+-+-+-+-+-+-+-+-+-+-+-++-+-+-+-+-+-+-+-")
def run(self):
while 1:
time.sleep(1)
class StompClient:
def connect(self, user, password, queue, callback_object):
log.debug("StompClient.connect()")
self.cb = callback_object
self.conn = StompConnection12([("datafeeds.nationalrail.co.uk", 61613)], auto_decode=False)
self.conn.set_listener('', self)
self.conn.start()
self.conn.connect(user, password)
self.conn.subscribe("/queue/"+queue, ack='auto', id='1')
def on_error(self, headers, message):
log.debug("StompClient.onError(headers={}, message={})".format(headers, message))
if has_method(self.cb, "_on_error"):
self.cb._on_error(headers, message)
def on_connecting(self, host_and_port):
log.debug("StompClient.onConnecting(host_and_port={})".format(host_and_port))
if has_method(self.cb, "_on_connecting"):
self.cb._on_connecting(host_and_port)
def on_connected(self, headers, body):
log.debug("StompClient.onConnected(headers={}, body={})".format(headers, body))
if has_method(self.cb, "_on_connected"):
self.cb._on_connected(headers, body)
def on_disconnected(self):
log.debug("StompClient.onDisconnected()")
if has_method(self.cb, "_on_disconnected"):
self.cb._on_disconnected()
def on_local_error(self, error):
if has_method(self.cb, "_on_local_error"):
self.cb._on_local_error(error)
def on_message(self, headers, message):
log.debug("StompClient.onMessage(headers={}, body=<truncated>)".format(headers))
if has_method(self.cb, "_on_message"):
try:
decompressed_data = zlib.decompress(message, 16+zlib.MAX_WBITS)
try:
self.cb._on_message(headers, decompressed_data)
except Exception as e:
log.exception("Exception occurred parsing DARWIN message: {}.".format(decompressed_data))
self.on_local_error(Error(ErrorType.ParseError, decompressed_data, e))
except Exception as e:
log.exception("Exception occurred decompressing the STOMP message.")
self.on_local_error(Error(ErrorType.DecompressionError, (headers, message), e))