Spawn.py

From PeformIQ Upgrade
Revision as of 15:28, 2 May 2008 by PeterHarding (talk | contribs) (New page: =Overview= This script is specifically targetted to run test.py which takes a single mandatory argument - the connector ID of the ODBC connection to use. So the script uses a list of con...)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Overview

This script is specifically targetted to run test.py which takes a single mandatory argument - the connector ID of the ODBC connection to use. So the script uses a list of connector numbers as run tokens.

#! /usr/bin/env python
#
#    $Id:$
#
#  Purpose:  Spawn multiple test scripts in a controlled way with logging
#
#-------------------------------------------------------------------------------

import sys
import re
import os
import time
import getopt
import string
import logging
import datetime

#-------------------------------------------------------------------------------

__version__        = '1.2.0'

debug_level        = 0
verbose_flg        = False

log                = None

stagger_delay      = 3
duration           = None
no_connectors      = 3
connectors         = []
procs              = {}

#===============================================================================

def INFO(msg):
   if log: log.info(' ' + msg)
   if verbose_flg: print "[test]  %s" % msg

#-------------------------------------------------------------------------------

def ERROR(msg):
   if log: log.error(msg)
   sys.stderr.write('[test]  %s\n' % msg)

#-------------------------------------------------------------------------------

def WARNING(msg):
   if log: log.warning('*****' + msg + '*****')
   if verbose_flg: print "[test]  %s" % msg

#===============================================================================

def init():
   global log

   pid      = os.getpid()

   if debug_level > 0:  print "My PID is %d" % pid

   log  = logging.getLogger('spawn')
   hdlr = logging.FileHandler('log/spawn.log')
   fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s')

   hdlr.setFormatter(fmtr)
   log.addHandler(hdlr) 
   log.setLevel(logging.INFO)

   INFO("===== Started processing ==================================")

#-------------------------------------------------------------------------

def do_child(connector, delay):
   time.sleep(delay)
   os.execv("./test.py", ["./test.py", "-c", "%d" % connector])

#-------------------------------------------------------------------------

def spawn():
   global stagger_delay

   loop = True

   delay = 0

   t_start = datetime.datetime.now()

   while loop:
      if len(connectors) > 0:
         connector = connectors.pop(0)

         child_pid = os.fork()

         if child_pid == 0:
            do_child(connector, delay)
         else:
            INFO("Spawned test process (PID %d) on connector %d" % (child_pid, connector))
            procs[child_pid] = connector

         delay += stagger_delay
      else:
         stagger_delay = 0

         (pid, status) = os.wait()

         connector     = procs[pid]

         INFO("cleaned up child process (PID %d) with connector %d" % (pid, connector))

         del procs[pid]

         t_now   = datetime.datetime.now()

         t_delta = t_now - t_start

         if duration:
            if t_delta < duration:
               connectors.append(connector)
               continue

         if len(procs) == 0:
            loop = False

   INFO("***** FINISHED *****")

#-------------------------------------------------------------------------------

def usage():
   USAGE = """
   
     Usage:
     
       $ spawn.py -n <no_to_spawn>
    
   """
   
   sys.stderr.write(USAGE)
   
#-------------------------------------------------------------------------------

def main(argv):
   global debug_flg, verbose_flg, no_connectors, duration, stagger_delay

   #----- Process command line arguments ----------------------------

   duration_min = None

   try:
      opts, args = getopt.getopt(argv, "dD:hn:s:vV",
              ["debug", "duration=", "help", "no=", "stagger=", "verbose", "version"])
   except getopt.GetoptError:
      usage()
      sys.exit(2)
   else:
      for opt, arg in opts:
         if opt in ("-h", "--help"):
            usage()
            sys.exit(0)
         elif opt == "-d":
            debug_level                += 1
         elif opt == "--debug":
            debug_level                 = int(arg)
         elif opt in ("-D", "--duration"):
            duration_min                = int(arg)
         elif opt in ("-s", "--stagger"):
            stagger_delay               = int(arg)
         elif opt in ("-n", "--no"):
            no_connectors               = int(arg)
            if no_connectors > 10:
               no_connectors = 10
               sys.stderr.write("Only 10 ODBC connectors available!")
               sys.stderr.flush()
         elif opt in ("-v", "--verbose"):
            verbose_flg = True
         elif opt in ("-V", "--version"):
            print "Version: %s" % __version__
            sys.exit(0)

   if duration_min:
      print "[spawn]  Running for a duration of %d minutes" % duration_min
      duration = datetime.timedelta(minutes=duration_min)

   for i in range(no_connectors):
      connectors.append(i + 1)

   print "[spawn]  Using these ODBC connectors - %s" % connectors

   init()

   spawn()

#---------------------------------------------------------------------

if __name__ == "__main__":
   main(sys.argv[1:])

#---------------------------------------------------------------------

test.py

Note, with this script I am running the ActiveState Python binary so as to make use of its ODBC functionality!

#!/c/PROGRA~1/ActiveState/Python/python.exe
#
#-------------------------------------------------------------------------------

import os
import sys
import dbi
import odbc
import time
import getopt
import logging

from datetime import datetime

#-------------------------------------------------------------------------------

from RTE import dcl

#-------------------------------------------------------------------------------

__version__   = "1.0.0"
__id__        = "@(#)  skel.py  [%s]  05/03/2008"

verbose_flg   = False

debug_level   = 0


table_name    = "Manifests"
PORT          = 9579
MAX_REQUESTS  = 10

log           = None
handle        = 'ODBC_%02d'
connector_no  = 1

#===============================================================================

def INFO(msg):
   if log: log.info(' ' + msg)
   if verbose_flg: print "[test]  %s" % msg

#-------------------------------------------------------------------------------

def ERROR(msg):
   if log: log.error(msg)
   sys.stderr.write('[test]  %s\n' % msg)

#-------------------------------------------------------------------------------

def WARNING(msg):
   if log: log.warning('*****' + msg + '*****')
   if verbose_flg: print "[test]  %s" % msg

#===============================================================================

def the_time():
   t = float(ref_time(False)) * 0.001

   return t

#------------------------------------------------------------------------------

t_reference = None

def ref_time(flg):
   global t_reference

   t_now  = datetime.now()

   if (flg):
      t_reference   = t_now
      t             = 0
   else:
      t_delta       = t_now - t_reference
      t             = ((t_delta.seconds * 1000000) + t_delta.microseconds)/1000.0

   return t

#===============================================================================

def query(no_queries, connector_no):
   ds = dcl.dcl(port=PORT)

   if (ds == None):
      print("Connection to data server failed - is data server process running?\n")
      return 1

   type_ref  = ds.RegisterType(table_name)

   if debug_level > 0:print "Data type \"%s\" registered as %d" % (table_name,  type_ref)

   s = odbc.odbc(handle % connector_no)

   cur = s.cursor()

   cnt = 0

   while cnt < no_queries:
      cnt += 1

      sp  = ds.GetNext(type_ref)

      if sp != None:
         manifest = sp[0]
      else:
         print "Type %d exhausted" % (pid, type_ref)
         return

      # print manifest

      qry   = "exec dbp_get_Article_history_for_internet %s" % manifest

      # print qry

      ref_time(True)

      cur.execute(qry)

      t_query = the_time()

      # print cur.description

      # for tup in cur.description:
         # print tup[0]

      row_cnt = 0

      while 1:
         rec = cur.fetchmany(10)
         row_cnt +=  len(rec)
         if not rec: break
         #print rec

      msg = "%02d ManifestNo %s  [%.3f] sec  %3d rows returned" % (cnt, manifest, t_query, row_cnt)

      sys.stderr.write("%s\n" % msg)
      sys.stderr.flush()

      INFO(msg)

#-------------------------------------------------------------------------------

def init():
   global lf
   global log

   pid      = os.getpid()

   if debug_level > 0:  print "My PID is %d" % pid

   log  = logging.getLogger('test')
   hdlr = logging.FileHandler('log/hcodevj86_%02d.log' % connector_no)
   fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s')

   hdlr.setFormatter(fmtr)
   log.addHandler(hdlr) 
   log.setLevel(logging.INFO)

   INFO("===== Started processing ==================================")

#===============================================================================

def main():
   global verbose_flg
   global debug_level
   global connector_no
   global MAX_REQUESTS

   try:
      opts, args = getopt.getopt(sys.argv[1:], "c:dD:M:vVw?")
   except getopt.error, msg:
      print __doc__
      return 1

   for o, a in opts:
      if o == '-?':
         print __doc__
         return 1
      elif o == '-c':
         connector_no   = int(a)
      elif o == '-d':
         debug_level   += 1
      elif o == '-D':
         debug_level    = int(a)
      elif o == '-M':
         MAX_REQUESTS   = int(a)
      elif o == '-v':
         verbose_flg    = True
      elif o == '-V':
         print "[xxxx]  Version: %s" % __version__
         return 1
      else:
         print __doc__
         return 1

   sys.stderr.write("[test]  Working directory is %s - Using ODBC connector %d\n" % (os.getcwd(), connector_no))
   sys.stderr.flush()

   if (debug_level > 0): print "Debugging level set to %d" % debug_level

   if args:
      for arg in args:
         print arg

   init()

   query(MAX_REQUESTS, connector_no)

   return 0

#-------------------------------------------------------------------------------

if __name__ == '__main__' or __name__ == sys.argv[0]:
   try:
      sys.exit(main())
   except KeyboardInterrupt, e:
      print "[test]  Interrupted!"

#-------------------------------------------------------------------------------

"""
Revision History:

     Date     Who   Description
   --------   ---   ------------------------------------------------------------
   20080428   plh   Initial implementation

Problems to fix:

To Do:

Issues:

"""