Python - XML Processing

From PeformIQ Upgrade
Revision as of 11:44, 2 April 2009 by PeterHarding (talk | contribs)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Using XML Module (xml.dom.utils) to Parse XML

Here is a bit of code I used recently for pulling apart a chunk of XML (a SOAP request).

#!/usr/bin/env python
#--------------------------------------------------------------------------

import re
import sys
import getopt
import pprint
import types

import xml.dom.minidom

# from xml.dom.utils import FileReader
from string import join, split    

#--------------------------------------------------------------------------

"""
Read in a DOM instance, convert it to a Python object
"""

__version__    = '1.0.0'

debug_flg      = False
verbose_flg    = False

pp             = pprint.PrettyPrinter(indent=3)
regex          = re.compile("\\n *")

filename       = '0001.xml'

service        = None
supplier       = None
customer       = None
order          = None

in_supplier    = False
in_order       = False
in_address     = False

no_orders      = 0

curr_obj_type  = None
obj            = None

hdr_flg        = {}

#==========================================================================

class Obj:
   pass

   def __init__(self):
      pass

   def __str__(self):
      s = ''
      for key in self.__dict__:
         x = getattr(self, key)
         s += "%s -> %s\n" % (key, x)
      return s

#==========================================================================

class Service:
   pass

   def __init__(self):
      pass

#==========================================================================

class Supplier:
   pass

   def __init__(self):
      self.customers = []

#==========================================================================

customer_attr = [
   'accountNumber',
...
   'customerName'
]

class Customer:
   pass

   def __init__(self):
      self.orders = []

   def dump_header(self, fd):
      fd.write("[Customer],")
      for i in range(len(customer_attr)):
         fd.write("%s," % customer_attr[i])
      fd.write("\n")

   def dump_data(self, fd):
      for i in range(len(customer_attr)):
         fd.write("%s," % getattr(self, customer_attr[i]))
      fd.write("\n")

#==========================================================================

order_attr = [
   'orderID',
   'orderStatus',
...
   'taxCode'
]


attr = {
   'orderID'                       : 'Order',
   'orderStatus'                   : 'IGNORE',
...
   'taxCode'                       : 'Charge'
}


objs = {
   'Order'     : [],
...
   'Charge'    : []
}


class Order:
   orderID                    = None
...
   taxCode                    = None

   def __init__(self):
      pass

   def dump_header(self, fd):
      for i in range(len(order_attr)):
         fd.write("%s," % order_attr[i])
      fd.write("\n")

   def dump_data(self, fd):
      for i in range(len(order_attr)):
         fd.write("%s," % getattr(self, order_attr[i]))
      fd.write("\n")

#----- Recurse node -------------------------------------------------------

def capture(node_name, value):
   global service, supplier, customer, order
   global in_supplier, in_customer, in_order, no_orders
   global obj

   if not service:
      service = Service()

   if not supplier:
      supplier = Supplier()

   if re.match('ns1:', node_name):
      (name, cnt) = re.subn('ns1:', '', node_name)
      setattr(service, name, value)
      print "service.__dict__ -> '%s'" % service.__dict__
      return

   if re.match('ns0:', node_name):
      (name, cnt) = re.subn('ns0:', '', node_name)

      print ">>> name : %s" % name

      if re.match('apAccountNumber', name):
         in_supplier = False
         in_customer = True
         in_order    = False

         customer    = Customer()

         print "New customer!"

         supplier.customers.append(customer)
         setattr(customer, name, value)
         # print customer.__dict__
         return

      if re.match('orderID', name):
         in_suppler  = False
         in_customer = False
         in_order    = True
         no_orders += 1

         order    = Order()
         obj      = Obj()
         obj.type = 'Order'

         objs['Order'].append(obj)

         setattr(obj, name, value)
         customer.orders.append(order)
         setattr(order, name, value)
         # print order.__dict__
         return

      if re.match('supplier_ID', name):
         in_supplier = True
         setattr(supplier, name, value)
         # print supplier.__dict__
         return

      if in_supplier:
         setattr(supplier, name, value)
         print "supplier.__dict__ -> '%s'" % supplier.__dict__
         return

      if in_customer:
         setattr(customer, name, value)
         print customer.__dict__
         return

      if in_order:
         setattr(order, name, value)
         type = attr[name]
         if type == obj.type:
            # add this attribute.
            setattr(obj, name, value)
         else:
            # new object - saveold one and create new one
            if type == 'IGNORE':
               return
            obj      = Obj()
            obj.type = type
            objs[type].append(obj)

         # print order.__dict__
         return

#----- Recurse node -------------------------------------------------------

def recurse_node(node):
   """\
      i is either an IntType or a LambdaType, m is 
      either an instance based on of myType (or a
      sub-type of myType or None). Any
      parameter can have None as a valid type
      unless it is disallowed via a require block.
      Both arguments are required."""

   # print "[recurse_node]  START"

   if  node.__dict__.has_key('nodeName'):
      node_name = node.nodeName
   else:
      # print "Node name not defined"
      node_name = False

   if  node.__dict__.has_key('childNodes'):
      nodes = node.childNodes
   else:
      nodes = None
      # display_node(node)

   if not nodes:
      if  node.__dict__.has_key('nodeValue'):
         value    = node.nodeValue
         matched  = regex.search(value)
         if matched:
            value = re.sub(r' *\n *', '', value)

      if verbose_flg: print ">>>>>>> Bogus text node!"

      return

   if not node_name:
      return

   no_nodes = len(nodes)

   if no_nodes == 1:
      if nodes[0].nodeName == '#text':
         value = nodes[0].nodeValue

      if verbose_flg: print "=====  Node: %-30s %s" % (node_name, value)

      capture(node_name, value)

   else:
      if verbose_flg: print "@@@@@  Node: %-30s" % node_name

      for node in nodes:
         if not display_node(node):
            continue
         recurse_node(node)

#----- Display node -------------------------------------------------------

def display_node(node):
   # print "[display_node]  ENTER"

   if  node.nodeName == '#text':
      if  node.__dict__.has_key('nodeValue'):
         value = node.nodeValue
         # print "[display_node]  Node Value:  [%s]" % value
         matched = regex.search(value)
         if matched:
            value = re.sub(r'\n *', 'N/A', value)
            # print "[display_node]  RETURN False - %s" % value
            return False
         # print "[display_node]  Node Name:  [%s]" % node.nodeName
   else:
      pass
      # print "pass..."

   if verbose_flg:
      print "===== node ====================================================="
      print "Node Name:  [%s]" % node.nodeName
      print "----------------------------------------------------------------"
      pp.pprint(node.__dict__)
      print "\n"

   # print "[display_node]  RETURN True"
   return True

#----- Usage --------------------------------------------------------------

def clean(filename):
   ifd   = open(filename, 'r')

   data = ifd.read()

   data = data.replace(chr(0240), '')
   data = data.replace(chr(0302), '')

   out  = open('parse.xml', 'w')

   out.write(data)

   ifd.close()
   out.close()

#----- Usage --------------------------------------------------------------

def usage():
   USAGE = """\

      $ ./parse_order.py [-f <file>]

   """

   sys.stderr.write(USAGE)

#----- Main ---------------------------------------------------------------

def parse(filename):
   clean(filename)  # -> parse.xml

   dom_obj =  xml.dom.minidom.parse('parse.xml')

   #-----------------------------------------------------------------

   # print "===== dom_obj.__dict__ =========================================\n"
   # pp.pprint(dom_obj.__dict__)
   # print "\n"

   # print "===== dom_obj.childNodes: ======================================\n"
   # pp.pprint(dom_obj.childNodes[0].__dict__)
   # print "\n"

   recurse_node(dom_obj.childNodes[0])

   print 'Processed %d orders' % no_orders

   if debug_flg:
      ofd = sys.stdout
   else:
      ofd = open('orders.csv', 'w')

   Customer().dump_header(ofd)

   for customer in supplier.customers:
      print "customer.__dict__ -> '%s'" % customer.__dict__
      customer.dump_data(ofd)
      Order().dump_header(ofd)
      for order in customer.orders:
         order.dump_data(ofd)

   ofd.close()

#---------------------------------------------------------------------

def dump():
   for obj_type in objs.keys():
      print obj_type
      ofp = open('dat/%s/%s.dat' % (supplier.supplier_ID, obj_type), 'w')
      obj_list = objs[obj_type]
      for obj in obj_list:
         if not hdr_flg.has_key(obj_type):
            hdr_flg[obj_type] = 1
            hdr = ''
            for attr in obj.__dict__.keys():
               if (attr != 'type'):
                  hdr += '%s,' % attr
            hdr += '\n'
            ofp.write(hdr)
         line = ''
         for attr in obj.__dict__.keys():
            val = getattr(obj, attr)
            if (val != obj_type):
               line += '"%s",' % getattr(obj, attr)
         line += '\n'
         ofp.write(line)

         print obj

#--------------------------------------------------------------------------

def usage():
   USAGE = """
   
     Usage:
     
       $ dt.py
    
   """
   
   sys.stderr.write(USAGE)
   
#---------------------------------------------------------------------

def main(argv):
   global debug_flg
   global verbose_flg
   global filename
   global pp

   #----- Process command line arguments ----------------------------

   try:
      opts, args = getopt.getopt(argv, "dhf:sv", ["debug", "help", "file=", "stdout", "verbose"])
   except getopt.GetoptError:
      usage()
      sys.exit(2)
   else:
      for opt, arg in opts:
         if opt in ("-d", "--debug"):
            debug_flg = True
         elif opt in ("-h", "--help"):
            usage()
            sys.exit(0)
         elif opt in ("-f", "--file"):
            filename = arg
         elif opt in ("-s", "--stdout"):
            stdout_flg = True
         elif opt in ("-v", "--verbose"):
            verbose_flg = True

   parse(filename)
   dump()

#---------------------------------------------------------------------

if __name__ == "__main__":
   main(sys.argv[1:])

#---------------------------------------------------------------------

Sample data file