Implementing a Standard Library
Jump to navigation
Jump to search
Components
I use the following for building up utility scripts.
$ ls -lR ~/lib/py /home/pharding/lib/py: total 4 drwxr-xr-x+ 1 pharding Users 0 Feb 20 11:44 performiq ./performiq: total 15 -rw-r--r-- 1 pharding Users 177 Feb 20 12:26 __init__.py -rw-r--r-- 1 pharding Users 23 Feb 20 10:52 CONSTANTS.py -rw-r--r-- 1 pharding Users 1248 Feb 20 12:44 CSV_Reader.py -rw-r--r-- 1 pharding Users 361 Feb 20 10:55 Enum.py -rw-r--r-- 1 pharding Users 2121 Feb 20 11:32 Logger.py -rw-r--r-- 1 pharding Users 1077 Feb 20 12:08 Timer.py
env | grep PYTHONPATH PYTHONPATH=/home/pharding/lib/py
__init__.py
$ cat __init__.py import CONSTANTS from CONSTANTS import VERSION from Enum import Enum from Logger import Logger from Timer import Timer from CSV_Reader import CSV_Reader
CONSTANTS.py
$ cat CONSTANTS.py VERSION = "1.0.0"
Enum.py
$ cat Enum.py #========================================================================== class Enum(set): pass #-------------------------------------------------------------------- def __getattr__(self, name): if name in self: return name raise AttributeError #==========================================================================
Logger.py
$ cat Logger.py import os import sys import logging from datetime import datetime #========================================================================== class MyFormatter(logging.Formatter): converter = datetime.fromtimestamp def formatTime(self, record, datefmt=None): ct = self.converter(record.created) if datefmt: s = ct.strftime(datefmt)[:23] else: t = ct.strftime("%Y-%m-%d %H:%M:%S") s = "%s.%03d" % (t, record.msecs) return s #========================================================================== class Logger: logger = None debug = False #---------------------------------------------------------------------- @classmethod def Info(cls, msg): global debug_level, verbose_flg if not cls.logger: cls.Init() cls.logger.info(' ' + msg) if cls.debug: sys.stderr.write("[%s::INFO] %s\n" % (cls.name, msg)) #---------------------------------------------------------------------- @classmethod def Error(cls, msg): global debug_level, verbose_flg if not cls.logger: cls.Init() cls.logger.error(msg) if cls.debug: sys.stderr.write("[%s::ERROR] %s\n" % (cls.name, msg)) #---------------------------------------------------------------------- @classmethod def Warning(cls, msg): global debug_level, verbose_flg if not cls.logger: cls.Init() cls.logger.warning('*****' + msg + '*****') if cls.debug: sys.stderr.write("[%s::WARNING] %s\n" % (cls.name, msg)) #---------------------------------------------------------------------- @classmethod def Init(cls, name='logger', log_dir='/c/temp', debug=False): cls.debug = debug cls.name = name cls.pid = os.getpid() if cls.debug: sys.stderr.write("[%s::Init] PID is %d\n" % (cls.name, cls.pid)) cls.log_file = '%s/%s.log' % (log_dir, name) try: cls.logger = logging.getLogger(name) cls.hdlr = logging.FileHandler(cls.log_file) cls.fmtr = MyFormatter('%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d,%H:%M:%S.%f') cls.hdlr.setFormatter(cls.fmtr) cls.logger.addHandler(cls.hdlr) cls.logger.setLevel(logging.INFO) cls.logger.info("===== Started processing %s" % ('=' * 20)) cls.count = 0 except IOError, msg: sys.stderr.write(cls.log_file + ': cannot open: ' + `msg` + '\n') sys.exit(1) #==========================================================================
Timer.py
class Timer: t_reference = None #---------------------------------------------------------------------- @classmethod def init(cls): return float(cls.get_reference_time(init=True)) * 0.001 #---------------------------------------------------------------------- @classmethod def time(cls): return float(cls.get_reference_time()) * 0.001 #---------------------------------------------------------------------- @classmethod def get_reference_time(cls, init=False): t_now = datetime.now() if (init): cls.t_reference = t_now t = 0 else: t_delta = t_now - cls.t_reference t = ((t_delta.seconds * 1000000) + t_delta.microseconds)/1000.0 return t #---------------------------------------------------------------------- #==========================================================================
CSV_Reader.py
Just so that I can leave it around...
$ cat CSV_Reader.py import csv import sys #========================================================================= class CSV_Reader: #--------------------------------------------------------------------- @classmethod def read(cls, fname, skip=0, obj=None, limit=0): csv_data = [] try: f_in = open(fname, "rb") except IOError, msg: sys.stderr.write(cls.log_file + ': cannot open: ' + `msg` + '\n') sys.exit(1) reader = csv.reader(f_in) cnt = 0 for row in reader: cnt += 1 if skip > 0: skip -= 1 continue # Skip headings if obj: data = obj(row) else: data = row # print data csv_data.append(data) if limit and (cnt < limit): continue f_in.close() # Explicitly close the file *NOW* no_lines = len(csv_data) print "Read %d data items..." % no_lines print " -> Total data %d" % cnt return csv_data #--------------------------------------------------------------------- #=========================================================================
A Test Harness
Use this to test the performiq module...
$ cat tst_module.py #!/usr/bin/env python import time import performiq from performiq import Logger debug_level = 0 verbose_flg = True #========================================================================== class Data: #-------------------------------------------------------------------- def __init__(self, row): self.One = row[0] self.Two = row[1] self.Three = row[2] self.Four = row[3] #-------------------------------------------------------------------- def __str__(self): return "%s,%s,%s,%s" % (self.One, self.Two, self.Three, self.Four) #-------------------------------------------------------------------- #========================================================================== print "performiq version [%s]" % performiq.VERSION print "Set timer - %5.3f" % performiq.Timer.init() Logger.Init(debug=True) Logger.Info("Testing - Info...") Logger.Warning("Testing - Warning...") Logger.Error("Testing - Error...") time.sleep(1.4) print "Get timer - %5.3f" % performiq.Timer.time() csv_data = """\ a,b,c,d e,f,g,h i,j,k,l """ csv_file = 'tst.csv' f_out = open(csv_file, 'w+') f_out.write(csv_data) f_out.close() data = performiq.CSV_Reader.read(csv_file, obj=Data) if data: for d in data: print d
Running it:
$ ./tst_module.py performiq version [1.0.0] Set timer - 0.000 [logger::Init] PID is 8404 [logger::INFO] Testing - Info... [logger::WARNING] Testing - Warning... [logger::ERROR] Testing - Error... Get timer - 3.001 Read 3 data items... -> Total data 3 a,b,c,d e,f,g,h i,j,k,l
A Variant skel.py
This variant of my skel.py script uses the module.
$ cat skel.py #!/usr/bin/env python # # Author: Peter Harding <plh@performiq.com.au> # # PerformIQ Pty. Ltd. # Suite 230, # Level 2, # 1 Queens Road, # MELBOURNE, VIC, 3004 # # Mobile: 0418 375 085 # # @(#) [2.3.01] skel.py 2013-02-20 # # # NAME # skel.py - Skeleton python script # # SYNOPSIS # skel.py [-dv] # # PARAMETERS # See __doc__ below # # DESCRIPTION # ... # # RETURNS # 0 for successful completion, 1 for any error # # FILES # ... # #-------------------------------------------------------------------------- """ Usage: $ skel.py [-dv] -apn 10y $ skel.py [-dv] -o # ... Parameters: -a ... -p ... -n 10 No of ... -o ... -d Debug -v Verbose """ #-------------------------------------------------------------------------- import os import re import sys import time import getopt import random import pickle import pprint import logging import urllib from datetime import datetime from performiq import Enum, Logger #-------------------------------------------------------------------------- __version__ = "2.3.01" __at_id__ = "@(#) skel.py [%s] 2013-02-20" % __version__ verbose_flg = False debug_level = 0 table_name = "people" DSERVER_PORT = 9570 MAX_REQUESTS = 200 lf = None log = None LOG_DIR = "/tmp" home_dir = None p_crlf = re.compile(r'[\r\n]*') pp = pprint.PrettyPrinter(indent=3) #========================================================================== class Data: TotalCount = 0 #-------------------------------------------------------------------- @classmethod def count_row(cls): cls.TotalCount += 1 #-------------------------------------------------------------------- def __init__(self, row): Data.count_row() cols = row.split(',') self.One = cols[0] self.Two = cols[1] #========================================================================== # And here is the real work... def do_work(fname): Logger.Info("[do_work]") fname_in = "%s.log" % fname fname_out = "%s.dat" % fname try: f_in = open(fname_in, 'r') except IOError, msg: sys.stderr.write(fname_in + ': cannot open: ' + `msg` + '\n') sys.exit(1) try: f_out = open(fname_out, 'a+') except IOError, msg: sys.stderr.write(fname_out + ': cannot open: ' + `msg` + '\n') sys.exit(1) while True: line = f_in.readline() if not line: break # Truncate EoL markers from end of line line = p_crlf.sub('', line) # or 'line = line[:-1]' data = Data(line) f_out.write("[%s]\n" % (line, )) f_in.close() f_out.close() #========================================================================= def usage(): print __doc__ #------------------------------------------------------------------------- def main(argv): global verbose_flg global debug_level global target global home_dir try: home_dir = os.environ['HOME'] except: print "Set HOME environment variable and re-run" sys.exit(0) Modes = Enum(["Info", "Parse", ]) mode = Modes.Info filename = "test" try: opts, args = getopt.getopt(argv, "dD:f:hvV?", ("debug", "debug-level=", "file=", "help", "verbose", "version")) except getopt.error, msg: usage() return 1 for opt, arg in opts: if opt in ("-?", "-h", "--help"): usage() return 0 elif opt in ('-d', '--debug'): debug_level += 1 elif opt in ('-D', '--debug-level'): debug_level = int(arg) elif opt in ('-f', '--file'): mode = Modes.Parse filename = arg elif opt in ('-v', '--verbose'): verbose_flg = True elif opt in ('-v', '--version'): print "[skel] Version: %s" % __version__ return 1 else: usage() return 1 sys.stderr.write("[skel] Working directory is %s\n" % os.getcwd()) if (debug_level > 0): sys.stderr.write("[skel] Debugging level set to %d\n" % debug_level) sys.stderr.flush() Logger.Init(name='skel') if mode == Modes.Info: Logger.Info('Info') elif mode == Modes.Parse: Logger.Info('Parsing') do_work(filename) else: Logger.Info('Nothing to do') return 0 #-------------------------------------------------------------------------- if __name__ == '__main__' or __name__ == sys.argv[0]: try: sys.exit(main(sys.argv[1:])) except KeyboardInterrupt, e: print "[skel] Interrupted!" #-------------------------------------------------------------------------- """ Revision History: Date Who Description -------- --- ------------------------------------------------------------ 20031014 plh Initial implementation 20111101 plh Add in Enums for modal behaviour 20130220 plh Reconstructed performiq module Problems to fix: To Do: Issues: """