Difference between revisions of "Data Server Variants"
Jump to navigation
Jump to search
PeterHarding (talk | contribs) (New page: =1.1.3= <pre> #!/usr/bin/env python # # Purpose: Threaded data server implementation # # $Id:$ # #--------------------------------------------------------------------- """ Threaded s...) |
PeterHarding (talk | contribs) |
||
Line 934: | Line 934: | ||
= | =Other= | ||
<pre> | <pre> | ||
Line 1,879: | Line 1,879: | ||
</pre> | </pre> | ||
= | =Old Versions= | ||
==1.1.5== | ==1.1.5== | ||
Line 4,367: | Line 4,367: | ||
dvstst.py -> tst.py | dvstst.py -> tst.py | ||
dcl.py -> client.py | dcl.py -> client.py | ||
</pre> | </pre> |
Latest revision as of 12:53, 2 April 2009
1.1.3
#!/usr/bin/env python # # Purpose: Threaded data server implementation # # $Id:$ # #--------------------------------------------------------------------- """ Threaded server model Server side: open a socket on a port, listen for a message from a client, and accept a request and service it. The server spawns a thread to handle each client connection. Threads share global memory space with main thread; This is more portable than fork -- not yet on Windows; This version has been extended to use the standard Python logging module. Add the delimiter to the INI file to allow use of alternate delimiters in transmitted data - so data with embedded commas can be used. """ #--------------------------------------------------------------------- import os import csv import sys import getopt import thread import time import signal import logging #--------------------------------------------------------------------- from socket import * # get socket constructor and constants from datetime import datetime #--------------------------------------------------------------------- __version__ = "1.1.3" __id__ = "@(#) dserver.py [%s] 30/04/2008" check_flg = False daemon_flg = False silent_flg = False terminate_flg = False verbose_flg = False wait_flg = False debug_level = 0 HOST = '' # Host server - '' means localhost PORT = 9578 # Listen on a non-reserved port number sockobj = None dserver_dir = None data_dir = None pid_path = None CONFIGFILE = "dserver.ini" LOGFILE = "dserver.log" PIDFILE = "dserver.pid" tables = [] INVALID = "INVALID" log = None #===================================================================== class Group: Name = None Idx = None Data = None def __init__(self, name): self.Name = name self.Idx = 0 self.Data = [] def __str__(self): s = "Grp %s Len %d" % (self.Name, len(self.Data)) return s def append(self, s): self.Data.append(s) def set(self): if len(self.Data) > 0: self.Idx = 0 else: self.Idx = -1 #--------------------------------------------------------------------- class Table: Count = 0 Valid = False Name = None Type = None Idx = None Data = None def __init__(self, name, type, delimiter=','): self.Name = name self.Type = type self.Delimiter = delimiter self.File = name + ".dat" self.Used = name + ".used" self.Stored = name + ".stored" if self.Type == "CSV": rc = self.read_csv() elif self.Type == "Sequence": rc = self.read_sequence() elif self.Type == "Indexed": rc = self.read_indexed() elif self.Type == "Keyed": rc = self.read_keyed() if rc > 0: self.Valid = True try: self.ufh = open(self.Used, 'a+') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) try: self.sfh = open(self.Stored, 'a+') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) #------------------------------------------------------------------ def __str__(self): s = "Table: %-10s Type: %-10s" % (self.Name, self.Type) if self.Valid: s += " * " if self.Type == "CSV": s += " %d rows" % len(self.Data) elif self.Type == "Sequence": s += " Starting value %d" % self.Data elif self.Type == "Indexed": s += " %d rows" % len(self.Data) elif self.Type == "Keyed": s += " %d groups" % len(self.Data) else: s += " " return s #------------------------------------------------------------------ def read_csv(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) self.Data = [] while True: line = f.readline() if not line: break line = line.strip() self.Data.append(line) f.close() self.Idx = 0 if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def read_sequence(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) while True: line = f.readline() if not line: break line = line.strip() try: no = int(line) except: no = 0 self.Data = no f.close() return 1 #------------------------------------------------------------------ def read_keyed(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) groupName = None group = None self.Data = {} while True: line = f.readline() if not line: break line = line.strip() if (line.find("[") != -1): group_name = line.replace('[','').replace(']','') group = Group(group_name) self.Data[group_name] = group continue elif (line.find("#") != -1): continue elif (len(line) == 0): continue else: group.append(line) f.close() if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def read_indexed(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) self.Data = {} while True: line = f.readline() if not line: break line = line.strip() (no, data) = line.split(':') self.Data[no] = data f.close() if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def flush(self): if not self.Valid: return ts = datetime.now().strftime('%Y%m%d%H%M%S') self.BackupCmd = "cp %s.dat %s.%s" % (self.Name, self.Name, ts) if self.Type == "CSV": self.flush_csv() elif self.Type == "Sequence": self.flush_sequence() elif self.Type == "Indexed": self.flush_indexed() elif self.Type == "Keyed": self.flush_keyed() #------------------------------------------------------------------ def flush_csv(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') return 0 i = self.Idx while i < len(self.Data): f.write("%s\n" % self.Data[i]) i += 1 f.close() #------------------------------------------------------------------ def flush_sequence(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') return 0 f.write("%d\n" % self.Data) f.close() #------------------------------------------------------------------ def flush_keyed(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') return 0 group_keys = self.Data.keys() group_keys.sort() for key in group_keys: f.write("[%s]\n" % key) group = self.Data[key] i = group.Idx while i < len(group.Data): f.write("%s\n" % group.Data[i]) i += 1 f.write("\n") f.close() #------------------------------------------------------------------ def flush_indexed(self): pass #===================================================================== def INFO(msg): if log: log.info(' ' + msg) if verbose_flg: print "[dserver] %s" % msg #--------------------------------------------------------------------- def ERROR(msg): if log: log.error(msg) sys.stderr.write('[dserver] %s\n' % msg) #--------------------------------------------------------------------- def WARNING(msg): if log: log.warning('*****' + msg + '*****') if verbose_flg: print "[dserver] %s" % msg #===================================================================== def read_config(): global PORT config_file = data_dir + CONFIGFILE try: f = open(config_file, 'r') except IOError, e: ERROR('Open failed: ' + str(e)) sys.exit(1) config_flg = False definition_flg = False while True: line = f.readline() if not line: break line = line[:-1] line = line.replace('\r','') line = line.strip() if (line.find("#") != -1): continue if (line.find("[Config]") != -1): config_flg = True elif (line.find("Port=") != -1): definition = line.split("=") PORT = definition[1] if (line.find("[Data]") != -1): definition_flg = True elif (line.find("Description=") != -1): definition = line.split("=") (name, type, delimiter) = definition[1].split(":") t = Table(name, type, delimiter) INFO(str(t)) tables.append(t) f.close() #--------------------------------------------------------------------- def get_table_index(name): for i in range(len(tables)): if (tables[i].Name == name): return i return -1 #--------------------------------------------------------------------- def process(str): msg = str.split("|") l = len(msg) if debug_level > 1: INFO("[dserver::process] len %d msg %s" % (l, msg)) ts = datetime.now().strftime('%Y%m%d%H%M%S') reply = "None" if (msg[0] == "REG"): name = msg[1].replace('\n','').replace('\r','') idx = get_table_index(name) if debug_level > 0: INFO("[dserver::process] REG '%s' -> %d" % (name, idx)) reply = "%d" % idx elif (msg[0] == "REGK"): if (len(msg) != 3): ERROR("[dserver::process] REGK -> Bad Message", msg) elif (msg[0] == "REGI"): if (len(msg) != 2): ERROR("[dserver::process] REGI -> Bad Message", msg) elif (msg[0] == "GETN"): if (len(msg) != 2): ERROR("[dserver::process] GETN -> Bad Message", msg) hdl = int(msg[1]) try: t = tables[hdl] except: t = None if t != None: if t.Type == 'CSV': if (t.Idx < len(t.Data)): reply = t.Data[t.Idx] t.Idx += 1 else: reply = "*Exhausted*" elif t.Type == "Sequence": reply = "%d" % t.Data t.Data += 1 else: reply = "UNKNOWN" t.ufh.write("%s - %s\n" % (ts, reply)) if debug_level > 2: INFO("[dserver::process] GETN -> %s" % reply) elif (msg[0] == "GETK"): if (len(msg) != 3): ERROR("[dserver::process] GETK -> Bad Message", msg) hdl = int(msg[1]) grp = msg[2] try: t = tables[hdl] except: t = None if t != None: try: g = t.Data[grp] except: g = None if g != None: if (g.Idx < len(g.Data)): reply = g.Data[g.Idx] g.Idx += 1 else: reply = "*Exhausted*" t.ufh.write("%s - %s::%s\n" % (ts, grp, reply)) if debug_level > 2: INFO("[dserver::process] GETK %s -> %s" % (grp, reply)) elif (msg[0] == "GETI"): if (len(msg) != 3): ERROR("[dserver::process] GETI -> Bad Message", msg) hdl = int(msg[1]) idx = msg[2] try: t = tables[hdl] except: t = None if t != None: try: reply = t.Data[idx] except: reply = "UNDEFINED" t.ufh.write("%s - %s::%s\n" % (ts, idx, reply)) if debug_level > 2: INFO("[dserver::process] GETI %s -> %s" % (idx, reply)) elif (msg[0] == "STOC"): if (len(msg) != 3): ERROR("[dserver::process] STOC -> Bad Message", msg) hdl = int(msg[1]) data = msg[2] reply = "0" try: t = tables[hdl] except: t = None if t != None: t.Data.append(data) t.sfh.write("%s - %s\n" % (ts, data)) t.sfh.flush() if debug_level > 1: INFO("STOC %s" % data) reply = "1" if debug_level > 2: INFO("[dserver::process] STOC %s -> %s" % (data, reply)) elif (msg[0] == "STOK"): if (len(msg) != 4): ERROR("[dserver::process] STOK -> Bad Message", msg) hdl = int(msg[1]) grp = msg[2] data = msg[3] reply = "0" try: t = tables[hdl] except: t = None if t != None: if t.Data.has_key(grp): g = t.Data[grp] else: g = Group(grp) t.Data[grp] = g if g != None: g.Data.append(data) if debug_level > 1: INFO("STOK %s %s" % (grp, data)) t.sfh.write("%s - %s::%s\n" % (ts, grp, data)) reply = "1" if debug_level > 2: INFO("[dserver::process] STOK %s %s -> %s" % (grp, data, reply)) return reply #--------------------------------------------------------------------- def sig_term(signum, frame): "SIGTERM handler" shutdown() #--------------------------------------------------------------------- def shutdown(): INFO("Server shutdown at %s" % datetime.now()) for i in range(len(tables)): tables[i].flush() try: os.unlink(pid_path) except IOError, e: ERROR('Unlink failed: ' + str(e)) sys.exit(1) sys.exit(0) #--------------------------------------------------------------------- def check_running(): try: pfp = open(pid_path, 'r') except IOError, (errno, strerror): pfp = None # ERROR("I/O error(%s): %s" % (errno, strerror)) except: ERROR("Unexpected error:", sys.exc_info()[0]) raise if pfp: line = pfp.readline() line = line.strip() dserver_pid = int(line) noProcess = 0 try: os.kill(dserver_pid, 0) except OSError, e: if e.errno == 3: noProcess = 1 else: ERROR("kill() failed:" + str(e)) sys.exit(0) if noProcess: INFO("[dserver] Stale dserver pid file!") pfp.close() os.unlink(pid_path) return None else: pfp.close() return dserver_pid return dserver_pid else: return None #--------------------------------------------------------------------- def create_pidfile(): pid = os.getpid() try: pfp = open(pid_path, 'w') except IOError, e: ERROR("Open failed - " + str(e)) sys.exit(0) pfp.write("%d" % pid) pfp.close() INFO("Running server with PID -> %d" % pid) return pid #--------------------------------------------------------------------- def become_daemon(): pid = os.fork() if pid == 0: # In child pid = create_pidfile() time.sleep(1) elif pid == -1: # Should not happen! ERROR("fork() failed!") time.sleep(1) sys.exit(0) else: # In Parent time.sleep(1) sys.exit(0) time.sleep(2) os.setsid() return pid #--------------------------------------------------------------------- def init(): pid = check_running() if pid: print "[dserver] Server already running! (pid = %d)" % pid sys.exit(0) if daemon_flg: pid = become_daemon() else: pid = create_pidfile() global log log = logging.getLogger('dserver') hdlr = logging.FileHandler(LOGFILE) fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s') hdlr.setFormatter(fmtr) log.addHandler(hdlr) log.setLevel(logging.INFO) INFO("Started processing") read_config() if (not silent_flg): INFO("Server PID is %d" % pid) #--------------------------------------------------------------------- def terminate(): dserver_pid = check_running() if dserver_pid: if (not silent_flg): INFO("Terminating server with pid, %d" % dserver_pid) os.kill(dserver_pid, signal.SIGTERM) if (wait_flg): while True: try: kill(dserver_pid, 0) except OSError, e: if e.errno == 3: break else: ERROR("kill() failed:" + str(e)) sys.exit(0) time.sleep(1) return 0 #--------------------------------------------------------------------- def check(): pid = check_running() if pid: print "[dserver] Server already running! (pid = %d)" % pid sys.exit(0) else: print "[dserver] Server not running" #==== Socket Server ================================================== def init_connection(): global sockobj sockobj = socket(AF_INET, SOCK_STREAM) # make a TCP socket object sockobj.bind((HOST, PORT)) # bind it to server port number sockobj.listen(10) # allow upto 10 pending connects #--------------------------------------------------------------------- def handle_client(connection): # in spawned thread: reply while True: # read, write a client socket try: request = connection.recv(1024) except: break if debug_level > 0: INFO('[dserver] Request -> "%s"' % request) if not request: break reply = process(request) if debug_level > 0: INFO('[dserver] Reply -> "%s..."' % reply[0:30]) connection.send(reply) connection.close() #--------------------------------------------------------------------- def dispatcher(): while True: # Wait for next connection, connection, address = sockobj.accept() INFO('Host (%s) - Connected at %s' % (address[0], datetime.now())) thread.start_new(handle_client, (connection,)) #===================================================================== def main(): global check_flg global daemon_flg global terminate_flg global verbose_flg global wait_flg global debug_level global dserver_dir global data_dir global pid_path try: opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?") except getopt.error, msg: print __doc__ return 1 try: dserver_dir = os.environ["DSERVER_DIR"] except KeyError, e: print "Set DSERVER_DIR environment variable and rerun!" return 1 wrk_path = os.getcwd() wrk_dir = os.path.basename(wrk_path) # data_dir = dserver_dir + '/DATA/' data_dir = wrk_path + '/DATA/' pid_path = data_dir + PIDFILE os.chdir(data_dir) for o, a in opts: if o == '-d': debug_level += 1 elif o == '-c': check_flg = True elif o == '-D': daemon_flg = True elif o == '-s': tsilent_flg = True elif o == '-T': terminate_flg = True elif o == '-v': verbose_flg = True elif o == '-V': print "[dserver] Version: %s" % __version__ return 1 elif o == '-w': wait_flg = True elif o == '-?': print __doc__ return 1 print "[dserver] Listening on port %s - running from %s" % (PORT, os.getcwd()) if check_flg: check() return 0 if terminate_flg: terminate() return 0 if (debug_level > 0): print "Debugging level set to %d" % debug_level if args: for arg in args: print arg signal.signal(signal.SIGTERM, sig_term) init() init_connection() dispatcher() return 0 #--------------------------------------------------------------------- if __name__ == '__main__' or __name__ == sys.argv[0]: try: sys.exit(main()) except KeyboardInterrupt, e: print "[dserver] Interrupted!" shutdown() #--------------------------------------------------------------------- """ Revision History: Date Who Description -------- --- -------------------------------------------------- 20031014 plh Initial implementation Problems to fix: To Do: Issues: """
Other
#!/usr/bin/env python # # Purpose: Threaded data server implementation # # $Id:$ # #--------------------------------------------------------------------- """ Threaded server model Server side: open a socket on a port, listen for a message from a client, and accept a request and service it. The server spawns a thread to handle each client connection. Threads share global memory space with main thread; This is more portable than fork -- not yet on Windows; This version has been extended to use the standard Python logging module. Add the delimiter to the INI file to allow use of alternate delimiters in transmitted data - so data with embedded commas can be used. """ #--------------------------------------------------------------------- import os import re import csv import sys import getopt import thread import time import signal import logging #--------------------------------------------------------------------- from socket import * # get socket constructor and constants from datetime import datetime #--------------------------------------------------------------------- __version__ = "1.1.3" __id__ = "@(#) dserver.py [%s] 30/04/2008" check_flg = False daemon_flg = False silent_flg = False terminate_flg = False verbose_flg = False wait_flg = False debug_level = 0 HOST = '' # Host server - '' means localhost PORT = 9575 # Listen on a non-reserved port number sockobj = None dserver_dir = None data_dir = None pid_path = None CONFIGFILE = "dserver.ini" LOGFILE = "dserver.log" PIDFILE = "dserver.pid" tables = [] INVALID = "INVALID" log = None #===================================================================== class Group: Name = None Idx = None Data = None def __init__(self, name): self.Name = name self.Idx = 0 self.Data = [] def __str__(self): s = "Grp %s Len %d" % (self.Name, len(self.Data)) return s def append(self, s): self.Data.append(s) def set(self): if len(self.Data) > 0: self.Idx = 0 else: self.Idx = -1 #--------------------------------------------------------------------- class Table: Count = 0 Valid = False Name = None Type = None Idx = None Data = None def __init__(self, name, type, delimiter=','): self.Name = name self.Type = type self.Delimiter = delimiter self.File = name + ".dat" self.Used = name + ".used" self.Stored = name + ".stored" if self.Type == "CSV": rc = self.read_csv() elif self.Type == "Sequence": rc = self.read_sequence() elif self.Type == "Indexed": rc = self.read_indexed() elif self.Type == "Keyed": rc = self.read_keyed() if rc > 0: self.Valid = True try: self.ufh = open(self.Used, 'a+') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) try: self.sfh = open(self.Stored, 'a+') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) #------------------------------------------------------------------ def __str__(self): s = "Table: %-10s Type: %-10s" % (self.Name, self.Type) if self.Valid: s += " * " if self.Type == "CSV": s += " %d rows" % len(self.Data) elif self.Type == "Sequence": s += " Starting value %d" % self.Data elif self.Type == "Indexed": s += " %d rows" % len(self.Data) elif self.Type == "Keyed": s += " %d groups" % len(self.Data) else: s += " " return s #------------------------------------------------------------------ def read_csv(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) self.Data = [] while True: line = f.readline() if not line: break line = line.strip() self.Data.append(line) f.close() self.Idx = 0 if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def read_sequence(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) while True: line = f.readline() if not line: break line = line.strip() try: no = int(line) except: no = 0 self.Data = no f.close() return 1 #------------------------------------------------------------------ def read_keyed(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) groupName = None group = None self.Data = {} while True: line = f.readline() if not line: break line = line.strip() if (line.find("[") != -1): group_name = line.replace('[','').replace(']','') group = Group(group_name) self.Data[group_name] = group continue elif (line.find("#") != -1): continue elif (len(line) == 0): continue else: group.append(line) f.close() if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def read_indexed(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) self.Data = {} while True: line = f.readline() if not line: break line = line.strip() (no, data) = line.split(':') self.Data[no] = data f.close() if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def flush(self): if not self.Valid: return ts = datetime.now().strftime('%Y%m%d%H%M%S') self.BackupCmd = "cp %s.dat %s.%s" % (self.Name, self.Name, ts) print "Flushing %s" % self.Name if self.Type == "CSV": self.flush_csv() elif self.Type == "Sequence": self.flush_sequence() elif self.Type == "Indexed": self.flush_indexed() elif self.Type == "Keyed": self.flush_keyed() #------------------------------------------------------------------ def flush_csv(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') return 0 i = self.Idx while i < len(self.Data): f.write("%s\n" % self.Data[i]) i += 1 f.close() #------------------------------------------------------------------ def flush_sequence(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') return 0 f.write("%d\n" % self.Data) f.close() #------------------------------------------------------------------ def flush_keyed(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') return 0 group_keys = self.Data.keys() group_keys.sort() for key in group_keys: f.write("[%s]\n" % key) group = self.Data[key] i = group.Idx while i < len(group.Data): f.write("%s\n" % group.Data[i]) i += 1 f.write("\n") f.close() #------------------------------------------------------------------ def flush_indexed(self): pass #===================================================================== def INFO(msg): if log: log.info(' ' + msg) if verbose_flg: print "[dserver] %s" % msg #--------------------------------------------------------------------- def ERROR(msg): if log: log.error(msg) sys.stderr.write('[dserver] %s\n' % msg) #--------------------------------------------------------------------- def WARNING(msg): if log: log.warning('*****' + msg + '*****') if verbose_flg: print "[dserver] %s" % msg #===================================================================== def read_config(): global PORT config_file = data_dir + CONFIGFILE try: f = open(config_file, 'r') except IOError, e: ERROR('Open failed: ' + str(e)) sys.exit(1) config_flg = False definition_flg = False while True: line = f.readline() if not line: break line = line[:-1] line = line.replace('\r','') line = line.strip() if (line.find("#") != -1): continue if (line.find("[Config]") != -1): config_flg = True elif (line.find("Port=") != -1): definition = line.split("=") PORT = int(definition[1].strip()) if (line.find("[Data]") != -1): definition_flg = True elif (line.find("Description=") != -1): definition = line.split("=") (name, type, delimiter) = definition[1].split(":") t = Table(name, type, delimiter) INFO(str(t)) tables.append(t) f.close() #--------------------------------------------------------------------- def get_table_index(name): for i in range(len(tables)): if (tables[i].Name == name): return i return -1 #--------------------------------------------------------------------- def process(str): msg = str.split("|") l = len(msg) if debug_level > 1: INFO("[dserver::process] len %d msg %s" % (l, msg)) ts = datetime.now().strftime('%Y%m%d%H%M%S') reply = "None" if (msg[0] == "REG"): name = msg[1].replace('\n','').replace('\r','') idx = get_table_index(name) if debug_level > 0: INFO("[dserver::process] REG '%s' -> %d" % (name, idx)) reply = "%d" % idx elif (msg[0] == "REGK"): if (len(msg) != 3): ERROR("[dserver::process] REGK -> Bad Message", msg) elif (msg[0] == "REGI"): if (len(msg) != 2): ERROR("[dserver::process] REGI -> Bad Message", msg) elif (msg[0] == "GETN"): if (len(msg) != 2): ERROR("[dserver::process] GETN -> Bad Message", msg) hdl = int(msg[1]) try: t = tables[hdl] except: t = None if t != None: if t.Type == 'CSV': if (t.Idx < len(t.Data)): reply = t.Data[t.Idx] t.Idx += 1 else: reply = "*Exhausted*" elif t.Type == "Sequence": reply = "%d" % t.Data t.Data += 1 else: reply = "UNKNOWN" t.ufh.write("%s - %s\n" % (ts, reply)) if debug_level > 2: INFO("[dserver::process] GETN -> %s" % reply) elif (msg[0] == "GETK"): if (len(msg) != 3): ERROR("[dserver::process] GETK -> Bad Message", msg) hdl = int(msg[1]) grp = msg[2] try: t = tables[hdl] except: t = None if t != None: try: g = t.Data[grp] except: g = None if g != None: if (g.Idx < len(g.Data)): reply = g.Data[g.Idx] reply = re.sub(", *", ",", reply) g.Idx += 1 else: reply = "*Exhausted*" t.ufh.write("%s - %s::%s\n" % (ts, grp, reply)) if debug_level > 2: INFO("[dserver::process] GETK %s -> %s" % (grp, reply)) elif (msg[0] == "GETI"): if (len(msg) != 3): ERROR("[dserver::process] GETI -> Bad Message", msg) hdl = int(msg[1]) idx = msg[2] try: t = tables[hdl] except: t = None if t != None: try: reply = t.Data[idx] except: reply = "UNDEFINED" t.ufh.write("%s - %s::%s\n" % (ts, idx, reply)) if debug_level > 2: INFO("[dserver::process] GETI %s -> %s" % (idx, reply)) elif (msg[0] == "STOC"): if (len(msg) != 3): ERROR("[dserver::process] STOC -> Bad Message", msg) hdl = int(msg[1]) data = msg[2] reply = "0" try: t = tables[hdl] except: t = None if t != None: t.Data.append(data) t.sfh.write("%s - %s\n" % (ts, data)) t.sfh.flush() if debug_level > 1: INFO("STOC %s" % data) reply = "1" if debug_level > 2: INFO("[dserver::process] STOC %s -> %s" % (data, reply)) elif (msg[0] == "STOK"): if (len(msg) != 4): ERROR("[dserver::process] STOK -> Bad Message", msg) hdl = int(msg[1]) grp = msg[2] data = msg[3] reply = "0" try: t = tables[hdl] except: t = None if t != None: if t.Data.has_key(grp): g = t.Data[grp] else: g = Group(grp) t.Data[grp] = g if g != None: g.Data.append(data) if debug_level > 1: INFO("STOK %s %s" % (grp, data)) t.sfh.write("%s - %s::%s\n" % (ts, grp, data)) reply = "1" if debug_level > 2: INFO("[dserver::process] STOK %s %s -> %s" % (grp, data, reply)) return reply #--------------------------------------------------------------------- def sig_term(signum, frame): "SIGTERM handler" shutdown() #--------------------------------------------------------------------- def shutdown(): INFO("Server shutdown at %s" % datetime.now()) print "\n" for i in range(len(tables)): tables[i].flush() print "*SHUTDOWN*" try: os.unlink(pid_path) except IOError, e: ERROR('Unlink failed: ' + str(e)) sys.exit(1) sys.exit(0) #--------------------------------------------------------------------- def check_running(): try: pfp = open(pid_path, 'r') except IOError, (errno, strerror): pfp = None # ERROR("I/O error(%s): %s" % (errno, strerror)) except: ERROR("Unexpected error:", sys.exc_info()[0]) raise if pfp: line = pfp.readline() line = line.strip() dserver_pid = int(line) noProcess = 0 try: os.kill(dserver_pid, 0) except OSError, e: if e.errno == 3: noProcess = 1 else: ERROR("kill() failed:" + str(e)) sys.exit(0) if noProcess: INFO("[dserver] Stale dserver pid file!") pfp.close() os.unlink(pid_path) return None else: pfp.close() return dserver_pid return dserver_pid else: return None #--------------------------------------------------------------------- def create_pidfile(): pid = os.getpid() try: pfp = open(pid_path, 'w') except IOError, e: ERROR("Open failed - " + str(e)) sys.exit(0) pfp.write("%d" % pid) pfp.close() INFO("Running server with PID -> %d" % pid) return pid #--------------------------------------------------------------------- def become_daemon(): pid = os.fork() if pid == 0: # In child pid = create_pidfile() time.sleep(1) elif pid == -1: # Should not happen! ERROR("fork() failed!") time.sleep(1) sys.exit(0) else: # In Parent time.sleep(1) sys.exit(0) time.sleep(2) os.setsid() return pid #--------------------------------------------------------------------- def init(): pid = check_running() if pid: print "[dserver] Server already running! (pid = %d)" % pid sys.exit(0) if daemon_flg: pid = become_daemon() else: pid = create_pidfile() global log log = logging.getLogger('dserver') hdlr = logging.FileHandler(LOGFILE) fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s') hdlr.setFormatter(fmtr) log.addHandler(hdlr) log.setLevel(logging.INFO) INFO("Started processing") read_config() if (not silent_flg): INFO("Server PID is %d" % pid) print "\nData Loaded..." #--------------------------------------------------------------------- def terminate(): dserver_pid = check_running() if dserver_pid: if (not silent_flg): INFO("Terminating server with pid, %d" % dserver_pid) os.kill(dserver_pid, signal.SIGTERM) if (wait_flg): while True: try: kill(dserver_pid, 0) except OSError, e: if e.errno == 3: break else: ERROR("kill() failed:" + str(e)) sys.exit(0) time.sleep(1) return 0 #--------------------------------------------------------------------- def check(): pid = check_running() if pid: print "[dserver] Server already running! (pid = %d)" % pid sys.exit(0) else: print "[dserver] Server not running" #==== Socket Server ================================================== def init_connection(): global sockobj sockobj = socket(AF_INET, SOCK_STREAM) # make a TCP socket object sockobj.bind((HOST, PORT)) # bind it to server port number sockobj.listen(10) # allow upto 10 pending connects #--------------------------------------------------------------------- def handle_client(connection): # in spawned thread: reply while True: # read, write a client socket try: request = connection.recv(1024) except: break if debug_level > 0: INFO('[dserver] Request -> "%s"' % request) if not request: break reply = process(request) if debug_level > 0: INFO('[dserver] Reply -> "%s..."' % reply[0:30]) connection.send(reply) connection.close() #--------------------------------------------------------------------- def dispatcher(): while True: # Wait for next connection, connection, address = sockobj.accept() INFO('Host (%s) - Connected at %s' % (address[0], datetime.now())) thread.start_new(handle_client, (connection,)) #===================================================================== def main(): global check_flg global daemon_flg global terminate_flg global verbose_flg global wait_flg global debug_level global dserver_dir global data_dir global pid_path try: opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?") except getopt.error, msg: print __doc__ return 1 try: dserver_dir = os.environ["DSERVER_DIR"] except KeyError, e: print "Set DSERVER_DIR environment variable and rerun!" return 1 wrk_path = os.getcwd() wrk_dir = os.path.basename(wrk_path) # data_dir = dserver_dir + '/DATA/' data_dir = wrk_path + '/DATA/' pid_path = data_dir + PIDFILE os.chdir(data_dir) for o, a in opts: if o == '-d': debug_level += 1 elif o == '-c': check_flg = True elif o == '-D': daemon_flg = True elif o == '-s': tsilent_flg = True elif o == '-T': terminate_flg = True elif o == '-v': verbose_flg = True elif o == '-V': print "[dserver] Version: %s" % __version__ return 1 elif o == '-w': wait_flg = True elif o == '-?': print __doc__ return 1 print "[dserver] Listening on port %s - running from %s" % (PORT, os.getcwd()) if check_flg: check() return 0 if terminate_flg: terminate() return 0 if (debug_level > 0): print "Debugging level set to %d" % debug_level if args: for arg in args: print arg signal.signal(signal.SIGTERM, sig_term) init() init_connection() dispatcher() return 0 #--------------------------------------------------------------------- if __name__ == '__main__' or __name__ == sys.argv[0]: try: sys.exit(main()) except KeyboardInterrupt, e: print "[dserver] Interrupted!" shutdown() #--------------------------------------------------------------------- """ Revision History: Date Who Description -------- --- -------------------------------------------------- 20031014 plh Initial implementation Problems to fix: To Do: Issues: """
Old Versions
1.1.5
#!/usr/bin/env python # # Purpose: Threaded data server implementation # # $Id:$ # #--------------------------------------------------------------------- """ Threaded server model Server side: open a socket on a port, listen for a message from a client, and accept a request and service it. The server spawns a thread to handle each client connection. Threads share global memory space with main thread; This is more portable than fork -- not yet on Windows; This version has been extended to use the standard Python logging module. Add the delimiter to the INI file to allow use of alternate delimiters in transmitted data - so data with embedded commas can be used. """ #--------------------------------------------------------------------- import os import re import csv import sys import getopt import thread import time import signal import logging #--------------------------------------------------------------------- from socket import * # get socket constructor and constants from datetime import datetime #--------------------------------------------------------------------- __version__ = "1.1.5" __id__ = "@(#) dserver.py [%s] 2008-06-10" % __version__ check_flg = False daemon_flg = False silent_flg = False terminate_flg = False verbose_flg = False wait_flg = False debug_level = 0 HOST = '' # Host server - '' means localhost PORT = 9572 # Listen on a non-reserved port number sockobj = None dserver_dir = None data_dir = None pid_path = None CONFIGFILE = "dserver.ini" LOGFILE = "dserver.log" PIDFILE = "dserver.pid" tables = [] INVALID = "INVALID" log = None #===================================================================== class Group: Name = None Idx = None Data = None def __init__(self, name): self.Name = name self.Idx = 0 self.Data = [] def __str__(self): s = "Grp %s Len %d" % (self.Name, len(self.Data)) return s def append(self, s): self.Data.append(s) def set(self): if len(self.Data) > 0: self.Idx = 0 else: self.Idx = -1 #--------------------------------------------------------------------- class Table: Count = 0 Valid = False Name = None Type = None Idx = None Data = None def __init__(self, name, type, delimiter=','): self.Name = name self.Type = type self.Delimiter = delimiter self.File = name + ".dat" self.Used = name + ".used" self.Stored = name + ".stored" sys.stderr.write("Loading %s\n" % self.Name) sys.stderr.flush() if self.Type == "CSV": rc = self.read_csv() elif self.Type == "Sequence": rc = self.read_sequence() elif self.Type == "Indexed": rc = self.read_indexed() elif self.Type == "Keyed": rc = self.read_keyed() if rc > 0: self.Valid = True try: self.ufh = open(self.Used, 'a+') except IOError, e: sys.stderr.write('[dserver] Open failed: %s\n' % str(e)) sys.exit(1) try: self.sfh = open(self.Stored, 'a+') except IOError, e: sys.stderr.write('[dserver] Open failed: %s\n' % str(e)) sys.exit(1) #------------------------------------------------------------------ def __str__(self): s = "Table: %-22s Type: %-10s" % (self.Name, self.Type) if self.Valid: s += " * " if self.Type == "CSV": s += " %7d rows" % len(self.Data) elif self.Type == "Sequence": s += " Starting value %d" % self.Data elif self.Type == "Indexed": s += " %7d rows" % len(self.Data) elif self.Type == "Keyed": s += " %7d groups" % len(self.Data) else: s += " " return s #------------------------------------------------------------------ def read_csv(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: %s\n' % str(e)) sys.exit(1) self.Data = [] while True: line = f.readline() if not line: break line = line.strip() self.Data.append(line) f.close() self.Idx = 0 if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def read_sequence(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: %s\n' % str(e)) sys.exit(1) while True: line = f.readline() if not line: break line = line.strip() try: no = int(line) except: no = 0 self.Data = no f.close() return 1 #------------------------------------------------------------------ def read_keyed(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: %s\n' % str(e)) sys.exit(1) groupName = None group = None self.Data = {} while True: line = f.readline() if not line: break line = line.strip() if (line.find("[") != -1): group_name = line.replace('[','').replace(']','') group = Group(group_name) self.Data[group_name] = group continue elif (line.find("#") != -1): continue elif (len(line) == 0): continue else: group.append(line) f.close() if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def read_indexed(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: %s\n' % str(e)) sys.exit(1) self.Data = {} while True: line = f.readline() if not line: break line = line.strip() try: (no, data) = line.split(':') except ValueError, e: sys.stderr.write('[dserver] Parse failed (%s): %s \n' % (self.File, str(e))) sys.exit(1) self.Data[no] = data f.close() if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def flush(self): if not self.Valid: return ts = datetime.now().strftime('%Y%m%d%H%M%S') self.BackupCmd = "cp %s.dat %s.%s" % (self.Name, self.Name, ts) print "Flushing %s" % self.Name if self.Type == "CSV": self.flush_csv() elif self.Type == "Sequence": self.flush_sequence() elif self.Type == "Indexed": self.flush_indexed() elif self.Type == "Keyed": self.flush_keyed() #------------------------------------------------------------------ def flush_csv(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: %s\n' % str(e)) return 0 i = self.Idx while i < len(self.Data): f.write("%s\n" % self.Data[i]) i += 1 f.close() #------------------------------------------------------------------ def flush_sequence(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: %s\n' % str(e)) return 0 f.write("%d\n" % self.Data) f.close() #------------------------------------------------------------------ def flush_keyed(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: %s\n' % str(e)) return 0 group_keys = self.Data.keys() group_keys.sort() for key in group_keys: f.write("[%s]\n" % key) group = self.Data[key] i = group.Idx while i < len(group.Data): f.write("%s\n" % group.Data[i]) i += 1 f.write("\n") f.close() #------------------------------------------------------------------ def flush_indexed(self): pass #===================================================================== def INFO(msg): if log: log.info(' ' + msg) if verbose_flg: print "[dserver] %s" % msg #--------------------------------------------------------------------- def ERROR(msg): if log: log.error(msg) sys.stderr.write('[dserver] %s\n' % msg) #--------------------------------------------------------------------- def WARNING(msg): if log: log.warning('*****' + msg + '*****') if verbose_flg: print "[dserver] %s" % msg #===================================================================== def read_config(): global PORT config_file = data_dir + CONFIGFILE try: f = open(config_file, 'r') except IOError, e: ERROR('Open failed: %s' % str(e)) sys.stderr.write('[dserver] Open failed: %s\n' % str(e)) sys.exit(1) config_flg = False definition_flg = False while True: line = f.readline() if not line: break line = line[:-1] line = line.replace('\r','') line = line.strip() if (line.find("#") != -1): continue if (line.find("[Config]") != -1): config_flg = True elif (line.find("Port=") != -1): definition = line.split("=") PORT = int(definition[1].strip()) if (line.find("[Data]") != -1): definition_flg = True elif (line.find("Description=") != -1): definition = line.split("=") (name, type, delimiter) = definition[1].split(":") t = Table(name, type, delimiter) INFO(str(t)) tables.append(t) f.close() #--------------------------------------------------------------------- def get_table_index(name): for i in range(len(tables)): if (tables[i].Name == name): return i return -1 #--------------------------------------------------------------------- def process(str): msg = str.split("|") l = len(msg) if debug_level > 1: INFO("[dserver::process] len %d msg %s" % (l, msg)) ts = datetime.now().strftime('%Y%m%d%H%M%S') reply = "None" if (msg[0] == "REG"): name = msg[1].replace('\n','').replace('\r','') idx = get_table_index(name) if debug_level > 0: INFO("[dserver::process] REG '%s' -> %d" % (name, idx)) reply = "%d" % idx elif (msg[0] == "REGK"): if (len(msg) != 3): ERROR("[dserver::process] REGK -> Bad Message", msg) elif (msg[0] == "REGI"): if (len(msg) != 2): ERROR("[dserver::process] REGI -> Bad Message", msg) elif (msg[0] == "GETN"): if (len(msg) != 2): ERROR("[dserver::process] GETN -> Bad Message", msg) hdl = int(msg[1]) try: t = tables[hdl] except: t = None if t != None: if t.Type == 'CSV': if (t.Idx < len(t.Data)): reply = t.Data[t.Idx] t.Idx += 1 else: reply = "*Exhausted*" elif t.Type == "Sequence": reply = "%d" % t.Data t.Data += 1 else: reply = "UNKNOWN" t.ufh.write("%s - %s\n" % (ts, reply)) if debug_level > 2: INFO("[dserver::process] GETN -> %s" % reply) elif (msg[0] == "GETK"): if (len(msg) != 3): ERROR("[dserver::process] GETK -> Bad Message", msg) hdl = int(msg[1]) grp = msg[2] try: t = tables[hdl] except: t = None if t != None: try: g = t.Data[grp] except: g = None if g != None: if (g.Idx < len(g.Data)): reply = g.Data[g.Idx] reply = re.sub(", *", ",", reply) g.Idx += 1 else: reply = "*Exhausted*" t.ufh.write("%s - %s::%s\n" % (ts, grp, reply)) if debug_level > 2: INFO("[dserver::process] GETK %s -> %s" % (grp, reply)) elif (msg[0] == "GETI"): if (len(msg) != 3): ERROR("[dserver::process] GETI -> Bad Message", msg) hdl = int(msg[1]) idx = msg[2] try: t = tables[hdl] except: t = None if t != None: try: reply = t.Data[idx] except: reply = "UNDEFINED" t.ufh.write("%s - %s::%s\n" % (ts, idx, reply)) if debug_level > 2: INFO("[dserver::process] GETI %s -> %s" % (idx, reply)) elif (msg[0] == "STOC"): if (len(msg) != 3): ERROR("[dserver::process] STOC -> Bad Message", msg) hdl = int(msg[1]) data = msg[2] reply = "0" try: t = tables[hdl] except: t = None if t != None: t.Data.append(data) t.sfh.write("%s - %s\n" % (ts, data)) t.sfh.flush() if debug_level > 1: INFO("STOC %s" % data) reply = "1" if debug_level > 2: INFO("[dserver::process] STOC %s -> %s" % (data, reply)) elif (msg[0] == "STOK"): if (len(msg) != 4): ERROR("[dserver::process] STOK -> Bad Message", msg) hdl = int(msg[1]) grp = msg[2] data = msg[3] reply = "0" try: t = tables[hdl] except: t = None if t != None: if t.Data.has_key(grp): g = t.Data[grp] else: g = Group(grp) t.Data[grp] = g if g != None: g.Data.append(data) if debug_level > 1: INFO("STOK %s %s" % (grp, data)) t.sfh.write("%s - %s::%s\n" % (ts, grp, data)) reply = "1" if debug_level > 2: INFO("[dserver::process] STOK %s %s -> %s" % (grp, data, reply)) return reply #--------------------------------------------------------------------- def sig_term(signum, frame): "SIGTERM handler" shutdown() #--------------------------------------------------------------------- def shutdown(): INFO("Server shutdown at %s" % datetime.now()) print "\n" for i in range(len(tables)): tables[i].flush() print "*SHUTDOWN*" try: os.unlink(pid_path) except IOError, e: ERROR('Unlink failed: %s' % str(e)) sys.exit(1) sys.exit(0) #--------------------------------------------------------------------- def check_running(): try: pfp = open(pid_path, 'r') except IOError, (errno, strerror): pfp = None # ERROR("I/O error(%s): %s" % (errno, strerror)) except: ERROR("Unexpected error: %s" % sys.exc_info()[0]) raise if pfp: line = pfp.readline() line = line.strip() dserver_pid = int(line) noProcess = 0 try: os.kill(dserver_pid, 0) except OSError, e: if e.errno == 3: noProcess = 1 else: ERROR("kill() failed: %s" % str(e)) sys.exit(0) if noProcess: INFO("[dserver] Stale dserver pid file!") pfp.close() os.unlink(pid_path) return None else: pfp.close() return dserver_pid return dserver_pid else: return None #--------------------------------------------------------------------- def create_pidfile(): pid = os.getpid() try: pfp = open(pid_path, 'w') except IOError, e: ERROR("Open failed: %s" % str(e)) sys.exit(0) pfp.write("%d" % pid) pfp.close() INFO("Running server with PID -> %d" % pid) return pid #--------------------------------------------------------------------- def become_daemon(): pid = os.fork() if pid == 0: # In child pid = create_pidfile() time.sleep(1) elif pid == -1: # Should not happen! ERROR("fork() failed!") time.sleep(1) sys.exit(0) else: # In Parent time.sleep(1) sys.exit(0) time.sleep(2) os.setsid() return pid #--------------------------------------------------------------------- def init(): pid = check_running() if pid: print "[dserver] Server already running! (pid = %d)" % pid sys.exit(0) if daemon_flg: pid = become_daemon() else: pid = create_pidfile() global log log = logging.getLogger('dserver') hdlr = logging.FileHandler(LOGFILE) fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s') hdlr.setFormatter(fmtr) log.addHandler(hdlr) log.setLevel(logging.INFO) INFO("Started processing") read_config() if (not silent_flg): INFO("Server PID is %d" % pid) print "\nData Loaded..." #--------------------------------------------------------------------- def terminate(): dserver_pid = check_running() if dserver_pid: if (not silent_flg): INFO("Terminating server with pid, %d" % dserver_pid) os.kill(dserver_pid, signal.SIGTERM) if (wait_flg): while True: try: kill(dserver_pid, 0) except OSError, e: if e.errno == 3: break else: ERROR("kill() failed: %s" % str(e)) sys.exit(0) time.sleep(1) return 0 #--------------------------------------------------------------------- def check(): pid = check_running() if pid: print "[dserver] Server already running! (pid = %d)" % pid sys.exit(0) else: print "[dserver] Server not running" #==== Socket Server ================================================== def init_connection(): global sockobj sockobj = socket(AF_INET, SOCK_STREAM) # make a TCP socket object sockobj.bind((HOST, PORT)) # bind it to server port number sockobj.listen(10) # allow upto 10 pending connects #--------------------------------------------------------------------- def handle_client(connection): # in spawned thread: reply while True: # read, write a client socket try: request = connection.recv(1024) except: break if debug_level > 0: INFO('[dserver] Request -> "%s"' % request) if not request: break reply = process(request) if debug_level > 0: INFO('[dserver] Reply -> "%s..."' % reply[0:30]) connection.send(reply) connection.close() #--------------------------------------------------------------------- def dispatcher(): while True: # Wait for next connection, connection, address = sockobj.accept() INFO('Host (%s) - Connected at %s' % (address[0], datetime.now())) thread.start_new(handle_client, (connection,)) #===================================================================== def main(): global check_flg global daemon_flg global terminate_flg global verbose_flg global wait_flg global debug_level global dserver_dir global data_dir global pid_path try: opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?") except getopt.error, msg: print __doc__ return 1 try: dserver_dir = os.environ["DSERVER_DIR"] except KeyError, e: print "Set DSERVER_DIR environment variable and rerun!" return 1 wrk_path = os.getcwd() wrk_dir = os.path.basename(wrk_path) # data_dir = dserver_dir + '/DATA/' data_dir = wrk_path + '/DATA/' pid_path = data_dir + PIDFILE os.chdir(data_dir) for o, a in opts: if o == '-d': debug_level += 1 elif o == '-c': check_flg = True elif o == '-D': daemon_flg = True elif o == '-s': tsilent_flg = True elif o == '-T': terminate_flg = True elif o == '-v': verbose_flg = True elif o == '-V': print "[dserver] Version: %s" % __version__ return 1 elif o == '-w': wait_flg = True elif o == '-?': print __doc__ return 1 print "[dserver] Listening on port %s - running from %s" % (PORT, os.getcwd()) if check_flg: check() return 0 if terminate_flg: terminate() return 0 if (debug_level > 0): print "Debugging level set to %d" % debug_level if args: for arg in args: print arg signal.signal(signal.SIGTERM, sig_term) init() init_connection() dispatcher() return 0 #--------------------------------------------------------------------- if __name__ == '__main__' or __name__ == sys.argv[0]: try: sys.exit(main()) except KeyboardInterrupt, e: print "[dserver] Interrupted!" shutdown() #--------------------------------------------------------------------- """ Revision History: Date Who Description -------- --- -------------------------------------------------- 20031014 plh Initial implementation 20080609 plh Added exception handling to read_indexed() 20080609 plh Reformatted exception strings 20080610 plh Reformatted log text for load 20080610 plh Reviewed __id__ and __version__ strings Problems to fix: To Do: Issues: """ <pre> =2.1.0= <pre> #!/usr/bin/env python # # Author: Peter Harding <plh@performiq.com.au> # PerformIQ Pty. Ltd. # Level 6, 170 Queen Street, # MELBOURNE, VIC, 3000 # # Phone: 03 9641 2222 # Fax: 03 9641 2200 # Mobile: 0418 375 085 # # Copyright (C) 1994-2008, Peter Harding # All rights reserved # # Purpose: Threaded data server implementation # #--------------------------------------------------------------------- """ Threaded server model Server side: open a socket on a port, listen for a message from a client, and accept a request and service it. The server spawns a thread to handle each client connection. Threads share global memory space with main thread; This is more portable than fork -- not yet on Windows; This version has been extended to use the standard Python logging module. Add the delimiter to the INI file to allow use of alternate delimiters in transmitted data - so data with embedded commas can be used. """ #--------------------------------------------------------------------- import re import os import csv import sys import time import getopt import signal import thread import marshal import logging #--------------------------------------------------------------------- from socket import * # get socket constructor and constants from datetime import datetime #--------------------------------------------------------------------- __cvsid__ = "$Id:$" __version__ = "2.1.0" __id__ = "@(#) dserver.py [2.1.0] 2008-05-10" check_flg = False daemon_flg = False silent_flg = False terminate_flg = False verbose_flg = False wait_flg = False debug_level = 0 HOST = '' # Host server - '' means localhost PORT = 9572 # Listen on a non-reserved port number sockobj = None dserver_dir = None data_dir = None pid_path = None client_language = None log = None sources = [] CONFIGFILE = "dserver.ini" LOGFILE = "dserver.log" PIDFILE = "dserver.pid" INVALID = 'INVALID' DELIMITER = 'delimiter' TAG_DELIMITER = 'tag_delimiter' COMMENT = re.compile('^#') #===================================================================== class Group: Name = None Idx = None Data = None def __init__(self, name): self.Name = name self.Idx = 0 self.Data = [] self.Comments = [] def __str__(self): s = "Grp %s Len %d" % (self.Name, len(self.Data)) return s def append(self, s): self.Data.append(s) def set(self): if len(self.Data) > 0: self.Idx = 0 else: self.Idx = -1 #--------------------------------------------------------------------- class Source: Count = 0 Valid = False Name = None Type = None Idx = None Data = None def __init__(self, name, source_type, attributes={}, delimiter=None): self.Name = name self.Type = source_type self.File = name + ".dat" self.Used = name + ".used" self.Stored = name + ".stored" self.Comments = [] # print '[dserver] Name: "%s" Type: "%s" Attributes: "%s"' % (self.Name, self.Type, repr(attributes)) if delimiter: self.Delimiter = delimiter elif attributes.has_key(DELIMITER): self.Delimiter = attributes[DELIMITER] else: self.Delimiter = ',' if self.Type == "CSV": rc = self.read_csv() elif self.Type == "Sequence": rc = self.read_sequence() elif self.Type == "Indexed": if attributes.has_key(TAG_DELIMITER): self.tag_delimiter = attributes[TAG_DELIMITER] else: self.tag_delimiter = ':' rc = self.read_indexed() elif self.Type == "Keyed": rc = self.read_keyed() else: print "[dserver] Bad source_type [%s]" % source_type sys.exit(1) self.Size = rc self.Attributes = { 'Type' : self.Type, 'Delimiter' : self.Delimiter, 'Size' : rc } try: self.ufh = open(self.Used, 'a+') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) try: self.sfh = open(self.Stored, 'a+') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) #------------------------------------------------------------------ def __str__(self): s = "Source: %-10s Type: %-10s" % (self.Name, self.Type) if self.Valid: s += " * " if self.Type == "CSV": s += " %d rows" % len(self.Data) elif self.Type == "Sequence": s += " Starting value %d" % self.Data elif self.Type == "Indexed": s += " %d rows" % len(self.Data) elif self.Type == "Keyed": s += " %d groups" % len(self.Data) else: s += " " return s #------------------------------------------------------------------ def read_csv(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) self.Data = [] while True: line = f.readline() if not line: break line = line.strip() if COMMENT.match(line): self.Comments.append(line) continue self.Data.append(line) f.close() self.Idx = 0 if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def read_sequence(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) while True: line = f.readline() if not line: break line = line.strip() if COMMENT.match(line): self.Comments.append(line) continue try: no = int(line) except: no = 0 self.Data = no f.close() return 1 #------------------------------------------------------------------ def read_keyed(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) groupName = None group = None self.Data = {} while True: line = f.readline() if not line: break line = line.strip() if (line.find("[") != -1): group_name = line.replace('[','').replace(']','') group = Group(group_name) self.Data[group_name] = group continue if COMMENT.match(line): if group: group.Comments.append(line) else: self.Comments.append(line) elif (len(line) == 0): continue else: group.append(line) f.close() if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def read_indexed(self): try: f = open(self.File, 'r') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') sys.exit(1) self.Data = {} while True: line = f.readline() if not line: break line = line.strip() if COMMENT.match(line): self.Comments.append(line) continue (tag, data) = line.split(self.tag_delimiter) tag = tag.strip() self.Data[tag] = data f.close() if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name)) return len(self.Data) #------------------------------------------------------------------ def flush(self): if not self.Valid: return ts = datetime.now().strftime('%Y%m%d%H%M%S') self.BackupCmd = "cp %s.dat %s.%s" % (self.Name, self.Name, ts) if self.Type == "CSV": self.flush_csv() elif self.Type == "Sequence": self.flush_sequence() elif self.Type == "Indexed": self.flush_indexed() elif self.Type == "Keyed": self.flush_keyed() #------------------------------------------------------------------ def flush_csv(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') return 0 i = self.Idx for line in self.Comments: f.write("%s\n" % line) while i < len(self.Data): f.write("%s\n" % self.Data[i]) i += 1 f.close() #------------------------------------------------------------------ def flush_sequence(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') return 0 for line in self.Comments: f.write("%s\n" % line) f.write("%d\n" % self.Data) f.close() #------------------------------------------------------------------ def flush_keyed(self): os.system(self.BackupCmd) try: f = open(self.File, 'wb') except IOError, e: sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n') return 0 group_keys = self.Data.keys() group_keys.sort() for line in self.Comments: f.write("%s\n" % line) for key in group_keys: f.write("[%s]\n" % key) group = self.Data[key] for line in group.Comments: f.write("%s\n" % line) i = group.Idx while i < len(group.Data): f.write("%s\n" % group.Data[i]) i += 1 f.write("\n") f.close() #------------------------------------------------------------------ def flush_indexed(self): pass #===================================================================== def INFO(msg): if log: log.info(' ' + msg) if verbose_flg: print "[dserver] %s" % msg #--------------------------------------------------------------------- def ERROR(msg): if log: log.error(msg) sys.stderr.write('[dserver] %s\n' % msg) #--------------------------------------------------------------------- def WARNING(msg): if log: log.warning('*****' + msg + '*****') if verbose_flg: print "[dserver] %s" % msg #===================================================================== def read_config(): global PORT config_file = data_dir + CONFIGFILE try: f = open(config_file, 'r') except IOError, e: ERROR('Open failed: ' + str(e)) sys.exit(1) config_flg = False definition_flg = False while True: line = f.readline() if not line: break line = line[:-1] line = line.replace('\r','') line = line.strip() if (line.find("#") != -1): continue if (line.find("[Config]") != -1): config_flg = True elif (line.find("Port=") != -1): definition = line.split("=") PORT = definition[1] if (line.find("[Data]") != -1): definition_flg = True elif (line.find("Description=") != -1): definition = line.split("=") (name, source_type, attribute_str) = definition[1].split(":", 2) try: attributes = eval(attribute_str) except: attributes = {} t = Source(name, source_type, attributes) INFO(str(t)) sources.append(t) f.close() #--------------------------------------------------------------------- def get_source_index(name): for i in range(len(sources)): if (sources[i].Name == name): return i return -1 #--------------------------------------------------------------------- def process(str): global client_language msg = str.split("|") l = len(msg) if debug_level > 1: INFO("[dserver::process] len %d msg %s" % (l, msg)) ts = datetime.now().strftime('%Y%m%d%H%M%S') reply = "None" if (msg[0] == "INIT"): client_language = msg[1] elif (msg[0] == "REG"): name = msg[1].replace('\n','').replace('\r','') idx = get_source_index(name) if debug_level > 0: INFO("[dserver::process] REG '%s' -> %d" % (name, idx)) if client_language == 'Python': reply = "%d|%s" % (idx, marshal.dumps(sources[idx].Attributes)) else: reply = "%d" % idx elif (msg[0] == "REGK"): if (len(msg) != 3): ERROR("[dserver::process] REGK -> Bad Message", msg) elif (msg[0] == "REGI"): if (len(msg) != 2): ERROR("[dserver::process] REGI -> Bad Message", msg) elif (msg[0] == "GETN"): if (len(msg) != 2): ERROR("[dserver::process] GETN -> Bad Message", msg) hdl = int(msg[1]) try: t = sources[hdl] except: t = None if t != None: if t.Type == 'CSV': if (t.Idx < len(t.Data)): reply = t.Data[t.Idx] t.Idx += 1 else: reply = "*Exhausted*" elif t.Type == "Sequence": reply = "%d" % t.Data t.Data += 1 else: reply = "UNKNOWN" t.ufh.write("%s - %s\n" % (ts, reply)) if debug_level > 2: INFO("[dserver::process] GETN -> %s" % reply) elif (msg[0] == "GETK"): if (len(msg) != 3): ERROR("[dserver::process] GETK -> Bad Message", msg) hdl = int(msg[1]) grp = msg[2] try: t = sources[hdl] except: t = None if t != None: try: g = t.Data[grp] except: g = None if g != None: if (g.Idx < len(g.Data)): reply = g.Data[g.Idx] g.Idx += 1 else: reply = "*Exhausted*" t.ufh.write("%s - %s::%s\n" % (ts, grp, reply)) if debug_level > 2: INFO("[dserver::process] GETK %s -> %s" % (grp, reply)) elif (msg[0] == "GETI"): if (len(msg) != 3): ERROR("[dserver::process] GETI -> Bad Message", msg) hdl = int(msg[1]) idx = msg[2] try: t = sources[hdl] except: t = None if t != None: try: reply = t.Data[idx] except: reply = "UNDEFINED" t.ufh.write("%s - %s::%s\n" % (ts, idx, reply)) if debug_level > 2: INFO("[dserver::process] GETI %s -> %s" % (idx, reply)) elif (msg[0] == "STOC"): if (len(msg) != 3): ERROR("[dserver::process] STOC -> Bad Message", msg) hdl = int(msg[1]) data = msg[2] reply = "0" try: t = sources[hdl] except: t = None if t != None: t.Data.append(data) t.sfh.write("%s - %s\n" % (ts, data)) t.sfh.flush() if debug_level > 1: INFO("STOC %s" % data) reply = "1" if debug_level > 2: INFO("[dserver::process] STOC %s -> %s" % (data, reply)) elif (msg[0] == "STOK"): if (len(msg) != 4): ERROR("[dserver::process] STOK -> Bad Message", msg) hdl = int(msg[1]) grp = msg[2] data = msg[3] reply = "0" try: t = sources[hdl] except: t = None if t != None: if t.Data.has_key(grp): g = t.Data[grp] else: g = Group(grp) t.Data[grp] = g if g != None: g.Data.append(data) if debug_level > 1: INFO("STOK %s %s" % (grp, data)) t.sfh.write("%s - %s::%s\n" % (ts, grp, data)) reply = "1" if debug_level > 2: INFO("[dserver::process] STOK %s %s -> %s" % (grp, data, reply)) return reply #--------------------------------------------------------------------- def sig_term(signum, frame): "SIGTERM handler" shutdown() #--------------------------------------------------------------------- def shutdown(): INFO("Server shutdown at %s" % datetime.now()) for i in range(len(sources)): sources[i].flush() try: os.unlink(pid_path) except IOError, e: ERROR('Unlink failed: ' + str(e)) sys.exit(1) sys.exit(0) #--------------------------------------------------------------------- def check_running(): try: pfp = open(pid_path, 'r') except IOError, (errno, strerror): pfp = None # ERROR("I/O error(%s): %s" % (errno, strerror)) except: ERROR("Unexpected error:", sys.exc_info()[0]) raise if pfp: line = pfp.readline() line = line.strip() dserver_pid = int(line) noProcess = 0 try: os.kill(dserver_pid, 0) except OSError, e: if e.errno == 3: noProcess = 1 else: ERROR("kill() failed:" + str(e)) sys.exit(0) if noProcess: INFO("[dserver] Stale dserver pid file!") pfp.close() os.unlink(pid_path) return None else: pfp.close() return dserver_pid return dserver_pid else: return None #--------------------------------------------------------------------- def create_pidfile(): pid = os.getpid() try: pfp = open(pid_path, 'w') except IOError, e: ERROR("Open failed - " + str(e)) sys.exit(0) pfp.write("%d" % pid) pfp.close() INFO("Running server with PID -> %d" % pid) return pid #--------------------------------------------------------------------- def become_daemon(): pid = os.fork() if pid == 0: # In child pid = create_pidfile() time.sleep(1) elif pid == -1: # Should not happen! ERROR("fork() failed!") time.sleep(1) sys.exit(0) else: # In Parent time.sleep(1) sys.exit(0) time.sleep(2) os.setsid() return pid #--------------------------------------------------------------------- def init(): pid = check_running() if pid: print "[dserver] Server already running! (pid = %d)" % pid sys.exit(0) if daemon_flg: pid = become_daemon() else: pid = create_pidfile() global log log = logging.getLogger('dserver') hdlr = logging.FileHandler(LOGFILE) fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s') hdlr.setFormatter(fmtr) log.addHandler(hdlr) log.setLevel(logging.INFO) INFO("Started processing") read_config() if (not silent_flg): INFO("Server PID is %d" % pid) #--------------------------------------------------------------------- def terminate(): dserver_pid = check_running() if dserver_pid: if (not silent_flg): INFO("Terminating server with pid, %d" % dserver_pid) os.kill(dserver_pid, signal.SIGTERM) if (wait_flg): while True: try: kill(dserver_pid, 0) except OSError, e: if e.errno == 3: break else: ERROR("kill() failed:" + str(e)) sys.exit(0) time.sleep(1) return 0 #--------------------------------------------------------------------- def check(): pid = check_running() if pid: print "[dserver] Server already running! (pid = %d)" % pid sys.exit(0) else: print "[dserver] Server not running" #==== Socket Server ================================================== def init_connection(): global sockobj sockobj = socket(AF_INET, SOCK_STREAM) # make a TCP socket object sockobj.bind((HOST, PORT)) # bind it to server port number sockobj.listen(10) # allow upto 10 pending connects #--------------------------------------------------------------------- def handle_client(connection): # in spawned thread: reply while True: # read, write a client socket try: request = connection.recv(1024) except: break if debug_level > 0: INFO('[dserver] Request -> "%s"' % request) if not request: break reply = process(request) if debug_level > 0: INFO('[dserver] Reply -> "%s..."' % reply[0:30]) connection.send(reply) connection.close() #--------------------------------------------------------------------- def dispatcher(): while True: # Wait for next connection, connection, address = sockobj.accept() INFO('Host (%s) - Connected at %s' % (address[0], datetime.now())) thread.start_new(handle_client, (connection,)) #===================================================================== def main(): global check_flg global daemon_flg global terminate_flg global verbose_flg global wait_flg global debug_level global dserver_dir global data_dir global pid_path try: opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?") except getopt.error, msg: print __doc__ return 1 try: dserver_dir = os.environ["DSERVER_DIR"] except KeyError, e: print "Set DSERVER_DIR environment variable and rerun!" return 1 wrk_path = os.getcwd() wrk_dir = os.path.basename(wrk_path) # data_dir = dserver_dir + '/DATA/' data_dir = wrk_path + '/DATA/' pid_path = data_dir + PIDFILE os.chdir(data_dir) for o, a in opts: if o == '-d': debug_level += 1 elif o == '-c': check_flg = True elif o == '-D': daemon_flg = True elif o == '-s': tsilent_flg = True elif o == '-T': terminate_flg = True elif o == '-v': verbose_flg = True elif o == '-V': print "[dserver] Version: %s" % __version__ return 1 elif o == '-w': wait_flg = True elif o == '-?': print __doc__ return 1 print "[dserver] Listening on port %s - running from %s" % (PORT, os.getcwd()) if check_flg: check() return 0 if terminate_flg: terminate() return 0 if (debug_level > 0): print "Debugging level set to %d" % debug_level if args: for arg in args: print arg signal.signal(signal.SIGTERM, sig_term) init() init_connection() dispatcher() return 0 #--------------------------------------------------------------------- if __name__ == '__main__' or __name__ == sys.argv[0]: try: sys.exit(main()) except KeyboardInterrupt, e: print "[dserver] Interrupted!" shutdown() #--------------------------------------------------------------------- """ Revision History: Date Who Description -------- --- -------------------------------------------------- 20031014 plh Initial implementation Problems to fix: To Do: Issues: """
tst.py
#!/usr/bin/env python # # Author: Peter Harding <plh@pha.com.au> # PerformIQ Pty. Ltd. # Level 6, # 179 Queen Street, # MELBOURNE, VIC, 3000 # # Phone: 03 9641 2222 # Fax: 03 9641 2200 # Mobile: 0418 375 085 # # Copyright (C) 1994-2008, Peter Harding # All rights reserved # # #--------------------------------------------------------------------- """ Test example of use of Data Server. Usage: # tst.py -t <table> [-k <key>] The '-t <table>' option is used to specify the name of the table to query The '-i <index>' specifies the index for the indexed data type. Indexes may be either an integer which reurns the ith element of a string - in which case the key to the data set must be a string. The '-k <key>' specifies the key for the keyed data type """ #--------------------------------------------------------------------- import os import sys import getopt import client #--------------------------------------------------------------------- __cvsid__ = "$Id:$" __version__ = "2.0.2" __id__ = "@(#) [2.0.2] tst.py 2008-05-10" #--------------------------------------------------------------------- PORT = 9572 table_name = "Address" indexed = False index = None keyed = False key = None store_flg = False store_data = None debug_flg = False term_flg = False verbose_flg = False #--------------------------------------------------------------------- def process(): ds = client.Connection(port=PORT) if (ds == None): print("Connection to data server failed - is data server process running?\n") return 1 (type_ref, attributes) = ds.RegisterType(table_name) if indexed: size = attributes['Size'] pid = os.getpid() print "My PID is %d" % pid print "Data type \"%s\" registered as %d with attributes %s" % (table_name, type_ref, repr(attributes)) if (store_flg): if keyed: ds.StoreKeyedData(type_ref, key, store_data) else: ds.StoreCsvData(type_ref, store_data) else: if keyed: sp = ds.GetNextKeyed(type_ref, key) elif indexed: sp = ds.GetNextIndexed(type_ref, index) else: sp = ds.GetNext(type_ref) if (sp): print "Buffer is \"%s\"" % sp if sp != None: if len(sp) > 0: for i in range(len(sp)): print "Field %2d: \"%s\"" % (i, sp[i]) else: print "Field: \"%s\"" % None else: print "Type %d exhausted" % (pid, type_ref) #--------------------------------------------------------------------- def main(): global debug_flg global term_flg global verbose_flg global indexed global index global keyed global key global table_name global store_flg global store_data global PORT try: opts, args = getopt.getopt(sys.argv[1:], "dD:hi:I:k:p:s:t:TvV?") except getopt.error, msg: print __doc__, return 1 for o, a in opts: if o == '-d': debug_level += 1 elif o == '-D': debug_level = int(a) elif o == '-i': # Assuming a numeric offset! indexed = True index = int(a) elif o == '-I': # Assuming a string index! indexed = True index = a elif o == '-k': keyed = True key = a elif o == '-p': PORT = int(a) elif o == '-t': table_name = a elif o == '-T': term_flg = True elif o == '-s': print "storing..." store_flg = True store_data = a elif o == '-v': verbose_flg = True elif o == '-V': print "Version: %s" % __version__ return 0 elif o in ('-h', '-?'): print __doc__ return 0 if args: for arg in args: print arg else: pass process() #--------------------------------------------------------------------- if __name__ == '__main__' or __name__ == sys.argv[0]: sys.exit(main()) #--------------------------------------------------------------------- """ Revision History: Date Who Description -------- --- -------------------------------------------------- 20031014 plh Initial implementation Problems to fix: To Do: Issues: """
client.py
#!/usr/bin/env python # # Author: Peter Harding <plh@performiq.com.au> # PerformIQ Pty. Ltd. # Level 6, 170 Queen Street, # MELBOURNE, VIC, 3000 # # Phone: 03 9641 2222 # Fax: 03 9641 2200 # Mobile: 0418 375 085 # # Copyright (C) 1994-2008, Peter Harding # All rights reserved # #--------------------------------------------------------------------- """ Purpose: Python implementation of DataServer client API Usage: ds = client.Connection(port=PORT) if (ds == None): print("Connection to data server failed - is data server process running?\n") return 1 (type_ref, attributes) = ds.RegisterType(table_name) Then one of: a) Pulling data: if Keyed: sp = ds.GetNextKeyed(type_ref, key) elif Indexed: sp = ds.GetNextIndexed(type_ref, index) else: sp = ds.GetNext(type_ref) a) Storing data: if Keyed: ds.StoreKeyedData(type_ref, key, store_data) else: ds.StoreCsvData(type_ref, store_data) Notes: i) For an indexed type the atributes returned are: { 'type' : 'Indexed', 'no_items' : <NO> } """ #--------------------------------------------------------------------- import os import sys import copy import getopt import marshal #--------------------------------------------------------------------- from socket import * # portable socket interface plus constants #--------------------------------------------------------------------- __cvsid__ = "$Id:$" __id__ = "@(#) [2.0.2] client.py 2008-05-10" __version__ = "2.0.2" HOST = 'localhost' PORT = 9572 verbose_flg = False debug_level = 0 #--------------------------------------------------------------------- class Connection: DELIM = ',' ServerHostname = None # server name, default to 'localhost' ServerPort = None # non-reserved port used by the server sockobj = None Fields = None def __init__(self, server=HOST, port=PORT, debug=0): global debug_level "Initialize TCP/IP socket object and make connection to server:port" self.ServerHostname = server self.ServerPort = port debug_level = debug self.sockobj = socket(AF_INET, SOCK_STREAM) try: self.sockobj.connect((self.ServerHostname, self.ServerPort)) except SocketError, e: sys.stderr.write('[client] Connect failed: ' + str(e) + '\n') sys.exit(1) msg = "INIT|Python" attributes = self.Get(msg) #try: # attributes = self.Get(msg) # except e: # sys.stderr.write('[client] Get failed: ' + str(e) + '\n') # sys.exit(1) print '[client::__init__] attributes "%s"' % attributes self.attributes = marshal.loads(attributes) self.sources = {} #------------------------------------------------------------------ def Get(self, s): "Send s to server and get back response" if self.sockobj != None: self.sockobj.send(s) data = self.sockobj.recv(1024) if debug_level > 0: print '[Client::Get] Sent: "%s" Received: "%s"' % (s, data) return data else: return None #------------------------------------------------------------------ def Close(self): "close socket to send eof to server" if self.sockobj != None: self.sockobj.close() self.sockobj = None #------------------------------------------------------------------ def RegisterType(self, type): msg = "REG|%s" % type # Should I really be using a try: here? - PLH 2008-05-10 try: response = self.Get(msg) except: type_ref = -1 (type_ref, attributes) = response.split('|', 1) type_ref = int(type_ref) attributes = marshal.loads(attributes) self.sources[type_ref] = attributes return (type_ref, attributes) #------------------------------------------------------------------ def GetNext(self, type_ref): msg = "GETN|%d" % type_ref csv_data = self.Get(msg) data = csv_data.split(self.DELIM) return data #------------------------------------------------------------------ def GetNextKeyed(self, type_ref, key): msg = "GETK|%d|%s" % (type_ref, key) csv_data = self.Get(msg) data = csv_data.split(self.DELIM) return data #------------------------------------------------------------------ def GetNextIndexed(self, type_ref, idx): msg = "GETI|%s|%s" % (type_ref, idx) csv_data = self.Get(msg) data = csv_data.split(self.DELIM) return data #------------------------------------------------------------------ def StoreCsvData(self, type_ref, data): msg = "STOC|%d|%s" % (type_ref, data) reply = self.Get(msg) try: rc = int(reply) except: rc = -1 return rc #------------------------------------------------------------------ def StoreKeyedData(self, type_ref, key_ref, data): msg = "STOK|%d|%s|%s" % (type_ref, key_ref, data) reply = self.Get(msg) try: rc = int(reply) except: rc = -1 return rc #------------------------------------------------------------------ def GetField(self, type_ref, i): if (i < len(self.Field[i])): return self.Field[i] else: return None #--------------------------------------------------------------------- def main(): global debug_level global verbose_flg global PORT try: opts, args = getopt.getopt(sys.argv[1:], "dhD:p:vV?") except getopt.error, msg: print __doc__, return 1 for o, a in opts: if o == '-d': debug_level += 1 elif o == '-D': debug_level = int(a) elif o == '-p': PORT = int(a) elif o == '-v': verboseFlg = True elif o == '-V': print "Version: %s" % __version__ return 0 elif o in ( '-h', '-?'): print __doc__ return 0 if args: for arg in args: print "[client] %s" % arg else: pass #--------------------------------------------------------------------- if __name__ == '__main__' or __name__ == sys.argv[0]: sys.exit(main()) #--------------------------------------------------------------------- """ Revision History: Date Who Description -------- --- -------------------------------------------------- 20031014 plh Initial implementation 20080510 plh Refactored as client and Connection rather than dcl Problems to fix: To Do: Issues: """
Sat May 10 13:35:23 AUSEST 2008 =============================== Have added in the return of attributes with RegisterType to allow mutilple values to be passed back from the data server. Specifically, I wanted to be able to get number of records (Size) in Indexed data sources. This will allow vusers implemented in Python to retrieve more information about the data sources. Initially, I wanted to be able to select a random group for the LDAP test. I have also refactored the scripts and reorganised some of the header info. i.e. dvstst.py -> tst.py dcl.py -> client.py