Python - XML Processing
Jump to navigation
Jump to search
Using XML Module (xml.dom.utils) to Parse XML
Here is a bit of code I used recently for pulling apart a chunk of XML (a SOAP request).
#!/usr/bin/env python
#--------------------------------------------------------------------------
import re
import sys
import getopt
import pprint
import types
import xml.dom.minidom
# from xml.dom.utils import FileReader
from string import join, split
#--------------------------------------------------------------------------
"""
Read in a DOM instance, convert it to a Python object
"""
__version__ = '1.0.0'
debug_flg = False
verbose_flg = False
pp = pprint.PrettyPrinter(indent=3)
regex = re.compile("\\n *")
filename = '0001.xml'
service = None
supplier = None
customer = None
order = None
in_supplier = False
in_order = False
in_address = False
no_orders = 0
curr_obj_type = None
obj = None
hdr_flg = {}
#==========================================================================
class Obj:
pass
def __init__(self):
pass
def __str__(self):
s = ''
for key in self.__dict__:
x = getattr(self, key)
s += "%s -> %s\n" % (key, x)
return s
#==========================================================================
class Service:
pass
def __init__(self):
pass
#==========================================================================
class Supplier:
pass
def __init__(self):
self.customers = []
#==========================================================================
customer_attr = [
'accountNumber',
...
'customerName'
]
class Customer:
pass
def __init__(self):
self.orders = []
def dump_header(self, fd):
fd.write("[Customer],")
for i in range(len(customer_attr)):
fd.write("%s," % customer_attr[i])
fd.write("\n")
def dump_data(self, fd):
for i in range(len(customer_attr)):
fd.write("%s," % getattr(self, customer_attr[i]))
fd.write("\n")
#==========================================================================
order_attr = [
'orderID',
'orderStatus',
...
'taxCode'
]
attr = {
'orderID' : 'Order',
'orderStatus' : 'IGNORE',
...
'taxCode' : 'Charge'
}
objs = {
'Order' : [],
...
'Charge' : []
}
class Order:
orderID = None
...
taxCode = None
def __init__(self):
pass
def dump_header(self, fd):
for i in range(len(order_attr)):
fd.write("%s," % order_attr[i])
fd.write("\n")
def dump_data(self, fd):
for i in range(len(order_attr)):
fd.write("%s," % getattr(self, order_attr[i]))
fd.write("\n")
#----- Recurse node -------------------------------------------------------
def capture(node_name, value):
global service, supplier, customer, order
global in_supplier, in_customer, in_order, no_orders
global obj
if not service:
service = Service()
if not supplier:
supplier = Supplier()
if re.match('ns1:', node_name):
(name, cnt) = re.subn('ns1:', '', node_name)
setattr(service, name, value)
print "service.__dict__ -> '%s'" % service.__dict__
return
if re.match('ns0:', node_name):
(name, cnt) = re.subn('ns0:', '', node_name)
print ">>> name : %s" % name
if re.match('apAccountNumber', name):
in_supplier = False
in_customer = True
in_order = False
customer = Customer()
print "New customer!"
supplier.customers.append(customer)
setattr(customer, name, value)
# print customer.__dict__
return
if re.match('orderID', name):
in_suppler = False
in_customer = False
in_order = True
no_orders += 1
order = Order()
obj = Obj()
obj.type = 'Order'
objs['Order'].append(obj)
setattr(obj, name, value)
customer.orders.append(order)
setattr(order, name, value)
# print order.__dict__
return
if re.match('supplier_ID', name):
in_supplier = True
setattr(supplier, name, value)
# print supplier.__dict__
return
if in_supplier:
setattr(supplier, name, value)
print "supplier.__dict__ -> '%s'" % supplier.__dict__
return
if in_customer:
setattr(customer, name, value)
print customer.__dict__
return
if in_order:
setattr(order, name, value)
type = attr[name]
if type == obj.type:
# add this attribute.
setattr(obj, name, value)
else:
# new object - saveold one and create new one
if type == 'IGNORE':
return
obj = Obj()
obj.type = type
objs[type].append(obj)
# print order.__dict__
return
#----- Recurse node -------------------------------------------------------
def recurse_node(node):
"""\
i is either an IntType or a LambdaType, m is
either an instance based on of myType (or a
sub-type of myType or None). Any
parameter can have None as a valid type
unless it is disallowed via a require block.
Both arguments are required."""
# print "[recurse_node] START"
if node.__dict__.has_key('nodeName'):
node_name = node.nodeName
else:
# print "Node name not defined"
node_name = False
if node.__dict__.has_key('childNodes'):
nodes = node.childNodes
else:
nodes = None
# display_node(node)
if not nodes:
if node.__dict__.has_key('nodeValue'):
value = node.nodeValue
matched = regex.search(value)
if matched:
value = re.sub(r' *\n *', '', value)
if verbose_flg: print ">>>>>>> Bogus text node!"
return
if not node_name:
return
no_nodes = len(nodes)
if no_nodes == 1:
if nodes[0].nodeName == '#text':
value = nodes[0].nodeValue
if verbose_flg: print "===== Node: %-30s %s" % (node_name, value)
capture(node_name, value)
else:
if verbose_flg: print "@@@@@ Node: %-30s" % node_name
for node in nodes:
if not display_node(node):
continue
recurse_node(node)
#----- Display node -------------------------------------------------------
def display_node(node):
# print "[display_node] ENTER"
if node.nodeName == '#text':
if node.__dict__.has_key('nodeValue'):
value = node.nodeValue
# print "[display_node] Node Value: [%s]" % value
matched = regex.search(value)
if matched:
value = re.sub(r'\n *', 'N/A', value)
# print "[display_node] RETURN False - %s" % value
return False
# print "[display_node] Node Name: [%s]" % node.nodeName
else:
pass
# print "pass..."
if verbose_flg:
print "===== node ====================================================="
print "Node Name: [%s]" % node.nodeName
print "----------------------------------------------------------------"
pp.pprint(node.__dict__)
print "\n"
# print "[display_node] RETURN True"
return True
#----- Usage --------------------------------------------------------------
def clean(filename):
ifd = open(filename, 'r')
data = ifd.read()
data = data.replace(chr(0240), '')
data = data.replace(chr(0302), '')
out = open('parse.xml', 'w')
out.write(data)
ifd.close()
out.close()
#----- Usage --------------------------------------------------------------
def usage():
USAGE = """\
$ ./parse_order.py [-f <file>]
"""
sys.stderr.write(USAGE)
#----- Main ---------------------------------------------------------------
def parse(filename):
clean(filename) # -> parse.xml
dom_obj = xml.dom.minidom.parse('parse.xml')
#-----------------------------------------------------------------
# print "===== dom_obj.__dict__ =========================================\n"
# pp.pprint(dom_obj.__dict__)
# print "\n"
# print "===== dom_obj.childNodes: ======================================\n"
# pp.pprint(dom_obj.childNodes[0].__dict__)
# print "\n"
recurse_node(dom_obj.childNodes[0])
print 'Processed %d orders' % no_orders
if debug_flg:
ofd = sys.stdout
else:
ofd = open('orders.csv', 'w')
Customer().dump_header(ofd)
for customer in supplier.customers:
print "customer.__dict__ -> '%s'" % customer.__dict__
customer.dump_data(ofd)
Order().dump_header(ofd)
for order in customer.orders:
order.dump_data(ofd)
ofd.close()
#---------------------------------------------------------------------
def dump():
for obj_type in objs.keys():
print obj_type
ofp = open('dat/%s/%s.dat' % (supplier.supplier_ID, obj_type), 'w')
obj_list = objs[obj_type]
for obj in obj_list:
if not hdr_flg.has_key(obj_type):
hdr_flg[obj_type] = 1
hdr = ''
for attr in obj.__dict__.keys():
if (attr != 'type'):
hdr += '%s,' % attr
hdr += '\n'
ofp.write(hdr)
line = ''
for attr in obj.__dict__.keys():
val = getattr(obj, attr)
if (val != obj_type):
line += '"%s",' % getattr(obj, attr)
line += '\n'
ofp.write(line)
print obj
#--------------------------------------------------------------------------
def usage():
USAGE = """
Usage:
$ dt.py
"""
sys.stderr.write(USAGE)
#---------------------------------------------------------------------
def main(argv):
global debug_flg
global verbose_flg
global filename
global pp
#----- Process command line arguments ----------------------------
try:
opts, args = getopt.getopt(argv, "dhf:sv", ["debug", "help", "file=", "stdout", "verbose"])
except getopt.GetoptError:
usage()
sys.exit(2)
else:
for opt, arg in opts:
if opt in ("-d", "--debug"):
debug_flg = True
elif opt in ("-h", "--help"):
usage()
sys.exit(0)
elif opt in ("-f", "--file"):
filename = arg
elif opt in ("-s", "--stdout"):
stdout_flg = True
elif opt in ("-v", "--verbose"):
verbose_flg = True
parse(filename)
dump()
#---------------------------------------------------------------------
if __name__ == "__main__":
main(sys.argv[1:])
#---------------------------------------------------------------------