Implementing a Standard Library

From PeformIQ Upgrade
Jump to navigation Jump to search

Components

I use the following for building up utility scripts.

$ ls -lR ~/lib/py

/home/pharding/lib/py:
total 4
drwxr-xr-x+ 1 pharding Users 0 Feb 20 11:44 performiq

./performiq:
total 15
-rw-r--r-- 1 pharding Users  177 Feb 20 12:26 __init__.py
-rw-r--r-- 1 pharding Users   23 Feb 20 10:52 CONSTANTS.py
-rw-r--r-- 1 pharding Users 1248 Feb 20 12:44 CSV_Reader.py
-rw-r--r-- 1 pharding Users  361 Feb 20 10:55 Enum.py
-rw-r--r-- 1 pharding Users 2121 Feb 20 11:32 Logger.py
-rw-r--r-- 1 pharding Users 1077 Feb 20 12:08 Timer.py
env | grep PYTHONPATH

PYTHONPATH=/home/pharding/lib/py

__init__.py

$ cat __init__.py

import CONSTANTS


from CONSTANTS   import VERSION
from Enum        import Enum
from Logger      import Logger
from Timer       import Timer
from CSV_Reader  import CSV_Reader

CONSTANTS.py

$ cat CONSTANTS.py

VERSION   = "1.0.0"

Enum.py

$ cat Enum.py

#==========================================================================

class Enum(set):
   pass

   #--------------------------------------------------------------------

   def __getattr__(self, name):
      if name in self:
         return name
      raise AttributeError

#==========================================================================

Logger.py

$ cat Logger.py

import os
import sys
import logging

#==========================================================================

class Logger:
    logger = None
    debug  = False

    @classmethod
    def Info(cls, msg):
        global debug_level, verbose_flg

        if not cls.logger:
            cls.Init()

        cls.logger.info(' ' + msg)

        if cls.debug: sys.stderr.write("[%s::INFO]  %s\n" % (cls.name, msg))

    #----------------------------------------------------------------------

    @classmethod
    def Error(cls, msg):
        global debug_level, verbose_flg

        if not cls.logger:
            cls.Init()

        cls.logger.error(msg)

        if cls.debug: sys.stderr.write("[%s::ERROR]  %s\n" % (cls.name, msg))

    #----------------------------------------------------------------------

    @classmethod
    def Warning(cls, msg):
        global debug_level, verbose_flg

        if not cls.logger:
            cls.Init()

        cls.logger.warning('*****' + msg + '*****')

        if cls.debug: sys.stderr.write("[%s::WARNING]  %s\n" % (cls.name, msg))

    #----------------------------------------------------------------------

    @classmethod
    def Init(cls, name='logger', log_dir='/c/temp', debug=False):
        cls.debug = debug
        cls.name  = name
        cls.pid   = os.getpid()

        if cls.debug: sys.stderr.write("[%s::Init] PID is %d\n" % (cls.name, cls.pid))

        cls.log_file = '%s/%s.log' % (log_dir, name)

        try:
            cls.logger  = logging.getLogger(name)
            cls.hdlr    = logging.FileHandler(cls.log_file)
            cls.fmtr    = logging.Formatter('%(asctime)s %(levelname)s %(message)s')

            cls.hdlr.setFormatter(cls.fmtr)
            cls.logger.addHandler(cls.hdlr)
            cls.logger.setLevel(logging.INFO)

            cls.logger.info("===== Started processing %s" % ('=' * 20))
 
            cls.count = 0
        except IOError, msg:
            sys.stderr.write(cls.log_file + ': cannot open: ' + `msg` + '\n')
            sys.exit(1)

#==========================================================================

Timer.py

class Timer:
    t_reference = None

    #----------------------------------------------------------------------

    @classmethod
    def init(cls):
       return float(cls.get_reference_time(init=True)) * 0.001

    #----------------------------------------------------------------------

    @classmethod
    def time(cls):
       return float(cls.get_reference_time()) * 0.001

    #----------------------------------------------------------------------

    @classmethod
    def get_reference_time(cls, init=False):
       t_now  = datetime.now()

       if (init):
          cls.t_reference   = t_now
          t             = 0
       else:
          t_delta       = t_now - cls.t_reference
          t             = ((t_delta.seconds * 1000000) + t_delta.microseconds)/1000.0

       return t

    #----------------------------------------------------------------------

#==========================================================================

CSV_Reader.py

Just so that I can leave it around...

$ cat CSV_Reader.py

import csv
import sys

#=========================================================================

class CSV_Reader:

    #---------------------------------------------------------------------

    @classmethod
    def read(cls, fname, skip=0, obj=None, limit=0):
        csv_data  = []

        try:
            f_in      = open(fname, "rb")
        except IOError, msg:
            sys.stderr.write(cls.log_file + ': cannot open: ' + `msg` + '\n')
            sys.exit(1)

        reader = csv.reader(f_in)

        cnt   = 0

        for row in reader:
           cnt += 1

           if skip > 0:
               skip -= 1
               continue   # Skip headings

           if obj:
               data = obj(row)
           else:
               data = row

           # print data

           csv_data.append(data)

           if limit and (cnt < limit): continue

        f_in.close()  # Explicitly close the file *NOW*

        no_lines  = len(csv_data)

        print "Read %d data items..." % no_lines
        print "                 -> Total data  %d" % cnt

        return csv_data

    #---------------------------------------------------------------------

#=========================================================================

A Test Harness

Use this to test the performiq module...

$ cat tst_module.py 
#!/usr/bin/env python

import time

import performiq

from performiq import Logger

debug_level  = 0
verbose_flg  = True

#==========================================================================

class Data:

    #--------------------------------------------------------------------

    def __init__(self, row):
        self.One = row[0]
        self.Two = row[1]
        self.Three = row[2]
        self.Four = row[3]

    #--------------------------------------------------------------------

    def __str__(self):
        return "%s,%s,%s,%s" % (self.One, self.Two, self.Three, self.Four)

    #--------------------------------------------------------------------

#==========================================================================


print "performiq version [%s]" % performiq.VERSION


print "Set timer - %5.3f" % performiq.Timer.init()

Logger.Init(debug=True)

Logger.Info("Testing - Info...")
Logger.Warning("Testing - Warning...")
Logger.Error("Testing - Error...")

time.sleep(1.4)

print "Get timer - %5.3f" % performiq.Timer.time()

csv_data = """\
a,b,c,d
e,f,g,h
i,j,k,l
"""

csv_file = 'tst.csv'

f_out = open(csv_file, 'w+')

f_out.write(csv_data)
f_out.close()

data = performiq.CSV_Reader.read(csv_file, obj=Data)

if data:
    for d in data:
        print d

Running it:

$ ./tst_module.py 
performiq version [1.0.0]
Set timer - 0.000
[logger::Init] PID is 8404
[logger::INFO]  Testing - Info...
[logger::WARNING]  Testing - Warning...
[logger::ERROR]  Testing - Error...
Get timer - 3.001
Read 3 data items...
                 -> Total data  3
a,b,c,d
e,f,g,h
i,j,k,l

A Variant skel.py

This variant of my skel.py script uses the module.

$ cat skel.py 
#!/usr/bin/env python
#
#       Author:  Peter Harding  <plh@performiq.com.au>
#
#                PerformIQ Pty. Ltd.
#                Suite 230,
#                Level 2,
#                1 Queens Road,
#                MELBOURNE, VIC, 3004
#
#                Mobile:  0418 375 085
#
#           @(#) [2.3.01] skel.py 2013-02-20
#
#
# NAME
#   skel.py - Skeleton python script
#
# SYNOPSIS
#   skel.py [-dv]
#
# PARAMETERS
#   See __doc__ below
#
# DESCRIPTION
#   ...
#
# RETURNS
#   0 for successful completion, 1 for any error
#
# FILES
#   ...
#
#--------------------------------------------------------------------------
"""
Usage:

   $ skel.py [-dv] -apn 10y

   $ skel.py [-dv] -o             # ...

Parameters:

   -a              ...
   -p              ...
   -n 10           No of ...
   -o              ...
   -d              Debug
   -v              Verbose

"""
#--------------------------------------------------------------------------

import os
import re
import sys
import time
import getopt
import random
import pickle
import pprint
import logging
import urllib

from datetime import datetime

from performiq import Enum, Logger

#--------------------------------------------------------------------------

__version__   = "2.3.01"
__at_id__     = "@(#)  skel.py  [%s]  2013-02-20" % __version__

verbose_flg   = False

debug_level   = 0

table_name    = "people"
DSERVER_PORT  = 9570
MAX_REQUESTS  = 200

lf            = None
log           = None

LOG_DIR       = "/tmp"
home_dir      = None

p_crlf        = re.compile(r'[\r\n]*')

pp            = pprint.PrettyPrinter(indent=3)

#==========================================================================

class Data:
   TotalCount = 0

   #--------------------------------------------------------------------

   @classmethod
   def count_row(cls):
      cls.TotalCount += 1

   #--------------------------------------------------------------------

   def __init__(self, row):
      Data.count_row()
      cols = row.split(',')
      self.One = cols[0]
      self.Two = cols[1]

#==========================================================================
# And here is the real work...

def do_work(fname):
   Logger.Info("[do_work]")

   fname_in  = "%s.log" % fname
   fname_out = "%s.dat" % fname

   try:
      f_in = open(fname_in, 'r')
   except IOError, msg:
      sys.stderr.write(fname_in + ': cannot open: ' + `msg` + '\n')
      sys.exit(1)

   try:
      f_out = open(fname_out, 'a+')
   except IOError, msg:
      sys.stderr.write(fname_out + ': cannot open: ' + `msg` + '\n')
      sys.exit(1)

   while True:
      line = f_in.readline()

      if not line: break

      #  Truncate EoL markers from end of line

      line = p_crlf.sub('', line)  # or 'line = line[:-1]'

      data = Data(line)

      f_out.write("[%s]\n" % (line, ))

   f_in.close()
   f_out.close()

#=========================================================================

def usage():
   print __doc__

#-------------------------------------------------------------------------

def main(argv):
   global verbose_flg
   global debug_level
   global target
   global home_dir

   try:
      home_dir = os.environ['HOME']
   except:
      print "Set HOME environment variable and re-run"
      sys.exit(0)

   Modes    = Enum(["Info", "Parse", ])

   mode     = Modes.Info
   filename = "test"

   try:
      opts, args = getopt.getopt(argv, "dD:f:hvV?",
              ("debug", "debug-level=", "file=", "help", "verbose", "version"))
   except getopt.error, msg:
      usage()
      return 1

   for opt, arg in opts:
      if opt in ("-?", "-h", "--help"):
         usage()
         return 0
      elif opt in ('-d', '--debug'):
         debug_level    += 1
      elif opt in ('-D', '--debug-level'):
         debug_level     = int(arg)
      elif opt in ('-f', '--file'):
         mode = Modes.Parse
         filename        = arg
      elif opt in ('-v', '--verbose'):
         verbose_flg     = True
      elif opt in ('-v', '--version'):
         print "[skel]  Version: %s" % __version__
         return 1
      else:
         usage()
         return 1

   sys.stderr.write("[skel]  Working directory is %s\n" % os.getcwd())

   if (debug_level > 0): sys.stderr.write("[skel]  Debugging level set to %d\n" % debug_level)

   sys.stderr.flush()

   Logger.Init(name='skel')

   if mode == Modes.Info:
      Logger.Info('Info')
   elif mode == Modes.Parse:
      Logger.Info('Parsing')
      do_work(filename)
   else:
      Logger.Info('Nothing to do')

   return 0

#--------------------------------------------------------------------------

if __name__ == '__main__' or __name__ == sys.argv[0]:
   try:
      sys.exit(main(sys.argv[1:]))
   except KeyboardInterrupt, e:
      print "[skel]  Interrupted!"

#--------------------------------------------------------------------------

"""
Revision History:

     Date     Who   Description
   --------   ---   ------------------------------------------------------------
   20031014   plh   Initial implementation
   20111101   plh   Add in Enums for modal behaviour
   20130220   plh   Reconstructed performiq module

Problems to fix:

To Do:

Issues:


"""