Python - Serial Comms Example
Overview
This script uses the win32file module to implement RS232 communications via a COMM port.
Script
#!/usr/bin/env pyhthon ''' This code was inspired by Willy Heineman's post to comp.lang.python on 7/31/2001 in which he advised anyone needing such a module to write their own. I had tried several serial packages and either could not get them to work or they didn't have all the features I needed. So following Willy's advice, I wrote something tailored to my particular needs. Additional features could be added but you will need to consult the documentation for the underlying Windows functions (CreateFile, WriteFile, ReadFile, etc.) as you will not find a great deal of help in the documentation for win32file. This can be found in the MSVS Help or perhaps the Microsoft web site. You will also need to install Mark Hammond's Windows Extensions to Python, which you can find at www.activestate.com. Thanks Willy. Gary Richardson *Comment inserted by Rob.* ''' import win32file as wf import string class Comm: def __init__(self, port='COM1', baudRate=wf.CBR_9600, stopBits=wf.ONESTOPBIT, parity=wf.NOPARITY ): self.handle = wf.CreateFile(port, wf.GENERIC_READ | wf.GENERIC_WRITE, 0, # Exclusive access None, # No security attributes wf.OPEN_EXISTING, 0, # Not overlapped 0) # Zero required for comm devices wf.SetupComm(self.handle, 1000, 1000) #Set the size of the input and output buffers wf.PurgeComm(self.handle, wf.PURGE_TXCLEAR | wf.PURGE_RXCLEAR ) dcb = wf.GetCommState(self.handle) dcb.BaudRate = baudRate dcb.StopBits = stopBits dcb.Parity = parity dcb.fDtrControl = wf.DTR_CONTROL_DISABLE dcb.ByteSize = 8 wf.SetCommState(self.handle, dcb) ct = 0xFFFFFFFF, 0xFFFFFFFF, 1000, 10, 10 # 1 sec ReadFile timeout wf.SetCommTimeouts(self.handle, ct) self.termChar = '\r' # See readPort def close(self): wf.CloseHandle(self.handle) def controlDTR(self, state): if state: cmd = wf.SETDTR else: cmd = wf.CLRDTR wf.EscapeCommFunction(self.handle, cmd) def sendCmd(self, command, maxReplyLength=40): # Send a command and wait for a reply. # The reply string must end with the termination character or be maxReplyLength long or longer. wf.PurgeComm(self.handle, wf.PURGE_TXCLEAR | wf.PURGE_RXCLEAR ) ec, nbytes = wf.WriteFile(self.handle, command, None) if nbytes != len(command): print 'sendCmd - write error', nbytes, len(command) return None data = self.readPort(maxReplyLength) return data def readPort(self, maxChars): data = "" while 1: rc, chars = wf.ReadFile(self.handle, maxChars, None) data += chars if string.count(chars, self.termChar): break if len(data) >= maxChars: break if len(chars) == 0: return None # Timeout return data if __name__ == '__main__': import time def testDTR(): # This code toggles the DTR line 4 times at 3 second intervals. dtr = 1 com = Comm() for x in range(4): print 'dtr=', dtr print com.controlDTR(dtr) dtr = dtr ^ 1 print time.sleep(3.0) com.close() print 'done' def loopbackTest(): """ This code performs a loopback test. A long string of characters is transmitted and the received data is compared with what was sent. The receive timeout can be tested by unplugging the loopback connector. """ com = Comm() td = [] total = 0.0 totalChars = 0 for x in range(5): cmd = "Now is the time for all good men. The quick brown fox jumped over the lazy dog's back."*3 + "\r" totalChars += len(cmd) ts = time.clock() data = com.sendCmd(cmd, maxReplyLength=len(cmd)) total += time.clock() - ts notok = -1 if data: for k in range(len(cmd)): if cmd[k] != data[k]: notok = k break if notok >= 0: print 'Error at %d' % (k) else: print 'Timeout' td.append(data) com.close() for n in td: print n a, b = total, total/totalChars print "Total elapsed time: %3.1f\nAverage time per character: %7.5f\nCharacters per sec: %5.1f" % (a, b, 1/b) print print loopbackTest()