Using ADO with Python
Jump to navigation
Jump to search
Examples
jet.py
#!/usr/bin/env Python """ #Instantiate the Jet engine. """ import win32com.client engine = win32com.client.Dispatch("DAO.DBEngine.35") #Determine the path to your database and use the following syntax: db = engine.OpenDatabase(r"c:\temp\mydb.mdb") """ Now you have an instance of the engine and an instance of a database object. Next we will open a recordset. Lets pretend we have a table in our database called 'customers'. To open this table for editing/perusing, you might use the following syntax. """ rs = db.OpenRecordset("customers") # You can also do the following: rs = db.OpenRecordset("select * from customers where state = 'OH'") # You can run action queries using the dao execute method. Example: db.Execute("delete * from customers where balancetype = 'overdue' and name = 'bill'") #be careful, there is no undo! :) # The EOF property is available, so you can do things like: while not rs.EOF: print rs.Fields("State").Value rs.MoveNext()
jet2sql.py
#!/usr/bin/env python # # jet2sql.py - Matthew C Keranen <mck@mpinet.net> [07/12/2000] # -------------------------------------------------------------------- # Creates ANSI SQL DDL from a MS Jet database file, useful for reverse # engineering database designs in E/R tools. # # Requires DAO 3.6 library. # -------------------------------------------------------------------- # Usage: python jet2sql.py infile.MDB outfile.SQL import sys, string, pythoncom, win32com.client const = win32com.client.constants daoEngine = win32com.client.Dispatch('DAO.DBEngine.36') class jetReverse: def __init__ (self, infile): self.jetfilename=infile self.dtbs = daoEngine.OpenDatabase(infile) return def terminate(self): return def writeTable(self, currTabl): self.writeLine('\ncreate table ' + chr(34) + currTabl.Name + chr(34),"",1) self.writeLine('(',"",1) # Write Columns cn=0 for col in currTabl.Fields: cn = cn +1 self.writeColumn(col.Name, col.Type, col.Size, col.Required, col.Attributes, col.DefaultValue, col.ValidationRule, currTabl.Fields.Count-cn) # Validation Rule tablRule = currTabl.ValidationRule if tablRule <> "": tablRule = " check(" + tablRule + ") " self.writeLine("",",",1) # add a comma and CR previous line self.writeLine(tablRule,"",0) # Primary Key pk=self.getPrimaryKey(currTabl) if pk <> "": self.writeLine("",",",1) # add a comma and CR previous line self.writeLine(pk,"",0) # End of table self.writeLine("","",1) # terminate previous line self.writeLine(');',"",1) # Write table comment try: sql = currTabl.Properties("Description").Value except pythoncom.com_error: sql="" if sql <> "": sql = "comment on table " + chr(34) + currTabl.Name + chr(34) + " is " + chr(34) + sql + chr(34) +";" self.writeLine(sql,"",1) # Write column comments for col in currTabl.Fields: try: sql = col.Properties("Description").Value except pythoncom.com_error: sql="" if sql <> "": sql = "comment on column " + chr(34) + currTabl.Name + chr(34) + "." + chr(34) + col.Name + chr(34) + " is " + chr(34) + sql + chr(34) + ";" self.writeLine(sql,"",1) # Write Indexes self.writeIndexes(currTabl) return def writeColumn(self, colName, colType, length, requird, attributes, default, check, colRix): # colRix: 0 based index of column from right side. 0 indicates rightmost column if colType == const.dbByte: dataType = "Byte" elif colType == const.dbInteger: dataType = "Integer" elif colType == const.dbSingle: dataType = "Single" elif colType == const.dbDouble: dataType = "Double" elif colType == const.dbDate: dataType = "DateTime" elif colType == const.dbLongBinary: dataType = "OLE" elif colType == const.dbMemo: dataType = "Memo" elif colType == const.dbCurrency: dataType = "Currency" elif colType == const.dbLong: if (attributes & const.dbAutoIncrField): dataType = "Counter" else: dataType = "LongInteger" elif colType == const.dbText: if length == 0: dataType = "Text" else: dataType = "char("+str(length)+")" elif colType == const.dbBoolean: dataType = "Bit" if default == "Yes": default = "1" else: default = "0" else: if length == 0: dataType = "Text" else: dataType = "Text("+str(length)+")" if default <> "": defaultStr = "default " + default + " " else: defaultStr = "" if check <> "": checkStr = "check(" + check + ") " else: checkStr = "" if requird or (attributes & const.dbAutoIncrField): mandatory = "not null " else: mandatory = "" sql = " " + chr(34) + colName + chr(34) + " " + dataType + " " + defaultStr + checkStr + mandatory if colRix > 0: self.writeLine(sql,",",1) else: self.writeLine(sql,"",0) return def getPrimaryKey(self, currTabl): # Get primary key fields sql = "" for idx in currTabl.Indexes: if idx.Primary: idxName = idx.Name sql = " primary key " cn=0 for col in idx.Fields: cn=cn+1 sql = sql + chr(34) + col.Name + chr(34) if idx.Fields.Count > cn : sql = sql + "," return sql def writeIndexes(self, currTabl): # Write index definition nIdx = -1 for idx in currTabl.Indexes: nIdx = nIdx + 1 idxName = idx.Name tablName = currTabl.Name if idx.Primary: idxName = tablName + "_PK" elif idxName[:9] == "REFERENCE": idxName = tablName + "_FK" + idxName[10:] else: idxName = tablName + "_IX" + str(nIdx) sql = "create " if idx.Unique: sql = sql + "unique " if idx.Clustered: sql = sql + "clustered " sql = sql + "index " + chr(34) + idxName + chr(34) sql = sql + " on " + chr(34) + tablName + chr(34) + " (" # Write Index Columns cn=0 for col in idx.Fields: cn = cn + 1 sql = sql + chr(34) + col.Name + chr(34) if col.Attributes & const.dbDescending: sql = sql + " desc" else: sql = sql + " asc" if idx.Fields.Count > cn: sql = sql + "," sql=sql + " );" self.writeLine(sql,"",1) return def writeForeignKey(self, currRefr): # Export foreign key sql = "\nalter table " + chr(34) + currRefr.ForeignTable + chr(34) self.writeLine(sql,"",1) sql = " add foreign key (" cn = 0 for col in currRefr.Fields: cn = cn + 1 sql = sql + chr(34) + col.ForeignName + chr(34) if currRefr.Fields.Count > cn: sql = sql + "," sql = sql + ")" self.writeLine(sql,"",1) sql = " references " + chr(34) + currRefr.Table + chr(34) + " (" cn = 0 for col in currRefr.Fields: cn = cn + 1 sql = sql + chr(34) + col.Name + chr(34) if currRefr.Fields.Count > cn: sql = sql + "," sql = sql + ")" if (currRefr.Attributes & const.dbRelationUpdateCascade) <> 0: sql = sql + " on update cascade" if (currRefr.Attributes & const.dbRelationDeleteCascade) <> 0: sql = sql + " on delete cascade" sql=sql+";" self.writeLine(sql,"",1) return def writeQuery(self, currQry): sql = "\ncreate view " + chr(34) + currQry.Name + chr(34) + " as" self.writeLine(sql,"",1) # Write Query text sql=string.replace(currQry.SQL,chr(13),"") # Get rid of extra linefeeds self.writeLine(sql,"",1) # Write Query comment try: sql = currQry.Properties("Description").Value except pythoncom.com_error: sql="" if sql <> "": sql = "comment on table " + chr(34) + currQry.Name + chr(34) + " is " + chr(34) + sql + chr(34) self.writeLine(sql,"",1) return def writeLine(self,strLine, delimit, newline): # Used for controlling where lines terminate with a comma or other continuation mark sqlfile.write(strLine) if delimit: sqlfile.write(delimit) if newline: sqlfile.write('\n') return if __name__ == '__main__': if len(sys.argv)<2: print "Usage: jet2sql.py infile.mdb outfile.sql" else: jetEng = jetReverse(sys.argv[1]) outfile = sys.argv[2] sqlfile = open(outfile,'w') print "\nReverse engineering %s to %s" % (jetEng.jetfilename, outfile) # Tables sys.stdout.write("\n Tables") for tabl in jetEng.dtbs.TableDefs: sys.stdout.write(".") if tabl.Name[:4] <> "MSys" and tabl.Name[:4] <> "~TMP": jetEng.writeTable(tabl) # Relations / FKs sys.stdout.write("\n Relations") for fk in jetEng.dtbs.Relations: sys.stdout.write(".") jetEng.writeForeignKey(fk) # Queries sys.stdout.write("\n Queries") for qry in jetEng.dtbs.QueryDefs: sys.stdout.write(".") jetEng.writeQuery(qry) print "\n Done\n" # Done sqlfile.close() jetEng.terminate()
msjet.py
# attempt to attach to the database - if that fails, create a new one # use the file name as the base name for the database self.dbName = os.path.splitext(fileName)[0] dsn = 'Provider=Microsoft.Jet.OLEDB.4.0;' + \ 'Jet OLEDB:Engine Type=5;' + \ 'Data Source=' + 'test.mdb' # try to create the database catalog = win32com.client.Dispatch('ADOX.Catalog') try: catalog.Create(dsn) catalog.ActiveConnection = dsn except: raise "Can't connect to database table" # create a table with the appropriate field names table = win32com.client.Dispatch('ADOX.Table') self.rs = win32com.client.Dispatch('ADODB.Recordset') table.Name = "TabName" Column = win32com.client.Dispatch("ADOX.Column") Column.Name = "AutoNumber" Column.Type = 3 # adInteger Column.ParentCatalog = catalog Column.Properties('Autoincrement').Value = win32com.client.pywintypes.TRUE table.Columns.Append(Column) Key = win32com.client.Dispatch("ADOX.Key") Key.Name = "PrimaryKey" Key.Type = 1 #win32.com.client.constants.adKeyPrimary Key.RelatedTable = "TabName" Key.Columns.append("AutoNumber") table.Keys.Append(Key) # add other fields using table.Columns.Append() catalog.Tables.Append(table) del table del catalog
odbc.py
#!/usr/bin/env python import dbi, odbc # ODBC modules import time # standard time module dbc = odbc.odbc( # open a database connection 'sample/monty/spam' # 'datasource/user/password' ) crsr = dbc.cursor() # create a cursor crsr.execute( # execute some SQL """ SELECT country_id, name, insert_change_date FROM country ORDER BY name """ ) print 'Column descriptions:' # show column descriptions for col in crsr.description: print ' ', col result = crsr.fetchall() # fetch the results all at once print '\nFirst result row:\n ', result[0] # show first result row print '\nDate conversions:' # play with dbiDate object date = result[0][-1] fmt = ' %-25s%-20s' print fmt % ('standard string:', str(date)) print fmt % ('seconds since epoch:', float(date)) timeTuple = time.localtime(date) print fmt % ('time tuple:', timeTuple) print fmt % ('user defined:', time.strftime('%d %B %Y', timeTuple))