Difference between revisions of "Dataserver"

From PeformIQ Upgrade
Jump to navigation Jump to search
(New page: =dserver.p= <pre> #!/usr/bin/env python # # Purpose: Threaded data server implementation # # $Id:$ # #--------------------------------------------------------------------- """ Thread...)
 
 
(4 intermediate revisions by the same user not shown)
Line 1: Line 1:
=dserver.p=
=dserver.py=


<pre>
<pre>
Line 14: Line 14:


   Server side: open a socket on a port, listen for
   Server side: open a socket on a port, listen for
   a message from a client, and send an echo reply;
   a message from a client, and accept a request and
  echos lines until eof when client closes socket;
   service it.
  spawns a thread to handle each client connection;
  threads share global memory space with main thread;
   this is more portable than fork--not yet on Windows;


   This version has been modified to use the standard Python
  The server spawns a thread to handle each client connection.
  Threads share global memory space with main thread;
  This is more portable than fork -- not yet on Windows;
 
   This version has been extended to use the standard Python
   logging module.
   logging module.


Line 440: Line 441:
   l  = len(msg)
   l  = len(msg)


   print "[dserver::process] len %d  msg %s" % (l, msg)
   if debug_flg: INFO("[dserver::process] len %d  msg %s" % (l, msg))


  ts    = datetime.now().strftime('%Y%m%d%H%M%S')
   reply = "None"
   reply = "None"


   if (msg[0] == "REG"):
   if (msg[0] == "REG"):
       name = msg[1]
       name = msg[1].replace('\n','').replace('\r','')
       idx  = get_table_index(name)
       idx  = get_table_index(name)
       print "[dserver::process]  REG -> %d" % idx
       if debug_flg: INFO("[dserver::process]  REG '%s' -> %d" % (name, idx))


       reply = "%d" % idx
       reply = "%d" % idx
Line 453: Line 455:
   elif (msg[0] == "REGK"):
   elif (msg[0] == "REGK"):
       if (len(msg) != 3):
       if (len(msg) != 3):
         print "Bad Message", msg
         if debug_flg: ERROR("Bad Message", msg)


   elif (msg[0] == "REGI"):
   elif (msg[0] == "REGI"):
       if (len(msg) != 2):
       if (len(msg) != 2):
         print "Bad Message", msg
         if debug_flg: ERROR("Bad Message", msg)


   elif (msg[0] == "GETN"):
   elif (msg[0] == "GETN"):
       if (len(msg) != 2):
       if (len(msg) != 2):
         print "Bad Message", msg
         if debug_flg: ERROR("Bad Message", msg)
       hdl  = int(msg[1])
       hdl  = int(msg[1])


Line 479: Line 481:
             reply = "%d" % t.Data
             reply = "%d" % t.Data
             t.Data += 1
             t.Data += 1
       print "[dserver::process]  GETN -> %s" % reply
        else:
            reply = "UNKNOWN"
        t.ufh.write("%s - %s" % (ts, reply))
       if debug_flg: INFO("[dserver::process]  GETN -> %s" % reply)


   elif (msg[0] == "GETK"):
   elif (msg[0] == "GETK"):
       if (len(msg) != 3):
       if (len(msg) != 3):
         print "Bad Message", msg
         if debug_flg: ERROR("Bad Message", msg)
       hdl  = int(msg[1])
       hdl  = int(msg[1])
       grp  = msg[2]
       grp  = msg[2]
Line 503: Line 508:
             else:
             else:
               reply = "*Exhausted*"
               reply = "*Exhausted*"
        t.ufh.write("%s - %s:%s" % (ts, grp, reply))


   elif (msg[0] == "GETI"):
   elif (msg[0] == "GETI"):
       if (len(msg) != 3):
       if (len(msg) != 3):
         print "Bad Message", msg
         if debug_flg: ERROR("Bad Message", msg)
       hdl  = int(msg[1])
       hdl  = int(msg[1])
       idx  = msg[2]
       idx  = msg[2]
Line 519: Line 525:
             reply = t.Data[idx]
             reply = t.Data[idx]
         except:
         except:
             pass
             reply = "UNDEFINED"
        t.ufh.write("%s - %s:%s" % (ts, idx, reply))


   elif (msg[0] == "STOC"):
   elif (msg[0] == "STOC"):
       if (len(msg) != 3):
       if (len(msg) != 3):
         print "Bad Message", msg
         if debug_flg: ERROR("Bad Message", msg)
       hdl = int(msg[1])
       hdl   = int(msg[1])
       data = msg[2]
       data = msg[2]
 
       reply = "0"
       reply = "0"


Line 536: Line 542:
       if t != None:
       if t != None:
         s    = t.Data.append(data)
         s    = t.Data.append(data)
         print t.Data
         if debug_flg: INFO(t.Data)
reply = "1"
reply = "1"
        
        
   elif (msg[0] == "STOK"):
   elif (msg[0] == "STOK"):
       if (len(msg) != 4):
       if (len(msg) != 4):
         print "Bad Message", msg
         if debug_flg: ERROR("Bad Message", msg)
       hdl  = int(msg[1])
       hdl  = int(msg[1])
       grp  = msg[2]
       grp  = msg[2]
Line 617: Line 623:


       if noProcess:
       if noProcess:
         print "[dserver]  Stale dserver pid file!"
         INFO("[dserver]  Stale dserver pid file!")
         pfp.close()
         pfp.close()
         os.unlink(pid_path)
         os.unlink(pid_path)
Line 741: Line 747:
def handle_client(connection):            # in spawned thread: reply
def handle_client(connection):            # in spawned thread: reply
   while True:                            # read, write a client socket
   while True:                            # read, write a client socket
       request = connection.recv(1024)
       try:
       print '[dserver]  Request -> "%s"' % request
        request = connection.recv(1024)
       except:
        break
 
      if debug_flg: INFO('[dserver]  Request -> "%s"' % request)
 
       if not request: break
       if not request: break
       reply = process(request)
       reply = process(request)
       print '[dserver]  Reply  -> "%s"' % reply
 
       if debug_flg: INFO('[dserver]  Reply  -> "%s"' % reply)
 
       connection.send(reply)
       connection.send(reply)


Line 861: Line 875:
"""
"""
</pre>
</pre>
Download [http://www.performiq.com.au/kb/images/DATA.tz data server data]


==dcl.py==
==dcl.py==
Line 1,385: Line 1,401:
</pre>
</pre>


==makefile==
==Makefile==


<pre>
<pre>
Line 1,393: Line 1,409:
cp Keyed.master      Keyed.dat
cp Keyed.master      Keyed.dat
cp Sequence.master  Sequence.dat
cp Sequence.master  Sequence.dat
</per>
</pre>
 
=Python ctypes Client=
 
This uses the Windows DLL implemented below as MS Visual Studio 2005 project(s).
 
<pre>
#!/usr/bin/env python
 
import sys
import ctypes
import pprint
 
dserver = ctypes.CDLL('dcl')
 
pprint.pprint(dserver.__dict__)
 
print type(dserver)
 
dserver.dsInit.restype        = ctypes.c_char_p
dserver.dsInit.argtypes        = [ctypes.c_char_p, ctypes.c_int]
dserver.dsRegister.restype    = ctypes.c_int
dserver.dsRegister.argtypes    = [ctypes.c_char_p]
dserver.dsGetNext.restype      = ctypes.c_char_p
dserver.dsGetNext.argtypes    = [ctypes.c_int]
dserver.dsGetKeyed.restype    = ctypes.c_char_p
dserver.dsGetKeyed.argtypes    = [ctypes.c_int, ctypes.c_char_p]
dserver.dsGetIndexed.restype  = ctypes.c_char_p
dserver.dsGetIndexed.argtypes  = [ctypes.c_int, ctypes.c_int]
dserver.dsStore.restype        = ctypes.c_int
dserver.dsStore.argtypes      = [ctypes.c_int, ctypes.c_char_p]
 
s= 100 * ' '
 
dserver.get_buffered(1, s, 100)
 
print s
 
dserver.dsInit('localhost', 9578)
 
dx = dserver.dsRegister('Sequence')
 
dserver.dsGetNext(idx)
</pre>
 
=MS Visual C Client DLL and Test harness=
 
[http://www.performiq.com.au/kb/images/DServer_Client.zip Here] are the zipped up project files...

Latest revision as of 15:48, 7 March 2008

dserver.py

#!/usr/bin/env python
#
#  Purpose: Threaded data server implementation
#
#  $Id:$
#
#---------------------------------------------------------------------

"""
  Threaded server model

  Server side: open a socket on a port, listen for
  a message from a client, and accept a request and
  service it.

  The server spawns a thread to handle each client connection.
  Threads share global memory space with main thread;
  This is more portable than fork -- not yet on Windows;

  This version has been extended to use the standard Python
  logging module.

  Add the delimiter to the INI file to allow use of alternate
  delimiters in transmitted data - so data with embedded commas
  can be used.
"""
#---------------------------------------------------------------------

import os
import csv
import sys
import getopt
import thread
import time
import signal
import logging

#---------------------------------------------------------------------

from socket   import *          # get socket constructor and constants
from datetime import datetime

#---------------------------------------------------------------------

__version__   = "1.1.1"
__id__        = "@(#)  dserver.py  [%s]  05/03/2008"

daemon_flg    = False
debug_flg     = False
silent_flg    = False
terminate_flg = False
verbose_flg   = False
wait_flg      = False

HOST          = ''             #  Host server - '' means localhost
PORT          = 9578           #  Listen on a non-reserved port number

sockobj       = None

dserver_dir   = None
data_dir      = None
pid_path      = None

CONFIGFILE    = "dserver.ini"
LOGFILE       = "dserver.log"
PIDFILE       = "dserver.pid"

tables        = []

INVALID       = "INVALID"

log           = None

#=====================================================================

class Group:
   Name     = None
   Idx      = None
   Data     = None

   def __init__(self, name):
      self.Name = name
      self.Idx  = 0
      self.Data = []

   def __str__(self):
      s = "Grp %s  Len %d" % (self.Name, len(self.Data))
      return s

   def append(self, s):
      self.Data.append(s)

   def set(self):
      if len(self.Data) > 0:
         self.Idx  = 0
      else:
         self.Idx  = -1

#---------------------------------------------------------------------

class Table:
   Count    = 0
   Valid    = False
   Name     = None
   Type     = None
   Idx      = None
   Data     = None

   def __init__(self, name, type, delimiter=','):
      self.Name       = name
      self.Type       = type
      self.Delimiter  = delimiter
      self.File       = name + ".dat"
      self.Used       = name + ".used"

      if self.Type == "CSV":
         rc = self.read_csv()
      elif self.Type == "Sequence":
         rc = self.read_sequence()
      elif self.Type == "Indexed":
         rc = self.read_indexed()
      elif self.Type == "Keyed":
         rc = self.read_keyed()

      if rc > 0:
         self.Valid = True

      try:
        self.ufh = open(self.Used, 'a+')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

   #------------------------------------------------------------------

   def __str__(self):
      s = "Table: %-10s Type: %-10s" % (self.Name, self.Type)

      if self.Valid:
         s += " * "
         if self.Type == "CSV":
            s += " %d rows" % len(self.Data)
         elif self.Type == "Sequence":
            s += " Starting value %d" % self.Data
         elif self.Type == "Indexed":
            s += " %d rows"   % len(self.Data)
         elif self.Type == "Keyed":
            s += " %d groups" % len(self.Data)
      else:
         s += "   "

      return s

   #------------------------------------------------------------------

   def read_csv(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      self.Data = []

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         self.Data.append(line)

      f.close()

      self.Idx = 0

      if debug_flg: INFO("Read in %d CSV rows - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def read_sequence(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         try:
            no = int(line)
         except:
            no = 0

         self.Data = no

      f.close()

      return 1

   #------------------------------------------------------------------

   def read_keyed(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      groupName  = None
      group      = None

      self.Data  = {}

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         if (line.find("[") != -1):
            group_name            = line.replace('[','').replace(']','')
	    group                 = Group(group_name)
            self.Data[group_name] = group

         elif (line.find("#") != -1):
            continue

         elif (len(line) == 0):
            continue

         else:
            group.append(line)

      f.close()

      if debug_flg: INFO("Read in %d Keyed groups - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def read_indexed(self):
      try:
         f = open(self.File, 'r')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         sys.exit(1)

      self.Data = {}

      while True:
         line = f.readline()

         if not line: break

         line = line.strip()

         (no, data) = line.split(':')

	 self.Data[no] = data

      f.close()

      if debug_flg: INFO("Read in %d indexed rows - %s" % (len(self.Data), self.Name))

      return len(self.Data)

   #------------------------------------------------------------------

   def flush(self):
      if not self.Valid:
         return

      ts = datetime.now().strftime('%Y%m%d%H%M%S')

      self.BackupCmd  = "cp %s.dat %s.%s" % (self.Name, self.Name, ts)

      if self.Type == "CSV":
         self.flush_csv()
      elif self.Type == "Sequence":
         self.flush_sequence()
      elif self.Type == "Indexed":
         self.flush_indexed()
      elif self.Type == "Keyed":
         self.flush_keyed()

   #------------------------------------------------------------------

   def flush_csv(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      i = self.Idx

      while i < len(self.Data):
         f.write("%s\n" % self.Data[i])
         i += 1

      f.close()

   #------------------------------------------------------------------

   def flush_sequence(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      f.write("%d\n" % self.Data)

      f.close()

   #------------------------------------------------------------------

   def flush_keyed(self):
      os.system(self.BackupCmd)

      try:
         f = open(self.File, 'wb')
      except IOError, e:
         sys.stderr.write('[dserver]  Open failed: ' + str(e) + '\n')
         return 0

      group_keys = self.Data.keys()

      group_keys.sort()

      for key in group_keys:
         f.write("[%s]\n" % key)

	 group = self.Data[key]

         i = group.Idx

         while i < len(group.Data):
            f.write("%s\n" % group.Data[i])
            i += 1

         f.write("\n")

      f.close()

   #------------------------------------------------------------------

   def flush_indexed(self):
      pass

#=====================================================================

def INFO(msg):
   if log: log.info(' ' + msg)
   if verbose_flg: print "[dserver]  %s" % msg

#---------------------------------------------------------------------

def ERROR(msg):
   if log: log.error(msg)
   sys.stderr.write('[dserver]  %s\n' % msg)

#---------------------------------------------------------------------

def WARNING(msg):
   if log: log.warning('*****' + msg + '*****')
   if verbose_flg: print "[dserver]  %s" % msg

#=====================================================================

def read_config():
   config_file = data_dir + CONFIGFILE

   try:
      f = open(config_file, 'r')
   except IOError, e:
      ERROR('Open failed: ' + str(e))
      sys.exit(1)

   definition_flg = False

   while True:
      line = f.readline()

      if not line: break

      line = line[:-1]
      line = line.replace('\r','')

      line = line.strip()

      if (line.find("#") != -1): continue

      if (line.find("[Data]") != -1):
         definition_flg = True

      elif (line.find("Description=") != -1):
          description  = line.split("=")

          (name, type, delimiter) = description[1].split(":")

          t = Table(name, type, delimiter)

          INFO(str(t))

          tables.append(t)

   f.close()

#---------------------------------------------------------------------

def get_table_index(name):
   for i in range(len(tables)):
      if (tables[i].Name == name):
         return i

   return -1

#---------------------------------------------------------------------

def process(str):
   msg = str.split("|")
   l   = len(msg)

   if debug_flg: INFO("[dserver::process] len %d  msg %s" % (l, msg))

   ts    = datetime.now().strftime('%Y%m%d%H%M%S')
   reply = "None"

   if (msg[0] == "REG"):
      name = msg[1].replace('\n','').replace('\r','')
      idx  = get_table_index(name)
      if debug_flg: INFO("[dserver::process]  REG '%s' -> %d" % (name, idx))

      reply = "%d" % idx

   elif (msg[0] == "REGK"):
      if (len(msg) != 3):
         if debug_flg: ERROR("Bad Message", msg)

   elif (msg[0] == "REGI"):
      if (len(msg) != 2):
         if debug_flg: ERROR("Bad Message", msg)

   elif (msg[0] == "GETN"):
      if (len(msg) != 2):
         if debug_flg: ERROR("Bad Message", msg)
      hdl  = int(msg[1])

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         if t.Type == 'CSV':
            if (t.Idx < len(t.Data)):
               reply  = t.Data[t.Idx]
               t.Idx += 1
            else:
               reply = "*Exhausted*"
         elif t.Type == "Sequence":
            reply = "%d" % t.Data
            t.Data += 1
         else:
            reply = "UNKNOWN"
         t.ufh.write("%s - %s" % (ts, reply))
      if debug_flg: INFO("[dserver::process]  GETN -> %s" % reply)

   elif (msg[0] == "GETK"):
      if (len(msg) != 3):
         if debug_flg: ERROR("Bad Message", msg)
      hdl  = int(msg[1])
      grp  = msg[2]

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         try:
            g = t.Data[grp]
         except:
            g = None
         if g != None:
            if (g.Idx < len(g.Data)):
               reply  = g.Data[g.Idx]
               g.Idx += 1
            else:
               reply = "*Exhausted*"
         t.ufh.write("%s - %s:%s" % (ts, grp, reply))

   elif (msg[0] == "GETI"):
      if (len(msg) != 3):
         if debug_flg: ERROR("Bad Message", msg)
      hdl  = int(msg[1])
      idx  = msg[2]

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         try:
            reply = t.Data[idx]
         except:
            reply = "UNDEFINED"
         t.ufh.write("%s - %s:%s" % (ts, idx, reply))

   elif (msg[0] == "STOC"):
      if (len(msg) != 3):
         if debug_flg: ERROR("Bad Message", msg)
      hdl   = int(msg[1])
      data  = msg[2]
      reply = "0"

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         s     = t.Data.append(data)
         if debug_flg: INFO(t.Data)
	 reply = "1"
      
   elif (msg[0] == "STOK"):
      if (len(msg) != 4):
         if debug_flg: ERROR("Bad Message", msg)
      hdl  = int(msg[1])
      grp  = msg[2]
      data = msg[3]

      reply = "0"

      try:
         t = tables[hdl]
      except:
         t = None

      if t != None:
         try:
            g = t.Data[grp]
         except:
            g = None
         if g != None:
            data  = g.Data.append(data)
	    reply = "1"

   return reply

#---------------------------------------------------------------------

def sig_term(signum, frame):
   "SIGTERM handler"

   shutdown()

#---------------------------------------------------------------------

def shutdown():
   INFO("Server shutdown at %s" % datetime.now())

   for i in range(len(tables)):
      tables[i].flush()

   try:
      os.unlink(pid_path)
   except IOError, e:
      ERROR('Unlink failed: ' + str(e))
      sys.exit(1)

   sys.exit(0)

#---------------------------------------------------------------------

def check_running():
   try:
      pfp = open(pid_path, 'r')
   except IOError, (errno, strerror):
      pfp = None
      # ERROR("I/O error(%s): %s" % (errno, strerror))
   except:
      ERROR("Unexpected error:", sys.exc_info()[0])
      raise

   if pfp:
      line = pfp.readline()
      line = line.strip()

      dserver_pid   = int(line)

      noProcess    = 0

      try:
         os.kill(dserver_pid, 0)
      except OSError, e:
         if e.errno == 3:
            noProcess = 1
         else:
            ERROR("kill() failed:" + str(e))
            sys.exit(0)

      if noProcess:
         INFO("[dserver]  Stale dserver pid file!")
         pfp.close()
         os.unlink(pid_path)

         return None
      else:
         pfp.close()
         return dserver_pid

      return dserver_pid
   else:
      return None

#---------------------------------------------------------------------

def create_pidfile():
   pid = os.getpid()

   try:
      pfp = open(pid_path, 'w')
   except IOError, e:
      ERROR("Open failed - " + str(e))
      sys.exit(0)

   pfp.write("%d" % pid)

   pfp.close()

   INFO("Running server with PID -> %d" % pid)

   return pid

#---------------------------------------------------------------------

def become_daemon():
   pid = os.fork()

   if pid == 0:                                             # In child
      pid = create_pidfile()
      time.sleep(1)
   elif pid == -1:                                # Should not happen!
      ERROR("fork() failed!")
      time.sleep(1)
      sys.exit(0)
   else:                                                   # In Parent
      time.sleep(1)
      sys.exit(0)

   time.sleep(2)

   os.setsid()

   return pid

#---------------------------------------------------------------------

def init():
   pid = check_running()

   if pid:
      print "[dserver]  Server already running! (pid = %d)" % pid
      sys.exit(0)

   if daemon_flg:
      pid = become_daemon()
   else:
      pid = create_pidfile()

   global log

   log  = logging.getLogger('dserver')
   hdlr = logging.FileHandler(LOGFILE)
   fmtr = logging.Formatter('%(asctime)s %(levelname)s %(message)s')

   hdlr.setFormatter(fmtr)
   log.addHandler(hdlr) 
   log.setLevel(logging.INFO)

   INFO("Started processing")

   read_config()

   if (not silent_flg):
      INFO("Server PID is %d" % pid)

#---------------------------------------------------------------------

def terminate():
   dserver_pid = check_running()

   if dserver_pid:
      if (not silent_flg):
         INFO("Terminating server with pid, %d" % dserver_pid)

      os.kill(dserver_pid, signal.SIGTERM)

      if (wait_flg):
         while True:
            try:
               kill(dserver_pid, 0)
            except OSError, e:
               if e.errno == 3:
                  break
               else:
                  ERROR("kill() failed:" + str(e))
                  sys.exit(0)
 
            time.sleep(1)

   return 0

#==== Socket Server ==================================================

def init_connection():
   global sockobj

   sockobj = socket(AF_INET, SOCK_STREAM)  # make a TCP socket object
   sockobj.bind((HOST, PORT))              # bind it to server port number
   sockobj.listen(10)                      # allow upto 10 pending connects

#---------------------------------------------------------------------

def handle_client(connection):             # in spawned thread: reply
   while True:                             # read, write a client socket
      try:
         request = connection.recv(1024)
      except:
         break

      if debug_flg: INFO('[dserver]  Request -> "%s"' % request)

      if not request: break

      reply = process(request)

      if debug_flg: INFO('[dserver]  Reply   -> "%s"' % reply)

      connection.send(reply)

   connection.close() 

#---------------------------------------------------------------------

def dispatcher():
   while True:
      # Wait for next connection,
      connection, address = sockobj.accept()

      INFO('Host (%s) - Connected at %s' % (address[0], datetime.now()))

      thread.start_new(handle_client, (connection,))

#=====================================================================

def main():
   global daemon_flg
   global debug_flg
   global terminate_flg
   global verbose_flg
   global wait_flg
   global dserver_dir
   global data_dir
   global pid_path

   try:
      opts, args = getopt.getopt(sys.argv[1:], "dDsTvVw?")
   except getopt.error, msg:
      print __doc__
      return 1

   try:
      dserver_dir  = os.environ["DSERVER_DIR"]
   except KeyError, e:
      print "Set DSERVER_DIR environment variable and rerun!"
      return 1

   data_dir = dserver_dir  + '/DATA/'
   pid_path = data_dir + PIDFILE

   wrk_path  = os.getcwd()
   wrk_dir   = os.path.basename(wrk_path)

   os.chdir(data_dir)

   for o, a in opts:
      if o == '-d':
         debug_flg      = True
      elif o == '-D':
         daemon_flg     = True
      elif o == '-s':
         tsilent_flg    = True
      elif o == '-T':
         terminate_flg  = True
      elif o == '-v':
         verbose_flg    = True
      elif o == '-V':
         print "[dserver]  Version: %s" % __version__
         return 1
      elif o == '-w':
         wait_flg       = True
      elif o == '-?':
         print __doc__
         return 1

   print "[dserver]  Server working directory is %s" % os.getcwd()

   if terminate_flg:
      terminate()
      return 0

   if (debug_flg): print "Debugging %s" % debug_flg

   if args:
      for arg in args:
         print arg

   signal.signal(signal.SIGTERM, sig_term)

   init()

   init_connection()

   dispatcher()

   return 0

#---------------------------------------------------------------------

if __name__ == '__main__' or __name__ == sys.argv[0]:
   try:
      sys.exit(main())
   except KeyboardInterrupt, e:
      print "[dserver]  Interrupted!"
      shutdown()

#---------------------------------------------------------------------

"""
Revision History:

     Date     Who   Description
   --------   ---   --------------------------------------------------
   20031014   plh   Initial implementation

Problems to fix:

To Do:

Issues:

"""

Download data server data

dcl.py

#!/usr/bin/env python
#
#       Author:  Peter Harding  <plh@pha.com.au>
#                P. L. Harding & Associates Pty. Ltd.
#                P. O. Box 6195,
#                MELBOURNE, VIC, 3004
# 
#                Phone:   03 9641 2222
#                Fax:     03 9641 2200
#                Mobile:  0418 375 085
# 
#          Copyright (C) 1994-2003, Peter Harding
#
#          @(#) [1.3.12] dsvtst.py 01/01/2003
#
#  $Id:$
#
#---------------------------------------------------------------------

"""
Python implementation of DataServer client API
"""

#---------------------------------------------------------------------

import os
import sys
import getopt

#---------------------------------------------------------------------

from socket import *        # portable socket interface plus constants

#---------------------------------------------------------------------

__version__ = "1.0.0"

HOST        = 'localhost'
PORT        = 9578

#---------------------------------------------------------------------

class dcl:
   DELIM          = ','
   ServerHostname = None    # server name, default to 'localhost'
   ServerPort     = None    # non-reserved port used by the server
   sockobj        = None
   Fields         = None

   def __init__(self, server=HOST, port=PORT):
      "Initialize TCP/IP socket object and make connection to server:port"

      self.ServerHostname = server
      self.ServerPort     = port

      self.sockobj = socket(AF_INET, SOCK_STREAM) 
      self.sockobj.connect((self.ServerHostname, self.ServerPort))

   #------------------------------------------------------------------

   def Get(self, s):
      "Send s to server and get back response"

      if self.sockobj != None:
         self.sockobj.send(s)

         data = self.sockobj.recv(1024)

         print '[dcl::Get]  Client received:', `data`

         return data
      else:
         return None

   #------------------------------------------------------------------

   def Close(self):
      "close socket to send eof to server"

      if self.sockobj != None:
         self.sockobj.close()
         self.sockobj = None

   #------------------------------------------------------------------

   def RegisterType(self, type):
      msg    = "REG|%s" % type

      try:
         handle = int(self.Get(msg))
      except:
         handle = -1

      return handle

   #------------------------------------------------------------------

   def GetNext(self, typeRef):
      msg     = "GETN|%d" % typeRef
      csv_data = self.Get(msg)
      data    = csv_data.split(',')

      return data

   #------------------------------------------------------------------

   def GetNextKeyed(self, typeRef, key):
      msg     = "GETK|%d|%s" % (typeRef, key)
      csv_data = self.Get(msg)
      data    = csv_data.split(',')

      return data

   #------------------------------------------------------------------

   def GetNextIndexed(self, typeRef, idx):
      msg     = "GETI|%d|%s" % (typeRef, idx)
      csv_data = self.Get(msg)
      data    = csv_data.split(',')

      return data

   #------------------------------------------------------------------

   def StoreCsvData(self, typeRef, csv_data):
      msg     = "STOC|%d|%s" % (typeRef, csv_data)
      reply   = self.Get(msg)

      try:
         rc = int(reply)
      except:
         rc = -1

      return rc

   #------------------------------------------------------------------

   def StoreKeyedData(self, typeRef, keyRef, csv_data):
      msg     = "STOK|%d|%s" % (typeRef, keyRef, csv_data)
      reply   = self.Get(msg)

      try:
         rc = int(reply)
      except:
         rc = -1

      return rc

   #------------------------------------------------------------------

   def GetField(self, typeRef, i):
      if (i < len(self.Field[i])):
         return self.Field[i]
      else:
         return None

#---------------------------------------------------------------------

def main():
   global debugFlg
   global verboseFlg

   try:
      opts, args = getopt.getopt(sys.argv[1:], "dvV?")
   except getopt.error, msg:
      print __doc__,
      return 1

   for o, a in opts:
      if o == '-d':
         debugFlg = 1
      elif o == '-v':
         verboseFlg = 1
      elif o == '-V':
         print "Version: %s" % __version__
         return 0
      elif o == '-?':
         print __doc__()
         return 0

   if args:
      for arg in args:
         print "[dcl] %s" % arg
   else:
      pass

#---------------------------------------------------------------------

if __name__ == '__main__' or __name__ == sys.argv[0]:
   sys.exit(main())

#---------------------------------------------------------------------

"""
Revision History:

     Date     Who   Description
   --------   ---   --------------------------------------------------
   20031014   plh   Initial implementation

Problems to fix:

To Do:

Issues:

"""


dsvtst.py

#!/usr/bin/env python
#
#       Author:  Peter Harding  <plh@pha.com.au>
#                P. L. Harding & Associates Pty. Ltd.
#                P. O. Box 6195,
#                MELBOURNE, VIC, 3004
# 
#                Phone:   03 9641 2222
#                Fax:     03 9641 2200
#                Mobile:  0418 375 085
# 
#          Copyright (C) 1994-2003, Peter Harding
#
#          @(#) [1.3.12] dsvtst.py 01/01/2003
#
#  $Id:$
#
#---------------------------------------------------------------------

"""
Python implementation of Data Server test 

Usage: dsvtst.py -t <table> [-k <key>]

The '-t <table>' option is used to specift the name of the table to
query

The '-i <index>' specifies the index for the indexed data type

The '-k <key>'   specifies the key for the keyed data type
"""

#---------------------------------------------------------------------

import os
import sys
import getopt

import dcl

#---------------------------------------------------------------------

__version__ = "1.0.0"

#---------------------------------------------------------------------

table_name                = "Address"

indexed                   = 0
indexNo                   = 0

keyed                     = 0
key_name                  = None
key_ref                    = None

store_flg                 = 0
store_data                = None

debug_flg                 = 0
term_flg                  = 0
verbose_flg               = 0

#---------------------------------------------------------------------

def process():
   ds = dcl.dcl()

   if (ds == None):
      print("Connection to data server failed - is data server process running?\n")
      return 1

   type_ref  = ds.RegisterType(table_name)

   pid      = os.getpid()

   print "My PID is %d" % pid

   print "Data type \"%s\" registered as %d" % (table_name,  type_ref)

   if (store_flg):
      if keyed:
         ds.StoreKeyedData(type_ref, key_ref, store_data)
      else:
         ds.StoreCsvData(type_ref, store_data)
   else:
      if keyed:
          sp  = ds.GetNextKeyed(type_ref, key_name)
      elif indexed:
          sp  = ds.GetNextIndexed(type_ref, indexNo)
      else:
          sp  = ds.GetNext(type_ref)

      if (sp):
         print "Buffer is \"%s\"" % sp

      if sp != None:
         if len(sp) > 0:
            for i in range(len(sp)):
               print "Field %2d: \"%s\"" % (i, sp[i])
         else:
            print "Field: \"%s\"" % None
      else:
        print "Type %d exhausted" % (pid, type_ref)
#  }

#---------------------------------------------------------------------

def main():
   global debug_flg
   global term_flg
   global verbose_flg
   global indexed
   global indexNo
   global keyed
   global key_name
   global table_name
   global store_flg
   global store_data

   try:
      opts, args = getopt.getopt(sys.argv[1:], "di:k:s:t:TvV?")
   except getopt.error, msg:
      print __doc__,
      return 1

   for o, a in opts:
      if o == '-d':
         debug_flg    = 1
      elif o == '-i':
         indexed      = 1
         indexNo      = int(a)
      elif o == '-k':
         keyed        = 1
         key_name     = a
      elif o == '-t':
         table_name   = a
      elif o == '-T':
         term_flg     = 1
      elif o == '-s':
         store_flg    = 1
         store_data   = a
      elif o == '-v':
         verbose_flg  = 1
      elif o == '-V':
         print "Version: %s" % __version__
         return 0
      elif o == '-?':
         print __doc__()
         return 0

   if args:
      for arg in args:
         print arg
   else:
      pass

   process()

#---------------------------------------------------------------------

if __name__ == '__main__' or __name__ == sys.argv[0]:
   sys.exit(main())

#---------------------------------------------------------------------

"""
Revision History:

     Date     Who   Description
   --------   ---   --------------------------------------------------
   20031014   plh   Initial implementation

Problems to fix:

To Do:

Issues:

"""


Data

dserver.ini


#----- Example dserver data files --------------------------

[Data]
Description=Address:CSV:|
# Description=Stock:CSV:,
Description=Sequence:Sequence:
Description=Indexed:Indexed:,
Description=Keyed:Keyed:,

Address.dat

Level 21,150 Lonsdale Street,Melbourne,Victoria,3000
Level 21,150 Lonsdale Street,Melbourne,Victoria,3000
Level 21,150 Lonsdale Street,Melbourne,Victoria,3000
Level 21,150 Lonsdale Street,Melbourne,Victoria,3000
Level 21,150 Lonsdale Street,Melbourne,Victoria,3000
Level 21,150 Lonsdale Street,Melbourne,Victoria,3000
Level 21,150 Lonsdale Street,Melbourne,Victoria,3000
Level 21,150 Lonsdale Street,Melbourne,Victoria,3000
Level 21,150 Lonsdale Street,Melbourne,Victoria,3000
Level 21,150 Lonsdale Street,Melbourne,Victoria,3000
AAA,Melbourne,Victoria,3000
BBB,Melbourne,Victoria,3000
CCC,Melbourne,Victoria,3000
AAA,Melbourne,Victoria,3000
BBB,Melbourne,Victoria,3000
CCC,Melbourne,Victoria,3000
AAA,Melbourne,Victoria,3000
BBB,Melbourne,Victoria,3000
CCC,Melbourne,Victoria,3000
AAA,Melbourne,Victoria,3000
BBB,Melbourne,Victoria,3000
CCC,Melbourne,Victoria,3000
AAA,Melbourne,Victoria,3000
BBB,Melbourne,Victoria,3000
CCC,Melbourne,Victoria,3000
AAA,Melbourne,Victoria,3000
BBB,Melbourne,Victoria,3000
CCC,Melbourne,Victoria,3000
AAA,Melbourne,Victoria,3000
BBB,Melbourne,Victoria,3000
CCC,Melbourne,Victoria,3000
AAA,Melbourne,Victoria,3000
BBB,Melbourne,Victoria,3000
CCC,Melbourne,Victoria,3000
AAA,Melbourne,Victoria,3000
BBB,Melbourne,Victoria,3000
CCC,Melbourne,Victoria,3000

Keyed.dat

[a100]
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC

[a101]
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC

[b100]
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC
AAA,BBB,CCC

Makefile

reset:
	cp Address.master    Address.dat
	cp Indexed.master    Indexed.dat
	cp Keyed.master      Keyed.dat
	cp Sequence.master   Sequence.dat

Python ctypes Client

This uses the Windows DLL implemented below as MS Visual Studio 2005 project(s).

#!/usr/bin/env python

import sys
import ctypes
import pprint

dserver = ctypes.CDLL('dcl')

pprint.pprint(dserver.__dict__)

print type(dserver)

dserver.dsInit.restype         = ctypes.c_char_p
dserver.dsInit.argtypes        = [ctypes.c_char_p, ctypes.c_int]
dserver.dsRegister.restype     = ctypes.c_int
dserver.dsRegister.argtypes    = [ctypes.c_char_p]
dserver.dsGetNext.restype      = ctypes.c_char_p
dserver.dsGetNext.argtypes     = [ctypes.c_int]
dserver.dsGetKeyed.restype     = ctypes.c_char_p
dserver.dsGetKeyed.argtypes    = [ctypes.c_int, ctypes.c_char_p]
dserver.dsGetIndexed.restype   = ctypes.c_char_p
dserver.dsGetIndexed.argtypes  = [ctypes.c_int, ctypes.c_int]
dserver.dsStore.restype        = ctypes.c_int
dserver.dsStore.argtypes       = [ctypes.c_int, ctypes.c_char_p]

s= 100 * ' '

dserver.get_buffered(1, s, 100)

print s

dserver.dsInit('localhost', 9578)

dx = dserver.dsRegister('Sequence')

dserver.dsGetNext(idx)

MS Visual C Client DLL and Test harness

Here are the zipped up project files...