Constantly Monitoring A Tcp Streaming Feed For Data In Python
Solution 1:
There are several technical challenges presented in your question.
First, is the simple matter of connecting to the server and retrieving the data. As you can see in connect()
below, that is pretty simple, just create a socket (s = socket.socket()
) and connect it (s.connect(('hostname', port_number))
).
The next problem is retrieving the data in a useful form. The socket natively provides .recv()
, but I wanted something with a file-like interface. The socket module provides a method unique to Python: .makefile()
. (return s.makefile('rb')
)
Now we get to the hard part. XML documents are typically stored one document per file, or one document per TCP transmission. Thus the end of the document is easily discovered by an end of file indication, or by a Content-Length:
header. Consequently, none of the Python XML API have a mechanism for dealing with multiple XML documents in one file, or in one string. I wrote xml_partition()
to solve that problem. xml_partition()
consumes data from a file-like object and yields each XML document from the stream. (Note: the XML documents must be pressed together. No whitespace is allowed after the final >
).
Finally, there is a short test program (alerts()
) which connects to the stream and reads a few of the XML documents, storing each into its own file.
Here, in its entirety, is a program for downloading emergency alerts from the National Alert Aggregation & Dissemination System from Pelmorex.
import socket
import xml.etree.ElementTree as ET
defconnect():
'Connect to pelmorex data stream and return a file-like object'# Set up the socket
s = socket.socket()
s.connect(('streaming1.naad-adna.pelmorex.com', 8080))
return s.makefile('rb')
# We have to consume the XML data in bits and pieces# so that we can stop precisely at the boundary between# streamed XML documents. This function ensures that# nothing follows a '>' in any XML fragment.defpartition(s, pattern):
'Consume a file-like object, and yield parts defined by pattern'
data = s.read(2048)
while data:
left, middle, data = data.partition(pattern)
while left or middle:
yield left
yield middle
left, middle, data = data.partition(pattern)
data = s.read(2048)
# Split the incoming XML stream into fragments (much smaller# than an XML document.) The end of each XML document# is guaranteed to align with the end of a fragment.# Use an XML parser to determine the actual end of# a document. Whenever the parser signals the end# of an XML document, yield what we have so far and# start a new parser.defxml_partition(s):
'Read multiple XML documents from one data stream'
parser = Nonefor part in partition(s, b'>'):
if parser isNone:
parser = ET.XMLPullParser(['start', 'end'])
starts = ends = 0
xml = []
xml.append(part)
parser.feed(part)
for event, elem in parser.read_events():
starts += event == "start"
ends += event == "end"if starts == ends > 0:
# We have reached the end of the XML doc
parser.close()
parser = Noneyieldb''.join(xml)
# Typical usage:defalerts():
for i, xml inenumerate(xml_partition(connect())):
# The XML is a bytes object that contains the undecoded# XML stream. You'll probably want to parse it and# somehow display the alert.# I'm just saving it to a file.withopen('alert%d.xml' % i, 'wb') as fp:
fp.write(xml)
if i == 3:
breakdeftest():
# A test function that uses multiple XML documents in one# file. This avoids the wait for a natural-disaster alert.withopen('multi.xml', 'rb') as fp:
print(list(xml_partition(fp)))
alerts()
Post a Comment for "Constantly Monitoring A Tcp Streaming Feed For Data In Python"