Using ADO with Python
Jump to navigation
Jump to search
Examples
jet.py
#!/usr/bin/env Python
"""
#Instantiate the Jet engine.
"""
import win32com.client
engine = win32com.client.Dispatch("DAO.DBEngine.35")
#Determine the path to your database and use the following syntax:
db = engine.OpenDatabase(r"c:\temp\mydb.mdb")
"""
Now you have an instance of the engine and an instance of a database object. Next we will open a recordset. Lets pretend we have a table in our database called 'customers'. To open this table for editing/perusing, you might use the following syntax.
"""
rs = db.OpenRecordset("customers")
# You can also do the following:
rs = db.OpenRecordset("select * from customers where state = 'OH'")
# You can run action queries using the dao execute method. Example:
db.Execute("delete * from customers where balancetype = 'overdue' and name = 'bill'")
#be careful, there is no undo! :)
# The EOF property is available, so you can do things like:
while not rs.EOF:
print rs.Fields("State").Value
rs.MoveNext()
jet2sql.py
#!/usr/bin/env python
#
# jet2sql.py - Matthew C Keranen <mck@mpinet.net> [07/12/2000]
# --------------------------------------------------------------------
# Creates ANSI SQL DDL from a MS Jet database file, useful for reverse
# engineering database designs in E/R tools.
#
# Requires DAO 3.6 library.
# --------------------------------------------------------------------
# Usage: python jet2sql.py infile.MDB outfile.SQL
import sys, string, pythoncom, win32com.client
const = win32com.client.constants
daoEngine = win32com.client.Dispatch('DAO.DBEngine.36')
class jetReverse:
def __init__ (self, infile):
self.jetfilename=infile
self.dtbs = daoEngine.OpenDatabase(infile)
return
def terminate(self):
return
def writeTable(self, currTabl):
self.writeLine('\ncreate table ' + chr(34) + currTabl.Name + chr(34),"",1)
self.writeLine('(',"",1)
# Write Columns
cn=0
for col in currTabl.Fields:
cn = cn +1
self.writeColumn(col.Name, col.Type, col.Size, col.Required, col.Attributes, col.DefaultValue, col.ValidationRule, currTabl.Fields.Count-cn)
# Validation Rule
tablRule = currTabl.ValidationRule
if tablRule <> "":
tablRule = " check(" + tablRule + ") "
self.writeLine("",",",1) # add a comma and CR previous line
self.writeLine(tablRule,"",0)
# Primary Key
pk=self.getPrimaryKey(currTabl)
if pk <> "":
self.writeLine("",",",1) # add a comma and CR previous line
self.writeLine(pk,"",0)
# End of table
self.writeLine("","",1) # terminate previous line
self.writeLine(');',"",1)
# Write table comment
try: sql = currTabl.Properties("Description").Value
except pythoncom.com_error: sql=""
if sql <> "":
sql = "comment on table " + chr(34) + currTabl.Name + chr(34) + " is " + chr(34) + sql + chr(34) +";"
self.writeLine(sql,"",1)
# Write column comments
for col in currTabl.Fields:
try: sql = col.Properties("Description").Value
except pythoncom.com_error: sql=""
if sql <> "":
sql = "comment on column " + chr(34) + currTabl.Name + chr(34) + "." + chr(34) + col.Name + chr(34) + " is " + chr(34) + sql + chr(34) + ";"
self.writeLine(sql,"",1)
# Write Indexes
self.writeIndexes(currTabl)
return
def writeColumn(self, colName, colType, length, requird, attributes, default, check, colRix):
# colRix: 0 based index of column from right side. 0 indicates rightmost column
if colType == const.dbByte: dataType = "Byte"
elif colType == const.dbInteger: dataType = "Integer"
elif colType == const.dbSingle: dataType = "Single"
elif colType == const.dbDouble: dataType = "Double"
elif colType == const.dbDate: dataType = "DateTime"
elif colType == const.dbLongBinary: dataType = "OLE"
elif colType == const.dbMemo: dataType = "Memo"
elif colType == const.dbCurrency: dataType = "Currency"
elif colType == const.dbLong:
if (attributes & const.dbAutoIncrField): dataType = "Counter"
else: dataType = "LongInteger"
elif colType == const.dbText:
if length == 0: dataType = "Text"
else: dataType = "char("+str(length)+")"
elif colType == const.dbBoolean:
dataType = "Bit"
if default == "Yes": default = "1"
else: default = "0"
else:
if length == 0: dataType = "Text"
else: dataType = "Text("+str(length)+")"
if default <> "":
defaultStr = "default " + default + " "
else: defaultStr = ""
if check <> "":
checkStr = "check(" + check + ") "
else:
checkStr = ""
if requird or (attributes & const.dbAutoIncrField):
mandatory = "not null "
else:
mandatory = ""
sql = " " + chr(34) + colName + chr(34) + " " + dataType + " " + defaultStr + checkStr + mandatory
if colRix > 0:
self.writeLine(sql,",",1)
else:
self.writeLine(sql,"",0)
return
def getPrimaryKey(self, currTabl):
# Get primary key fields
sql = ""
for idx in currTabl.Indexes:
if idx.Primary:
idxName = idx.Name
sql = " primary key "
cn=0
for col in idx.Fields:
cn=cn+1
sql = sql + chr(34) + col.Name + chr(34)
if idx.Fields.Count > cn : sql = sql + ","
return sql
def writeIndexes(self, currTabl):
# Write index definition
nIdx = -1
for idx in currTabl.Indexes:
nIdx = nIdx + 1
idxName = idx.Name
tablName = currTabl.Name
if idx.Primary:
idxName = tablName + "_PK"
elif idxName[:9] == "REFERENCE":
idxName = tablName + "_FK" + idxName[10:]
else:
idxName = tablName + "_IX" + str(nIdx)
sql = "create "
if idx.Unique: sql = sql + "unique "
if idx.Clustered: sql = sql + "clustered "
sql = sql + "index " + chr(34) + idxName + chr(34)
sql = sql + " on " + chr(34) + tablName + chr(34) + " ("
# Write Index Columns
cn=0
for col in idx.Fields:
cn = cn + 1
sql = sql + chr(34) + col.Name + chr(34)
if col.Attributes & const.dbDescending:
sql = sql + " desc"
else:
sql = sql + " asc"
if idx.Fields.Count > cn: sql = sql + ","
sql=sql + " );"
self.writeLine(sql,"",1)
return
def writeForeignKey(self, currRefr):
# Export foreign key
sql = "\nalter table " + chr(34) + currRefr.ForeignTable + chr(34)
self.writeLine(sql,"",1)
sql = " add foreign key ("
cn = 0
for col in currRefr.Fields:
cn = cn + 1
sql = sql + chr(34) + col.ForeignName + chr(34)
if currRefr.Fields.Count > cn: sql = sql + ","
sql = sql + ")"
self.writeLine(sql,"",1)
sql = " references " + chr(34) + currRefr.Table + chr(34) + " ("
cn = 0
for col in currRefr.Fields:
cn = cn + 1
sql = sql + chr(34) + col.Name + chr(34)
if currRefr.Fields.Count > cn: sql = sql + ","
sql = sql + ")"
if (currRefr.Attributes & const.dbRelationUpdateCascade) <> 0:
sql = sql + " on update cascade"
if (currRefr.Attributes & const.dbRelationDeleteCascade) <> 0:
sql = sql + " on delete cascade"
sql=sql+";"
self.writeLine(sql,"",1)
return
def writeQuery(self, currQry):
sql = "\ncreate view " + chr(34) + currQry.Name + chr(34) + " as"
self.writeLine(sql,"",1)
# Write Query text
sql=string.replace(currQry.SQL,chr(13),"") # Get rid of extra linefeeds
self.writeLine(sql,"",1)
# Write Query comment
try: sql = currQry.Properties("Description").Value
except pythoncom.com_error: sql=""
if sql <> "":
sql = "comment on table " + chr(34) + currQry.Name + chr(34) + " is " + chr(34) + sql + chr(34)
self.writeLine(sql,"",1)
return
def writeLine(self,strLine, delimit, newline):
# Used for controlling where lines terminate with a comma or other continuation mark
sqlfile.write(strLine)
if delimit: sqlfile.write(delimit)
if newline: sqlfile.write('\n')
return
if __name__ == '__main__':
if len(sys.argv)<2:
print "Usage: jet2sql.py infile.mdb outfile.sql"
else:
jetEng = jetReverse(sys.argv[1])
outfile = sys.argv[2]
sqlfile = open(outfile,'w')
print "\nReverse engineering %s to %s" % (jetEng.jetfilename, outfile)
# Tables
sys.stdout.write("\n Tables")
for tabl in jetEng.dtbs.TableDefs:
sys.stdout.write(".")
if tabl.Name[:4] <> "MSys" and tabl.Name[:4] <> "~TMP":
jetEng.writeTable(tabl)
# Relations / FKs
sys.stdout.write("\n Relations")
for fk in jetEng.dtbs.Relations:
sys.stdout.write(".")
jetEng.writeForeignKey(fk)
# Queries
sys.stdout.write("\n Queries")
for qry in jetEng.dtbs.QueryDefs:
sys.stdout.write(".")
jetEng.writeQuery(qry)
print "\n Done\n"
# Done
sqlfile.close()
jetEng.terminate()
msjet.py
# attempt to attach to the database - if that fails, create a new one
# use the file name as the base name for the database
self.dbName = os.path.splitext(fileName)[0]
dsn = 'Provider=Microsoft.Jet.OLEDB.4.0;' + \
'Jet OLEDB:Engine Type=5;' + \
'Data Source=' + 'test.mdb'
# try to create the database
catalog = win32com.client.Dispatch('ADOX.Catalog')
try:
catalog.Create(dsn)
catalog.ActiveConnection = dsn
except:
raise "Can't connect to database table"
# create a table with the appropriate field names
table = win32com.client.Dispatch('ADOX.Table')
self.rs = win32com.client.Dispatch('ADODB.Recordset')
table.Name = "TabName"
Column = win32com.client.Dispatch("ADOX.Column")
Column.Name = "AutoNumber"
Column.Type = 3 # adInteger
Column.ParentCatalog = catalog
Column.Properties('Autoincrement').Value = win32com.client.pywintypes.TRUE
table.Columns.Append(Column)
Key = win32com.client.Dispatch("ADOX.Key")
Key.Name = "PrimaryKey"
Key.Type = 1 #win32.com.client.constants.adKeyPrimary
Key.RelatedTable = "TabName"
Key.Columns.append("AutoNumber")
table.Keys.Append(Key)
# add other fields using table.Columns.Append()
catalog.Tables.Append(table)
del table
del catalog
odbc.py
#!/usr/bin/env python
import dbi, odbc # ODBC modules
import time # standard time module
dbc = odbc.odbc( # open a database connection
'sample/monty/spam' # 'datasource/user/password'
)
crsr = dbc.cursor() # create a cursor
crsr.execute( # execute some SQL
"""
SELECT country_id, name, insert_change_date
FROM country
ORDER BY name
"""
)
print 'Column descriptions:' # show column descriptions
for col in crsr.description:
print ' ', col
result = crsr.fetchall() # fetch the results all at once
print '\nFirst result row:\n ', result[0] # show first result row
print '\nDate conversions:' # play with dbiDate object
date = result[0][-1]
fmt = ' %-25s%-20s'
print fmt % ('standard string:', str(date))
print fmt % ('seconds since epoch:', float(date))
timeTuple = time.localtime(date)
print fmt % ('time tuple:', timeTuple)
print fmt % ('user defined:', time.strftime('%d %B %Y', timeTuple))