Difference between revisions of "Skel.py"
Jump to navigation
Jump to search
PeterHarding (talk | contribs) |
PeterHarding (talk | contribs) |
||
(11 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
=Intro= | |||
Some skeleton Python scripts... | |||
Also check out - [[Python Skeleton Scripts]] - and - [[Implementing a Standard Library]] | |||
=Using Enums= | |||
<pre> | |||
#!/usr/bin/env python27 | |||
# | |||
# @(#) [1.0.01] skel.py 2012-02-04 | |||
# | |||
# | |||
# NAME | |||
# skel.py - Skeleton python script | |||
# | |||
# SYNOPSIS | |||
# skel.py [-dv] | |||
# | |||
# PARAMETERS | |||
# See __doc__ below | |||
# | |||
# DESCRIPTION | |||
# ... | |||
# | |||
# RETURNS | |||
# 0 for successful completion, 1 for any error | |||
# | |||
# FILES | |||
# ... | |||
# | |||
#-------------------------------------------------------------------------- | |||
""" | |||
Usage: | |||
$ skel.py [-dv] -apn 10y | |||
$ skel.py [-dv] -o # ... | |||
Parameters: | |||
-a ... | |||
-p ... | |||
-n 10 No of ... | |||
-o ... | |||
-d Debug | |||
-v Verbose | |||
""" | |||
#-------------------------------------------------------------------------- | |||
import os | |||
import re | |||
import sys | |||
import time | |||
import getopt | |||
import random | |||
import pickle | |||
import pprint | |||
import logging | |||
import urllib | |||
from datetime import datetime | |||
#-------------------------------------------------------------------------- | |||
__version__ = "1.2.0" | |||
__id__ = "@(#) skel.py [%s] 2011-11-01" | |||
verbose_flg = False | |||
debug_level = 0 | |||
table_name = "people" | |||
DSERVER_PORT = 9570 | |||
MAX_REQUESTS = 200 | |||
lf = None | |||
log = None | |||
targets = { | |||
'dev' : { 'host': 'ldap-dev', 'port' : 2389, 'type' : 'OracleOID' }, | |||
'prod' : { 'host': 'openldap', 'port' : 2389, 'type' : 'OpenLDAP' }, | |||
} | |||
LOG_DIR = "/tmp" | |||
home_dir = None | |||
p_crlf = re.compile(r'[\r\n]*') | |||
pp = pprint.PrettyPrinter(indent=3) | |||
#========================================================================== | |||
class Enum(set): | |||
pass | |||
#-------------------------------------------------------------------- | |||
def __getattr__(self, name): | |||
if name in self: | |||
return name | |||
raise AttributeError | |||
#========================================================================== | |||
class Data: | |||
TotalCount = 0 | |||
#-------------------------------------------------------------------- | |||
@classmethod | |||
def count_row(cls): | |||
cls.TotalCount += 1 | |||
#-------------------------------------------------------------------- | |||
def __init__(self, row): | |||
Data.count_row() | |||
cols = row.split(',') | |||
self.One = cols[0] | |||
self.Two = cols[1] | |||
#========================================================================== | |||
def INFO(msg): | |||
if log: log.info(' ' + msg) | |||
if verbose_flg: print "[skel] %s" % msg | |||
#-------------------------------------------------------------------------- | |||
def ERROR(msg): | |||
if log: log.error(msg) | |||
sys.stderr.write('[skel] %s\n' % msg) | |||
#-------------------------------------------------------------------------- | |||
def WARNING(msg): | |||
if log: log.warning('*****' + msg + '*****') | |||
if verbose_flg: print "[skel] %s" % msg | |||
#-------------------------------------------------------------------------- | |||
def init_logging(fname=None): | |||
global lf | |||
global log | |||
pid = os.getpid() | |||
if debug_level > 0: print "[skel] My PID is %d" % pid | |||
if not fname: | |||
fname = '%s/%s.log' % (LOG_DIR, 'skel') | |||
log = logging.getLogger('skel') | |||
hdlr = logging.FileHandler(fname) | |||
fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s') | |||
hdlr.setFormatter(fmtr) | |||
log.addHandler(hdlr) | |||
log.setLevel(logging.INFO) | |||
INFO("===== Started processing %s" % ('=' * 50)) | |||
cnt = 0 | |||
#========================================================================= | |||
def the_time(): | |||
t = float(ref_time(False)) * 0.001 | |||
return t | |||
#------------------------------------------------------------------------------ | |||
t_reference = None | |||
def ref_time(flg): | |||
global t_reference | |||
t_now = datetime.now() | |||
if (flg): | |||
t_reference = t_now | |||
t = 0 | |||
else: | |||
t_delta = t_now - t_reference | |||
t = ((t_delta.seconds * 1000000) + t_delta.microseconds)/1000.0 | |||
return t | |||
#========================================================================= | |||
# And here is the real work... | |||
def csv_reader(fname): | |||
csv_data = [] | |||
f_in = open(fname, "rb") | |||
reader = csv.reader(f_in) | |||
cnt = 0 | |||
total = 0 | |||
for row in reader: | |||
cnt += 1 | |||
if cnt < 2: continue # Skip headings | |||
data = Data(row) | |||
if data.TotalCount < 1000: continue | |||
total += data.TotalCount | |||
# print data | |||
csv_data.append(data) | |||
f_in.close() # Explicitly close the file *NOW* | |||
no_lines = len(csv_data) | |||
print "Read %d data items..." % no_lines | |||
print " -> Total data %d" % total | |||
return csv_data | |||
#------------------------------------------------------------------------------ | |||
def do_work(fname): | |||
INFO("[do_work]") | |||
fname_in = "%s.log" % fname | |||
fname_out = "%s.dat" % fname | |||
try: | |||
f_in = open(fname_in, 'r') | |||
except IOError, msg: | |||
sys.stderr.write(fname_in + ': cannot open: ' + `msg` + '\n') | |||
sys.exit(1) | |||
try: | |||
f_out = open(fname_out, 'a+') | |||
except IOError, msg: | |||
sys.stderr.write(fname_out + ': cannot open: ' + `msg` + '\n') | |||
sys.exit(1) | |||
while True: | |||
line = f_in.readline() | |||
if not line: break | |||
# Truncate EoL markers from end of line | |||
line = p_crlf.sub('', line) # or 'line = line[:-1]' | |||
data = Data(line) | |||
f_out.write("[%s]\n" % (line, )) | |||
f_in.close() | |||
f_out.close() | |||
#========================================================================= | |||
def usage(): | |||
print __doc__ | |||
#------------------------------------------------------------------------- | |||
def main(argv): | |||
global verbose_flg | |||
global debug_level | |||
global target | |||
global home_dir | |||
try: | |||
home_dir = os.environ['HOME'] | |||
except: | |||
print "Set HOME environment variable and re-run" | |||
sys.exit(0) | |||
Modes = Enum(["Info", "Parse", ]) | |||
mode = Modes.Info | |||
filename = "test" | |||
try: | |||
opts, args = getopt.getopt(argv, "dD:f:hvV?", | |||
("debug", "debug-level=", "file=", "help", "verbose", "version")) | |||
except getopt.error, msg: | |||
usage() | |||
return 1 | |||
for opt, arg in opts: | |||
if opt in ("-?", "-h", "--help"): | |||
usage() | |||
return 0 | |||
elif opt in ('-d', '--debug'): | |||
debug_level += 1 | |||
elif opt in ('-D', '--debug-level'): | |||
debug_level = int(arg) | |||
elif opt in ('-f', '--file'): | |||
mode = Modes.Parse | |||
filename = arg | |||
elif opt in ('-v', '--verbose'): | |||
verbose_flg = True | |||
elif opt in ('-v', '--version'): | |||
print "[skel] Version: %s" % __version__ | |||
return 1 | |||
else: | |||
usage() | |||
return 1 | |||
sys.stderr.write("[skel] Working directory is %s\n" % os.getcwd()) | |||
if (debug_level > 0): sys.stderr.write("[skel] Debugging level set to %d\n" % debug_level) | |||
sys.stderr.flush() | |||
init_logging() | |||
if mode == Modes.Info: | |||
INFO('Info') | |||
elif mode == Modes.Parse: | |||
INFO('Parsing') | |||
do_work(filename) | |||
else: | |||
INFO('Nothing to do') | |||
return 0 | |||
#-------------------------------------------------------------------------- | |||
if __name__ == '__main__' or __name__ == sys.argv[0]: | |||
try: | |||
sys.exit(main(sys.argv[1:])) | |||
except KeyboardInterrupt, e: | |||
print "[skel] Interrupted!" | |||
#-------------------------------------------------------------------------- | |||
""" | |||
Revision History: | |||
Date Who Description | |||
-------- --- ------------------------------------------------------------ | |||
20031014 plh Initial implementation | |||
20111101 plh Add in Enums for modal behaviour | |||
Problems to fix: | |||
To Do: | |||
Issues: | |||
""" | |||
</pre> | |||
=Version Incorporating Python Logging= | =Version Incorporating Python Logging= | ||
Line 10: | Line 367: | ||
# Purpose: Skeleton Python script | # Purpose: Skeleton Python script | ||
# | # | ||
# | # | ||
#------------------------------------------------------------------------------- | #------------------------------------------------------------------------------- | ||
Line 193: | Line 549: | ||
<pre> | <pre> | ||
#!/usr/bin/env python | #!/usr/bin/env python | ||
# | # | ||
# @(#) [1.0.01] skel.py 2006-03-07 | # @(#) [1.0.01] skel.py 2006-03-07 | ||
Line 343: | Line 685: | ||
</pre> | </pre> | ||
= | =A Really Old Version= | ||
<pre> | <pre> | ||
#!/usr/bin/env python | #!/usr/bin/env python | ||
# | # | ||
# | # | ||
# | # Purpose: Skeleton python script | ||
# | # | ||
# | # | ||
Line 384: | Line 717: | ||
import getopt | import getopt | ||
import string | import string | ||
import | import random | ||
#--------------------------------------------------------------------- | #--------------------------------------------------------------------- | ||
Line 458: | Line 791: | ||
""" | """ | ||
</pre> | </pre> | ||
[[Category:Python]] | |||
[[Category:Examples]] |
Latest revision as of 09:18, 13 March 2013
Intro
Some skeleton Python scripts...
Also check out - Python Skeleton Scripts - and - Implementing a Standard Library
Using Enums
#!/usr/bin/env python27 # # @(#) [1.0.01] skel.py 2012-02-04 # # # NAME # skel.py - Skeleton python script # # SYNOPSIS # skel.py [-dv] # # PARAMETERS # See __doc__ below # # DESCRIPTION # ... # # RETURNS # 0 for successful completion, 1 for any error # # FILES # ... # #-------------------------------------------------------------------------- """ Usage: $ skel.py [-dv] -apn 10y $ skel.py [-dv] -o # ... Parameters: -a ... -p ... -n 10 No of ... -o ... -d Debug -v Verbose """ #-------------------------------------------------------------------------- import os import re import sys import time import getopt import random import pickle import pprint import logging import urllib from datetime import datetime #-------------------------------------------------------------------------- __version__ = "1.2.0" __id__ = "@(#) skel.py [%s] 2011-11-01" verbose_flg = False debug_level = 0 table_name = "people" DSERVER_PORT = 9570 MAX_REQUESTS = 200 lf = None log = None targets = { 'dev' : { 'host': 'ldap-dev', 'port' : 2389, 'type' : 'OracleOID' }, 'prod' : { 'host': 'openldap', 'port' : 2389, 'type' : 'OpenLDAP' }, } LOG_DIR = "/tmp" home_dir = None p_crlf = re.compile(r'[\r\n]*') pp = pprint.PrettyPrinter(indent=3) #========================================================================== class Enum(set): pass #-------------------------------------------------------------------- def __getattr__(self, name): if name in self: return name raise AttributeError #========================================================================== class Data: TotalCount = 0 #-------------------------------------------------------------------- @classmethod def count_row(cls): cls.TotalCount += 1 #-------------------------------------------------------------------- def __init__(self, row): Data.count_row() cols = row.split(',') self.One = cols[0] self.Two = cols[1] #========================================================================== def INFO(msg): if log: log.info(' ' + msg) if verbose_flg: print "[skel] %s" % msg #-------------------------------------------------------------------------- def ERROR(msg): if log: log.error(msg) sys.stderr.write('[skel] %s\n' % msg) #-------------------------------------------------------------------------- def WARNING(msg): if log: log.warning('*****' + msg + '*****') if verbose_flg: print "[skel] %s" % msg #-------------------------------------------------------------------------- def init_logging(fname=None): global lf global log pid = os.getpid() if debug_level > 0: print "[skel] My PID is %d" % pid if not fname: fname = '%s/%s.log' % (LOG_DIR, 'skel') log = logging.getLogger('skel') hdlr = logging.FileHandler(fname) fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s') hdlr.setFormatter(fmtr) log.addHandler(hdlr) log.setLevel(logging.INFO) INFO("===== Started processing %s" % ('=' * 50)) cnt = 0 #========================================================================= def the_time(): t = float(ref_time(False)) * 0.001 return t #------------------------------------------------------------------------------ t_reference = None def ref_time(flg): global t_reference t_now = datetime.now() if (flg): t_reference = t_now t = 0 else: t_delta = t_now - t_reference t = ((t_delta.seconds * 1000000) + t_delta.microseconds)/1000.0 return t #========================================================================= # And here is the real work... def csv_reader(fname): csv_data = [] f_in = open(fname, "rb") reader = csv.reader(f_in) cnt = 0 total = 0 for row in reader: cnt += 1 if cnt < 2: continue # Skip headings data = Data(row) if data.TotalCount < 1000: continue total += data.TotalCount # print data csv_data.append(data) f_in.close() # Explicitly close the file *NOW* no_lines = len(csv_data) print "Read %d data items..." % no_lines print " -> Total data %d" % total return csv_data #------------------------------------------------------------------------------ def do_work(fname): INFO("[do_work]") fname_in = "%s.log" % fname fname_out = "%s.dat" % fname try: f_in = open(fname_in, 'r') except IOError, msg: sys.stderr.write(fname_in + ': cannot open: ' + `msg` + '\n') sys.exit(1) try: f_out = open(fname_out, 'a+') except IOError, msg: sys.stderr.write(fname_out + ': cannot open: ' + `msg` + '\n') sys.exit(1) while True: line = f_in.readline() if not line: break # Truncate EoL markers from end of line line = p_crlf.sub('', line) # or 'line = line[:-1]' data = Data(line) f_out.write("[%s]\n" % (line, )) f_in.close() f_out.close() #========================================================================= def usage(): print __doc__ #------------------------------------------------------------------------- def main(argv): global verbose_flg global debug_level global target global home_dir try: home_dir = os.environ['HOME'] except: print "Set HOME environment variable and re-run" sys.exit(0) Modes = Enum(["Info", "Parse", ]) mode = Modes.Info filename = "test" try: opts, args = getopt.getopt(argv, "dD:f:hvV?", ("debug", "debug-level=", "file=", "help", "verbose", "version")) except getopt.error, msg: usage() return 1 for opt, arg in opts: if opt in ("-?", "-h", "--help"): usage() return 0 elif opt in ('-d', '--debug'): debug_level += 1 elif opt in ('-D', '--debug-level'): debug_level = int(arg) elif opt in ('-f', '--file'): mode = Modes.Parse filename = arg elif opt in ('-v', '--verbose'): verbose_flg = True elif opt in ('-v', '--version'): print "[skel] Version: %s" % __version__ return 1 else: usage() return 1 sys.stderr.write("[skel] Working directory is %s\n" % os.getcwd()) if (debug_level > 0): sys.stderr.write("[skel] Debugging level set to %d\n" % debug_level) sys.stderr.flush() init_logging() if mode == Modes.Info: INFO('Info') elif mode == Modes.Parse: INFO('Parsing') do_work(filename) else: INFO('Nothing to do') return 0 #-------------------------------------------------------------------------- if __name__ == '__main__' or __name__ == sys.argv[0]: try: sys.exit(main(sys.argv[1:])) except KeyboardInterrupt, e: print "[skel] Interrupted!" #-------------------------------------------------------------------------- """ Revision History: Date Who Description -------- --- ------------------------------------------------------------ 20031014 plh Initial implementation 20111101 plh Add in Enums for modal behaviour Problems to fix: To Do: Issues: """
Version Incorporating Python Logging
#!/usr/bin/env python # # Purpose: Skeleton Python script # # #------------------------------------------------------------------------------- """ Skeleton Python script ... """ #------------------------------------------------------------------------------- import os import re import sys import getopt import logging #------------------------------------------------------------------------------- from datetime import datetime #------------------------------------------------------------------------------- __version__ = "1.0.0" __id__ = "@(#) skel.py [%s] 2008-05-03" debug_level = 0 verbose_flg = False LOGFILE = "log/skel.log" PIDFILE = "DATA/skel.pid" tables = [] log = None pid = None #=============================================================================== def INFO(msg): if log: log.info(' ' + msg) if verbose_flg: print "[skel] %s" % msg #------------------------------------------------------------------------------- def ERROR(msg): if log: log.error(msg) sys.stderr.write('[skel] %s\n' % msg) #------------------------------------------------------------------------------- def WARNING(msg): if log: log.warning('*****' + msg + '*****') if verbose_flg: print "[skel] %s" % msg #=============================================================================== def init(): global log global pid pid = os.getpid() log = logging.getLogger('skel') hdlr = logging.FileHandler(LOGFILE) fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s') hdlr.setFormatter(fmtr) log.addHandler(hdlr) log.setLevel(logging.INFO) INFO("Started processing with PID %d" % pid) if (not verbose_flg): INFO("PID is %d" % pid) #=============================================================================== def the_time(): t = float(ref_time(False)) * 0.001 return t #------------------------------------------------------------------------------ t_reference = None def ref_time(flg): global t_reference t_now = datetime.now() if (flg): t_reference = t_now t = 0 else: t_delta = t_now - t_reference t = ((t_delta.seconds * 1000000) + t_delta.microseconds)/1000.0 return t #=============================================================================== def main(): global verbose_flg global debug_level try: opts, args = getopt.getopt(sys.argv[1:], "dD:vVw?") except getopt.error, msg: print __doc__ return 1 try: terminal_type = os.environ["TERM"] except KeyError, e: print "Set TERM environment variable and rerun!" return 1 wrk_path = os.getcwd() wrk_dir = os.path.basename(wrk_path) data_dir = wrk_dir + '/DATA/' pid_path = data_dir + PIDFILE # os.chdir(data_dir) for o, a in opts: if o == '-d': debug_level += 1 elif o == '-D': debug_level = int(a) elif o == '-v': verbose_flg = True elif o == '-V': print "[skel] Version: %s" % __version__ return 1 elif o == '-?': print __doc__ return 1 print "[skel] Working directory is %s" % os.getcwd() if (debug_level > 0): print "Debugging level set to %d" % debug_level if args: for arg in args: print arg init() return 0 #------------------------------------------------------------------------------- if __name__ == '__main__' or __name__ == sys.argv[0]: try: sys.exit(main()) except KeyboardInterrupt, e: print "[skel] Interrupted!" #------------------------------------------------------------------------------- """ Revision History: Date Who Description -------- --- ------------------------------------------------------------ 20031014 plh Initial implementation Problems to fix: To Do: Issues: """
Version Incorporating Dates
#!/usr/bin/env python # # @(#) [1.0.01] skel.py 2006-03-07 # # # NAME # skel.py - Sample script # # SYNOPSIS # skel.py [-dv] [-D <DATE>|-S <STARTDATE>|-E <ENDDATE>] # # PARAMETERS # -D <DATE> Date (in ISO format # -d Debug # -v Verbose # # DESCRIPTION # ... # # RETURNS # 0 for successful completion, 1 for any error # # FILES # ... # # $Id:$ # #--------------------------------------------------------------------- """ ... """ #--------------------------------------------------------------------- import sys import getopt #--------------------------------------------------------------------- debug_level = 0 verbose_flg = True #--------------------------------------------------------------------- def is_weekday(dow): if ((dow >= 0) and (dow <=4)): return 1 else: return 0 #--------------------------------------------------------------------- def to_datetime(date): date = int(date) mday = date % 100 month = ( date % 10000) / 100 year = date / 10000 return datetime.date(day=mday, month=month, year=year) #--------------------------------------------------------------------- def main(): global debug_flg global verbose_flg if None: try: userId = os.environ['USER'] except: print "***** USER environment variable not set" # sys.exit(0) try: opts, args = getopt.getopt(sys.argv[1:], "dD:E:S:vVY?") except getopt.error, msg: print __doc__ return 1 date = None start_date = None end_date = None for o, a in opts: if o == '-?': print __doc__ return 1 elif o == '-d': debug_level += 1 elif o == '-D': date = to_datetime(a) elif o == '-E': end_date = to_datetime(a) elif o == '-S': start_date = to_datetime(a) elif o == '-Y': date = datetime.date.today() - datetime.timedelta(days=1) elif o == '-v': verbose_flg = True elif o == '-V': print "Version: %s" % __version__ return 1 else: return 1 return 1 #--------------------------------------------------------------------- if __name__ == '__main__' or __name__ == sys.argv[0]: try: sys.exit(main()) except KeyboardInterrupt, e: print "[skel] Interrupted!" #--------------------------------------------------------------------- """ Revision History: Date Who Description -------- --- -------------------------------------------------- 20030920 plh Initial implementation 20031002 plh Cleaned up args in main(). Added '-h', '-?', '-V' Problems to fix: To Do: Issues: """
A Really Old Version
#!/usr/bin/env python # # # Purpose: Skeleton python script # # # Copyright (C) Peter Harding, 2003 # All rights reserved # # $Id: skel.py,v 1.1.1.1 2003/10/22 11:57:35 zyx Exp $ # #--------------------------------------------------------------------- """ Skeleton Python Framework Usage: skel.py -g <no> -v The '-g <no>' option is used to specify the number of groups. The '-v' turns on verbose mode. """ #--------------------------------------------------------------------- import os import sys import getopt import string import random #--------------------------------------------------------------------- __version__ = "1.0.0" wrk = None no = None #--------------------------------------------------------------------- def process(): pass #--------------------------------------------------------------------- def main(): global debugFlg global no global verboseFlg try: opts, args = getopt.getopt(sys.argv[1:], "dg:vV?") except getopt.error, msg: print __doc__, return 1 for o, a in opts: if o == '-d': debugFlg = 1 elif o == '-g': no = int(a) elif o == '-v': verboseFlg = 1 elif o == '-V': print "Version: %s" % __version__ return 0 elif o == '-?': print __doc__() return 0 if args: for arg in args: process arg global wrk wrk = os.getcwd() process() #--------------------------------------------------------------------- if __name__ == '__main__' or __name__ == sys.argv[0]: sys.exit(main()) #--------------------------------------------------------------------- """ Revision History: Date Who Description -------- --- -------------------------------------------------- 20030920 plh Initial implementation 20031002 plh Cleaned up args in main(). Added '-h', '-?', '-V' Problems to fix: To Do: Issues: """