Data Server Variants
Jump to navigation
Jump to search
1.1.3
#!/usr/bin/env python
#
# Purpose: Threaded data server implementation
#
# $Id:$
#
#---------------------------------------------------------------------
"""
Threaded server model
Server side: open a socket on a port, listen for
a message from a client, and accept a request and
service it.
The server spawns a thread to handle each client connection.
Threads share global memory space with main thread;
This is more portable than fork -- not yet on Windows;
This version has been extended to use the standard Python
logging module.
Add the delimiter to the INI file to allow use of alternate
delimiters in transmitted data - so data with embedded commas
can be used.
"""
#---------------------------------------------------------------------
import os
import csv
import sys
import getopt
import thread
import time
import signal
import logging
#---------------------------------------------------------------------
from socket import * # get socket constructor and constants
from datetime import datetime
#---------------------------------------------------------------------
__version__ = "1.1.3"
__id__ = "@(#) dserver.py [%s] 30/04/2008"
check_flg = False
daemon_flg = False
silent_flg = False
terminate_flg = False
verbose_flg = False
wait_flg = False
debug_level = 0
HOST = '' # Host server - '' means localhost
PORT = 9578 # Listen on a non-reserved port number
sockobj = None
dserver_dir = None
data_dir = None
pid_path = None
CONFIGFILE = "dserver.ini"
LOGFILE = "dserver.log"
PIDFILE = "dserver.pid"
tables = []
INVALID = "INVALID"
log = None
#=====================================================================
class Group:
Name = None
Idx = None
Data = None
def __init__(self, name):
self.Name = name
self.Idx = 0
self.Data = []
def __str__(self):
s = "Grp %s Len %d" % (self.Name, len(self.Data))
return s
def append(self, s):
self.Data.append(s)
def set(self):
if len(self.Data) > 0:
self.Idx = 0
else:
self.Idx = -1
#---------------------------------------------------------------------
class Table:
Count = 0
Valid = False
Name = None
Type = None
Idx = None
Data = None
def __init__(self, name, type, delimiter=','):
self.Name = name
self.Type = type
self.Delimiter = delimiter
self.File = name + ".dat"
self.Used = name + ".used"
self.Stored = name + ".stored"
if self.Type == "CSV":
rc = self.read_csv()
elif self.Type == "Sequence":
rc = self.read_sequence()
elif self.Type == "Indexed":
rc = self.read_indexed()
elif self.Type == "Keyed":
rc = self.read_keyed()
if rc > 0:
self.Valid = True
try:
self.ufh = open(self.Used, 'a+')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
try:
self.sfh = open(self.Stored, 'a+')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
#------------------------------------------------------------------
def __str__(self):
s = "Table: %-10s Type: %-10s" % (self.Name, self.Type)
if self.Valid:
s += " * "
if self.Type == "CSV":
s += " %d rows" % len(self.Data)
elif self.Type == "Sequence":
s += " Starting value %d" % self.Data
elif self.Type == "Indexed":
s += " %d rows" % len(self.Data)
elif self.Type == "Keyed":
s += " %d groups" % len(self.Data)
else:
s += " "
return s
#------------------------------------------------------------------
def read_csv(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
self.Data = []
while True:
line = f.readline()
if not line: break
line = line.strip()
self.Data.append(line)
f.close()
self.Idx = 0
if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def read_sequence(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
while True:
line = f.readline()
if not line: break
line = line.strip()
try:
no = int(line)
except:
no = 0
self.Data = no
f.close()
return 1
#------------------------------------------------------------------
def read_keyed(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
groupName = None
group = None
self.Data = {}
while True:
line = f.readline()
if not line: break
line = line.strip()
if (line.find("[") != -1):
group_name = line.replace('[','').replace(']','')
group = Group(group_name)
self.Data[group_name] = group
continue
elif (line.find("#") != -1):
continue
elif (len(line) == 0):
continue
else:
group.append(line)
f.close()
if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def read_indexed(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
self.Data = {}
while True:
line = f.readline()
if not line: break
line = line.strip()
(no, data) = line.split(':')
self.Data[no] = data
f.close()
if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def flush(self):
if not self.Valid:
return
ts = datetime.now().strftime('%Y%m%d%H%M%S')
self.BackupCmd = "cp %s.dat %s.%s" % (self.Name, self.Name, ts)
if self.Type == "CSV":
self.flush_csv()
elif self.Type == "Sequence":
self.flush_sequence()
elif self.Type == "Indexed":
self.flush_indexed()
elif self.Type == "Keyed":
self.flush_keyed()
#------------------------------------------------------------------
def flush_csv(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
return 0
i = self.Idx
while i < len(self.Data):
f.write("%s\n" % self.Data[i])
i += 1
f.close()
#------------------------------------------------------------------
def flush_sequence(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
return 0
f.write("%d\n" % self.Data)
f.close()
#------------------------------------------------------------------
def flush_keyed(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
return 0
group_keys = self.Data.keys()
group_keys.sort()
for key in group_keys:
f.write("[%s]\n" % key)
group = self.Data[key]
i = group.Idx
while i < len(group.Data):
f.write("%s\n" % group.Data[i])
i += 1
f.write("\n")
f.close()
#------------------------------------------------------------------
def flush_indexed(self):
pass
#=====================================================================
def INFO(msg):
if log: log.info(' ' + msg)
if verbose_flg: print "[dserver] %s" % msg
#---------------------------------------------------------------------
def ERROR(msg):
if log: log.error(msg)
sys.stderr.write('[dserver] %s\n' % msg)
#---------------------------------------------------------------------
def WARNING(msg):
if log: log.warning('*****' + msg + '*****')
if verbose_flg: print "[dserver] %s" % msg
#=====================================================================
def read_config():
global PORT
config_file = data_dir + CONFIGFILE
try:
f = open(config_file, 'r')
except IOError, e:
ERROR('Open failed: ' + str(e))
sys.exit(1)
config_flg = False
definition_flg = False
while True:
line = f.readline()
if not line: break
line = line[:-1]
line = line.replace('\r','')
line = line.strip()
if (line.find("#") != -1): continue
if (line.find("[Config]") != -1):
config_flg = True
elif (line.find("Port=") != -1):
definition = line.split("=")
PORT = definition[1]
if (line.find("[Data]") != -1):
definition_flg = True
elif (line.find("Description=") != -1):
definition = line.split("=")
(name, type, delimiter) = definition[1].split(":")
t = Table(name, type, delimiter)
INFO(str(t))
tables.append(t)
f.close()
#---------------------------------------------------------------------
def get_table_index(name):
for i in range(len(tables)):
if (tables[i].Name == name):
return i
return -1
#---------------------------------------------------------------------
def process(str):
msg = str.split("|")
l = len(msg)
if debug_level > 1: INFO("[dserver::process] len %d msg %s" % (l, msg))
ts = datetime.now().strftime('%Y%m%d%H%M%S')
reply = "None"
if (msg[0] == "REG"):
name = msg[1].replace('\n','').replace('\r','')
idx = get_table_index(name)
if debug_level > 0: INFO("[dserver::process] REG '%s' -> %d" % (name, idx))
reply = "%d" % idx
elif (msg[0] == "REGK"):
if (len(msg) != 3):
ERROR("[dserver::process] REGK -> Bad Message", msg)
elif (msg[0] == "REGI"):
if (len(msg) != 2):
ERROR("[dserver::process] REGI -> Bad Message", msg)
elif (msg[0] == "GETN"):
if (len(msg) != 2):
ERROR("[dserver::process] GETN -> Bad Message", msg)
hdl = int(msg[1])
try:
t = tables[hdl]
except:
t = None
if t != None:
if t.Type == 'CSV':
if (t.Idx < len(t.Data)):
reply = t.Data[t.Idx]
t.Idx += 1
else:
reply = "*Exhausted*"
elif t.Type == "Sequence":
reply = "%d" % t.Data
t.Data += 1
else:
reply = "UNKNOWN"
t.ufh.write("%s - %s\n" % (ts, reply))
if debug_level > 2: INFO("[dserver::process] GETN -> %s" % reply)
elif (msg[0] == "GETK"):
if (len(msg) != 3):
ERROR("[dserver::process] GETK -> Bad Message", msg)
hdl = int(msg[1])
grp = msg[2]
try:
t = tables[hdl]
except:
t = None
if t != None:
try:
g = t.Data[grp]
except:
g = None
if g != None:
if (g.Idx < len(g.Data)):
reply = g.Data[g.Idx]
g.Idx += 1
else:
reply = "*Exhausted*"
t.ufh.write("%s - %s::%s\n" % (ts, grp, reply))
if debug_level > 2: INFO("[dserver::process] GETK %s -> %s" % (grp, reply))
elif (msg[0] == "GETI"):
if (len(msg) != 3):
ERROR("[dserver::process] GETI -> Bad Message", msg)
hdl = int(msg[1])
idx = msg[2]
try:
t = tables[hdl]
except:
t = None
if t != None:
try:
reply = t.Data[idx]
except:
reply = "UNDEFINED"
t.ufh.write("%s - %s::%s\n" % (ts, idx, reply))
if debug_level > 2: INFO("[dserver::process] GETI %s -> %s" % (idx, reply))
elif (msg[0] == "STOC"):
if (len(msg) != 3):
ERROR("[dserver::process] STOC -> Bad Message", msg)
hdl = int(msg[1])
data = msg[2]
reply = "0"
try:
t = tables[hdl]
except:
t = None
if t != None:
t.Data.append(data)
t.sfh.write("%s - %s\n" % (ts, data))
t.sfh.flush()
if debug_level > 1: INFO("STOC %s" % data)
reply = "1"
if debug_level > 2: INFO("[dserver::process] STOC %s -> %s" % (data, reply))
elif (msg[0] == "STOK"):
if (len(msg) != 4):
ERROR("[dserver::process] STOK -> Bad Message", msg)
hdl = int(msg[1])
grp = msg[2]
data = msg[3]
reply = "0"
try:
t = tables[hdl]
except:
t = None
if t != None:
if t.Data.has_key(grp):
g = t.Data[grp]
else:
g = Group(grp)
t.Data[grp] = g
if g != None:
g.Data.append(data)
if debug_level > 1: INFO("STOK %s %s" % (grp, data))
t.sfh.write("%s - %s::%s\n" % (ts, grp, data))
reply = "1"
if debug_level > 2: INFO("[dserver::process] STOK %s %s -> %s" % (grp, data, reply))
return reply
#---------------------------------------------------------------------
def sig_term(signum, frame):
"SIGTERM handler"
shutdown()
#---------------------------------------------------------------------
def shutdown():
INFO("Server shutdown at %s" % datetime.now())
for i in range(len(tables)):
tables[i].flush()
try:
os.unlink(pid_path)
except IOError, e:
ERROR('Unlink failed: ' + str(e))
sys.exit(1)
sys.exit(0)
#---------------------------------------------------------------------
def check_running():
try:
pfp = open(pid_path, 'r')
except IOError, (errno, strerror):
pfp = None
# ERROR("I/O error(%s): %s" % (errno, strerror))
except:
ERROR("Unexpected error:", sys.exc_info()[0])
raise
if pfp:
line = pfp.readline()
line = line.strip()
dserver_pid = int(line)
noProcess = 0
try:
os.kill(dserver_pid, 0)
except OSError, e:
if e.errno == 3:
noProcess = 1
else:
ERROR("kill() failed:" + str(e))
sys.exit(0)
if noProcess:
INFO("[dserver] Stale dserver pid file!")
pfp.close()
os.unlink(pid_path)
return None
else:
pfp.close()
return dserver_pid
return dserver_pid
else:
return None
#---------------------------------------------------------------------
def create_pidfile():
pid = os.getpid()
try:
pfp = open(pid_path, 'w')
except IOError, e:
ERROR("Open failed - " + str(e))
sys.exit(0)
pfp.write("%d" % pid)
pfp.close()
INFO("Running server with PID -> %d" % pid)
return pid
#---------------------------------------------------------------------
def become_daemon():
pid = os.fork()
if pid == 0: # In child
pid = create_pidfile()
time.sleep(1)
elif pid == -1: # Should not happen!
ERROR("fork() failed!")
time.sleep(1)
sys.exit(0)
else: # In Parent
time.sleep(1)
sys.exit(0)
time.sleep(2)
os.setsid()
return pid
#---------------------------------------------------------------------
def init():
pid = check_running()
if pid:
print "[dserver] Server already running! (pid = %d)" % pid
sys.exit(0)
if daemon_flg:
pid = become_daemon()
else:
pid = create_pidfile()
global log
log = logging.getLogger('dserver')
hdlr = logging.FileHandler(LOGFILE)
fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(fmtr)
log.addHandler(hdlr)
log.setLevel(logging.INFO)
INFO("Started processing")
read_config()
if (not silent_flg):
INFO("Server PID is %d" % pid)
#---------------------------------------------------------------------
def terminate():
dserver_pid = check_running()
if dserver_pid:
if (not silent_flg):
INFO("Terminating server with pid, %d" % dserver_pid)
os.kill(dserver_pid, signal.SIGTERM)
if (wait_flg):
while True:
try:
kill(dserver_pid, 0)
except OSError, e:
if e.errno == 3:
break
else:
ERROR("kill() failed:" + str(e))
sys.exit(0)
time.sleep(1)
return 0
#---------------------------------------------------------------------
def check():
pid = check_running()
if pid:
print "[dserver] Server already running! (pid = %d)" % pid
sys.exit(0)
else:
print "[dserver] Server not running"
#==== Socket Server ==================================================
def init_connection():
global sockobj
sockobj = socket(AF_INET, SOCK_STREAM) # make a TCP socket object
sockobj.bind((HOST, PORT)) # bind it to server port number
sockobj.listen(10) # allow upto 10 pending connects
#---------------------------------------------------------------------
def handle_client(connection): # in spawned thread: reply
while True: # read, write a client socket
try:
request = connection.recv(1024)
except:
break
if debug_level > 0: INFO('[dserver] Request -> "%s"' % request)
if not request: break
reply = process(request)
if debug_level > 0: INFO('[dserver] Reply -> "%s..."' % reply[0:30])
connection.send(reply)
connection.close()
#---------------------------------------------------------------------
def dispatcher():
while True:
# Wait for next connection,
connection, address = sockobj.accept()
INFO('Host (%s) - Connected at %s' % (address[0], datetime.now()))
thread.start_new(handle_client, (connection,))
#=====================================================================
def main():
global check_flg
global daemon_flg
global terminate_flg
global verbose_flg
global wait_flg
global debug_level
global dserver_dir
global data_dir
global pid_path
try:
opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?")
except getopt.error, msg:
print __doc__
return 1
try:
dserver_dir = os.environ["DSERVER_DIR"]
except KeyError, e:
print "Set DSERVER_DIR environment variable and rerun!"
return 1
wrk_path = os.getcwd()
wrk_dir = os.path.basename(wrk_path)
# data_dir = dserver_dir + '/DATA/'
data_dir = wrk_path + '/DATA/'
pid_path = data_dir + PIDFILE
os.chdir(data_dir)
for o, a in opts:
if o == '-d':
debug_level += 1
elif o == '-c':
check_flg = True
elif o == '-D':
daemon_flg = True
elif o == '-s':
tsilent_flg = True
elif o == '-T':
terminate_flg = True
elif o == '-v':
verbose_flg = True
elif o == '-V':
print "[dserver] Version: %s" % __version__
return 1
elif o == '-w':
wait_flg = True
elif o == '-?':
print __doc__
return 1
print "[dserver] Listening on port %s - running from %s" % (PORT, os.getcwd())
if check_flg:
check()
return 0
if terminate_flg:
terminate()
return 0
if (debug_level > 0): print "Debugging level set to %d" % debug_level
if args:
for arg in args:
print arg
signal.signal(signal.SIGTERM, sig_term)
init()
init_connection()
dispatcher()
return 0
#---------------------------------------------------------------------
if __name__ == '__main__' or __name__ == sys.argv[0]:
try:
sys.exit(main())
except KeyboardInterrupt, e:
print "[dserver] Interrupted!"
shutdown()
#---------------------------------------------------------------------
"""
Revision History:
Date Who Description
-------- --- --------------------------------------------------
20031014 plh Initial implementation
Problems to fix:
To Do:
Issues:
"""
Other
#!/usr/bin/env python
#
# Purpose: Threaded data server implementation
#
# $Id:$
#
#---------------------------------------------------------------------
"""
Threaded server model
Server side: open a socket on a port, listen for
a message from a client, and accept a request and
service it.
The server spawns a thread to handle each client connection.
Threads share global memory space with main thread;
This is more portable than fork -- not yet on Windows;
This version has been extended to use the standard Python
logging module.
Add the delimiter to the INI file to allow use of alternate
delimiters in transmitted data - so data with embedded commas
can be used.
"""
#---------------------------------------------------------------------
import os
import re
import csv
import sys
import getopt
import thread
import time
import signal
import logging
#---------------------------------------------------------------------
from socket import * # get socket constructor and constants
from datetime import datetime
#---------------------------------------------------------------------
__version__ = "1.1.3"
__id__ = "@(#) dserver.py [%s] 30/04/2008"
check_flg = False
daemon_flg = False
silent_flg = False
terminate_flg = False
verbose_flg = False
wait_flg = False
debug_level = 0
HOST = '' # Host server - '' means localhost
PORT = 9575 # Listen on a non-reserved port number
sockobj = None
dserver_dir = None
data_dir = None
pid_path = None
CONFIGFILE = "dserver.ini"
LOGFILE = "dserver.log"
PIDFILE = "dserver.pid"
tables = []
INVALID = "INVALID"
log = None
#=====================================================================
class Group:
Name = None
Idx = None
Data = None
def __init__(self, name):
self.Name = name
self.Idx = 0
self.Data = []
def __str__(self):
s = "Grp %s Len %d" % (self.Name, len(self.Data))
return s
def append(self, s):
self.Data.append(s)
def set(self):
if len(self.Data) > 0:
self.Idx = 0
else:
self.Idx = -1
#---------------------------------------------------------------------
class Table:
Count = 0
Valid = False
Name = None
Type = None
Idx = None
Data = None
def __init__(self, name, type, delimiter=','):
self.Name = name
self.Type = type
self.Delimiter = delimiter
self.File = name + ".dat"
self.Used = name + ".used"
self.Stored = name + ".stored"
if self.Type == "CSV":
rc = self.read_csv()
elif self.Type == "Sequence":
rc = self.read_sequence()
elif self.Type == "Indexed":
rc = self.read_indexed()
elif self.Type == "Keyed":
rc = self.read_keyed()
if rc > 0:
self.Valid = True
try:
self.ufh = open(self.Used, 'a+')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
try:
self.sfh = open(self.Stored, 'a+')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
#------------------------------------------------------------------
def __str__(self):
s = "Table: %-10s Type: %-10s" % (self.Name, self.Type)
if self.Valid:
s += " * "
if self.Type == "CSV":
s += " %d rows" % len(self.Data)
elif self.Type == "Sequence":
s += " Starting value %d" % self.Data
elif self.Type == "Indexed":
s += " %d rows" % len(self.Data)
elif self.Type == "Keyed":
s += " %d groups" % len(self.Data)
else:
s += " "
return s
#------------------------------------------------------------------
def read_csv(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
self.Data = []
while True:
line = f.readline()
if not line: break
line = line.strip()
self.Data.append(line)
f.close()
self.Idx = 0
if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def read_sequence(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
while True:
line = f.readline()
if not line: break
line = line.strip()
try:
no = int(line)
except:
no = 0
self.Data = no
f.close()
return 1
#------------------------------------------------------------------
def read_keyed(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
groupName = None
group = None
self.Data = {}
while True:
line = f.readline()
if not line: break
line = line.strip()
if (line.find("[") != -1):
group_name = line.replace('[','').replace(']','')
group = Group(group_name)
self.Data[group_name] = group
continue
elif (line.find("#") != -1):
continue
elif (len(line) == 0):
continue
else:
group.append(line)
f.close()
if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def read_indexed(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
self.Data = {}
while True:
line = f.readline()
if not line: break
line = line.strip()
(no, data) = line.split(':')
self.Data[no] = data
f.close()
if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def flush(self):
if not self.Valid:
return
ts = datetime.now().strftime('%Y%m%d%H%M%S')
self.BackupCmd = "cp %s.dat %s.%s" % (self.Name, self.Name, ts)
print "Flushing %s" % self.Name
if self.Type == "CSV":
self.flush_csv()
elif self.Type == "Sequence":
self.flush_sequence()
elif self.Type == "Indexed":
self.flush_indexed()
elif self.Type == "Keyed":
self.flush_keyed()
#------------------------------------------------------------------
def flush_csv(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
return 0
i = self.Idx
while i < len(self.Data):
f.write("%s\n" % self.Data[i])
i += 1
f.close()
#------------------------------------------------------------------
def flush_sequence(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
return 0
f.write("%d\n" % self.Data)
f.close()
#------------------------------------------------------------------
def flush_keyed(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
return 0
group_keys = self.Data.keys()
group_keys.sort()
for key in group_keys:
f.write("[%s]\n" % key)
group = self.Data[key]
i = group.Idx
while i < len(group.Data):
f.write("%s\n" % group.Data[i])
i += 1
f.write("\n")
f.close()
#------------------------------------------------------------------
def flush_indexed(self):
pass
#=====================================================================
def INFO(msg):
if log: log.info(' ' + msg)
if verbose_flg: print "[dserver] %s" % msg
#---------------------------------------------------------------------
def ERROR(msg):
if log: log.error(msg)
sys.stderr.write('[dserver] %s\n' % msg)
#---------------------------------------------------------------------
def WARNING(msg):
if log: log.warning('*****' + msg + '*****')
if verbose_flg: print "[dserver] %s" % msg
#=====================================================================
def read_config():
global PORT
config_file = data_dir + CONFIGFILE
try:
f = open(config_file, 'r')
except IOError, e:
ERROR('Open failed: ' + str(e))
sys.exit(1)
config_flg = False
definition_flg = False
while True:
line = f.readline()
if not line: break
line = line[:-1]
line = line.replace('\r','')
line = line.strip()
if (line.find("#") != -1): continue
if (line.find("[Config]") != -1):
config_flg = True
elif (line.find("Port=") != -1):
definition = line.split("=")
PORT = int(definition[1].strip())
if (line.find("[Data]") != -1):
definition_flg = True
elif (line.find("Description=") != -1):
definition = line.split("=")
(name, type, delimiter) = definition[1].split(":")
t = Table(name, type, delimiter)
INFO(str(t))
tables.append(t)
f.close()
#---------------------------------------------------------------------
def get_table_index(name):
for i in range(len(tables)):
if (tables[i].Name == name):
return i
return -1
#---------------------------------------------------------------------
def process(str):
msg = str.split("|")
l = len(msg)
if debug_level > 1: INFO("[dserver::process] len %d msg %s" % (l, msg))
ts = datetime.now().strftime('%Y%m%d%H%M%S')
reply = "None"
if (msg[0] == "REG"):
name = msg[1].replace('\n','').replace('\r','')
idx = get_table_index(name)
if debug_level > 0: INFO("[dserver::process] REG '%s' -> %d" % (name, idx))
reply = "%d" % idx
elif (msg[0] == "REGK"):
if (len(msg) != 3):
ERROR("[dserver::process] REGK -> Bad Message", msg)
elif (msg[0] == "REGI"):
if (len(msg) != 2):
ERROR("[dserver::process] REGI -> Bad Message", msg)
elif (msg[0] == "GETN"):
if (len(msg) != 2):
ERROR("[dserver::process] GETN -> Bad Message", msg)
hdl = int(msg[1])
try:
t = tables[hdl]
except:
t = None
if t != None:
if t.Type == 'CSV':
if (t.Idx < len(t.Data)):
reply = t.Data[t.Idx]
t.Idx += 1
else:
reply = "*Exhausted*"
elif t.Type == "Sequence":
reply = "%d" % t.Data
t.Data += 1
else:
reply = "UNKNOWN"
t.ufh.write("%s - %s\n" % (ts, reply))
if debug_level > 2: INFO("[dserver::process] GETN -> %s" % reply)
elif (msg[0] == "GETK"):
if (len(msg) != 3):
ERROR("[dserver::process] GETK -> Bad Message", msg)
hdl = int(msg[1])
grp = msg[2]
try:
t = tables[hdl]
except:
t = None
if t != None:
try:
g = t.Data[grp]
except:
g = None
if g != None:
if (g.Idx < len(g.Data)):
reply = g.Data[g.Idx]
reply = re.sub(", *", ",", reply)
g.Idx += 1
else:
reply = "*Exhausted*"
t.ufh.write("%s - %s::%s\n" % (ts, grp, reply))
if debug_level > 2: INFO("[dserver::process] GETK %s -> %s" % (grp, reply))
elif (msg[0] == "GETI"):
if (len(msg) != 3):
ERROR("[dserver::process] GETI -> Bad Message", msg)
hdl = int(msg[1])
idx = msg[2]
try:
t = tables[hdl]
except:
t = None
if t != None:
try:
reply = t.Data[idx]
except:
reply = "UNDEFINED"
t.ufh.write("%s - %s::%s\n" % (ts, idx, reply))
if debug_level > 2: INFO("[dserver::process] GETI %s -> %s" % (idx, reply))
elif (msg[0] == "STOC"):
if (len(msg) != 3):
ERROR("[dserver::process] STOC -> Bad Message", msg)
hdl = int(msg[1])
data = msg[2]
reply = "0"
try:
t = tables[hdl]
except:
t = None
if t != None:
t.Data.append(data)
t.sfh.write("%s - %s\n" % (ts, data))
t.sfh.flush()
if debug_level > 1: INFO("STOC %s" % data)
reply = "1"
if debug_level > 2: INFO("[dserver::process] STOC %s -> %s" % (data, reply))
elif (msg[0] == "STOK"):
if (len(msg) != 4):
ERROR("[dserver::process] STOK -> Bad Message", msg)
hdl = int(msg[1])
grp = msg[2]
data = msg[3]
reply = "0"
try:
t = tables[hdl]
except:
t = None
if t != None:
if t.Data.has_key(grp):
g = t.Data[grp]
else:
g = Group(grp)
t.Data[grp] = g
if g != None:
g.Data.append(data)
if debug_level > 1: INFO("STOK %s %s" % (grp, data))
t.sfh.write("%s - %s::%s\n" % (ts, grp, data))
reply = "1"
if debug_level > 2: INFO("[dserver::process] STOK %s %s -> %s" % (grp, data, reply))
return reply
#---------------------------------------------------------------------
def sig_term(signum, frame):
"SIGTERM handler"
shutdown()
#---------------------------------------------------------------------
def shutdown():
INFO("Server shutdown at %s" % datetime.now())
print "\n"
for i in range(len(tables)):
tables[i].flush()
print "*SHUTDOWN*"
try:
os.unlink(pid_path)
except IOError, e:
ERROR('Unlink failed: ' + str(e))
sys.exit(1)
sys.exit(0)
#---------------------------------------------------------------------
def check_running():
try:
pfp = open(pid_path, 'r')
except IOError, (errno, strerror):
pfp = None
# ERROR("I/O error(%s): %s" % (errno, strerror))
except:
ERROR("Unexpected error:", sys.exc_info()[0])
raise
if pfp:
line = pfp.readline()
line = line.strip()
dserver_pid = int(line)
noProcess = 0
try:
os.kill(dserver_pid, 0)
except OSError, e:
if e.errno == 3:
noProcess = 1
else:
ERROR("kill() failed:" + str(e))
sys.exit(0)
if noProcess:
INFO("[dserver] Stale dserver pid file!")
pfp.close()
os.unlink(pid_path)
return None
else:
pfp.close()
return dserver_pid
return dserver_pid
else:
return None
#---------------------------------------------------------------------
def create_pidfile():
pid = os.getpid()
try:
pfp = open(pid_path, 'w')
except IOError, e:
ERROR("Open failed - " + str(e))
sys.exit(0)
pfp.write("%d" % pid)
pfp.close()
INFO("Running server with PID -> %d" % pid)
return pid
#---------------------------------------------------------------------
def become_daemon():
pid = os.fork()
if pid == 0: # In child
pid = create_pidfile()
time.sleep(1)
elif pid == -1: # Should not happen!
ERROR("fork() failed!")
time.sleep(1)
sys.exit(0)
else: # In Parent
time.sleep(1)
sys.exit(0)
time.sleep(2)
os.setsid()
return pid
#---------------------------------------------------------------------
def init():
pid = check_running()
if pid:
print "[dserver] Server already running! (pid = %d)" % pid
sys.exit(0)
if daemon_flg:
pid = become_daemon()
else:
pid = create_pidfile()
global log
log = logging.getLogger('dserver')
hdlr = logging.FileHandler(LOGFILE)
fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(fmtr)
log.addHandler(hdlr)
log.setLevel(logging.INFO)
INFO("Started processing")
read_config()
if (not silent_flg):
INFO("Server PID is %d" % pid)
print "\nData Loaded..."
#---------------------------------------------------------------------
def terminate():
dserver_pid = check_running()
if dserver_pid:
if (not silent_flg):
INFO("Terminating server with pid, %d" % dserver_pid)
os.kill(dserver_pid, signal.SIGTERM)
if (wait_flg):
while True:
try:
kill(dserver_pid, 0)
except OSError, e:
if e.errno == 3:
break
else:
ERROR("kill() failed:" + str(e))
sys.exit(0)
time.sleep(1)
return 0
#---------------------------------------------------------------------
def check():
pid = check_running()
if pid:
print "[dserver] Server already running! (pid = %d)" % pid
sys.exit(0)
else:
print "[dserver] Server not running"
#==== Socket Server ==================================================
def init_connection():
global sockobj
sockobj = socket(AF_INET, SOCK_STREAM) # make a TCP socket object
sockobj.bind((HOST, PORT)) # bind it to server port number
sockobj.listen(10) # allow upto 10 pending connects
#---------------------------------------------------------------------
def handle_client(connection): # in spawned thread: reply
while True: # read, write a client socket
try:
request = connection.recv(1024)
except:
break
if debug_level > 0: INFO('[dserver] Request -> "%s"' % request)
if not request: break
reply = process(request)
if debug_level > 0: INFO('[dserver] Reply -> "%s..."' % reply[0:30])
connection.send(reply)
connection.close()
#---------------------------------------------------------------------
def dispatcher():
while True:
# Wait for next connection,
connection, address = sockobj.accept()
INFO('Host (%s) - Connected at %s' % (address[0], datetime.now()))
thread.start_new(handle_client, (connection,))
#=====================================================================
def main():
global check_flg
global daemon_flg
global terminate_flg
global verbose_flg
global wait_flg
global debug_level
global dserver_dir
global data_dir
global pid_path
try:
opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?")
except getopt.error, msg:
print __doc__
return 1
try:
dserver_dir = os.environ["DSERVER_DIR"]
except KeyError, e:
print "Set DSERVER_DIR environment variable and rerun!"
return 1
wrk_path = os.getcwd()
wrk_dir = os.path.basename(wrk_path)
# data_dir = dserver_dir + '/DATA/'
data_dir = wrk_path + '/DATA/'
pid_path = data_dir + PIDFILE
os.chdir(data_dir)
for o, a in opts:
if o == '-d':
debug_level += 1
elif o == '-c':
check_flg = True
elif o == '-D':
daemon_flg = True
elif o == '-s':
tsilent_flg = True
elif o == '-T':
terminate_flg = True
elif o == '-v':
verbose_flg = True
elif o == '-V':
print "[dserver] Version: %s" % __version__
return 1
elif o == '-w':
wait_flg = True
elif o == '-?':
print __doc__
return 1
print "[dserver] Listening on port %s - running from %s" % (PORT, os.getcwd())
if check_flg:
check()
return 0
if terminate_flg:
terminate()
return 0
if (debug_level > 0): print "Debugging level set to %d" % debug_level
if args:
for arg in args:
print arg
signal.signal(signal.SIGTERM, sig_term)
init()
init_connection()
dispatcher()
return 0
#---------------------------------------------------------------------
if __name__ == '__main__' or __name__ == sys.argv[0]:
try:
sys.exit(main())
except KeyboardInterrupt, e:
print "[dserver] Interrupted!"
shutdown()
#---------------------------------------------------------------------
"""
Revision History:
Date Who Description
-------- --- --------------------------------------------------
20031014 plh Initial implementation
Problems to fix:
To Do:
Issues:
"""
Old Versions
1.1.5
#!/usr/bin/env python
#
# Purpose: Threaded data server implementation
#
# $Id:$
#
#---------------------------------------------------------------------
"""
Threaded server model
Server side: open a socket on a port, listen for
a message from a client, and accept a request and
service it.
The server spawns a thread to handle each client connection.
Threads share global memory space with main thread;
This is more portable than fork -- not yet on Windows;
This version has been extended to use the standard Python
logging module.
Add the delimiter to the INI file to allow use of alternate
delimiters in transmitted data - so data with embedded commas
can be used.
"""
#---------------------------------------------------------------------
import os
import re
import csv
import sys
import getopt
import thread
import time
import signal
import logging
#---------------------------------------------------------------------
from socket import * # get socket constructor and constants
from datetime import datetime
#---------------------------------------------------------------------
__version__ = "1.1.5"
__id__ = "@(#) dserver.py [%s] 2008-06-10" % __version__
check_flg = False
daemon_flg = False
silent_flg = False
terminate_flg = False
verbose_flg = False
wait_flg = False
debug_level = 0
HOST = '' # Host server - '' means localhost
PORT = 9572 # Listen on a non-reserved port number
sockobj = None
dserver_dir = None
data_dir = None
pid_path = None
CONFIGFILE = "dserver.ini"
LOGFILE = "dserver.log"
PIDFILE = "dserver.pid"
tables = []
INVALID = "INVALID"
log = None
#=====================================================================
class Group:
Name = None
Idx = None
Data = None
def __init__(self, name):
self.Name = name
self.Idx = 0
self.Data = []
def __str__(self):
s = "Grp %s Len %d" % (self.Name, len(self.Data))
return s
def append(self, s):
self.Data.append(s)
def set(self):
if len(self.Data) > 0:
self.Idx = 0
else:
self.Idx = -1
#---------------------------------------------------------------------
class Table:
Count = 0
Valid = False
Name = None
Type = None
Idx = None
Data = None
def __init__(self, name, type, delimiter=','):
self.Name = name
self.Type = type
self.Delimiter = delimiter
self.File = name + ".dat"
self.Used = name + ".used"
self.Stored = name + ".stored"
sys.stderr.write("Loading %s\n" % self.Name)
sys.stderr.flush()
if self.Type == "CSV":
rc = self.read_csv()
elif self.Type == "Sequence":
rc = self.read_sequence()
elif self.Type == "Indexed":
rc = self.read_indexed()
elif self.Type == "Keyed":
rc = self.read_keyed()
if rc > 0:
self.Valid = True
try:
self.ufh = open(self.Used, 'a+')
except IOError, e:
sys.stderr.write('[dserver] Open failed: %s\n' % str(e))
sys.exit(1)
try:
self.sfh = open(self.Stored, 'a+')
except IOError, e:
sys.stderr.write('[dserver] Open failed: %s\n' % str(e))
sys.exit(1)
#------------------------------------------------------------------
def __str__(self):
s = "Table: %-22s Type: %-10s" % (self.Name, self.Type)
if self.Valid:
s += " * "
if self.Type == "CSV":
s += " %7d rows" % len(self.Data)
elif self.Type == "Sequence":
s += " Starting value %d" % self.Data
elif self.Type == "Indexed":
s += " %7d rows" % len(self.Data)
elif self.Type == "Keyed":
s += " %7d groups" % len(self.Data)
else:
s += " "
return s
#------------------------------------------------------------------
def read_csv(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: %s\n' % str(e))
sys.exit(1)
self.Data = []
while True:
line = f.readline()
if not line: break
line = line.strip()
self.Data.append(line)
f.close()
self.Idx = 0
if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def read_sequence(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: %s\n' % str(e))
sys.exit(1)
while True:
line = f.readline()
if not line: break
line = line.strip()
try:
no = int(line)
except:
no = 0
self.Data = no
f.close()
return 1
#------------------------------------------------------------------
def read_keyed(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: %s\n' % str(e))
sys.exit(1)
groupName = None
group = None
self.Data = {}
while True:
line = f.readline()
if not line: break
line = line.strip()
if (line.find("[") != -1):
group_name = line.replace('[','').replace(']','')
group = Group(group_name)
self.Data[group_name] = group
continue
elif (line.find("#") != -1):
continue
elif (len(line) == 0):
continue
else:
group.append(line)
f.close()
if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def read_indexed(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: %s\n' % str(e))
sys.exit(1)
self.Data = {}
while True:
line = f.readline()
if not line: break
line = line.strip()
try:
(no, data) = line.split(':')
except ValueError, e:
sys.stderr.write('[dserver] Parse failed (%s): %s \n' % (self.File, str(e)))
sys.exit(1)
self.Data[no] = data
f.close()
if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def flush(self):
if not self.Valid:
return
ts = datetime.now().strftime('%Y%m%d%H%M%S')
self.BackupCmd = "cp %s.dat %s.%s" % (self.Name, self.Name, ts)
print "Flushing %s" % self.Name
if self.Type == "CSV":
self.flush_csv()
elif self.Type == "Sequence":
self.flush_sequence()
elif self.Type == "Indexed":
self.flush_indexed()
elif self.Type == "Keyed":
self.flush_keyed()
#------------------------------------------------------------------
def flush_csv(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: %s\n' % str(e))
return 0
i = self.Idx
while i < len(self.Data):
f.write("%s\n" % self.Data[i])
i += 1
f.close()
#------------------------------------------------------------------
def flush_sequence(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: %s\n' % str(e))
return 0
f.write("%d\n" % self.Data)
f.close()
#------------------------------------------------------------------
def flush_keyed(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: %s\n' % str(e))
return 0
group_keys = self.Data.keys()
group_keys.sort()
for key in group_keys:
f.write("[%s]\n" % key)
group = self.Data[key]
i = group.Idx
while i < len(group.Data):
f.write("%s\n" % group.Data[i])
i += 1
f.write("\n")
f.close()
#------------------------------------------------------------------
def flush_indexed(self):
pass
#=====================================================================
def INFO(msg):
if log: log.info(' ' + msg)
if verbose_flg: print "[dserver] %s" % msg
#---------------------------------------------------------------------
def ERROR(msg):
if log: log.error(msg)
sys.stderr.write('[dserver] %s\n' % msg)
#---------------------------------------------------------------------
def WARNING(msg):
if log: log.warning('*****' + msg + '*****')
if verbose_flg: print "[dserver] %s" % msg
#=====================================================================
def read_config():
global PORT
config_file = data_dir + CONFIGFILE
try:
f = open(config_file, 'r')
except IOError, e:
ERROR('Open failed: %s' % str(e))
sys.stderr.write('[dserver] Open failed: %s\n' % str(e))
sys.exit(1)
config_flg = False
definition_flg = False
while True:
line = f.readline()
if not line: break
line = line[:-1]
line = line.replace('\r','')
line = line.strip()
if (line.find("#") != -1): continue
if (line.find("[Config]") != -1):
config_flg = True
elif (line.find("Port=") != -1):
definition = line.split("=")
PORT = int(definition[1].strip())
if (line.find("[Data]") != -1):
definition_flg = True
elif (line.find("Description=") != -1):
definition = line.split("=")
(name, type, delimiter) = definition[1].split(":")
t = Table(name, type, delimiter)
INFO(str(t))
tables.append(t)
f.close()
#---------------------------------------------------------------------
def get_table_index(name):
for i in range(len(tables)):
if (tables[i].Name == name):
return i
return -1
#---------------------------------------------------------------------
def process(str):
msg = str.split("|")
l = len(msg)
if debug_level > 1: INFO("[dserver::process] len %d msg %s" % (l, msg))
ts = datetime.now().strftime('%Y%m%d%H%M%S')
reply = "None"
if (msg[0] == "REG"):
name = msg[1].replace('\n','').replace('\r','')
idx = get_table_index(name)
if debug_level > 0: INFO("[dserver::process] REG '%s' -> %d" % (name, idx))
reply = "%d" % idx
elif (msg[0] == "REGK"):
if (len(msg) != 3):
ERROR("[dserver::process] REGK -> Bad Message", msg)
elif (msg[0] == "REGI"):
if (len(msg) != 2):
ERROR("[dserver::process] REGI -> Bad Message", msg)
elif (msg[0] == "GETN"):
if (len(msg) != 2):
ERROR("[dserver::process] GETN -> Bad Message", msg)
hdl = int(msg[1])
try:
t = tables[hdl]
except:
t = None
if t != None:
if t.Type == 'CSV':
if (t.Idx < len(t.Data)):
reply = t.Data[t.Idx]
t.Idx += 1
else:
reply = "*Exhausted*"
elif t.Type == "Sequence":
reply = "%d" % t.Data
t.Data += 1
else:
reply = "UNKNOWN"
t.ufh.write("%s - %s\n" % (ts, reply))
if debug_level > 2: INFO("[dserver::process] GETN -> %s" % reply)
elif (msg[0] == "GETK"):
if (len(msg) != 3):
ERROR("[dserver::process] GETK -> Bad Message", msg)
hdl = int(msg[1])
grp = msg[2]
try:
t = tables[hdl]
except:
t = None
if t != None:
try:
g = t.Data[grp]
except:
g = None
if g != None:
if (g.Idx < len(g.Data)):
reply = g.Data[g.Idx]
reply = re.sub(", *", ",", reply)
g.Idx += 1
else:
reply = "*Exhausted*"
t.ufh.write("%s - %s::%s\n" % (ts, grp, reply))
if debug_level > 2: INFO("[dserver::process] GETK %s -> %s" % (grp, reply))
elif (msg[0] == "GETI"):
if (len(msg) != 3):
ERROR("[dserver::process] GETI -> Bad Message", msg)
hdl = int(msg[1])
idx = msg[2]
try:
t = tables[hdl]
except:
t = None
if t != None:
try:
reply = t.Data[idx]
except:
reply = "UNDEFINED"
t.ufh.write("%s - %s::%s\n" % (ts, idx, reply))
if debug_level > 2: INFO("[dserver::process] GETI %s -> %s" % (idx, reply))
elif (msg[0] == "STOC"):
if (len(msg) != 3):
ERROR("[dserver::process] STOC -> Bad Message", msg)
hdl = int(msg[1])
data = msg[2]
reply = "0"
try:
t = tables[hdl]
except:
t = None
if t != None:
t.Data.append(data)
t.sfh.write("%s - %s\n" % (ts, data))
t.sfh.flush()
if debug_level > 1: INFO("STOC %s" % data)
reply = "1"
if debug_level > 2: INFO("[dserver::process] STOC %s -> %s" % (data, reply))
elif (msg[0] == "STOK"):
if (len(msg) != 4):
ERROR("[dserver::process] STOK -> Bad Message", msg)
hdl = int(msg[1])
grp = msg[2]
data = msg[3]
reply = "0"
try:
t = tables[hdl]
except:
t = None
if t != None:
if t.Data.has_key(grp):
g = t.Data[grp]
else:
g = Group(grp)
t.Data[grp] = g
if g != None:
g.Data.append(data)
if debug_level > 1: INFO("STOK %s %s" % (grp, data))
t.sfh.write("%s - %s::%s\n" % (ts, grp, data))
reply = "1"
if debug_level > 2: INFO("[dserver::process] STOK %s %s -> %s" % (grp, data, reply))
return reply
#---------------------------------------------------------------------
def sig_term(signum, frame):
"SIGTERM handler"
shutdown()
#---------------------------------------------------------------------
def shutdown():
INFO("Server shutdown at %s" % datetime.now())
print "\n"
for i in range(len(tables)):
tables[i].flush()
print "*SHUTDOWN*"
try:
os.unlink(pid_path)
except IOError, e:
ERROR('Unlink failed: %s' % str(e))
sys.exit(1)
sys.exit(0)
#---------------------------------------------------------------------
def check_running():
try:
pfp = open(pid_path, 'r')
except IOError, (errno, strerror):
pfp = None
# ERROR("I/O error(%s): %s" % (errno, strerror))
except:
ERROR("Unexpected error: %s" % sys.exc_info()[0])
raise
if pfp:
line = pfp.readline()
line = line.strip()
dserver_pid = int(line)
noProcess = 0
try:
os.kill(dserver_pid, 0)
except OSError, e:
if e.errno == 3:
noProcess = 1
else:
ERROR("kill() failed: %s" % str(e))
sys.exit(0)
if noProcess:
INFO("[dserver] Stale dserver pid file!")
pfp.close()
os.unlink(pid_path)
return None
else:
pfp.close()
return dserver_pid
return dserver_pid
else:
return None
#---------------------------------------------------------------------
def create_pidfile():
pid = os.getpid()
try:
pfp = open(pid_path, 'w')
except IOError, e:
ERROR("Open failed: %s" % str(e))
sys.exit(0)
pfp.write("%d" % pid)
pfp.close()
INFO("Running server with PID -> %d" % pid)
return pid
#---------------------------------------------------------------------
def become_daemon():
pid = os.fork()
if pid == 0: # In child
pid = create_pidfile()
time.sleep(1)
elif pid == -1: # Should not happen!
ERROR("fork() failed!")
time.sleep(1)
sys.exit(0)
else: # In Parent
time.sleep(1)
sys.exit(0)
time.sleep(2)
os.setsid()
return pid
#---------------------------------------------------------------------
def init():
pid = check_running()
if pid:
print "[dserver] Server already running! (pid = %d)" % pid
sys.exit(0)
if daemon_flg:
pid = become_daemon()
else:
pid = create_pidfile()
global log
log = logging.getLogger('dserver')
hdlr = logging.FileHandler(LOGFILE)
fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(fmtr)
log.addHandler(hdlr)
log.setLevel(logging.INFO)
INFO("Started processing")
read_config()
if (not silent_flg):
INFO("Server PID is %d" % pid)
print "\nData Loaded..."
#---------------------------------------------------------------------
def terminate():
dserver_pid = check_running()
if dserver_pid:
if (not silent_flg):
INFO("Terminating server with pid, %d" % dserver_pid)
os.kill(dserver_pid, signal.SIGTERM)
if (wait_flg):
while True:
try:
kill(dserver_pid, 0)
except OSError, e:
if e.errno == 3:
break
else:
ERROR("kill() failed: %s" % str(e))
sys.exit(0)
time.sleep(1)
return 0
#---------------------------------------------------------------------
def check():
pid = check_running()
if pid:
print "[dserver] Server already running! (pid = %d)" % pid
sys.exit(0)
else:
print "[dserver] Server not running"
#==== Socket Server ==================================================
def init_connection():
global sockobj
sockobj = socket(AF_INET, SOCK_STREAM) # make a TCP socket object
sockobj.bind((HOST, PORT)) # bind it to server port number
sockobj.listen(10) # allow upto 10 pending connects
#---------------------------------------------------------------------
def handle_client(connection): # in spawned thread: reply
while True: # read, write a client socket
try:
request = connection.recv(1024)
except:
break
if debug_level > 0: INFO('[dserver] Request -> "%s"' % request)
if not request: break
reply = process(request)
if debug_level > 0: INFO('[dserver] Reply -> "%s..."' % reply[0:30])
connection.send(reply)
connection.close()
#---------------------------------------------------------------------
def dispatcher():
while True:
# Wait for next connection,
connection, address = sockobj.accept()
INFO('Host (%s) - Connected at %s' % (address[0], datetime.now()))
thread.start_new(handle_client, (connection,))
#=====================================================================
def main():
global check_flg
global daemon_flg
global terminate_flg
global verbose_flg
global wait_flg
global debug_level
global dserver_dir
global data_dir
global pid_path
try:
opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?")
except getopt.error, msg:
print __doc__
return 1
try:
dserver_dir = os.environ["DSERVER_DIR"]
except KeyError, e:
print "Set DSERVER_DIR environment variable and rerun!"
return 1
wrk_path = os.getcwd()
wrk_dir = os.path.basename(wrk_path)
# data_dir = dserver_dir + '/DATA/'
data_dir = wrk_path + '/DATA/'
pid_path = data_dir + PIDFILE
os.chdir(data_dir)
for o, a in opts:
if o == '-d':
debug_level += 1
elif o == '-c':
check_flg = True
elif o == '-D':
daemon_flg = True
elif o == '-s':
tsilent_flg = True
elif o == '-T':
terminate_flg = True
elif o == '-v':
verbose_flg = True
elif o == '-V':
print "[dserver] Version: %s" % __version__
return 1
elif o == '-w':
wait_flg = True
elif o == '-?':
print __doc__
return 1
print "[dserver] Listening on port %s - running from %s" % (PORT, os.getcwd())
if check_flg:
check()
return 0
if terminate_flg:
terminate()
return 0
if (debug_level > 0): print "Debugging level set to %d" % debug_level
if args:
for arg in args:
print arg
signal.signal(signal.SIGTERM, sig_term)
init()
init_connection()
dispatcher()
return 0
#---------------------------------------------------------------------
if __name__ == '__main__' or __name__ == sys.argv[0]:
try:
sys.exit(main())
except KeyboardInterrupt, e:
print "[dserver] Interrupted!"
shutdown()
#---------------------------------------------------------------------
"""
Revision History:
Date Who Description
-------- --- --------------------------------------------------
20031014 plh Initial implementation
20080609 plh Added exception handling to read_indexed()
20080609 plh Reformatted exception strings
20080610 plh Reformatted log text for load
20080610 plh Reviewed __id__ and __version__ strings
Problems to fix:
To Do:
Issues:
"""
<pre>
=2.1.0=
<pre>
#!/usr/bin/env python
#
# Author: Peter Harding <plh@performiq.com.au>
# PerformIQ Pty. Ltd.
# Level 6, 170 Queen Street,
# MELBOURNE, VIC, 3000
#
# Phone: 03 9641 2222
# Fax: 03 9641 2200
# Mobile: 0418 375 085
#
# Copyright (C) 1994-2008, Peter Harding
# All rights reserved
#
# Purpose: Threaded data server implementation
#
#---------------------------------------------------------------------
"""
Threaded server model
Server side: open a socket on a port, listen for
a message from a client, and accept a request and
service it.
The server spawns a thread to handle each client connection.
Threads share global memory space with main thread;
This is more portable than fork -- not yet on Windows;
This version has been extended to use the standard Python
logging module.
Add the delimiter to the INI file to allow use of alternate
delimiters in transmitted data - so data with embedded commas
can be used.
"""
#---------------------------------------------------------------------
import re
import os
import csv
import sys
import time
import getopt
import signal
import thread
import marshal
import logging
#---------------------------------------------------------------------
from socket import * # get socket constructor and constants
from datetime import datetime
#---------------------------------------------------------------------
__cvsid__ = "$Id:$"
__version__ = "2.1.0"
__id__ = "@(#) dserver.py [2.1.0] 2008-05-10"
check_flg = False
daemon_flg = False
silent_flg = False
terminate_flg = False
verbose_flg = False
wait_flg = False
debug_level = 0
HOST = '' # Host server - '' means localhost
PORT = 9572 # Listen on a non-reserved port number
sockobj = None
dserver_dir = None
data_dir = None
pid_path = None
client_language = None
log = None
sources = []
CONFIGFILE = "dserver.ini"
LOGFILE = "dserver.log"
PIDFILE = "dserver.pid"
INVALID = 'INVALID'
DELIMITER = 'delimiter'
TAG_DELIMITER = 'tag_delimiter'
COMMENT = re.compile('^#')
#=====================================================================
class Group:
Name = None
Idx = None
Data = None
def __init__(self, name):
self.Name = name
self.Idx = 0
self.Data = []
self.Comments = []
def __str__(self):
s = "Grp %s Len %d" % (self.Name, len(self.Data))
return s
def append(self, s):
self.Data.append(s)
def set(self):
if len(self.Data) > 0:
self.Idx = 0
else:
self.Idx = -1
#---------------------------------------------------------------------
class Source:
Count = 0
Valid = False
Name = None
Type = None
Idx = None
Data = None
def __init__(self, name, source_type, attributes={}, delimiter=None):
self.Name = name
self.Type = source_type
self.File = name + ".dat"
self.Used = name + ".used"
self.Stored = name + ".stored"
self.Comments = []
# print '[dserver] Name: "%s" Type: "%s" Attributes: "%s"' % (self.Name, self.Type, repr(attributes))
if delimiter:
self.Delimiter = delimiter
elif attributes.has_key(DELIMITER):
self.Delimiter = attributes[DELIMITER]
else:
self.Delimiter = ','
if self.Type == "CSV":
rc = self.read_csv()
elif self.Type == "Sequence":
rc = self.read_sequence()
elif self.Type == "Indexed":
if attributes.has_key(TAG_DELIMITER):
self.tag_delimiter = attributes[TAG_DELIMITER]
else:
self.tag_delimiter = ':'
rc = self.read_indexed()
elif self.Type == "Keyed":
rc = self.read_keyed()
else:
print "[dserver] Bad source_type [%s]" % source_type
sys.exit(1)
self.Size = rc
self.Attributes = {
'Type' : self.Type,
'Delimiter' : self.Delimiter,
'Size' : rc
}
try:
self.ufh = open(self.Used, 'a+')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
try:
self.sfh = open(self.Stored, 'a+')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
#------------------------------------------------------------------
def __str__(self):
s = "Source: %-10s Type: %-10s" % (self.Name, self.Type)
if self.Valid:
s += " * "
if self.Type == "CSV":
s += " %d rows" % len(self.Data)
elif self.Type == "Sequence":
s += " Starting value %d" % self.Data
elif self.Type == "Indexed":
s += " %d rows" % len(self.Data)
elif self.Type == "Keyed":
s += " %d groups" % len(self.Data)
else:
s += " "
return s
#------------------------------------------------------------------
def read_csv(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
self.Data = []
while True:
line = f.readline()
if not line: break
line = line.strip()
if COMMENT.match(line):
self.Comments.append(line)
continue
self.Data.append(line)
f.close()
self.Idx = 0
if debug_level > 5: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def read_sequence(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
while True:
line = f.readline()
if not line: break
line = line.strip()
if COMMENT.match(line):
self.Comments.append(line)
continue
try:
no = int(line)
except:
no = 0
self.Data = no
f.close()
return 1
#------------------------------------------------------------------
def read_keyed(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
groupName = None
group = None
self.Data = {}
while True:
line = f.readline()
if not line: break
line = line.strip()
if (line.find("[") != -1):
group_name = line.replace('[','').replace(']','')
group = Group(group_name)
self.Data[group_name] = group
continue
if COMMENT.match(line):
if group:
group.Comments.append(line)
else:
self.Comments.append(line)
elif (len(line) == 0):
continue
else:
group.append(line)
f.close()
if debug_level > 5: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def read_indexed(self):
try:
f = open(self.File, 'r')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
sys.exit(1)
self.Data = {}
while True:
line = f.readline()
if not line: break
line = line.strip()
if COMMENT.match(line):
self.Comments.append(line)
continue
(tag, data) = line.split(self.tag_delimiter)
tag = tag.strip()
self.Data[tag] = data
f.close()
if debug_level > 5: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name))
return len(self.Data)
#------------------------------------------------------------------
def flush(self):
if not self.Valid:
return
ts = datetime.now().strftime('%Y%m%d%H%M%S')
self.BackupCmd = "cp %s.dat %s.%s" % (self.Name, self.Name, ts)
if self.Type == "CSV":
self.flush_csv()
elif self.Type == "Sequence":
self.flush_sequence()
elif self.Type == "Indexed":
self.flush_indexed()
elif self.Type == "Keyed":
self.flush_keyed()
#------------------------------------------------------------------
def flush_csv(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
return 0
i = self.Idx
for line in self.Comments:
f.write("%s\n" % line)
while i < len(self.Data):
f.write("%s\n" % self.Data[i])
i += 1
f.close()
#------------------------------------------------------------------
def flush_sequence(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
return 0
for line in self.Comments:
f.write("%s\n" % line)
f.write("%d\n" % self.Data)
f.close()
#------------------------------------------------------------------
def flush_keyed(self):
os.system(self.BackupCmd)
try:
f = open(self.File, 'wb')
except IOError, e:
sys.stderr.write('[dserver] Open failed: ' + str(e) + '\n')
return 0
group_keys = self.Data.keys()
group_keys.sort()
for line in self.Comments:
f.write("%s\n" % line)
for key in group_keys:
f.write("[%s]\n" % key)
group = self.Data[key]
for line in group.Comments:
f.write("%s\n" % line)
i = group.Idx
while i < len(group.Data):
f.write("%s\n" % group.Data[i])
i += 1
f.write("\n")
f.close()
#------------------------------------------------------------------
def flush_indexed(self):
pass
#=====================================================================
def INFO(msg):
if log: log.info(' ' + msg)
if verbose_flg: print "[dserver] %s" % msg
#---------------------------------------------------------------------
def ERROR(msg):
if log: log.error(msg)
sys.stderr.write('[dserver] %s\n' % msg)
#---------------------------------------------------------------------
def WARNING(msg):
if log: log.warning('*****' + msg + '*****')
if verbose_flg: print "[dserver] %s" % msg
#=====================================================================
def read_config():
global PORT
config_file = data_dir + CONFIGFILE
try:
f = open(config_file, 'r')
except IOError, e:
ERROR('Open failed: ' + str(e))
sys.exit(1)
config_flg = False
definition_flg = False
while True:
line = f.readline()
if not line: break
line = line[:-1]
line = line.replace('\r','')
line = line.strip()
if (line.find("#") != -1): continue
if (line.find("[Config]") != -1):
config_flg = True
elif (line.find("Port=") != -1):
definition = line.split("=")
PORT = definition[1]
if (line.find("[Data]") != -1):
definition_flg = True
elif (line.find("Description=") != -1):
definition = line.split("=")
(name, source_type, attribute_str) = definition[1].split(":", 2)
try:
attributes = eval(attribute_str)
except:
attributes = {}
t = Source(name, source_type, attributes)
INFO(str(t))
sources.append(t)
f.close()
#---------------------------------------------------------------------
def get_source_index(name):
for i in range(len(sources)):
if (sources[i].Name == name):
return i
return -1
#---------------------------------------------------------------------
def process(str):
global client_language
msg = str.split("|")
l = len(msg)
if debug_level > 1: INFO("[dserver::process] len %d msg %s" % (l, msg))
ts = datetime.now().strftime('%Y%m%d%H%M%S')
reply = "None"
if (msg[0] == "INIT"):
client_language = msg[1]
elif (msg[0] == "REG"):
name = msg[1].replace('\n','').replace('\r','')
idx = get_source_index(name)
if debug_level > 0: INFO("[dserver::process] REG '%s' -> %d" % (name, idx))
if client_language == 'Python':
reply = "%d|%s" % (idx, marshal.dumps(sources[idx].Attributes))
else:
reply = "%d" % idx
elif (msg[0] == "REGK"):
if (len(msg) != 3):
ERROR("[dserver::process] REGK -> Bad Message", msg)
elif (msg[0] == "REGI"):
if (len(msg) != 2):
ERROR("[dserver::process] REGI -> Bad Message", msg)
elif (msg[0] == "GETN"):
if (len(msg) != 2):
ERROR("[dserver::process] GETN -> Bad Message", msg)
hdl = int(msg[1])
try:
t = sources[hdl]
except:
t = None
if t != None:
if t.Type == 'CSV':
if (t.Idx < len(t.Data)):
reply = t.Data[t.Idx]
t.Idx += 1
else:
reply = "*Exhausted*"
elif t.Type == "Sequence":
reply = "%d" % t.Data
t.Data += 1
else:
reply = "UNKNOWN"
t.ufh.write("%s - %s\n" % (ts, reply))
if debug_level > 2: INFO("[dserver::process] GETN -> %s" % reply)
elif (msg[0] == "GETK"):
if (len(msg) != 3):
ERROR("[dserver::process] GETK -> Bad Message", msg)
hdl = int(msg[1])
grp = msg[2]
try:
t = sources[hdl]
except:
t = None
if t != None:
try:
g = t.Data[grp]
except:
g = None
if g != None:
if (g.Idx < len(g.Data)):
reply = g.Data[g.Idx]
g.Idx += 1
else:
reply = "*Exhausted*"
t.ufh.write("%s - %s::%s\n" % (ts, grp, reply))
if debug_level > 2: INFO("[dserver::process] GETK %s -> %s" % (grp, reply))
elif (msg[0] == "GETI"):
if (len(msg) != 3):
ERROR("[dserver::process] GETI -> Bad Message", msg)
hdl = int(msg[1])
idx = msg[2]
try:
t = sources[hdl]
except:
t = None
if t != None:
try:
reply = t.Data[idx]
except:
reply = "UNDEFINED"
t.ufh.write("%s - %s::%s\n" % (ts, idx, reply))
if debug_level > 2: INFO("[dserver::process] GETI %s -> %s" % (idx, reply))
elif (msg[0] == "STOC"):
if (len(msg) != 3):
ERROR("[dserver::process] STOC -> Bad Message", msg)
hdl = int(msg[1])
data = msg[2]
reply = "0"
try:
t = sources[hdl]
except:
t = None
if t != None:
t.Data.append(data)
t.sfh.write("%s - %s\n" % (ts, data))
t.sfh.flush()
if debug_level > 1: INFO("STOC %s" % data)
reply = "1"
if debug_level > 2: INFO("[dserver::process] STOC %s -> %s" % (data, reply))
elif (msg[0] == "STOK"):
if (len(msg) != 4):
ERROR("[dserver::process] STOK -> Bad Message", msg)
hdl = int(msg[1])
grp = msg[2]
data = msg[3]
reply = "0"
try:
t = sources[hdl]
except:
t = None
if t != None:
if t.Data.has_key(grp):
g = t.Data[grp]
else:
g = Group(grp)
t.Data[grp] = g
if g != None:
g.Data.append(data)
if debug_level > 1: INFO("STOK %s %s" % (grp, data))
t.sfh.write("%s - %s::%s\n" % (ts, grp, data))
reply = "1"
if debug_level > 2: INFO("[dserver::process] STOK %s %s -> %s" % (grp, data, reply))
return reply
#---------------------------------------------------------------------
def sig_term(signum, frame):
"SIGTERM handler"
shutdown()
#---------------------------------------------------------------------
def shutdown():
INFO("Server shutdown at %s" % datetime.now())
for i in range(len(sources)):
sources[i].flush()
try:
os.unlink(pid_path)
except IOError, e:
ERROR('Unlink failed: ' + str(e))
sys.exit(1)
sys.exit(0)
#---------------------------------------------------------------------
def check_running():
try:
pfp = open(pid_path, 'r')
except IOError, (errno, strerror):
pfp = None
# ERROR("I/O error(%s): %s" % (errno, strerror))
except:
ERROR("Unexpected error:", sys.exc_info()[0])
raise
if pfp:
line = pfp.readline()
line = line.strip()
dserver_pid = int(line)
noProcess = 0
try:
os.kill(dserver_pid, 0)
except OSError, e:
if e.errno == 3:
noProcess = 1
else:
ERROR("kill() failed:" + str(e))
sys.exit(0)
if noProcess:
INFO("[dserver] Stale dserver pid file!")
pfp.close()
os.unlink(pid_path)
return None
else:
pfp.close()
return dserver_pid
return dserver_pid
else:
return None
#---------------------------------------------------------------------
def create_pidfile():
pid = os.getpid()
try:
pfp = open(pid_path, 'w')
except IOError, e:
ERROR("Open failed - " + str(e))
sys.exit(0)
pfp.write("%d" % pid)
pfp.close()
INFO("Running server with PID -> %d" % pid)
return pid
#---------------------------------------------------------------------
def become_daemon():
pid = os.fork()
if pid == 0: # In child
pid = create_pidfile()
time.sleep(1)
elif pid == -1: # Should not happen!
ERROR("fork() failed!")
time.sleep(1)
sys.exit(0)
else: # In Parent
time.sleep(1)
sys.exit(0)
time.sleep(2)
os.setsid()
return pid
#---------------------------------------------------------------------
def init():
pid = check_running()
if pid:
print "[dserver] Server already running! (pid = %d)" % pid
sys.exit(0)
if daemon_flg:
pid = become_daemon()
else:
pid = create_pidfile()
global log
log = logging.getLogger('dserver')
hdlr = logging.FileHandler(LOGFILE)
fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(fmtr)
log.addHandler(hdlr)
log.setLevel(logging.INFO)
INFO("Started processing")
read_config()
if (not silent_flg):
INFO("Server PID is %d" % pid)
#---------------------------------------------------------------------
def terminate():
dserver_pid = check_running()
if dserver_pid:
if (not silent_flg):
INFO("Terminating server with pid, %d" % dserver_pid)
os.kill(dserver_pid, signal.SIGTERM)
if (wait_flg):
while True:
try:
kill(dserver_pid, 0)
except OSError, e:
if e.errno == 3:
break
else:
ERROR("kill() failed:" + str(e))
sys.exit(0)
time.sleep(1)
return 0
#---------------------------------------------------------------------
def check():
pid = check_running()
if pid:
print "[dserver] Server already running! (pid = %d)" % pid
sys.exit(0)
else:
print "[dserver] Server not running"
#==== Socket Server ==================================================
def init_connection():
global sockobj
sockobj = socket(AF_INET, SOCK_STREAM) # make a TCP socket object
sockobj.bind((HOST, PORT)) # bind it to server port number
sockobj.listen(10) # allow upto 10 pending connects
#---------------------------------------------------------------------
def handle_client(connection): # in spawned thread: reply
while True: # read, write a client socket
try:
request = connection.recv(1024)
except:
break
if debug_level > 0: INFO('[dserver] Request -> "%s"' % request)
if not request: break
reply = process(request)
if debug_level > 0: INFO('[dserver] Reply -> "%s..."' % reply[0:30])
connection.send(reply)
connection.close()
#---------------------------------------------------------------------
def dispatcher():
while True:
# Wait for next connection,
connection, address = sockobj.accept()
INFO('Host (%s) - Connected at %s' % (address[0], datetime.now()))
thread.start_new(handle_client, (connection,))
#=====================================================================
def main():
global check_flg
global daemon_flg
global terminate_flg
global verbose_flg
global wait_flg
global debug_level
global dserver_dir
global data_dir
global pid_path
try:
opts, args = getopt.getopt(sys.argv[1:], "cdDsTvVw?")
except getopt.error, msg:
print __doc__
return 1
try:
dserver_dir = os.environ["DSERVER_DIR"]
except KeyError, e:
print "Set DSERVER_DIR environment variable and rerun!"
return 1
wrk_path = os.getcwd()
wrk_dir = os.path.basename(wrk_path)
# data_dir = dserver_dir + '/DATA/'
data_dir = wrk_path + '/DATA/'
pid_path = data_dir + PIDFILE
os.chdir(data_dir)
for o, a in opts:
if o == '-d':
debug_level += 1
elif o == '-c':
check_flg = True
elif o == '-D':
daemon_flg = True
elif o == '-s':
tsilent_flg = True
elif o == '-T':
terminate_flg = True
elif o == '-v':
verbose_flg = True
elif o == '-V':
print "[dserver] Version: %s" % __version__
return 1
elif o == '-w':
wait_flg = True
elif o == '-?':
print __doc__
return 1
print "[dserver] Listening on port %s - running from %s" % (PORT, os.getcwd())
if check_flg:
check()
return 0
if terminate_flg:
terminate()
return 0
if (debug_level > 0): print "Debugging level set to %d" % debug_level
if args:
for arg in args:
print arg
signal.signal(signal.SIGTERM, sig_term)
init()
init_connection()
dispatcher()
return 0
#---------------------------------------------------------------------
if __name__ == '__main__' or __name__ == sys.argv[0]:
try:
sys.exit(main())
except KeyboardInterrupt, e:
print "[dserver] Interrupted!"
shutdown()
#---------------------------------------------------------------------
"""
Revision History:
Date Who Description
-------- --- --------------------------------------------------
20031014 plh Initial implementation
Problems to fix:
To Do:
Issues:
"""
tst.py
#!/usr/bin/env python
#
# Author: Peter Harding <plh@pha.com.au>
# PerformIQ Pty. Ltd.
# Level 6,
# 179 Queen Street,
# MELBOURNE, VIC, 3000
#
# Phone: 03 9641 2222
# Fax: 03 9641 2200
# Mobile: 0418 375 085
#
# Copyright (C) 1994-2008, Peter Harding
# All rights reserved
#
#
#---------------------------------------------------------------------
"""
Test example of use of Data Server.
Usage:
# tst.py -t <table> [-k <key>]
The '-t <table>' option is used to specify the name
of the table to query
The '-i <index>' specifies the index for the indexed
data type. Indexes may be either an integer which
reurns the ith element of a string - in which case
the key to the data set must be a string.
The '-k <key>' specifies the key for the keyed data
type
"""
#---------------------------------------------------------------------
import os
import sys
import getopt
import client
#---------------------------------------------------------------------
__cvsid__ = "$Id:$"
__version__ = "2.0.2"
__id__ = "@(#) [2.0.2] tst.py 2008-05-10"
#---------------------------------------------------------------------
PORT = 9572
table_name = "Address"
indexed = False
index = None
keyed = False
key = None
store_flg = False
store_data = None
debug_flg = False
term_flg = False
verbose_flg = False
#---------------------------------------------------------------------
def process():
ds = client.Connection(port=PORT)
if (ds == None):
print("Connection to data server failed - is data server process running?\n")
return 1
(type_ref, attributes) = ds.RegisterType(table_name)
if indexed:
size = attributes['Size']
pid = os.getpid()
print "My PID is %d" % pid
print "Data type \"%s\" registered as %d with attributes %s" % (table_name, type_ref, repr(attributes))
if (store_flg):
if keyed:
ds.StoreKeyedData(type_ref, key, store_data)
else:
ds.StoreCsvData(type_ref, store_data)
else:
if keyed:
sp = ds.GetNextKeyed(type_ref, key)
elif indexed:
sp = ds.GetNextIndexed(type_ref, index)
else:
sp = ds.GetNext(type_ref)
if (sp):
print "Buffer is \"%s\"" % sp
if sp != None:
if len(sp) > 0:
for i in range(len(sp)):
print "Field %2d: \"%s\"" % (i, sp[i])
else:
print "Field: \"%s\"" % None
else:
print "Type %d exhausted" % (pid, type_ref)
#---------------------------------------------------------------------
def main():
global debug_flg
global term_flg
global verbose_flg
global indexed
global index
global keyed
global key
global table_name
global store_flg
global store_data
global PORT
try:
opts, args = getopt.getopt(sys.argv[1:], "dD:hi:I:k:p:s:t:TvV?")
except getopt.error, msg:
print __doc__,
return 1
for o, a in opts:
if o == '-d':
debug_level += 1
elif o == '-D':
debug_level = int(a)
elif o == '-i': # Assuming a numeric offset!
indexed = True
index = int(a)
elif o == '-I': # Assuming a string index!
indexed = True
index = a
elif o == '-k':
keyed = True
key = a
elif o == '-p':
PORT = int(a)
elif o == '-t':
table_name = a
elif o == '-T':
term_flg = True
elif o == '-s':
print "storing..."
store_flg = True
store_data = a
elif o == '-v':
verbose_flg = True
elif o == '-V':
print "Version: %s" % __version__
return 0
elif o in ('-h', '-?'):
print __doc__
return 0
if args:
for arg in args:
print arg
else:
pass
process()
#---------------------------------------------------------------------
if __name__ == '__main__' or __name__ == sys.argv[0]:
sys.exit(main())
#---------------------------------------------------------------------
"""
Revision History:
Date Who Description
-------- --- --------------------------------------------------
20031014 plh Initial implementation
Problems to fix:
To Do:
Issues:
"""
client.py
#!/usr/bin/env python
#
# Author: Peter Harding <plh@performiq.com.au>
# PerformIQ Pty. Ltd.
# Level 6, 170 Queen Street,
# MELBOURNE, VIC, 3000
#
# Phone: 03 9641 2222
# Fax: 03 9641 2200
# Mobile: 0418 375 085
#
# Copyright (C) 1994-2008, Peter Harding
# All rights reserved
#
#---------------------------------------------------------------------
"""
Purpose:
Python implementation of DataServer client API
Usage:
ds = client.Connection(port=PORT)
if (ds == None):
print("Connection to data server failed - is data server process running?\n")
return 1
(type_ref, attributes) = ds.RegisterType(table_name)
Then one of:
a) Pulling data:
if Keyed:
sp = ds.GetNextKeyed(type_ref, key)
elif Indexed:
sp = ds.GetNextIndexed(type_ref, index)
else:
sp = ds.GetNext(type_ref)
a) Storing data:
if Keyed:
ds.StoreKeyedData(type_ref, key, store_data)
else:
ds.StoreCsvData(type_ref, store_data)
Notes:
i) For an indexed type the atributes returned are:
{
'type' : 'Indexed',
'no_items' : <NO>
}
"""
#---------------------------------------------------------------------
import os
import sys
import copy
import getopt
import marshal
#---------------------------------------------------------------------
from socket import * # portable socket interface plus constants
#---------------------------------------------------------------------
__cvsid__ = "$Id:$"
__id__ = "@(#) [2.0.2] client.py 2008-05-10"
__version__ = "2.0.2"
HOST = 'localhost'
PORT = 9572
verbose_flg = False
debug_level = 0
#---------------------------------------------------------------------
class Connection:
DELIM = ','
ServerHostname = None # server name, default to 'localhost'
ServerPort = None # non-reserved port used by the server
sockobj = None
Fields = None
def __init__(self, server=HOST, port=PORT, debug=0):
global debug_level
"Initialize TCP/IP socket object and make connection to server:port"
self.ServerHostname = server
self.ServerPort = port
debug_level = debug
self.sockobj = socket(AF_INET, SOCK_STREAM)
try:
self.sockobj.connect((self.ServerHostname, self.ServerPort))
except SocketError, e:
sys.stderr.write('[client] Connect failed: ' + str(e) + '\n')
sys.exit(1)
msg = "INIT|Python"
attributes = self.Get(msg)
#try:
# attributes = self.Get(msg)
# except e:
# sys.stderr.write('[client] Get failed: ' + str(e) + '\n')
# sys.exit(1)
print '[client::__init__] attributes "%s"' % attributes
self.attributes = marshal.loads(attributes)
self.sources = {}
#------------------------------------------------------------------
def Get(self, s):
"Send s to server and get back response"
if self.sockobj != None:
self.sockobj.send(s)
data = self.sockobj.recv(1024)
if debug_level > 0: print '[Client::Get] Sent: "%s" Received: "%s"' % (s, data)
return data
else:
return None
#------------------------------------------------------------------
def Close(self):
"close socket to send eof to server"
if self.sockobj != None:
self.sockobj.close()
self.sockobj = None
#------------------------------------------------------------------
def RegisterType(self, type):
msg = "REG|%s" % type
# Should I really be using a try: here? - PLH 2008-05-10
try:
response = self.Get(msg)
except:
type_ref = -1
(type_ref, attributes) = response.split('|', 1)
type_ref = int(type_ref)
attributes = marshal.loads(attributes)
self.sources[type_ref] = attributes
return (type_ref, attributes)
#------------------------------------------------------------------
def GetNext(self, type_ref):
msg = "GETN|%d" % type_ref
csv_data = self.Get(msg)
data = csv_data.split(self.DELIM)
return data
#------------------------------------------------------------------
def GetNextKeyed(self, type_ref, key):
msg = "GETK|%d|%s" % (type_ref, key)
csv_data = self.Get(msg)
data = csv_data.split(self.DELIM)
return data
#------------------------------------------------------------------
def GetNextIndexed(self, type_ref, idx):
msg = "GETI|%s|%s" % (type_ref, idx)
csv_data = self.Get(msg)
data = csv_data.split(self.DELIM)
return data
#------------------------------------------------------------------
def StoreCsvData(self, type_ref, data):
msg = "STOC|%d|%s" % (type_ref, data)
reply = self.Get(msg)
try:
rc = int(reply)
except:
rc = -1
return rc
#------------------------------------------------------------------
def StoreKeyedData(self, type_ref, key_ref, data):
msg = "STOK|%d|%s|%s" % (type_ref, key_ref, data)
reply = self.Get(msg)
try:
rc = int(reply)
except:
rc = -1
return rc
#------------------------------------------------------------------
def GetField(self, type_ref, i):
if (i < len(self.Field[i])):
return self.Field[i]
else:
return None
#---------------------------------------------------------------------
def main():
global debug_level
global verbose_flg
global PORT
try:
opts, args = getopt.getopt(sys.argv[1:], "dhD:p:vV?")
except getopt.error, msg:
print __doc__,
return 1
for o, a in opts:
if o == '-d':
debug_level += 1
elif o == '-D':
debug_level = int(a)
elif o == '-p':
PORT = int(a)
elif o == '-v':
verboseFlg = True
elif o == '-V':
print "Version: %s" % __version__
return 0
elif o in ( '-h', '-?'):
print __doc__
return 0
if args:
for arg in args:
print "[client] %s" % arg
else:
pass
#---------------------------------------------------------------------
if __name__ == '__main__' or __name__ == sys.argv[0]:
sys.exit(main())
#---------------------------------------------------------------------
"""
Revision History:
Date Who Description
-------- --- --------------------------------------------------
20031014 plh Initial implementation
20080510 plh Refactored as client and Connection rather than dcl
Problems to fix:
To Do:
Issues:
"""
Sat May 10 13:35:23 AUSEST 2008
===============================
Have added in the return of attributes with RegisterType to allow mutilple values
to be passed back from the data server. Specifically, I wanted to be able to get
number of records (Size) in Indexed data sources. This will allow vusers
implemented in Python to retrieve more information about the data sources.
Initially, I wanted to be able to select a random group for the LDAP test.
I have also refactored the scripts and reorganised some of the header info.
i.e.
dvstst.py -> tst.py
dcl.py -> client.py