Python - MS SQL Server Modules

From PeformIQ Upgrade
Revision as of 14:48, 9 November 2007 by PeterHarding (talk | contribs) (New page: = Python MS SQL Server Interface Modules = * [http://wiki.python.org/moin/SQL_Server Accessing SQL Server - PythonInfo Wiki] * [http://www.object-craft.com.au/projects/mssql/ MS SQL Serv...)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Python MS SQL Server Interface Modules

Other Resources

Misc Links

Using mxODBC

How about trying mxODBC ([4]). I found that it's quite easy to install and works great. Besides, the Win32 ODBC module, according to the Python Website, is not currently being maintained by anyone.

Using mxODBC, I found that the following line will create a connection to a database within a CGI script:


# Open connect to Database via ODBC
 db = Odbc.Windows.DriverConnect('DRIVER={Microsoft ODBC for
Oracle};SERVER=Test;UID=userid;PWD=password;')

Obviously, I'm using the ODBC driver for Oracle, so replace this driver with SQL Server.

Good Luck, Paul

"Navtej Riyait" <Navtej.Riyait at ukgateway.net> wrote in message
news:86ioek$iek$1 at lure.pipex.net...
> Hello,
>
> I am trying to create a little test web application in python/HTML using
> IIS. I have setup IIS so that .cgi scripts are sent to the python
> interpreter. I am trying to access a SQL Server database, 'pubs' (using
> Python DB APIs). I am importing odbc, dbi in my script. Database access
> works fine when I run a dbtest script at the command line, but the same bit
> of code gives an error when run in the .cgi script. It is not finding the
> data source name (DSN). I have set up the DSN in the ODBC Driver Manager
> correctly.
>
> I can't see what environment parameters to set to get the .cgi python script
> to access the 'pubs' database. Can anyone help ?
>
>
> Thanks
>

Some Examples

\\

Example 1

import pymssql

con = pymssql.connect(host='192.168.13.122',user='sa',password=,database='tempdb') cur = con.cursor()


query="create table pymssql (no int, fno float, comment varchar(50));" cur.execute(query) print "create table: %d" % cur.rowcount

for x in range(10):

   query="insert into pymssql (no,fno,comment) values (%d,%d.%d,'%dth comment');" % (x+1,x+1,x+1,x+1)
   ret=cur.execute(query)
   print "insert table: %d" % cur.rowcount


for x in range(10):

   query="update pymssql set comment='%dth hahaha.' where no = %d" % (x+1,x+1)
   ret=cur.execute(query)
   print "update table: %d" % cur.rowcount


query="EXEC sp_tables; select * from pymssql;"

for x in range(10):

   cur.execute(query)
   while 1:

print cur.fetchall() if 0 == cur.nextset(): break


query="drop table pymssql;" cur.execute(query) print "drop table: %d" % cur.rowcount

con.commit() con.close()

Example 2

import _mssql

mssql=_mssql.connect('192.168.13.122','sa','')
mssql.select_db('tempdb')

# create db

query="create table pymssql (no int, comment varchar(50));"

ret = mssql.query(query)
if ret:
print "create table: %d" % ret
print mssql.fetch_array()
else:
print mssql.errmsg()


# insert

for x in range(10):
query="insert into pymssql (no,comment) values (%d,'%dth comment');" % (x+1,x+1)
ret=mssql.query(query)
if ret:
print "insert table: %d" % ret
print mssql.fetch_array()

else:
print mssql.errmsg()


# update

for x in range(10):
query="update pymssql set comment='%dth hahaha.' where no = %d" % (x+1,x+1)
ret=mssql.query(query)

if ret:
print "update table: %d" % ret
print mssql.fetch_array()
else:
print mssql.errmsg()


# multiple query and multiple result

query="EXEC sp_tables; select * from pymssql;"
for x in range(10):
if mssql.query(query):
header=mssql.fetch_array()
for y in header:
print y
#print x,header[0][0][0],len(header[0][1][0])

else:
print mssql.errmsg()
print mssql.stdmsg()


# drop table

query="drop table pymssql;"
ret = mssql.query(query)
if ret:
print "drop table: %d" % ret
print mssql.fetch_array()

else:
print mssql.errmsg()


mssql.close()

Other Stuff

MS SQL Server DBI Implementation

This DBI implements the Cursor and Connection objects. It is functional: you can create connections, cursors, do fetchone, fetchall, get rowcount, etc. It uses osql or SQL2005's sqlcmd instead of ODBC or ADO. There is a good sized section with examples to get you started. The SQL2005 support is new, showing improved execution speed for SQL2005's sqlcmd.exe, even when accessing SQL2000 databases.


#dblib.py
#created by Jorge Besada

import os,sys

class Connection:
    def __init__(self,sname,uname='',password='',db='', version=''):
        self.version = version
        self.servername = sname
        self.username = uname
        self.password = password
        self.defdb = db
        self.constr = ''
        if db == '':
            self.defdb = 'master'
        self.connected = 0
        if self.version == None or self.version == "":
            print "Need to pass sql version argument"
            return self
        if self.version == "sql2000" or self.version == "sql7":
            execsql = "osql"
        if self.version == "sql2005":
            execsql = "sqlcmd"
        if self.version == "sybase":
            execsql = "isql"
            print "Sorry, Sybase has not been implemented yet!"
            return self
        if uname == '':
            self.constr = execsql + " -E -S" + self.servername + " -d" + self.defdb + " /w 8192 "
        else:
            self.constr = execsql + " -U" + self.username + " -P" + self.password + " -S" + self.servername + " -d" + self.defdb + " /w 8192 "

        #test connection:
        s = "set nocount on select name from master..syslogins where name = 'sa'"
        lst = os.popen(self.constr + ' -Q' + '"' + s + '"').readlines()

        try:
            if lst[2].strip() == 'sa':
                self.connected = 1
            else:
                self.connected = 0
            c = Cursor()
            c.servername = sname
            c.username = uname
            c.password = password
            c.defdb = db
            c.constr = self.constr
            self.cursor = c
        except IndexError:
            print "Could not connect"

    def commit(self):
        "this is here for compatibility"
        pass

    def close(self):
        self = None
        return self


class Cursor:
    def __init__(self):
        self.defdb = ''
        self.servername = ''
        self.username = ''
        self.password = ''
        self.constr = ''
        self.rowcount = -1
        self.records = []
        self.rowid = 0
        self.sqlfile = "-Q"
        self.colseparator = chr(1) #default column separator
        #this is going to be a list of lists, each one with:
        #name, type_code, display_size, internal_size, precision, scale, null_ok
        self.description = []
        self.fieldnames = []
        self.fieldvalues = []
        self.fieldvalue = []
        #one dictionary by column
        self.dictfield = {'name':'', 'type_code':0,'display_size':0,'internal_size':0,'precision':0, 'scale':0, 'null_ok':0}
        #list of lists
        self.dictfields = []

    #this is for compatibility to allow both types of calls:
    #cursor = connection.cursor() or using cursor = connection.cursor
    def __call__(self):
        c = Cursor()
        return c

    def execute(self, s):
        self.records = []
        lst = os.popen(self.constr + ' -s' + self.colseparator + " " + self.sqlfile + '"' + s + '"').readlines()
        if len(lst) == 0:
            return self.rowcount

        #If we get here we have results
        #rowcount maybe in last line, in this form: (4 rows affected)
        tmplastline = lst[-1]
        if tmplastline[0] == "(":  #there is a rowcount
            lastline = lst[-1]
            spacepos = lastline.index(" ")
            count = lastline[1:spacepos]
            self.rowcount = int(count)
        else:
            #last line has no recordcount, so reset it to 0
            self.records = lst[:]
            self.rowcount = 0
            return self.rowcount

        #if we got here we may have a rowcount and the list with results
        i = 0
        #process metadata if we have it:
        firstline = lst[0]
        lst1 = lst[0].split(self.colseparator)
        self.fieldnames = []
        for x in lst1:
            x1 = x.strip()
            self.fieldnames.append(x1)  #add column name
        #need to make a list for each column name
        self.description = []
        for x in self.fieldnames:
            l = []
            l.append(x)
            for m in range(len(self.dictfield) - 1):
                l.append(0)
            l2 = tuple(l)
            self.description.append(l2)
        self.description = tuple(self.description)

        #Data section: lst[0] is row with column names,skip
        #If the resulting string starts and ends with '-', discard

        for x in lst[1:-1]:
            x0 = ''.join(x)
            x1 = x0.strip()
            if x1 > '' and x1[0] > '-' and x1[-1] > '-':
                self.records.append(x1)
        #reset for each execution
        self.rowid = 0
        return self.rowcount

    #returns one row of the result set, keeps track of the position
    def fetchone(self):
        i = self.rowid
        j = i + 1
        self.rowid = j
        try:
            return tuple(self.records[i].split(self.colseparator))
        except IndexError:
            pass

    #returns whole recordset
    def fetchall(self):
        lst = []
        try:
            for x in range(self.rowid, self.rowcount):
                x1 = tuple(self.records[x].split(self.colseparator))
                lst.append(x1)
        except IndexError:
            pass
        return lst

    def close(self):
        self.records = None
        self = None
        return self

#-----------------------------------------

#Testing harness: we create and drop logins and databases
#Edit connection for desired server name and security options:
#For local server, integrated security
#   c = Connection('(local)',db='pubs', version='sql2000')
#For local server, SQL security
#   c = Connection('(local)','sa','sa password',db='pubs', version='sql2000')
#The first part of the test uses a restored pubs database
#in a SQL2005 instance (local)\sql1, the second test uses the pubs database
#from the default instance in the same server (local machine)


if __name__ == '__main__':
    c = Connection('(local)\sql1',db='pubs', version='sql2005')
    print "Connection string: " + c.constr
    if c.connected == 1:
        print "Connected OK"
    cu = c.cursor
    lst = cu.execute('select * from authors')
    print 'rowcount=' + str(cu.rowcount)
    rows = cu.fetchall()
    for x in rows:
        print x
    c.close()

    #Several SQL statements test
    lst = cu.execute("sp_addlogin 'test2', 'test2'")
    print 'rowcount=' + str(cu.rowcount)
    lst = cu.execute("select name from master..syslogins where name = 'test2'")
    print 'rowcount=' + str(cu.rowcount)
    rows = cu.fetchall()
    for x in rows:
        print x
    c.close()

    lst = cu.execute("EXEC sp_droplogin 'test2'")
    print 'rowcount=' + str(cu.rowcount)
    lst = cu.execute("select name from master..syslogins where name = 'test2'")
    print 'rowcount=' + str(cu.rowcount)
    rows = cu.fetchall()
    for x in rows:
        print x
    c.close()

    lst = cu.execute("CREATE DATABASE test")
    print 'rowcount=' + str(cu.rowcount)
    lst = cu.execute("select name from master..sysdatabases where name = 'test'")
    print 'rowcount=' + str(cu.rowcount)
    rows = cu.fetchall()
    for x in rows:
        print x
    c.close()

    lst = cu.execute("DROP DATABASE test")
    print 'rowcount=' + str(cu.rowcount)
    lst = cu.execute("select name from master..sysdatabases where name = 'test'")
    print 'rowcount=' + str(cu.rowcount)
    rows = cu.fetchall()
    for x in rows:
        print x
    c.close()

    print "\n\nRepeating test with SQL2000"
    c = Connection('(local)',db='pubs', version='sql2000')
    print "Connection string: " + c.constr
    if c.connected == 1:
        print "Connected OK"
    cu = c.cursor
    lst = cu.execute('select * from authors')
    print 'rowcount=' + str(cu.rowcount)
    rows = cu.fetchall()
    for x in rows:
        print x
    c.close()