Using ADO with Python

From PeformIQ Upgrade
Jump to navigation Jump to search

Examples

jet.py

#!/usr/bin/env Python

"""
#Instantiate the Jet engine.
"""

import win32com.client
engine = win32com.client.Dispatch("DAO.DBEngine.35")
#Determine the path to your database and use the following syntax:
db = engine.OpenDatabase(r"c:\temp\mydb.mdb")

"""
Now you have an instance of the engine and an instance of a database object. Next we will open a recordset. Lets pretend we have a table in our database called 'customers'. To open this table for editing/perusing, you might use the following syntax.
"""

rs = db.OpenRecordset("customers")

# You can also do the following:

rs = db.OpenRecordset("select * from customers where state = 'OH'")

# You can run action queries using the dao execute method. Example:

db.Execute("delete * from customers where balancetype = 'overdue' and name = 'bill'")

#be careful, there is no undo!  :)

# The EOF property is available, so you can do things like:

while not rs.EOF:
   print rs.Fields("State").Value
   rs.MoveNext()

jet2sql.py

#!/usr/bin/env python
#
# jet2sql.py - Matthew C Keranen <mck@mpinet.net> [07/12/2000]
# --------------------------------------------------------------------
# Creates ANSI SQL DDL from a MS Jet database file, useful for reverse
# engineering database designs in E/R tools.
#
# Requires DAO 3.6 library.
# --------------------------------------------------------------------
# Usage: python jet2sql.py infile.MDB outfile.SQL

import sys, string, pythoncom, win32com.client

const = win32com.client.constants
daoEngine = win32com.client.Dispatch('DAO.DBEngine.36')

class jetReverse:
    def __init__ (self, infile):

        self.jetfilename=infile
        self.dtbs = daoEngine.OpenDatabase(infile)

        return

    def terminate(self):
        return

    def writeTable(self, currTabl):
        self.writeLine('\ncreate table ' + chr(34) + currTabl.Name + chr(34),"",1)
        self.writeLine('(',"",1)

        # Write Columns
        cn=0
        for col in currTabl.Fields:
            cn = cn +1
            self.writeColumn(col.Name, col.Type, col.Size, col.Required, col.Attributes, col.DefaultValue, col.ValidationRule, currTabl.Fields.Count-cn)

        # Validation Rule
        tablRule = currTabl.ValidationRule
        if tablRule <> "":
            tablRule = "    check(" + tablRule + ") "
            self.writeLine("",",",1) # add a comma and CR previous line
            self.writeLine(tablRule,"",0)

        # Primary Key
        pk=self.getPrimaryKey(currTabl)
        if pk <> "":
            self.writeLine("",",",1) # add a comma and CR previous line
            self.writeLine(pk,"",0)

        # End of table
        self.writeLine("","",1) # terminate previous line
        self.writeLine(');',"",1)

        # Write table comment
        try: sql = currTabl.Properties("Description").Value
        except pythoncom.com_error: sql=""
        if sql <> "":
           sql = "comment on table " + chr(34) + currTabl.Name + chr(34) + " is " + chr(34) + sql + chr(34) +";"
           self.writeLine(sql,"",1)

        # Write column comments
        for col in currTabl.Fields:
            try: sql = col.Properties("Description").Value
            except pythoncom.com_error: sql=""
            if sql <> "":
               sql = "comment on column " + chr(34) + currTabl.Name + chr(34) + "." + chr(34) + col.Name + chr(34) + " is " + chr(34) + sql + chr(34) + ";"
               self.writeLine(sql,"",1)

        # Write Indexes
        self.writeIndexes(currTabl)

        return

    def writeColumn(self, colName, colType, length, requird, attributes, default, check, colRix):
        # colRix: 0 based index of column from right side. 0 indicates rightmost column

        if colType == const.dbByte: dataType = "Byte"
        elif colType == const.dbInteger: dataType = "Integer"
        elif colType == const.dbSingle: dataType = "Single"
        elif colType == const.dbDouble: dataType = "Double"
        elif colType == const.dbDate: dataType = "DateTime"
        elif colType == const.dbLongBinary: dataType = "OLE"
        elif colType == const.dbMemo: dataType = "Memo"
        elif colType == const.dbCurrency: dataType = "Currency"
        elif colType == const.dbLong:
            if  (attributes & const.dbAutoIncrField): dataType = "Counter"
            else: dataType = "LongInteger"
        elif colType == const.dbText:
            if length == 0: dataType = "Text"
            else: dataType = "char("+str(length)+")"
        elif colType == const.dbBoolean:
            dataType = "Bit"
            if default == "Yes": default = "1"
            else: default = "0"
        else:
            if length == 0: dataType = "Text"
            else: dataType = "Text("+str(length)+")"

        if default <> "":
            defaultStr = "default " + default + " "
        else: defaultStr = ""

        if check <> "":
            checkStr = "check(" + check + ") "
        else:
            checkStr = ""

        if requird or (attributes & const.dbAutoIncrField):
            mandatory = "not null "
        else:
            mandatory = ""

        sql = "    " + chr(34) + colName + chr(34) + " " + dataType + " " + defaultStr + checkStr + mandatory
        if colRix > 0:
            self.writeLine(sql,",",1)
        else:
            self.writeLine(sql,"",0)

        return

    def getPrimaryKey(self, currTabl):

        # Get primary key fields
        sql = ""
        for idx in currTabl.Indexes:
           if idx.Primary:
              idxName = idx.Name
              sql = "    primary key "
              cn=0
              for col in idx.Fields:
                  cn=cn+1
                  sql = sql + chr(34) + col.Name + chr(34)
                  if idx.Fields.Count > cn : sql = sql + ","
        return sql

    def writeIndexes(self, currTabl):

        # Write index definition
        nIdx = -1
        for idx in currTabl.Indexes:
            nIdx = nIdx + 1
            idxName = idx.Name
            tablName = currTabl.Name
            if idx.Primary:
                idxName = tablName + "_PK"
            elif idxName[:9] == "REFERENCE":
               idxName = tablName + "_FK" + idxName[10:]
            else:
                idxName = tablName + "_IX" + str(nIdx)

            sql = "create "
            if idx.Unique: sql = sql + "unique "
            if idx.Clustered: sql = sql + "clustered "
            sql = sql + "index " + chr(34) + idxName + chr(34)
            sql = sql + " on " + chr(34) + tablName + chr(34) + " ("

            # Write Index Columns
            cn=0
            for col in idx.Fields:
                cn = cn + 1
                sql = sql + chr(34) + col.Name + chr(34)
                if col.Attributes & const.dbDescending:
                    sql = sql + " desc"
                else:
                    sql = sql + " asc"
                if idx.Fields.Count > cn: sql = sql + ","

            sql=sql + " );"

            self.writeLine(sql,"",1)
        return

    def writeForeignKey(self, currRefr):

        # Export foreign key
        sql = "\nalter table " + chr(34) + currRefr.ForeignTable + chr(34)
        self.writeLine(sql,"",1)

        sql = "    add foreign key ("
        cn = 0
        for col in currRefr.Fields:
            cn = cn + 1
            sql = sql + chr(34) + col.ForeignName + chr(34)
            if currRefr.Fields.Count > cn: sql = sql + ","
            
        sql = sql + ")"
        self.writeLine(sql,"",1)

        sql = "    references " + chr(34) + currRefr.Table + chr(34) + " ("
        cn = 0
        for col in currRefr.Fields:
            cn = cn + 1
            sql = sql + chr(34) + col.Name + chr(34)
            if currRefr.Fields.Count > cn: sql = sql + ","

        sql = sql + ")"
        if (currRefr.Attributes & const.dbRelationUpdateCascade) <> 0:
           sql = sql + " on update cascade"
        if (currRefr.Attributes & const.dbRelationDeleteCascade) <> 0:
           sql = sql + " on delete cascade"
        sql=sql+";"
        self.writeLine(sql,"",1)

        return

    def writeQuery(self, currQry):

        sql = "\ncreate view " + chr(34) + currQry.Name + chr(34) + " as"
        self.writeLine(sql,"",1)

        # Write Query text
        sql=string.replace(currQry.SQL,chr(13),"") # Get rid of extra linefeeds
        self.writeLine(sql,"",1)

        # Write Query comment
        try: sql = currQry.Properties("Description").Value
        except pythoncom.com_error: sql=""
        if sql <> "":
            sql =  "comment on table " + chr(34) + currQry.Name + chr(34) + " is " + chr(34) + sql + chr(34)
            self.writeLine(sql,"",1)
            
        return

    def writeLine(self,strLine, delimit, newline):
        # Used for controlling where lines terminate with a comma or other continuation mark
        sqlfile.write(strLine)
        if delimit: sqlfile.write(delimit)
        if newline: sqlfile.write('\n')
        return


if __name__ == '__main__':
    if len(sys.argv)<2:
        print "Usage: jet2sql.py infile.mdb outfile.sql"
    else:
        jetEng = jetReverse(sys.argv[1])
        outfile = sys.argv[2]

        sqlfile = open(outfile,'w')

        print "\nReverse engineering %s to %s" % (jetEng.jetfilename, outfile)

        # Tables
        sys.stdout.write("\n   Tables")
        for tabl in jetEng.dtbs.TableDefs:
            sys.stdout.write(".")
            if tabl.Name[:4] <> "MSys" and tabl.Name[:4] <> "~TMP":
                jetEng.writeTable(tabl)

        # Relations / FKs
        sys.stdout.write("\n   Relations")
        for fk in jetEng.dtbs.Relations:
            sys.stdout.write(".")
            jetEng.writeForeignKey(fk)

        # Queries
        sys.stdout.write("\n   Queries")
        for qry in jetEng.dtbs.QueryDefs:
            sys.stdout.write(".")
            jetEng.writeQuery(qry)

        print "\n   Done\n"
        
        # Done
        sqlfile.close()
        jetEng.terminate()


msjet.py


		# attempt to attach to the database - if that fails, create a new one
		# use the file name as the base name for the database
		self.dbName = os.path.splitext(fileName)[0]
		dsn = 'Provider=Microsoft.Jet.OLEDB.4.0;' + \
				'Jet OLEDB:Engine Type=5;' + \
				'Data Source=' + 'test.mdb'

		# try to create the database
		catalog = win32com.client.Dispatch('ADOX.Catalog') 
		try:
			catalog.Create(dsn)
			catalog.ActiveConnection = dsn
		except:
			raise "Can't connect to database table"


		# create a table with the appropriate field names
		table = win32com.client.Dispatch('ADOX.Table') 
		self.rs = win32com.client.Dispatch('ADODB.Recordset')
		table.Name = "TabName"

		Column = win32com.client.Dispatch("ADOX.Column")
		Column.Name = "AutoNumber"
		Column.Type = 3 # adInteger
		Column.ParentCatalog = catalog
		Column.Properties('Autoincrement').Value = win32com.client.pywintypes.TRUE
		table.Columns.Append(Column)

		Key = win32com.client.Dispatch("ADOX.Key")
		Key.Name = "PrimaryKey"
		Key.Type = 1 #win32.com.client.constants.adKeyPrimary
		Key.RelatedTable = "TabName"
		Key.Columns.append("AutoNumber")
		table.Keys.Append(Key)

		# add other fields using table.Columns.Append()


		catalog.Tables.Append(table)
		del table
		del catalog



odbc.py

#!/usr/bin/env python

import dbi, odbc     # ODBC modules
import time          # standard time module

dbc = odbc.odbc(     # open a database connection
    'sample/monty/spam'  # 'datasource/user/password'
    )
crsr = dbc.cursor()  # create a cursor
crsr.execute(        # execute some SQL
    """
    SELECT country_id, name, insert_change_date
    FROM country
    ORDER BY name
    """
    )
print 'Column descriptions:'  # show column descriptions
for col in crsr.description:
    print ' ', col
result = crsr.fetchall()      # fetch the results all at once
print '\nFirst result row:\n ', result[0]  # show first result row
print '\nDate conversions:'   # play with dbiDate object
date = result[0][-1]
fmt = '  %-25s%-20s'
print fmt % ('standard string:', str(date))
print fmt % ('seconds since epoch:', float(date))
timeTuple = time.localtime(date)
print fmt % ('time tuple:', timeTuple)
print fmt % ('user defined:', time.strftime('%d %B %Y', timeTuple))