Python - Serial Comms Example
Jump to navigation
Jump to search
Overview
This script uses the win32file module to implement RS232 communications via a COMM port.
Script
#!/usr/bin/env pyhthon
'''
This code was inspired by Willy Heineman's post to
comp.lang.python on 7/31/2001 in which he advised
anyone needing such a module to write their own. I had
tried several serial packages and either could not get
them to work or they didn't have all the features I
needed. So following Willy's advice, I wrote something
tailored to my particular needs. Additional features
could be added but you will need to consult the
documentation for the underlying Windows functions
(CreateFile, WriteFile, ReadFile, etc.) as you will
not find a great deal of help in the documentation for
win32file. This can be found in the MSVS Help or
perhaps the Microsoft web site.
You will also need to install Mark Hammond's Windows
Extensions to Python, which you can find at
www.activestate.com.
Thanks Willy.
Gary Richardson
*Comment inserted by Rob.*
'''
import win32file as wf
import string
class Comm:
def __init__(self, port='COM1', baudRate=wf.CBR_9600, stopBits=wf.ONESTOPBIT, parity=wf.NOPARITY ):
self.handle = wf.CreateFile(port,
wf.GENERIC_READ | wf.GENERIC_WRITE,
0, # Exclusive access
None, # No security attributes
wf.OPEN_EXISTING,
0, # Not overlapped
0) # Zero required for comm devices
wf.SetupComm(self.handle, 1000, 1000) #Set the size of the input and output buffers
wf.PurgeComm(self.handle, wf.PURGE_TXCLEAR | wf.PURGE_RXCLEAR )
dcb = wf.GetCommState(self.handle)
dcb.BaudRate = baudRate
dcb.StopBits = stopBits
dcb.Parity = parity
dcb.fDtrControl = wf.DTR_CONTROL_DISABLE
dcb.ByteSize = 8
wf.SetCommState(self.handle, dcb)
ct = 0xFFFFFFFF, 0xFFFFFFFF, 1000, 10, 10 # 1 sec ReadFile timeout
wf.SetCommTimeouts(self.handle, ct)
self.termChar = '\r' # See readPort
def close(self):
wf.CloseHandle(self.handle)
def controlDTR(self, state):
if state:
cmd = wf.SETDTR
else:
cmd = wf.CLRDTR
wf.EscapeCommFunction(self.handle, cmd)
def sendCmd(self, command, maxReplyLength=40):
# Send a command and wait for a reply.
# The reply string must end with the termination character or be maxReplyLength long or longer.
wf.PurgeComm(self.handle, wf.PURGE_TXCLEAR | wf.PURGE_RXCLEAR )
ec, nbytes = wf.WriteFile(self.handle, command, None)
if nbytes != len(command):
print 'sendCmd - write error', nbytes, len(command)
return None
data = self.readPort(maxReplyLength)
return data
def readPort(self, maxChars):
data = ""
while 1:
rc, chars = wf.ReadFile(self.handle, maxChars, None)
data += chars
if string.count(chars, self.termChar):
break
if len(data) >= maxChars:
break
if len(chars) == 0:
return None # Timeout
return data
if __name__ == '__main__':
import time
def testDTR():
# This code toggles the DTR line 4 times at 3 second intervals.
dtr = 1
com = Comm()
for x in range(4):
print 'dtr=', dtr
print
com.controlDTR(dtr)
dtr = dtr ^ 1
print
time.sleep(3.0)
com.close()
print 'done'
def loopbackTest():
""" This code performs a loopback test. A long string of characters is transmitted and the received
data is compared with what was sent.
The receive timeout can be tested by unplugging the loopback connector.
"""
com = Comm()
td = []
total = 0.0
totalChars = 0
for x in range(5):
cmd = "Now is the time for all good men. The quick brown fox jumped over the lazy dog's back."*3 + "\r"
totalChars += len(cmd)
ts = time.clock()
data = com.sendCmd(cmd, maxReplyLength=len(cmd))
total += time.clock() - ts
notok = -1
if data:
for k in range(len(cmd)):
if cmd[k] != data[k]:
notok = k
break
if notok >= 0:
print 'Error at %d' % (k)
else:
print 'Timeout'
td.append(data)
com.close()
for n in td: print n
a, b = total, total/totalChars
print "Total elapsed time: %3.1f\nAverage time per character: %7.5f\nCharacters per sec: %5.1f" % (a, b, 1/b)
print
print
loopbackTest()