ODBC with Python
DBTable Implementation
1 #
2 # Database wrapper class.
3 #
4 import odbc
5 import os
6 import traceback
7
8 class DBTable:
9 """
10 Wrapper for database table
11 """
12 FIELD_TYPE = 0
13
14 def __init__( self, oConnection, strTable):
15 self.oConnection = oConnection
16 self.strTable = strTable
17
18 oCursor = oConnection.cursor()
19 oCursor.execute( "SELECT * FROM %s" % strTable)
20
21 self.oFields = [ oField[0] for oField in oCursor.description]
22 self.oFieldDescription = dict( [ (oField[0],
23 oField[1:]) for oField in oCursor.description])
24
25 def Select( self, strQuery):
26 """
27 Select records from query
28
29 Takes either SQL of select statement or a dictionary containing
30 field names and values to find.
31 """
32 self.oCursor = self.oConnection.cursor()
33 if type( strQuery) == type(""):
34 self.oCursor.execute( strQuery)
35 else:
36 #
37 # assume query is a dict
38 #
39 self.oCursor.execute( "SELECT * FROM %s WHERE %s" % (self.strTable,
40 self.DictToWhere( strQuery)))
41
42 def FetchOne( self):
43 """
44 Get next row of results
45 Returns a dictionary holding field names and values.
46 """
47 oRow = self.oCursor.fetchone()
48 if oRow:
49 """
50 Build a dictionary to map field name->value
51 """
52 return dict([(self.oFields[i], oRow[i]) for i in range(len(oRow))])
53 else:
54 return None
55
56 def Insert( self, oDict):
57 """
58 Insert a row in the database
59 Takes a dictionary holding field names and values.
60 """
61 strFields = oDict.keys()
62 strValues = []
63 for strField in strFields:
64 strValue = oDict[strField]
65 strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
66
67 strValues.append( self.FormatField( strField, strValue))
68
69 strSQL = "INSERT INTO %s ( %s) VALUES(%s);" % ( self.strTable,
70 ", ".join( strFields),
71 ", ".join( strValues))
72 print strSQL
73 oCursor = self.oConnection.cursor()
74 oCursor.execute( strSQL)
75 self.oConnection.commit()
76
77 def Delete( self, oDict):
78 """
79 Delete row based on dictionary contents
80 Takes a dictionary holding field names and values.
81 """
82 strSQL = "DELETE FROM %s WHERE %s;" % ( self.strTable, self.DictToWhere( oDict))
83 print strSQL
84 oCursor = self.oConnection.cursor()
85 oCursor.execute( strSQL)
86 self.oConnection.commit()
87
88 def DictToWhere( self, oDict):
89 """
90 Convert dictionary to WHERE clause.
91 """
92 strFields = oDict.keys()
93 strExpressions = []
94
95 for strField in strFields:
96 strValue = oDict[strField]
97 strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
98
99 strValue = self.FormatField( strField, strValue)
100
101 strExpressions.append( '%s = %s' % (strField, strValue))
102
103 return " AND ".join( strExpressions)
104
105 def FormatField( self, strField, strValue):
106 """
107 Format a field for an sql statement.
108 """
109 strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
110 if strType == 'STRING':
111 return "'%s'" % str(strValue).replace( "'", "''")
112 elif strType == 'NUMBER':
113 return '%d' % int( strValue)
114 else:
115 raise 'unknown field type %s' % strType
116
RealPyOdbc.py
# Welcome to RealPyODBC
# Version 0.1 beta
# This class help you to connect your python script with ODBC engine.
# I need at least ctypes 0.9.2 for work.
#
# This class is not db-api 2.0 compatible. If you want to help me to do it
# please modify it and send me an e-mail with your work!
# All the comunity will thanks you.
#
# Please send bugs and reports to michele.petrazzo@unipex.it
#
# TO-DO
# Make compatibility with db-api 2.0, so add:
# apilevel, theadsafety, paramstyle, cursor, exceptions, ....
#
# This software if released with MIT Licence
'''
A little example
dsn_test = 'mysql'
user = 'someuser'
od = odbc()
#Dsn list
DSN_list = od.EnumateDSN()
od.ConnectOdbc(dsn_test, user)
#Get tables list
tables = od.GeTables()
#Get fields on the table
cols = od.ColDescr(tables[0])
#Make a query
od.Query('SELECT * FROM %s' % tables[0])
#Get results
print od.fetchmany(2)
print od.fetchall()
#Close before exit
od.close()'''
import sys, os
import ctypes
library = "/usr/lib/libodbc.so"
VERBOSE = 0
#Costants
SQL_FETCH_NEXT = 0x01
SQL_FETCH_FIRST = 0x02
SQL_FETCH_LAST = 0x04
SQL_INVALID_HANDLE = -2
SQL_SUCCESS = 0
SQL_SUCCESS_WITH_INFO = 1
SQL_NO_DATA_FOUND = 100
SQL_NULL_HANDLE = 0
SQL_HANDLE_ENV = 1
SQL_HANDLE_DBC = 2
SQL_HANDLE_DESCR = 4
SQL_HANDLE_STMT = 3
SQL_ATTR_ODBC_VERSION = 200
SQL_OV_ODBC2 = 2
SQL_TABLE_NAMES = 3
SQL_C_CHAR = 1
#Types
SqlTypes = {0:'TYPE_NULL',1:'CHAR',2:'NUMERIC',3:'DECIMAL',4:'INTEGER', \
5:'SMALLINT',6:'FLOAT',7:'REAL',8:'DOUBLE',9:'DATE',10:'TIME',\
11:'TIMESTAMP',12:'VARCHAR'}
#Custom exceptions
class OdbcNoLibrary(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class OdbcLibraryError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class OdbcInvalidHandle(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class OdbcGenericError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class odbc:
"""This class implement a odbc connection. It use ctypes for work.
"""
def __init__(self):
"""Init variables and connect to the engine"""
self.connect = 0
if sys.platform == 'win32':
self.odbc = ctypes.windll.odbc32
else:
if not os.path.exists(library):
raise OdbcNoLibrary, 'Library %s not found' % library
try:
self.odbc = ctypes.cdll.LoadLibrary(library)
except:
raise OdbcLibraryError, 'Error while loading %s' % library
self.env_h = ctypes.c_int()
self.dbc_h = ctypes.c_int()
self.stmt_h = ctypes.c_int()
self.odbc.SQLAllocHandle.restype = ctypes.c_short
ret = self.odbc.SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, ctypes.byref(self.env_h))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.env_h, ret)
self.odbc.SQLSetEnvAttr.restype = ctypes.c_short
ret = self.odbc.SQLSetEnvAttr(self.env_h, SQL_ATTR_ODBC_VERSION, SQL_OV_ODBC2, 0)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.env_h, ret)
self.odbc.SQLAllocHandle.restype = ctypes.c_short
ret = self.odbc.SQLAllocHandle(SQL_HANDLE_DBC, self.env_h, ctypes.byref(self.dbc_h))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)
def ConnectOdbc(self, dsn, user, passwd = ''):
"""Connect to odbc, we need dsn, user and optionally password"""
self.dsn = dsn
self.user = user
self.passwd = passwd
sn = ctypes.create_string_buffer(dsn)
un = ctypes.create_string_buffer(user)
pw = ctypes.create_string_buffer(passwd)
self.odbc.SQLConnect.restype = ctypes.c_short
ret = self.odbc.SQLConnect(self.dbc_h, sn, len(sn), un, len(un), pw, len(pw))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)
self.__set_stmt_h()
self.connect = 1
def GeTables(self):
"""Return a list with all tables"""
self.__set_stmt_h()
self.odbc.SQLTables.restype = ctypes.c_short
#We want only tables
t_type = ctypes.create_string_buffer('TABLE')
ret = self.odbc.SQLTables(self.stmt_h, None, 0, None, 0, None, 0, \
ctypes.byref(t_type), len(t_type))
if not ret == SQL_SUCCESS:
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
data = ctypes.create_string_buffer(1024)
buff = ctypes.c_int()
self.__bind(SQL_TABLE_NAMES, data, buff)
return self.__fetch([data])
def Query(self, q):
"""Make a query"""
self.__set_stmt_h()
self.odbc.SQLExecDirect.restype = ctypes.c_short
ret = self.odbc.SQLExecDirect(self.stmt_h, q, len(q))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
def FetchOne(self):
return self._fetch(1)
def FetchMany(self, rows):
return self._fetch(rows)
def FetchAll(self):
return self._fetch()
def NumOfCols(self):
"""Get the number of cols"""
NOC = ctypes.c_int()
self.odbc.SQLNumResultCols.restype = ctypes.c_short
ret = self.odbc.SQLNumResultCols(self.stmt_h, ctypes.byref(NOC))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
return NOC.value
def NumOfRow(self):
"""Get the number of rows"""
NOR = ctypes.c_int()
self.odbc.SQLRowCount.restype = ctypes.c_short
ret = self.odbc.SQLRowCount(self.stmt_h, ctypes.byref(NOR))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
return NOR.value
def ColDescr(self, table):
"""We return a list with a tuple for every col:
field, type, number of digits, allow null"""
self.Query("SELECT * FROM " + table)
NOC = self.NumOfCols()
self.odbc.SQLDescribeCol.restype = ctypes.c_short
CName = ctypes.create_string_buffer(1024)
Cname_ptr = ctypes.c_int()
Ctype = ctypes.c_int()
Csize = ctypes.c_int()
NOdigits = ctypes.c_int()
Allow_nuls = ctypes.c_int()
ColDescr = []
for row in range(1, NOC+1):
ret = self.odbc.SQLDescribeCol(self.stmt_h, row, ctypes.byref(CName), len(CName), ctypes.byref(Cname_ptr),\
ctypes.byref(Ctype),ctypes.byref(Csize),ctypes.byref(NOdigits), ctypes.byref(Allow_nuls))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
if SqlTypes.has_key(Ctype.value):
ColDescr.append((CName.value, SqlTypes[Ctype.value],NOdigits.value,Allow_nuls.value))
else:
ColDescr.append((CName.value, SqlTypes[0],NOdigits.value,Allow_nuls.value))
return ColDescr
def EnumateDSN(self):
"""Return a list with [name, descrition]"""
dsn = ctypes.create_string_buffer(1024)
desc = ctypes.create_string_buffer(1024)
dsn_len = ctypes.c_int()
desc_len = ctypes.c_int()
dsn_list = []
self.odbc.SQLDataSources.restype = ctypes.c_short
while 1:
ret = self.odbc.SQLDataSources(self.env_h, SQL_FETCH_NEXT, \
dsn, len(dsn), ctypes.byref(dsn_len), desc, len(desc), ctypes.byref(desc_len))
if ret == SQL_NO_DATA_FOUND:
break
elif not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, stmt_h, ret)
else:
dsn_list.append((dsn.value, desc.value))
return dsn_list
def ctrl_err(self, ht, h, val_ret):
"""Method for make a control of the errors
We get type of handle, handle, return value
Return a raise with a list"""
state = ctypes.create_string_buffer(5)
NativeError = ctypes.c_int()
Message = ctypes.create_string_buffer(1024*10)
Buffer_len = ctypes.c_int()
err_list = []
number_errors = 1
self.odbc.SQLGetDiagRec.restype = ctypes.c_short
while 1:
ret = self.odbc.SQLGetDiagRec(ht, h, number_errors, state, \
NativeError, Message, len(Message), ctypes.byref(Buffer_len))
if ret == SQL_NO_DATA_FOUND:
#No more data, I can raise
raise OdbcGenericError, err_list
break
elif ret == SQL_INVALID_HANDLE:
#The handle passed is an invalid handle
raise OdbcInvalidHandle, 'SQL_INVALID_HANDLE'
elif ret == SQL_SUCCESS:
err_list.append((state.value, Message.value, NativeError.value))
number_errors += 1
def close(self):
"""Call me before exit, please"""
self.__CloseCursor()
self.__CloseHandle()
def __set_stmt_h(self):
self.__CloseHandle(SQL_HANDLE_STMT, self.stmt_h)
self.odbc.SQLAllocHandle.restype = ctypes.c_short
ret = self.odbc.SQLAllocHandle(SQL_HANDLE_STMT, self.dbc_h, ctypes.byref(self.stmt_h))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
def __fetch(self, cols, NOR = 0):
if not NOR: NOR = self.NumOfRow()
rows = []
self.odbc.SQLFetch.restype = ctypes.c_short
while NOR:
row = []
ret = self.odbc.SQLFetch(self.stmt_h)
if ret == SQL_NO_DATA_FOUND:
break
elif not ret == SQL_SUCCESS:
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
for col in cols:
row.append(col.value)
rows.append(row)
NOR -= 1
return rows
def __bind(self, col, data, buff_indicator):
self.odbc.SQLBindCol.restype = ctypes.c_short
ret = self.odbc.SQLBindCol(self.stmt_h, col, SQL_C_CHAR, ctypes.byref(data), \
len(data), ctypes.byref(buff_indicator))
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_STMT, self.stmt_h, ret)
def _fetch(self, NOR = 0):
col_vars = []
buff = ctypes.c_int()
for col in range(1, self.NumOfCols()+1):
col_vars.append(ctypes.create_string_buffer(1024))
self.__bind(col, col_vars[col -1], buff)
return self.__fetch(col_vars, NOR)
def __CloseCursor(self):
self.odbc.SQLCloseCursor.restype = ctypes.c_short
ret = self.odbc.SQLCloseCursor(self.stmt_h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.stmt_h, ret)
return
def __CloseHandle(self, ht='', h=0):
self.odbc.SQLFreeHandle.restype = ctypes.c_short
if ht:
if not h.value: return
ret = self.odbc.SQLFreeHandle(ht, h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.stmt_h, ret)
return
if self.stmt_h.value:
if VERBOSE: print 's'
ret = self.odbc.SQLFreeHandle(SQL_HANDLE_STMT, self.stmt_h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.stmt_h, ret)
if self.dbc_h.value:
if self.connect:
if VERBOSE: print 'disc'
self.odbc.SQLDisconnect.restype = ctypes.c_short
ret = self.odbc.SQLDisconnect(self.dbc_h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)
if VERBOSE: print 'dbc'
ret = self.odbc.SQLFreeHandle(SQL_HANDLE_DBC, self.dbc_h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_DBC, self.dbc_h, ret)
if self.env_h.value:
if VERBOSE: print 'env'
ret = self.odbc.SQLFreeHandle(SQL_HANDLE_ENV, self.env_h)
if not ret in (SQL_SUCCESS, SQL_SUCCESS_WITH_INFO):
self.ctrl_err(SQL_HANDLE_ENV, self.env_h, ret)
Others
def adodbapi_connection (server, database, username, password):
#
# http://adodbapi.sf.net
#
import adodbapi
connectors = ["Provider=SQLOLEDB"]
connectors.append ("Data Source=%s" % server)
connectors.append ("Initial Catalog=%s" % database)
if username:
connectors.append ("User Id=%s" % username)
connectors.append ("Password=%s" % password)
else:
connectors.append("Integrated Security=SSPI")
return adodbapi.connect (";".join (connectors))
def pymssql_connection (server, database, username, password):
#
# http://pymssql.sf.net
#
import pymssql
if not username:
raise RuntimeError, "Unable to use NT authentication for pymssql"
return pymssql.connect (user=username, password=password,
host=server, database=database)
def pyodbc_connection (server, database, username, password):
#
# http://pyodbc.sf.net
#
import pyodbc
connectors = ["Driver={SQL Server}"]
connectors.append ("Server=%s" % server)
connectors.append ("Database=%s" % database)
if username:
connectors.append ("UID=%s" % username)
connectors.append ("PWD=%s" % password)
else:
connectors.append ("TrustedConnection=Yes")
return pyodbc.connect (";".join (connectors))