Difference between revisions of "Implementing a Standard Library"
Jump to navigation
Jump to search
PeterHarding (talk | contribs) |
PeterHarding (talk | contribs) |
||
| (3 intermediate revisions by the same user not shown) | |||
| Line 34: | Line 34: | ||
import CONSTANTS | import CONSTANTS | ||
from CONSTANTS import VERSION | |||
from Enum | from CONSTANTS import VERSION | ||
from Logger | from Enum import Enum | ||
from Timer | from Logger import Logger | ||
from Timer import Timer | |||
from CSV_Reader import CSV_Reader | |||
</pre> | </pre> | ||
| Line 76: | Line 78: | ||
import sys | import sys | ||
import logging | import logging | ||
from datetime import datetime | |||
#========================================================================== | |||
class MyFormatter(logging.Formatter): | |||
converter = datetime.fromtimestamp | |||
def formatTime(self, record, datefmt=None): | |||
ct = self.converter(record.created) | |||
if datefmt: | |||
s = ct.strftime(datefmt)[:23] | |||
else: | |||
t = ct.strftime("%Y-%m-%d %H:%M:%S") | |||
s = "%s.%03d" % (t, record.msecs) | |||
return s | |||
#========================================================================== | #========================================================================== | ||
| Line 82: | Line 100: | ||
logger = None | logger = None | ||
debug = False | debug = False | ||
#---------------------------------------------------------------------- | |||
@classmethod | @classmethod | ||
| Line 135: | Line 155: | ||
cls.logger = logging.getLogger(name) | cls.logger = logging.getLogger(name) | ||
cls.hdlr = logging.FileHandler(cls.log_file) | cls.hdlr = logging.FileHandler(cls.log_file) | ||
cls.fmtr = | cls.fmtr = MyFormatter('%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d,%H:%M:%S.%f') | ||
cls.hdlr.setFormatter(cls.fmtr) | cls.hdlr.setFormatter(cls.fmtr) | ||
| Line 149: | Line 169: | ||
#========================================================================== | #========================================================================== | ||
</pre> | </pre> | ||
| Line 154: | Line 175: | ||
<pre> | <pre> | ||
class Timer: | class Timer: | ||
t_reference = None | t_reference = None | ||
| Line 177: | Line 196: | ||
t_now = datetime.now() | t_now = datetime.now() | ||
if ( | if (init): | ||
cls.t_reference = t_now | cls.t_reference = t_now | ||
t = 0 | t = 0 | ||
Latest revision as of 09:27, 13 March 2013
Components
I use the following for building up utility scripts.
$ ls -lR ~/lib/py /home/pharding/lib/py: total 4 drwxr-xr-x+ 1 pharding Users 0 Feb 20 11:44 performiq ./performiq: total 15 -rw-r--r-- 1 pharding Users 177 Feb 20 12:26 __init__.py -rw-r--r-- 1 pharding Users 23 Feb 20 10:52 CONSTANTS.py -rw-r--r-- 1 pharding Users 1248 Feb 20 12:44 CSV_Reader.py -rw-r--r-- 1 pharding Users 361 Feb 20 10:55 Enum.py -rw-r--r-- 1 pharding Users 2121 Feb 20 11:32 Logger.py -rw-r--r-- 1 pharding Users 1077 Feb 20 12:08 Timer.py
env | grep PYTHONPATH PYTHONPATH=/home/pharding/lib/py
__init__.py
$ cat __init__.py import CONSTANTS from CONSTANTS import VERSION from Enum import Enum from Logger import Logger from Timer import Timer from CSV_Reader import CSV_Reader
CONSTANTS.py
$ cat CONSTANTS.py VERSION = "1.0.0"
Enum.py
$ cat Enum.py
#==========================================================================
class Enum(set):
pass
#--------------------------------------------------------------------
def __getattr__(self, name):
if name in self:
return name
raise AttributeError
#==========================================================================
Logger.py
$ cat Logger.py
import os
import sys
import logging
from datetime import datetime
#==========================================================================
class MyFormatter(logging.Formatter):
converter = datetime.fromtimestamp
def formatTime(self, record, datefmt=None):
ct = self.converter(record.created)
if datefmt:
s = ct.strftime(datefmt)[:23]
else:
t = ct.strftime("%Y-%m-%d %H:%M:%S")
s = "%s.%03d" % (t, record.msecs)
return s
#==========================================================================
class Logger:
logger = None
debug = False
#----------------------------------------------------------------------
@classmethod
def Info(cls, msg):
global debug_level, verbose_flg
if not cls.logger:
cls.Init()
cls.logger.info(' ' + msg)
if cls.debug: sys.stderr.write("[%s::INFO] %s\n" % (cls.name, msg))
#----------------------------------------------------------------------
@classmethod
def Error(cls, msg):
global debug_level, verbose_flg
if not cls.logger:
cls.Init()
cls.logger.error(msg)
if cls.debug: sys.stderr.write("[%s::ERROR] %s\n" % (cls.name, msg))
#----------------------------------------------------------------------
@classmethod
def Warning(cls, msg):
global debug_level, verbose_flg
if not cls.logger:
cls.Init()
cls.logger.warning('*****' + msg + '*****')
if cls.debug: sys.stderr.write("[%s::WARNING] %s\n" % (cls.name, msg))
#----------------------------------------------------------------------
@classmethod
def Init(cls, name='logger', log_dir='/c/temp', debug=False):
cls.debug = debug
cls.name = name
cls.pid = os.getpid()
if cls.debug: sys.stderr.write("[%s::Init] PID is %d\n" % (cls.name, cls.pid))
cls.log_file = '%s/%s.log' % (log_dir, name)
try:
cls.logger = logging.getLogger(name)
cls.hdlr = logging.FileHandler(cls.log_file)
cls.fmtr = MyFormatter('%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d,%H:%M:%S.%f')
cls.hdlr.setFormatter(cls.fmtr)
cls.logger.addHandler(cls.hdlr)
cls.logger.setLevel(logging.INFO)
cls.logger.info("===== Started processing %s" % ('=' * 20))
cls.count = 0
except IOError, msg:
sys.stderr.write(cls.log_file + ': cannot open: ' + `msg` + '\n')
sys.exit(1)
#==========================================================================
Timer.py
class Timer:
t_reference = None
#----------------------------------------------------------------------
@classmethod
def init(cls):
return float(cls.get_reference_time(init=True)) * 0.001
#----------------------------------------------------------------------
@classmethod
def time(cls):
return float(cls.get_reference_time()) * 0.001
#----------------------------------------------------------------------
@classmethod
def get_reference_time(cls, init=False):
t_now = datetime.now()
if (init):
cls.t_reference = t_now
t = 0
else:
t_delta = t_now - cls.t_reference
t = ((t_delta.seconds * 1000000) + t_delta.microseconds)/1000.0
return t
#----------------------------------------------------------------------
#==========================================================================
CSV_Reader.py
Just so that I can leave it around...
$ cat CSV_Reader.py
import csv
import sys
#=========================================================================
class CSV_Reader:
#---------------------------------------------------------------------
@classmethod
def read(cls, fname, skip=0, obj=None, limit=0):
csv_data = []
try:
f_in = open(fname, "rb")
except IOError, msg:
sys.stderr.write(cls.log_file + ': cannot open: ' + `msg` + '\n')
sys.exit(1)
reader = csv.reader(f_in)
cnt = 0
for row in reader:
cnt += 1
if skip > 0:
skip -= 1
continue # Skip headings
if obj:
data = obj(row)
else:
data = row
# print data
csv_data.append(data)
if limit and (cnt < limit): continue
f_in.close() # Explicitly close the file *NOW*
no_lines = len(csv_data)
print "Read %d data items..." % no_lines
print " -> Total data %d" % cnt
return csv_data
#---------------------------------------------------------------------
#=========================================================================
A Test Harness
Use this to test the performiq module...
$ cat tst_module.py
#!/usr/bin/env python
import time
import performiq
from performiq import Logger
debug_level = 0
verbose_flg = True
#==========================================================================
class Data:
#--------------------------------------------------------------------
def __init__(self, row):
self.One = row[0]
self.Two = row[1]
self.Three = row[2]
self.Four = row[3]
#--------------------------------------------------------------------
def __str__(self):
return "%s,%s,%s,%s" % (self.One, self.Two, self.Three, self.Four)
#--------------------------------------------------------------------
#==========================================================================
print "performiq version [%s]" % performiq.VERSION
print "Set timer - %5.3f" % performiq.Timer.init()
Logger.Init(debug=True)
Logger.Info("Testing - Info...")
Logger.Warning("Testing - Warning...")
Logger.Error("Testing - Error...")
time.sleep(1.4)
print "Get timer - %5.3f" % performiq.Timer.time()
csv_data = """\
a,b,c,d
e,f,g,h
i,j,k,l
"""
csv_file = 'tst.csv'
f_out = open(csv_file, 'w+')
f_out.write(csv_data)
f_out.close()
data = performiq.CSV_Reader.read(csv_file, obj=Data)
if data:
for d in data:
print d
Running it:
$ ./tst_module.py
performiq version [1.0.0]
Set timer - 0.000
[logger::Init] PID is 8404
[logger::INFO] Testing - Info...
[logger::WARNING] Testing - Warning...
[logger::ERROR] Testing - Error...
Get timer - 3.001
Read 3 data items...
-> Total data 3
a,b,c,d
e,f,g,h
i,j,k,l
A Variant skel.py
This variant of my skel.py script uses the module.
$ cat skel.py
#!/usr/bin/env python
#
# Author: Peter Harding <plh@performiq.com.au>
#
# PerformIQ Pty. Ltd.
# Suite 230,
# Level 2,
# 1 Queens Road,
# MELBOURNE, VIC, 3004
#
# Mobile: 0418 375 085
#
# @(#) [2.3.01] skel.py 2013-02-20
#
#
# NAME
# skel.py - Skeleton python script
#
# SYNOPSIS
# skel.py [-dv]
#
# PARAMETERS
# See __doc__ below
#
# DESCRIPTION
# ...
#
# RETURNS
# 0 for successful completion, 1 for any error
#
# FILES
# ...
#
#--------------------------------------------------------------------------
"""
Usage:
$ skel.py [-dv] -apn 10y
$ skel.py [-dv] -o # ...
Parameters:
-a ...
-p ...
-n 10 No of ...
-o ...
-d Debug
-v Verbose
"""
#--------------------------------------------------------------------------
import os
import re
import sys
import time
import getopt
import random
import pickle
import pprint
import logging
import urllib
from datetime import datetime
from performiq import Enum, Logger
#--------------------------------------------------------------------------
__version__ = "2.3.01"
__at_id__ = "@(#) skel.py [%s] 2013-02-20" % __version__
verbose_flg = False
debug_level = 0
table_name = "people"
DSERVER_PORT = 9570
MAX_REQUESTS = 200
lf = None
log = None
LOG_DIR = "/tmp"
home_dir = None
p_crlf = re.compile(r'[\r\n]*')
pp = pprint.PrettyPrinter(indent=3)
#==========================================================================
class Data:
TotalCount = 0
#--------------------------------------------------------------------
@classmethod
def count_row(cls):
cls.TotalCount += 1
#--------------------------------------------------------------------
def __init__(self, row):
Data.count_row()
cols = row.split(',')
self.One = cols[0]
self.Two = cols[1]
#==========================================================================
# And here is the real work...
def do_work(fname):
Logger.Info("[do_work]")
fname_in = "%s.log" % fname
fname_out = "%s.dat" % fname
try:
f_in = open(fname_in, 'r')
except IOError, msg:
sys.stderr.write(fname_in + ': cannot open: ' + `msg` + '\n')
sys.exit(1)
try:
f_out = open(fname_out, 'a+')
except IOError, msg:
sys.stderr.write(fname_out + ': cannot open: ' + `msg` + '\n')
sys.exit(1)
while True:
line = f_in.readline()
if not line: break
# Truncate EoL markers from end of line
line = p_crlf.sub('', line) # or 'line = line[:-1]'
data = Data(line)
f_out.write("[%s]\n" % (line, ))
f_in.close()
f_out.close()
#=========================================================================
def usage():
print __doc__
#-------------------------------------------------------------------------
def main(argv):
global verbose_flg
global debug_level
global target
global home_dir
try:
home_dir = os.environ['HOME']
except:
print "Set HOME environment variable and re-run"
sys.exit(0)
Modes = Enum(["Info", "Parse", ])
mode = Modes.Info
filename = "test"
try:
opts, args = getopt.getopt(argv, "dD:f:hvV?",
("debug", "debug-level=", "file=", "help", "verbose", "version"))
except getopt.error, msg:
usage()
return 1
for opt, arg in opts:
if opt in ("-?", "-h", "--help"):
usage()
return 0
elif opt in ('-d', '--debug'):
debug_level += 1
elif opt in ('-D', '--debug-level'):
debug_level = int(arg)
elif opt in ('-f', '--file'):
mode = Modes.Parse
filename = arg
elif opt in ('-v', '--verbose'):
verbose_flg = True
elif opt in ('-v', '--version'):
print "[skel] Version: %s" % __version__
return 1
else:
usage()
return 1
sys.stderr.write("[skel] Working directory is %s\n" % os.getcwd())
if (debug_level > 0): sys.stderr.write("[skel] Debugging level set to %d\n" % debug_level)
sys.stderr.flush()
Logger.Init(name='skel')
if mode == Modes.Info:
Logger.Info('Info')
elif mode == Modes.Parse:
Logger.Info('Parsing')
do_work(filename)
else:
Logger.Info('Nothing to do')
return 0
#--------------------------------------------------------------------------
if __name__ == '__main__' or __name__ == sys.argv[0]:
try:
sys.exit(main(sys.argv[1:]))
except KeyboardInterrupt, e:
print "[skel] Interrupted!"
#--------------------------------------------------------------------------
"""
Revision History:
Date Who Description
-------- --- ------------------------------------------------------------
20031014 plh Initial implementation
20111101 plh Add in Enums for modal behaviour
20130220 plh Reconstructed performiq module
Problems to fix:
To Do:
Issues:
"""