Difference between revisions of "Python - XML Processing"
Jump to navigation
Jump to search
PeterHarding (talk | contribs) |
PeterHarding (talk | contribs) |
||
Line 14: | Line 14: | ||
import xml.dom.minidom | import xml.dom.minidom | ||
# from xml.dom.utils import FileReader | # from xml.dom.utils import FileReader | ||
Line 37: | Line 35: | ||
service = None | service = None | ||
supplier = None | |||
customer = None | customer = None | ||
order = None | order = None | ||
in_supplier = False | |||
in_order = False | in_order = False | ||
in_address = False | in_address = False | ||
Line 77: | Line 75: | ||
#========================================================================== | #========================================================================== | ||
class | class Supplier: | ||
pass | pass | ||
Line 86: | Line 84: | ||
customer_attr = [ | customer_attr = [ | ||
' | 'accountNumber', | ||
... | |||
'customerName' | |||
' | |||
] | ] | ||
Line 114: | Line 111: | ||
'orderID', | 'orderID', | ||
'orderStatus', | 'orderStatus', | ||
... | |||
'taxCode' | 'taxCode' | ||
] | ] | ||
Line 159: | Line 119: | ||
'orderID' : 'Order', | 'orderID' : 'Order', | ||
'orderStatus' : 'IGNORE', | 'orderStatus' : 'IGNORE', | ||
... | |||
'taxCode' : 'Charge' | 'taxCode' : 'Charge' | ||
} | } | ||
Line 222: | Line 126: | ||
objs = { | objs = { | ||
'Order' : [], | 'Order' : [], | ||
... | |||
'Charge' : [] | 'Charge' : [] | ||
} | } | ||
Line 233: | Line 133: | ||
class Order: | class Order: | ||
orderID = None | orderID = None | ||
... | |||
taxCode = None | taxCode = None | ||
Line 288: | Line 150: | ||
#----- Recurse node ------------------------------------------------------- | #----- Recurse node ------------------------------------------------------- | ||
def capture(node_name, value): | def capture(node_name, value): | ||
global service, | global service, supplier, customer, order | ||
global | global in_supplier, in_customer, in_order, no_orders | ||
global obj | global obj | ||
Line 315: | Line 159: | ||
service = Service() | service = Service() | ||
if not | if not supplier: | ||
supplier = Supplier() | |||
if re.match(' | if re.match('ns1:', node_name): | ||
(name, cnt) = re.subn(' | (name, cnt) = re.subn('ns1:', '', node_name) | ||
setattr(service, name, value) | setattr(service, name, value) | ||
print "service.__dict__ -> '%s'" % service.__dict__ | print "service.__dict__ -> '%s'" % service.__dict__ | ||
Line 330: | Line 174: | ||
if re.match('apAccountNumber', name): | if re.match('apAccountNumber', name): | ||
in_supplier = False | |||
in_customer = True | in_customer = True | ||
in_order = False | in_order = False | ||
Line 338: | Line 182: | ||
print "New customer!" | print "New customer!" | ||
supplier.customers.append(customer) | |||
setattr(customer, name, value) | setattr(customer, name, value) | ||
# print customer.__dict__ | # print customer.__dict__ | ||
Line 344: | Line 188: | ||
if re.match('orderID', name): | if re.match('orderID', name): | ||
in_suppler = False | |||
in_customer = False | in_customer = False | ||
in_order = True | in_order = True | ||
Line 361: | Line 205: | ||
return | return | ||
if re.match(' | if re.match('supplier_ID', name): | ||
in_supplier = True | |||
setattr( | setattr(supplier, name, value) | ||
# print | # print supplier.__dict__ | ||
return | return | ||
if | if in_supplier: | ||
setattr( | setattr(supplier, name, value) | ||
print " | print "supplier.__dict__ -> '%s'" % supplier.__dict__ | ||
return | return | ||
Line 536: | Line 380: | ||
Customer().dump_header(ofd) | Customer().dump_header(ofd) | ||
for customer in | for customer in supplier.customers: | ||
print "customer.__dict__ -> '%s'" % customer.__dict__ | print "customer.__dict__ -> '%s'" % customer.__dict__ | ||
customer.dump_data(ofd) | customer.dump_data(ofd) | ||
Line 550: | Line 394: | ||
for obj_type in objs.keys(): | for obj_type in objs.keys(): | ||
print obj_type | print obj_type | ||
ofp = open('dat/%s/%s.dat' % ( | ofp = open('dat/%s/%s.dat' % (supplier.supplier_ID, obj_type), 'w') | ||
obj_list = objs[obj_type] | obj_list = objs[obj_type] | ||
for obj in obj_list: | for obj in obj_list: |
Latest revision as of 11:44, 2 April 2009
Using XML Module (xml.dom.utils) to Parse XML
Here is a bit of code I used recently for pulling apart a chunk of XML (a SOAP request).
#!/usr/bin/env python #-------------------------------------------------------------------------- import re import sys import getopt import pprint import types import xml.dom.minidom # from xml.dom.utils import FileReader from string import join, split #-------------------------------------------------------------------------- """ Read in a DOM instance, convert it to a Python object """ __version__ = '1.0.0' debug_flg = False verbose_flg = False pp = pprint.PrettyPrinter(indent=3) regex = re.compile("\\n *") filename = '0001.xml' service = None supplier = None customer = None order = None in_supplier = False in_order = False in_address = False no_orders = 0 curr_obj_type = None obj = None hdr_flg = {} #========================================================================== class Obj: pass def __init__(self): pass def __str__(self): s = '' for key in self.__dict__: x = getattr(self, key) s += "%s -> %s\n" % (key, x) return s #========================================================================== class Service: pass def __init__(self): pass #========================================================================== class Supplier: pass def __init__(self): self.customers = [] #========================================================================== customer_attr = [ 'accountNumber', ... 'customerName' ] class Customer: pass def __init__(self): self.orders = [] def dump_header(self, fd): fd.write("[Customer],") for i in range(len(customer_attr)): fd.write("%s," % customer_attr[i]) fd.write("\n") def dump_data(self, fd): for i in range(len(customer_attr)): fd.write("%s," % getattr(self, customer_attr[i])) fd.write("\n") #========================================================================== order_attr = [ 'orderID', 'orderStatus', ... 'taxCode' ] attr = { 'orderID' : 'Order', 'orderStatus' : 'IGNORE', ... 'taxCode' : 'Charge' } objs = { 'Order' : [], ... 'Charge' : [] } class Order: orderID = None ... taxCode = None def __init__(self): pass def dump_header(self, fd): for i in range(len(order_attr)): fd.write("%s," % order_attr[i]) fd.write("\n") def dump_data(self, fd): for i in range(len(order_attr)): fd.write("%s," % getattr(self, order_attr[i])) fd.write("\n") #----- Recurse node ------------------------------------------------------- def capture(node_name, value): global service, supplier, customer, order global in_supplier, in_customer, in_order, no_orders global obj if not service: service = Service() if not supplier: supplier = Supplier() if re.match('ns1:', node_name): (name, cnt) = re.subn('ns1:', '', node_name) setattr(service, name, value) print "service.__dict__ -> '%s'" % service.__dict__ return if re.match('ns0:', node_name): (name, cnt) = re.subn('ns0:', '', node_name) print ">>> name : %s" % name if re.match('apAccountNumber', name): in_supplier = False in_customer = True in_order = False customer = Customer() print "New customer!" supplier.customers.append(customer) setattr(customer, name, value) # print customer.__dict__ return if re.match('orderID', name): in_suppler = False in_customer = False in_order = True no_orders += 1 order = Order() obj = Obj() obj.type = 'Order' objs['Order'].append(obj) setattr(obj, name, value) customer.orders.append(order) setattr(order, name, value) # print order.__dict__ return if re.match('supplier_ID', name): in_supplier = True setattr(supplier, name, value) # print supplier.__dict__ return if in_supplier: setattr(supplier, name, value) print "supplier.__dict__ -> '%s'" % supplier.__dict__ return if in_customer: setattr(customer, name, value) print customer.__dict__ return if in_order: setattr(order, name, value) type = attr[name] if type == obj.type: # add this attribute. setattr(obj, name, value) else: # new object - saveold one and create new one if type == 'IGNORE': return obj = Obj() obj.type = type objs[type].append(obj) # print order.__dict__ return #----- Recurse node ------------------------------------------------------- def recurse_node(node): """\ i is either an IntType or a LambdaType, m is either an instance based on of myType (or a sub-type of myType or None). Any parameter can have None as a valid type unless it is disallowed via a require block. Both arguments are required.""" # print "[recurse_node] START" if node.__dict__.has_key('nodeName'): node_name = node.nodeName else: # print "Node name not defined" node_name = False if node.__dict__.has_key('childNodes'): nodes = node.childNodes else: nodes = None # display_node(node) if not nodes: if node.__dict__.has_key('nodeValue'): value = node.nodeValue matched = regex.search(value) if matched: value = re.sub(r' *\n *', '', value) if verbose_flg: print ">>>>>>> Bogus text node!" return if not node_name: return no_nodes = len(nodes) if no_nodes == 1: if nodes[0].nodeName == '#text': value = nodes[0].nodeValue if verbose_flg: print "===== Node: %-30s %s" % (node_name, value) capture(node_name, value) else: if verbose_flg: print "@@@@@ Node: %-30s" % node_name for node in nodes: if not display_node(node): continue recurse_node(node) #----- Display node ------------------------------------------------------- def display_node(node): # print "[display_node] ENTER" if node.nodeName == '#text': if node.__dict__.has_key('nodeValue'): value = node.nodeValue # print "[display_node] Node Value: [%s]" % value matched = regex.search(value) if matched: value = re.sub(r'\n *', 'N/A', value) # print "[display_node] RETURN False - %s" % value return False # print "[display_node] Node Name: [%s]" % node.nodeName else: pass # print "pass..." if verbose_flg: print "===== node =====================================================" print "Node Name: [%s]" % node.nodeName print "----------------------------------------------------------------" pp.pprint(node.__dict__) print "\n" # print "[display_node] RETURN True" return True #----- Usage -------------------------------------------------------------- def clean(filename): ifd = open(filename, 'r') data = ifd.read() data = data.replace(chr(0240), '') data = data.replace(chr(0302), '') out = open('parse.xml', 'w') out.write(data) ifd.close() out.close() #----- Usage -------------------------------------------------------------- def usage(): USAGE = """\ $ ./parse_order.py [-f <file>] """ sys.stderr.write(USAGE) #----- Main --------------------------------------------------------------- def parse(filename): clean(filename) # -> parse.xml dom_obj = xml.dom.minidom.parse('parse.xml') #----------------------------------------------------------------- # print "===== dom_obj.__dict__ =========================================\n" # pp.pprint(dom_obj.__dict__) # print "\n" # print "===== dom_obj.childNodes: ======================================\n" # pp.pprint(dom_obj.childNodes[0].__dict__) # print "\n" recurse_node(dom_obj.childNodes[0]) print 'Processed %d orders' % no_orders if debug_flg: ofd = sys.stdout else: ofd = open('orders.csv', 'w') Customer().dump_header(ofd) for customer in supplier.customers: print "customer.__dict__ -> '%s'" % customer.__dict__ customer.dump_data(ofd) Order().dump_header(ofd) for order in customer.orders: order.dump_data(ofd) ofd.close() #--------------------------------------------------------------------- def dump(): for obj_type in objs.keys(): print obj_type ofp = open('dat/%s/%s.dat' % (supplier.supplier_ID, obj_type), 'w') obj_list = objs[obj_type] for obj in obj_list: if not hdr_flg.has_key(obj_type): hdr_flg[obj_type] = 1 hdr = '' for attr in obj.__dict__.keys(): if (attr != 'type'): hdr += '%s,' % attr hdr += '\n' ofp.write(hdr) line = '' for attr in obj.__dict__.keys(): val = getattr(obj, attr) if (val != obj_type): line += '"%s",' % getattr(obj, attr) line += '\n' ofp.write(line) print obj #-------------------------------------------------------------------------- def usage(): USAGE = """ Usage: $ dt.py """ sys.stderr.write(USAGE) #--------------------------------------------------------------------- def main(argv): global debug_flg global verbose_flg global filename global pp #----- Process command line arguments ---------------------------- try: opts, args = getopt.getopt(argv, "dhf:sv", ["debug", "help", "file=", "stdout", "verbose"]) except getopt.GetoptError: usage() sys.exit(2) else: for opt, arg in opts: if opt in ("-d", "--debug"): debug_flg = True elif opt in ("-h", "--help"): usage() sys.exit(0) elif opt in ("-f", "--file"): filename = arg elif opt in ("-s", "--stdout"): stdout_flg = True elif opt in ("-v", "--verbose"): verbose_flg = True parse(filename) dump() #--------------------------------------------------------------------- if __name__ == "__main__": main(sys.argv[1:]) #---------------------------------------------------------------------