Python - Serial Comms Example

From PeformIQ Upgrade
Revision as of 10:30, 1 August 2015 by PeterHarding (talk | contribs) (→‎Script)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Overview

This script uses the win32file module to implement RS232 communications via a COMM port.

Script

#!/usr/bin/env pyhthon

'''
This code was inspired by Willy Heineman's post to
comp.lang.python on 7/31/2001 in which he advised
anyone needing such a module to write their own. I had
tried several serial packages and either could not get
them to work or they didn't have all the features I
needed. So following Willy's advice, I wrote something
tailored to my particular needs. Additional features
could be added but you will need to consult the
documentation for the underlying Windows functions
(CreateFile, WriteFile, ReadFile, etc.) as you will
not find a great deal of help in the documentation for
win32file. This can be found in the MSVS Help or
perhaps the Microsoft web site.

You will also need to install Mark Hammond's Windows
Extensions to Python, which you can find at
www.activestate.com. 

Thanks Willy.

Gary Richardson

*Comment inserted by Rob.*
'''

import win32file as wf
import string

class Comm: 
   def __init__(self, port='COM1', baudRate=wf.CBR_9600, stopBits=wf.ONESTOPBIT, parity=wf.NOPARITY ):
      self.handle = wf.CreateFile(port,
                                 wf.GENERIC_READ |  wf.GENERIC_WRITE,
                                 0,                                                   # Exclusive access
                                 None,                                             # No security attributes
                                  wf.OPEN_EXISTING,
                                 0,                                                   # Not overlapped
                                 0)                                                   # Zero required for comm devices
      wf.SetupComm(self.handle, 1000, 1000)   #Set the size of the input and output buffers
      wf.PurgeComm(self.handle, wf.PURGE_TXCLEAR | wf.PURGE_RXCLEAR )
      dcb = wf.GetCommState(self.handle)
      dcb.BaudRate = baudRate
      dcb.StopBits = stopBits
      dcb.Parity = parity
      dcb.fDtrControl = wf.DTR_CONTROL_DISABLE
      dcb.ByteSize = 8
      wf.SetCommState(self.handle, dcb)
      ct = 0xFFFFFFFF, 0xFFFFFFFF, 1000, 10, 10      # 1 sec ReadFile timeout
      wf.SetCommTimeouts(self.handle, ct)
      self.termChar = '\r'      # See readPort

   def close(self):
      wf.CloseHandle(self.handle)

   def controlDTR(self, state):
      if state:
         cmd = wf.SETDTR
      else:
         cmd = wf.CLRDTR
      wf.EscapeCommFunction(self.handle, cmd)
      
   def  sendCmd(self, command, maxReplyLength=40):
      # Send a command and wait for a reply.
      # The reply string must end with the termination character or be maxReplyLength long or longer.
      wf.PurgeComm(self.handle, wf.PURGE_TXCLEAR | wf.PURGE_RXCLEAR )
      ec, nbytes = wf.WriteFile(self.handle, command, None)
      if nbytes != len(command):
         print 'sendCmd - write error', nbytes, len(command)
         return None
      data = self.readPort(maxReplyLength)
      return data

   def readPort(self, maxChars):
      data = ""
      while 1:
         rc, chars = wf.ReadFile(self.handle, maxChars, None)
         data += chars
         if string.count(chars, self.termChar):
            break
         if len(data) >= maxChars:
            break
         if len(chars) == 0:
            return None               # Timeout
      return data
         

if __name__ == '__main__':
   import time

   def testDTR():
      # This code toggles the DTR line 4 times at 3 second intervals.
      dtr = 1
      com = Comm()
      for x in range(4):
         print 'dtr=', dtr
         print
         com.controlDTR(dtr)
         dtr = dtr ^ 1
         print
         time.sleep(3.0)
      com.close()
      print 'done'


   def loopbackTest():
      """ This code performs a loopback test. A long string of characters is transmitted and the received
          data is compared with what was sent.
          The receive timeout can be tested by unplugging the loopback connector.
      """
      com = Comm()
      td = []
      total = 0.0
      totalChars = 0
      for x in range(5):
         cmd = "Now is the time for all good men. The quick brown fox jumped over the lazy dog's back."*3 + "\r"
         totalChars += len(cmd)
         ts = time.clock()
         data = com.sendCmd(cmd, maxReplyLength=len(cmd))
         total += time.clock() - ts
         notok = -1
         if data:
            for k in range(len(cmd)):
               if cmd[k] != data[k]:
                  notok = k
                  break
            if notok >= 0:
               print 'Error at %d' % (k)
         else:
            print 'Timeout'
         td.append(data)
      com.close()
      for n in td: print n
      a, b = total, total/totalChars
      print "Total elapsed time: %3.1f\nAverage time per character: %7.5f\nCharacters per sec: %5.1f" % (a, b, 1/b)

   print
   print
   loopbackTest()