Python - XML Processing

From PeformIQ Upgrade
Revision as of 15:05, 21 April 2008 by PeterHarding (talk | contribs)
Jump to navigation Jump to search

Using XML Module (xml.dom.utils) to Parse XML

Here is a bit of code I used recently for pulling apart a chunk of XML (a SOAP request).

#!/usr/bin/env python
#--------------------------------------------------------------------------

import re
import sys
import getopt
import pprint
import types

import xml.dom.minidom

import ebdm

# from xml.dom.utils import FileReader
from string import join, split    

#--------------------------------------------------------------------------

"""
Read in a DOM instance, convert it to a Python object
"""

__version__    = '1.0.0'

debug_flg      = False
verbose_flg    = False

pp             = pprint.PrettyPrinter(indent=3)
regex          = re.compile("\\n *")

filename       = '0001.xml'

service        = None
cdp            = None
customer       = None
order          = None

in_cdp         = False
in_order       = False
in_address     = False

no_orders      = 0

curr_obj_type  = None
obj            = None

hdr_flg        = {}

#==========================================================================

class Obj:
   pass

   def __init__(self):
      pass

   def __str__(self):
      s = ''
      for key in self.__dict__:
         x = getattr(self, key)
         s += "%s -> %s\n" % (key, x)
      return s

#==========================================================================

class Service:
   pass

   def __init__(self):
      pass

#==========================================================================

class CDP:
   pass

   def __init__(self):
      self.customers = []

#==========================================================================

customer_attr = [
   'apAccountNumber',
   'cdpAccountNumber',
   'cdpCustomerID',
   'cdpCustomerName'
]

class Customer:
   pass

   def __init__(self):
      self.orders = []

   def dump_header(self, fd):
      fd.write("[Customer],")
      for i in range(len(customer_attr)):
         fd.write("%s," % customer_attr[i])
      fd.write("\n")

   def dump_data(self, fd):
      for i in range(len(customer_attr)):
         fd.write("%s," % getattr(self, customer_attr[i]))
      fd.write("\n")

#==========================================================================

order_attr = [
   'orderID',
   'orderStatus',
   'orderDateTime',
   'readyDateTime',
   'invoiceDateTime',
   'numberOfItems',
   'workCentreID',
   'customerCostCentre',
   'orderRef1',
   'orderRef1Desc',
   'orderRef2',
   'orderRef2Desc',
   'orderRef3',
   'orderRef3Desc',
   'otherCustomerRef',
   'otherCustomerRefDesc',
   'address1',
   'address2',
   'address3',
   'suburb',
   'state',
   'postcode',
   'entryWeight',
   'cubedWeight',
   'actualWeight',
   'height',
   'length',
   'width',
   'serviceType',
   'serviceArea',
   'serviceCodeSet',
   'serviceCode',
   'serviceDesc',
   'apProductCode',
   'transactionID',
   'transactionType',
   'chargeType',
   'chargeCode',
   'chargeCodeDesc',
   'priceCharged',
   'taxCode'
]


attr = {
   'orderID'                       : 'Order',
   'orderStatus'                   : 'IGNORE',
   'orderDateTime'                 : 'IGNORE',
   'readyDateTime'                 : 'IGNORE',
   'invoiceDateTime'               : 'IGNORE',
   'numberOfItems'                 : 'Order',
   'workCentreID'                  : 'Order',
   'customerCostCentre'            : 'Order',

   'orderRef1'                     : 'CustRef',
   'orderRef1Desc'                 : 'CustRef',
   'orderRef2'                     : 'CustRef',
   'orderRef2Desc'                 : 'CustRef',
   'orderRef3'                     : 'CustRef',
   'orderRef3Desc'                 : 'CustRef',
   'otherCustomerRef'              : 'CustRef',
   'otherCustomerRefDesc'          : 'CustRef',

   'address1'                      : 'Address',
   'address2'                      : 'Address',
   'address3'                      : 'Address',
   'suburb'                        : 'Address',
   'postcode'                      : 'Address',
   'state'                         : 'Address',

   'entryWeight'                   : 'Weight',
   'cubedWeight'                   : 'Weight',
   'actualWeight'                  : 'Weight',

   'height'                        : 'Dimension',
   'length'                        : 'Dimension',
   'width'                         : 'Dimension',

   'schedPickUpDateTime'           : 'IGNORE',
   'pickUpTimeliness'              : 'IGNORE',
   'schedDeliveryDateTime'         : 'IGNORE',
   'deliveryTimeliness'            : 'IGNORE',
   'pickupSignatureRequired'       : 'IGNORE',
   'pickupSignatureReceived'       : 'IGNORE',
   'actualPickUpDateTime'          : 'IGNORE',
   'latePickUpIndicator'           : 'IGNORE',
   'deliverySignatureRequired'     : 'IGNORE',
   'deliverySignatureReceived'     : 'IGNORE',
   'actualDeliveryDateTime'        : 'IGNORE',
   'lateDeliveryIndicator'         : 'IGNORE',

   'serviceType'                   : 'Service',
   'serviceArea'                   : 'Service',
   'serviceCodeSet'                : 'Service',
   'serviceCode'                   : 'Service',
   'serviceDesc'                   : 'Service',
   'apProductCode'                 : 'Service',

   'transactionID'                 : 'Charge',
   'transactionType'               : 'Charge',
   'chargeType'                    : 'Charge',
   'chargeCode'                    : 'Charge',
   'chargeCodeDesc'                : 'Charge',
   'priceCharged'                  : 'Charge',
   'taxCode'                       : 'Charge'
}


objs = {
   'Order'     : [],
   'CustRef'   : [],
   'Address'   : [],
   'Weight'    : [],
   'Dimension' : [],
   'Service'   : [],
   'Charge'    : []
}


class Order:
   orderID                    = None
   orderStatus                = None
   orderDateTime              = None
   readyDateTime              = None
   invoiceDateTime            = None
   numberOfItems              = None
   workCentreID               = None
   customerCostCentre         = None
   otherCustomerRef           = None
   otherCustomerRefDesc       = None
   orderRef1                  = None
   orderRef1Desc              = None
   orderRef2                  = None
   orderRef2Desc              = None
   orderRef3                  = None
   orderRef3Desc              = None
   address1                   = None
   address2                   = None
   address3                   = None
   suburb                     = None
   postcode                   = None
   state                      = None
   entryWeight                = None
   cubedWeight                = None
   actualWeight               = None
   height                     = None
   length                     = None
   width                      = None
   serviceType                = None
   serviceArea                = None
   serviceCodeSet             = None
   serviceCode                = None
   serviceDesc                = None
   apProductCode              = None
   transactionID              = None
   transactionType            = None
   chargeType                 = None
   chargeCode                 = None
   chargeCodeDesc             = None
   priceCharged               = None
   taxCode                    = None

   def __init__(self):
      pass

   def dump_header(self, fd):
      for i in range(len(order_attr)):
         fd.write("%s," % order_attr[i])
      fd.write("\n")

   def dump_data(self, fd):
      for i in range(len(order_attr)):
         fd.write("%s," % getattr(self, order_attr[i]))
      fd.write("\n")

#----- Recurse node -------------------------------------------------------
"""
  Node: esb:SvcName                    receiveBilling
  Node: esb:SvcVersion                 1.0
  Node: esb:RequestDt                  2007-06-20T11:36:00
  Node: esb:ComponentID                Test
  Node: esb:ComponentName              Test Application
  Node: ns0:cdpID                      AaE
  Node: ns0:consortiumID               APCBE
  Node: ns0:sourceSystemID             AAE PSft
  Node: ns0:sequenceID                 1
  Node: ns0:notificationEmail          xxx@xxx.com.au
  Node: ns0:apAccountNumber            5322201
  Node: ns0:cdpAccountNumber           2922230
  Node: ns0:cdpCustomerID              2822275
  Node: ns0:cdpCustomerName            XXXXX
  Node: ns0:orderID                    PH0000000097
  Node: ns0:orderDateTime              2003-12-31T12:00:00.000+10:00
"""

def capture(node_name, value):
   global service, cdp, customer, order
   global in_cdp, in_customer, in_order, no_orders
   global obj

   if not service:
      service = Service()

   if not cdp:
      cdp = CDP()

   if re.match('esb:', node_name):
      (name, cnt) = re.subn('esb:', '', node_name)
      setattr(service, name, value)
      print "service.__dict__ -> '%s'" % service.__dict__
      return

   if re.match('ns0:', node_name):
      (name, cnt) = re.subn('ns0:', '', node_name)

      print ">>> name : %s" % name

      if re.match('apAccountNumber', name):
         in_cdp      = False
         in_customer = True
         in_order    = False

         customer    = Customer()

         print "New customer!"

         cdp.customers.append(customer)
         setattr(customer, name, value)
         # print customer.__dict__
         return

      if re.match('orderID', name):
         in_cdp      = False
         in_customer = False
         in_order    = True
         no_orders += 1

         order    = Order()
         obj      = Obj()
         obj.type = 'Order'

         objs['Order'].append(obj)

         setattr(obj, name, value)
         customer.orders.append(order)
         setattr(order, name, value)
         # print order.__dict__
         return

      if re.match('cdpID', name):
         in_cdp = True
         setattr(cdp, name, value)
         # print cdp.__dict__
         return

      if in_cdp:
         setattr(cdp, name, value)
         print "cdp.__dict__ -> '%s'" % cdp.__dict__
         return

      if in_customer:
         setattr(customer, name, value)
         print customer.__dict__
         return

      if in_order:
         setattr(order, name, value)
         type = attr[name]
         if type == obj.type:
            # add this attribute.
            setattr(obj, name, value)
         else:
            # new object - saveold one and create new one
            if type == 'IGNORE':
               return
            obj      = Obj()
            obj.type = type
            objs[type].append(obj)

         # print order.__dict__
         return

#----- Recurse node -------------------------------------------------------

def recurse_node(node):
   """\
      i is either an IntType or a LambdaType, m is 
      either an instance based on of myType (or a
      sub-type of myType or None). Any
      parameter can have None as a valid type
      unless it is disallowed via a require block.
      Both arguments are required."""

   # print "[recurse_node]  START"

   if  node.__dict__.has_key('nodeName'):
      node_name = node.nodeName
   else:
      # print "Node name not defined"
      node_name = False

   if  node.__dict__.has_key('childNodes'):
      nodes = node.childNodes
   else:
      nodes = None
      # display_node(node)

   if not nodes:
      if  node.__dict__.has_key('nodeValue'):
         value    = node.nodeValue
         matched  = regex.search(value)
         if matched:
            value = re.sub(r' *\n *', '', value)

      if verbose_flg: print ">>>>>>> Bogus text node!"

      return

   if not node_name:
      return

   no_nodes = len(nodes)

   if no_nodes == 1:
      if nodes[0].nodeName == '#text':
         value = nodes[0].nodeValue

      if verbose_flg: print "=====  Node: %-30s %s" % (node_name, value)

      capture(node_name, value)

   else:
      if verbose_flg: print "@@@@@  Node: %-30s" % node_name

      for node in nodes:
         if not display_node(node):
            continue
         recurse_node(node)

#----- Display node -------------------------------------------------------

def display_node(node):
   # print "[display_node]  ENTER"

   if  node.nodeName == '#text':
      if  node.__dict__.has_key('nodeValue'):
         value = node.nodeValue
         # print "[display_node]  Node Value:  [%s]" % value
         matched = regex.search(value)
         if matched:
            value = re.sub(r'\n *', 'N/A', value)
            # print "[display_node]  RETURN False - %s" % value
            return False
         # print "[display_node]  Node Name:  [%s]" % node.nodeName
   else:
      pass
      # print "pass..."

   if verbose_flg:
      print "===== node ====================================================="
      print "Node Name:  [%s]" % node.nodeName
      print "----------------------------------------------------------------"
      pp.pprint(node.__dict__)
      print "\n"

   # print "[display_node]  RETURN True"
   return True

#----- Usage --------------------------------------------------------------

def clean(filename):
   ifd   = open(filename, 'r')

   data = ifd.read()

   data = data.replace(chr(0240), '')
   data = data.replace(chr(0302), '')

   out  = open('parse.xml', 'w')

   out.write(data)

   ifd.close()
   out.close()

#----- Usage --------------------------------------------------------------

def usage():
   USAGE = """\

      $ ./parse_order.py [-f <file>]

   """

   sys.stderr.write(USAGE)

#----- Main ---------------------------------------------------------------

def parse(filename):
   clean(filename)  # -> parse.xml

   dom_obj =  xml.dom.minidom.parse('parse.xml')

   #-----------------------------------------------------------------

   # print "===== dom_obj.__dict__ =========================================\n"
   # pp.pprint(dom_obj.__dict__)
   # print "\n"

   # print "===== dom_obj.childNodes: ======================================\n"
   # pp.pprint(dom_obj.childNodes[0].__dict__)
   # print "\n"

   recurse_node(dom_obj.childNodes[0])

   print 'Processed %d orders' % no_orders

   if debug_flg:
      ofd = sys.stdout
   else:
      ofd = open('orders.csv', 'w')

   Customer().dump_header(ofd)

   for customer in cdp.customers:
      print "customer.__dict__ -> '%s'" % customer.__dict__
      customer.dump_data(ofd)
      Order().dump_header(ofd)
      for order in customer.orders:
         order.dump_data(ofd)

   ofd.close()

#---------------------------------------------------------------------

def dump():
   for obj_type in objs.keys():
      print obj_type
      ofp = open('dat/%s/%s.dat' % (cdp.cdpID, obj_type), 'w')
      obj_list = objs[obj_type]
      for obj in obj_list:
         if not hdr_flg.has_key(obj_type):
            hdr_flg[obj_type] = 1
            hdr = ''
            for attr in obj.__dict__.keys():
               if (attr != 'type'):
                  hdr += '%s,' % attr
            hdr += '\n'
            ofp.write(hdr)
         line = ''
         for attr in obj.__dict__.keys():
            val = getattr(obj, attr)
            if (val != obj_type):
               line += '"%s",' % getattr(obj, attr)
         line += '\n'
         ofp.write(line)

         print obj

#--------------------------------------------------------------------------

def usage():
   USAGE = """
   
     Usage:
     
       $ dt.py
    
   """
   
   sys.stderr.write(USAGE)
   
#---------------------------------------------------------------------

def main(argv):
   global debug_flg
   global verbose_flg
   global filename
   global pp

   #----- Process command line arguments ----------------------------

   try:
      opts, args = getopt.getopt(argv, "dhf:sv", ["debug", "help", "file=", "stdout", "verbose"])
   except getopt.GetoptError:
      usage()
      sys.exit(2)
   else:
      for opt, arg in opts:
         if opt in ("-d", "--debug"):
            debug_flg = True
         elif opt in ("-h", "--help"):
            usage()
            sys.exit(0)
         elif opt in ("-f", "--file"):
            filename = arg
         elif opt in ("-s", "--stdout"):
            stdout_flg = True
         elif opt in ("-v", "--verbose"):
            verbose_flg = True

   parse(filename)
   dump()

#---------------------------------------------------------------------

if __name__ == "__main__":
   main(sys.argv[1:])

#---------------------------------------------------------------------

Sample data file