Python ODBC Wrapper

From PeformIQ Upgrade
Revision as of 21:12, 15 November 2011 by PeterHarding (talk | contribs) (→‎ODBC with Python)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

ODBC with Python

I never managed to find the time to do much with these pieces of code. Found them while searching for a solution to a testing project some time ago.

Ultimately I found that I could use either Windows Python or ActiveState Python - both of which have an ODBC built in - from within cygwin which allowed me to build a number of command line sccripts which could be use to extract data from databases and transform it into a data file which could be used to parameterize LoadRunner scripts.

DBTable Implementation

   1  #
   2  # Database wrapper class.
   3  #
   4  import odbc
   5  import os
   6  import traceback
   7  
   8  class DBTable:
   9      """
  10      Wrapper for database table
  11      """
  12      FIELD_TYPE = 0
  13  
  14      def __init__( self, oConnection, strTable):
  15          self.oConnection = oConnection
  16          self.strTable = strTable
  17  
  18          oCursor = oConnection.cursor()
  19          oCursor.execute( "SELECT * FROM %s" % strTable)
  20  
  21          self.oFields = [ oField[0] for oField in oCursor.description]
  22          self.oFieldDescription = dict( [ (oField[0],
  23                                        oField[1:]) for oField in oCursor.description])
  24  
  25      def Select( self, strQuery):
  26          """
  27          Select records from query
  28  
  29          Takes either SQL of select statement or a dictionary containing
  30          field names and values to find.
  31          """
  32          self.oCursor = self.oConnection.cursor()
  33          if type( strQuery) == type(""):
  34              self.oCursor.execute( strQuery)
  35          else:
  36              #
  37              # assume query is a dict
  38              #
  39              self.oCursor.execute( "SELECT * FROM %s WHERE %s" % (self.strTable,
  40                                                          self.DictToWhere( strQuery)))
  41  
  42      def FetchOne( self):
  43          """
  44          Get next row of results
  45          Returns a dictionary holding field names and values.
  46          """
  47          oRow = self.oCursor.fetchone()
  48          if oRow:
  49              """
  50              Build a dictionary to map field name->value
  51              """
  52              return dict([(self.oFields[i], oRow[i]) for i in range(len(oRow))])
  53          else:
  54              return None
  55  
  56      def Insert( self, oDict):
  57          """
  58          Insert a row in the database
  59          Takes a dictionary holding field names and values.
  60          """
  61          strFields = oDict.keys()
  62          strValues = []
  63          for strField in strFields:
  64              strValue = oDict[strField]
  65              strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
  66  
  67              strValues.append( self.FormatField( strField, strValue))
  68  
  69          strSQL = "INSERT INTO %s ( %s) VALUES(%s);" % ( self.strTable,
  70                                                           ", ".join( strFields),
  71                                                           ", ".join( strValues))
  72          print strSQL
  73          oCursor = self.oConnection.cursor()
  74          oCursor.execute( strSQL)
  75          self.oConnection.commit()
  76  
  77      def Delete( self, oDict):
  78          """
  79          Delete row based on dictionary contents
  80          Takes a dictionary holding field names and values.
  81          """
  82          strSQL = "DELETE FROM %s WHERE %s;" % ( self.strTable, self.DictToWhere( oDict))
  83          print strSQL
  84          oCursor = self.oConnection.cursor()
  85          oCursor.execute( strSQL)
  86          self.oConnection.commit()
  87  
  88      def DictToWhere( self, oDict):
  89          """
  90          Convert dictionary to WHERE clause.
  91          """
  92          strFields = oDict.keys()
  93          strExpressions = []
  94  
  95          for strField in strFields:
  96              strValue = oDict[strField]
  97              strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
  98  
  99              strValue = self.FormatField( strField, strValue)
 100  
 101              strExpressions.append( '%s = %s' % (strField, strValue))
 102  
 103          return " AND ".join( strExpressions)
 104  
 105      def FormatField( self, strField, strValue):
 106          """
 107          Format a field for an sql statement.
 108          """
 109          strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
 110          if strType == 'STRING':
 111              return "'%s'" % str(strValue).replace( "'", "''")
 112          elif strType == 'NUMBER':
 113              return '%d' % int( strValue)
 114          else:
 115              raise 'unknown field type %s' % strType
 116  

RealPyOdbc.py

# Welcome to RealPyODBC
# Version 0.1 beta
# This class help you to connect your python script with ODBC engine.
# I need at least ctypes 0.9.2 for work.
#
# This class is not db-api 2.0 compatible. If you want to help me to do it
# please modify it and send me an e-mail with your work!
# All the comunity will thanks you.
#
# Please send bugs and reports to michele.petrazzo@unipex.it
#
# TO-DO
# Make compatibility with db-api 2.0, so add:
# apilevel, theadsafety, paramstyle, cursor, exceptions, ....
#
# This software if released with MIT Licence

'''
A little example

dsn_test = 'mysql'
user = 'someuser'

od = odbc()
#Dsn list
DSN_list = od.EnumateDSN()
od.ConnectOdbc(dsn_test, user)
#Get tables list
tables = od.GeTables()
#Get fields on the table
cols = od.ColDescr(tables[0])
#Make a query
od.Query('SELECT * FROM %s' % tables[0])
#Get results
print od.fetchmany(2)
print od.fetchall()
#Close before exit
od.close()'''

import sys, os
import ctypes

library = "/usr/lib/libodbc.so"
VERBOSE = 0    

#Costants
SQL_FETCH_NEXT = 0x01
SQL_FETCH_FIRST = 0x02
SQL_FETCH_LAST = 0x04

SQL_INVALID_HANDLE = -2
SQL_SUCCESS = 0
SQL_SUCCESS_WITH_INFO = 1
SQL_NO_DATA_FOUND = 100

SQL_NULL_HANDLE = 0
SQL_HANDLE_ENV = 1
SQL_HANDLE_DBC = 2
SQL_HANDLE_DESCR = 4
SQL_HANDLE_STMT = 3

SQL_ATTR_ODBC_VERSION = 200
SQL_OV_ODBC2 = 2

SQL_TABLE_NAMES = 3
SQL_C_CHAR = 1

#Types
SqlTypes = {0:'TYPE_NULL',1:'CHAR',2:'NUMERIC',3:'DECIMAL',4:'INTEGER', \
    5:'SMALLINT',6:'FLOAT',7:'REAL',8:'DOUBLE',9:'DATE',10:'TIME',\
    11:'TIMESTAMP',12:'VARCHAR'}

#Custom exceptions
class OdbcNoLibrary(Exception):
    def __init__(self, value):
        self.value = value
    def __str__(self):
        return repr(self.value)
class OdbcLibraryError(Exception):
    def __init__(self, value):
        self.value = value
    def __str__(self):
        return repr(self.value)
class OdbcInvalidHandle(Exception):
    def __init__(self, value):
        self.value = value
    def __str__(self):
        return repr(self.value)
class OdbcGenericError(Exception):
    def __init__(self, value):
        self.value = value
    def __str__(self):
        return repr(self.value)


class odbc:
    """This class implement a odbc connection. It use ctypes for work.
    """
    def __init__(self):
        """Init variables and connect to the engine"""
        self.connect = 0
        if sys.platform == 'win32':
            self.odbc = ctypes.windll.odbc32
        else:
            if not os.path.exists(library):
                raise OdbcNoLibrary, 'Library %s not found' % library
            try:
                self.odbc = ctypes.cdll.LoadLibrary(library)
            except:
                raise OdbcLibraryError, 'Error while loading %s' % library
        
        self.env_h = ctypes.c_int()
        self.dbc_h = ctypes.c_int()
        self.stmt_h = ctypes.c_int()

        self.odbc.SQLAllocHandle.restype = ctypes.c_short
        ret = self.odbc.SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, ctypes.byref(self.env_h))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_ENV, self.env_h, ret)

        self.odbc.SQLSetEnvAttr.restype = ctypes.c_short
        ret = self.odbc.SQLSetEnvAttr(self.env_h, SQL_ATTR_ODBC_VERSION, SQL_OV_ODBC2, 0)
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_ENV, self.env_h, ret)

        self.odbc.SQLAllocHandle.restype = ctypes.c_short
        ret = self.odbc.SQLAllocHandle(SQL_HANDLE_DBC, self.env_h, ctypes.byref(self.dbc_h))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)

    def ConnectOdbc(self, dsn, user, passwd = ''):
        """Connect to odbc, we need dsn, user and optionally password"""
        self.dsn = dsn
        self.user = user
        self.passwd = passwd

        sn = ctypes.create_string_buffer(dsn)
        un = ctypes.create_string_buffer(user)        
        pw = ctypes.create_string_buffer(passwd)
        self.odbc.SQLConnect.restype = ctypes.c_short
        ret = self.odbc.SQLConnect(self.dbc_h, sn, len(sn), un, len(un), pw, len(pw))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)
        self.__set_stmt_h()
        self.connect = 1

    def GeTables(self):
        """Return a list with all tables"""
        self.__set_stmt_h()
        self.odbc.SQLTables.restype = ctypes.c_short
        #We want only tables
        t_type = ctypes.create_string_buffer('TABLE')
        ret = self.odbc.SQLTables(self.stmt_h, None, 0, None, 0, None, 0, \
            ctypes.byref(t_type), len(t_type))
        if not ret == SQL_SUCCESS:
            self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
        data = ctypes.create_string_buffer(1024)
        buff = ctypes.c_int()
        self.__bind(SQL_TABLE_NAMES, data, buff)
        return self.__fetch([data])

    def Query(self, q):
        """Make a query"""
        self.__set_stmt_h()
        self.odbc.SQLExecDirect.restype = ctypes.c_short
        ret = self.odbc.SQLExecDirect(self.stmt_h, q, len(q))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)

    def FetchOne(self):
        return self._fetch(1)
        
    def FetchMany(self, rows):
        return self._fetch(rows)

    def FetchAll(self):
        return self._fetch()

    def NumOfCols(self):
        """Get the number of cols"""
        NOC = ctypes.c_int()
        self.odbc.SQLNumResultCols.restype = ctypes.c_short
        ret = self.odbc.SQLNumResultCols(self.stmt_h, ctypes.byref(NOC))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
        return NOC.value

    def NumOfRow(self):
        """Get the number of rows"""
        NOR = ctypes.c_int()
        self.odbc.SQLRowCount.restype = ctypes.c_short
        ret = self.odbc.SQLRowCount(self.stmt_h, ctypes.byref(NOR))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
        return NOR.value        

    def ColDescr(self, table):
        """We return a list with a tuple for every col:
        field, type, number of digits, allow null"""
        self.Query("SELECT * FROM " + table)
        NOC = self.NumOfCols()
        self.odbc.SQLDescribeCol.restype = ctypes.c_short
        CName = ctypes.create_string_buffer(1024)
        Cname_ptr = ctypes.c_int()
        Ctype = ctypes.c_int()
        Csize = ctypes.c_int()
        NOdigits = ctypes.c_int()
        Allow_nuls = ctypes.c_int()
        ColDescr = []
        for row in range(1, NOC+1):
            ret = self.odbc.SQLDescribeCol(self.stmt_h, row, ctypes.byref(CName), len(CName), ctypes.byref(Cname_ptr),\
                ctypes.byref(Ctype),ctypes.byref(Csize),ctypes.byref(NOdigits), ctypes.byref(Allow_nuls))
            if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
                self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
            if SqlTypes.has_key(Ctype.value):
                ColDescr.append((CName.value, SqlTypes[Ctype.value],NOdigits.value,Allow_nuls.value))
            else:
                ColDescr.append((CName.value, SqlTypes[0],NOdigits.value,Allow_nuls.value))
        return ColDescr

    def EnumateDSN(self):
        """Return a list with [name, descrition]"""
        dsn = ctypes.create_string_buffer(1024)
        desc = ctypes.create_string_buffer(1024)
        dsn_len = ctypes.c_int()
        desc_len = ctypes.c_int()
        dsn_list = []
        self.odbc.SQLDataSources.restype = ctypes.c_short
        while 1:
            ret = self.odbc.SQLDataSources(self.env_h, SQL_FETCH_NEXT, \
                dsn, len(dsn), ctypes.byref(dsn_len), desc, len(desc), ctypes.byref(desc_len))
            if ret == SQL_NO_DATA_FOUND:
                break
            elif not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
                self.ctrl_err(SQL_HANDLE_STMT, stmt_h, ret)
            else:
                dsn_list.append((dsn.value, desc.value))
        return dsn_list

    def ctrl_err(self, ht, h, val_ret):
        """Method for make a control of the errors
        We get type of handle, handle, return value
        Return a raise with a list"""
        state = ctypes.create_string_buffer(5)
        NativeError = ctypes.c_int()
        Message = ctypes.create_string_buffer(1024*10)
        Buffer_len = ctypes.c_int()
        err_list = []
        number_errors = 1
        self.odbc.SQLGetDiagRec.restype = ctypes.c_short
        while 1:
            ret = self.odbc.SQLGetDiagRec(ht, h, number_errors, state, \
                NativeError, Message, len(Message), ctypes.byref(Buffer_len))
            if ret == SQL_NO_DATA_FOUND:
                #No more data, I can raise
                raise OdbcGenericError, err_list
                break
            elif ret == SQL_INVALID_HANDLE:
                #The handle passed is an invalid handle
                raise OdbcInvalidHandle, 'SQL_INVALID_HANDLE'
            elif ret == SQL_SUCCESS:
                err_list.append((state.value, Message.value, NativeError.value))
                number_errors += 1

    def close(self):
        """Call me before exit, please"""
        self.__CloseCursor()
        self.__CloseHandle()

    def __set_stmt_h(self):
        self.__CloseHandle(SQL_HANDLE_STMT, self.stmt_h)
        self.odbc.SQLAllocHandle.restype = ctypes.c_short
        ret = self.odbc.SQLAllocHandle(SQL_HANDLE_STMT, self.dbc_h, ctypes.byref(self.stmt_h))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)

    def __fetch(self, cols, NOR = 0):
        if not NOR: NOR = self.NumOfRow()
        rows = []
        self.odbc.SQLFetch.restype = ctypes.c_short
        while NOR:
            row = []
            ret = self.odbc.SQLFetch(self.stmt_h)
            if ret == SQL_NO_DATA_FOUND:
                break
            elif not ret == SQL_SUCCESS:
                self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
            for col in cols:
                row.append(col.value)
            rows.append(row)
            NOR -= 1
        return rows

    def __bind(self, col, data, buff_indicator):
        self.odbc.SQLBindCol.restype = ctypes.c_short
        ret = self.odbc.SQLBindCol(self.stmt_h, col, SQL_C_CHAR, ctypes.byref(data), \
          len(data), ctypes.byref(buff_indicator))
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)

    def _fetch(self, NOR = 0):
        col_vars = []
        buff = ctypes.c_int()
        for col in range(1, self.NumOfCols()+1):
            col_vars.append(ctypes.create_string_buffer(1024))
            self.__bind(col, col_vars[col -1], buff)
        return self.__fetch(col_vars, NOR)

    def __CloseCursor(self):
        self.odbc.SQLCloseCursor.restype = ctypes.c_short
        ret = self.odbc.SQLCloseCursor(self.stmt_h)
        if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
            self.ctrl_err(SQL_HANDLE_ENV, self.stmt_h, ret)
        return

    def __CloseHandle(self, ht='', h=0):
        self.odbc.SQLFreeHandle.restype = ctypes.c_short
        if ht:
            if not h.value: return
            ret = self.odbc.SQLFreeHandle(ht, h)
            if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
                self.ctrl_err(SQL_HANDLE_ENV, self.stmt_h, ret)
            return
        if self.stmt_h.value:
            if VERBOSE: print 's'
            ret = self.odbc.SQLFreeHandle(SQL_HANDLE_STMT, self.stmt_h)
            if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
                self.ctrl_err(SQL_HANDLE_ENV, self.stmt_h, ret)
        if self.dbc_h.value:
            if self.connect:
                if VERBOSE: print 'disc'
                self.odbc.SQLDisconnect.restype = ctypes.c_short
                ret = self.odbc.SQLDisconnect(self.dbc_h)
                if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
                    self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)
            if VERBOSE: print 'dbc'
            ret = self.odbc.SQLFreeHandle(SQL_HANDLE_DBC, self.dbc_h)
            if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
                self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)
        if self.env_h.value:
            if VERBOSE: print 'env'
            ret = self.odbc.SQLFreeHandle(SQL_HANDLE_ENV, self.env_h)
            if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
                self.ctrl_err(SQL_HANDLE_ENV, self.env_h, ret)

Others

def adodbapi_connection (server, database, username, password):
   #
   # http://adodbapi.sf.net
   #
   import adodbapi
   connectors = ["Provider=SQLOLEDB"]
   connectors.append ("Data Source=%s" % server)
   connectors.append ("Initial Catalog=%s" % database)
   if username:
       connectors.append ("User Id=%s" % username)
       connectors.append ("Password=%s" % password)
   else:
       connectors.append("Integrated Security=SSPI")
   return adodbapi.connect (";".join (connectors))

def pymssql_connection (server, database, username, password):
   #
   # http://pymssql.sf.net
   #
   import pymssql
   if not username:
     raise RuntimeError, "Unable to use NT authentication for pymssql"
   return pymssql.connect (user=username, password=password,
host=server, database=database)

def pyodbc_connection (server, database, username, password):
   #
   # http://pyodbc.sf.net
   #
   import pyodbc
   connectors = ["Driver={SQL Server}"]
   connectors.append ("Server=%s" % server)
   connectors.append ("Database=%s" % database)
   if username:
       connectors.append ("UID=%s" % username)
       connectors.append ("PWD=%s" % password)
   else:
       connectors.append ("TrustedConnection=Yes")
   return pyodbc.connect (";".join (connectors))