Difference between revisions of "Python pymssql and dblib Examples"
Jump to navigation
Jump to search
PeterHarding (talk | contribs) (New page: <pre> import pymssql con = pymssql.connect(host='192.168.13.122',user='sa',password='',database='tempdb') cur = con.cursor() query="create table pymssql (no int, fno float, comment va...) |
PeterHarding (talk | contribs) |
||
Line 1: | Line 1: | ||
=Using psmssql Module= | |||
<pre> | <pre> | ||
Line 40: | Line 40: | ||
con.commit() | con.commit() | ||
con.close() | con.close() | ||
</pre> | |||
=Using DBLIB= | |||
dblib - derived from the origain Sybase mechanism - was used with MS SqlServer until SqlServer 2000 version. It has been replaced in subsequent releases. | |||
LoadRunner uses this method for it native access to SqlServer databases - which becomes problematic with perfoprmance testing modern SqlServer releases. | |||
<pre> | |||
#dblib.py | |||
#created by Jorge Besada | |||
import os,sys | |||
class Connection: | |||
def __init__(self,sname,uname='',password='',db='', version=''): | |||
self.version = version | |||
self.servername = sname | |||
self.username = uname | |||
self.password = password | |||
self.defdb = db | |||
self.constr = '' | |||
if db == '': | |||
self.defdb = 'master' | |||
self.connected = 0 | |||
if self.version == None or self.version == "": | |||
print "Need to pass sql version argument" | |||
return self | |||
if self.version == "sql2000" or self.version == "sql7": | |||
execsql = "osql" | |||
if self.version == "sql2005": | |||
execsql = "sqlcmd" | |||
if self.version == "sybase": | |||
execsql = "isql" | |||
print "Sorry, Sybase has not been implemented yet!" | |||
return self | |||
if uname == '': | |||
self.constr = execsql + " -E -S" + self.servername + " -d" + self.defdb + " /w 8192 " | |||
else: | |||
self.constr = execsql + " -U" + self.username + " -P" + self.password + " -S" + self.servername + " -d" + self.defdb + " /w 8192 " | |||
#test connection: | |||
s = "set nocount on select name from master..syslogins where name = 'sa'" | |||
lst = os.popen(self.constr + ' -Q' + '"' + s + '"').readlines() | |||
try: | |||
if lst[2].strip() == 'sa': | |||
self.connected = 1 | |||
else: | |||
self.connected = 0 | |||
c = Cursor() | |||
c.servername = sname | |||
c.username = uname | |||
c.password = password | |||
c.defdb = db | |||
c.constr = self.constr | |||
self.cursor = c | |||
except IndexError: | |||
print "Could not connect" | |||
def commit(self): | |||
"this is here for compatibility" | |||
pass | |||
def close(self): | |||
self = None | |||
return self | |||
class Cursor: | |||
def __init__(self): | |||
self.defdb = '' | |||
self.servername = '' | |||
self.username = '' | |||
self.password = '' | |||
self.constr = '' | |||
self.rowcount = -1 | |||
self.records = [] | |||
self.rowid = 0 | |||
self.sqlfile = "-Q" | |||
self.colseparator = chr(1) #default column separator | |||
#this is going to be a list of lists, each one with: | |||
#name, type_code, display_size, internal_size, precision, scale, null_ok | |||
self.description = [] | |||
self.fieldnames = [] | |||
self.fieldvalues = [] | |||
self.fieldvalue = [] | |||
#one dictionary by column | |||
self.dictfield = {'name':'', 'type_code':0,'display_size':0,'internal_size':0,'precision':0, 'scale':0, 'null_ok':0} | |||
#list of lists | |||
self.dictfields = [] | |||
#this is for compatibility to allow both types of calls: | |||
#cursor = connection.cursor() or using cursor = connection.cursor | |||
def __call__(self): | |||
c = Cursor() | |||
return c | |||
def execute(self, s): | |||
self.records = [] | |||
lst = os.popen(self.constr + ' -s' + self.colseparator + " " + self.sqlfile + '"' + s + '"').readlines() | |||
if len(lst) == 0: | |||
return self.rowcount | |||
#If we get here we have results | |||
#rowcount maybe in last line, in this form: (4 rows affected) | |||
tmplastline = lst[-1] | |||
if tmplastline[0] == "(": #there is a rowcount | |||
lastline = lst[-1] | |||
spacepos = lastline.index(" ") | |||
count = lastline[1:spacepos] | |||
self.rowcount = int(count) | |||
else: | |||
#last line has no recordcount, so reset it to 0 | |||
self.records = lst[:] | |||
self.rowcount = 0 | |||
return self.rowcount | |||
#if we got here we may have a rowcount and the list with results | |||
i = 0 | |||
#process metadata if we have it: | |||
firstline = lst[0] | |||
lst1 = lst[0].split(self.colseparator) | |||
self.fieldnames = [] | |||
for x in lst1: | |||
x1 = x.strip() | |||
self.fieldnames.append(x1) #add column name | |||
#need to make a list for each column name | |||
self.description = [] | |||
for x in self.fieldnames: | |||
l = [] | |||
l.append(x) | |||
for m in range(len(self.dictfield) - 1): | |||
l.append(0) | |||
l2 = tuple(l) | |||
self.description.append(l2) | |||
self.description = tuple(self.description) | |||
#Data section: lst[0] is row with column names,skip | |||
#If the resulting string starts and ends with '-', discard | |||
for x in lst[1:-1]: | |||
x0 = ''.join(x) | |||
x1 = x0.strip() | |||
if x1 > '' and x1[0] > '-' and x1[-1] > '-': | |||
self.records.append(x1) | |||
#reset for each execution | |||
self.rowid = 0 | |||
return self.rowcount | |||
#returns one row of the result set, keeps track of the position | |||
def fetchone(self): | |||
i = self.rowid | |||
j = i + 1 | |||
self.rowid = j | |||
try: | |||
return tuple(self.records[i].split(self.colseparator)) | |||
except IndexError: | |||
pass | |||
#returns whole recordset | |||
def fetchall(self): | |||
lst = [] | |||
try: | |||
for x in range(self.rowid, self.rowcount): | |||
x1 = tuple(self.records[x].split(self.colseparator)) | |||
lst.append(x1) | |||
except IndexError: | |||
pass | |||
return lst | |||
def close(self): | |||
self.records = None | |||
self = None | |||
return self | |||
#----------------------------------------- | |||
#Testing harness: we create and drop logins and databases | |||
#Edit connection for desired server name and security options: | |||
#For local server, integrated security | |||
# c = Connection('(local)',db='pubs', version='sql2000') | |||
#For local server, SQL security | |||
# c = Connection('(local)','sa','sa password',db='pubs', version='sql2000') | |||
#The first part of the test uses a restored pubs database | |||
#in a SQL2005 instance (local)\sql1, the second test uses the pubs database | |||
#from the default instance in the same server (local machine) | |||
if __name__ == '__main__': | |||
c = Connection('(local)\sql1',db='pubs', version='sql2005') | |||
print "Connection string: " + c.constr | |||
if c.connected == 1: | |||
print "Connected OK" | |||
cu = c.cursor | |||
lst = cu.execute('select * from authors') | |||
print 'rowcount=' + str(cu.rowcount) | |||
rows = cu.fetchall() | |||
for x in rows: | |||
print x | |||
c.close() | |||
#Several SQL statements test | |||
lst = cu.execute("sp_addlogin 'test2', 'test2'") | |||
print 'rowcount=' + str(cu.rowcount) | |||
lst = cu.execute("select name from master..syslogins where name = 'test2'") | |||
print 'rowcount=' + str(cu.rowcount) | |||
rows = cu.fetchall() | |||
for x in rows: | |||
print x | |||
c.close() | |||
lst = cu.execute("EXEC sp_droplogin 'test2'") | |||
print 'rowcount=' + str(cu.rowcount) | |||
lst = cu.execute("select name from master..syslogins where name = 'test2'") | |||
print 'rowcount=' + str(cu.rowcount) | |||
rows = cu.fetchall() | |||
for x in rows: | |||
print x | |||
c.close() | |||
lst = cu.execute("CREATE DATABASE test") | |||
print 'rowcount=' + str(cu.rowcount) | |||
lst = cu.execute("select name from master..sysdatabases where name = 'test'") | |||
print 'rowcount=' + str(cu.rowcount) | |||
rows = cu.fetchall() | |||
for x in rows: | |||
print x | |||
c.close() | |||
lst = cu.execute("DROP DATABASE test") | |||
print 'rowcount=' + str(cu.rowcount) | |||
lst = cu.execute("select name from master..sysdatabases where name = 'test'") | |||
print 'rowcount=' + str(cu.rowcount) | |||
rows = cu.fetchall() | |||
for x in rows: | |||
print x | |||
c.close() | |||
print "\n\nRepeating test with SQL2000" | |||
c = Connection('(local)',db='pubs', version='sql2000') | |||
print "Connection string: " + c.constr | |||
if c.connected == 1: | |||
print "Connected OK" | |||
cu = c.cursor | |||
lst = cu.execute('select * from authors') | |||
print 'rowcount=' + str(cu.rowcount) | |||
rows = cu.fetchall() | |||
for x in rows: | |||
print x | |||
c.close() | |||
</pre> | </pre> | ||
[[Category:Python]] | [[Category:Python]] | ||
[[Category:Database]] | [[Category:Database]] |
Revision as of 19:23, 30 April 2008
Using psmssql Module
import pymssql con = pymssql.connect(host='192.168.13.122',user='sa',password='',database='tempdb') cur = con.cursor() query="create table pymssql (no int, fno float, comment varchar(50));" cur.execute(query) print "create table: %d" % cur.rowcount for x in range(10): query="insert into pymssql (no,fno,comment) values (%d,%d.%d,'%dth comment');" % (x+1,x+1,x+1,x+1) ret=cur.execute(query) print "insert table: %d" % cur.rowcount for x in range(10): query="update pymssql set comment='%dth hahaha.' where no = %d" % (x+1,x+1) ret=cur.execute(query) print "update table: %d" % cur.rowcount query="EXEC sp_tables; select * from pymssql;" for x in range(10): cur.execute(query) while 1: print cur.fetchall() if 0 == cur.nextset(): break query="drop table pymssql;" cur.execute(query) print "drop table: %d" % cur.rowcount con.commit() con.close()
Using DBLIB
dblib - derived from the origain Sybase mechanism - was used with MS SqlServer until SqlServer 2000 version. It has been replaced in subsequent releases.
LoadRunner uses this method for it native access to SqlServer databases - which becomes problematic with perfoprmance testing modern SqlServer releases.
#dblib.py #created by Jorge Besada import os,sys class Connection: def __init__(self,sname,uname='',password='',db='', version=''): self.version = version self.servername = sname self.username = uname self.password = password self.defdb = db self.constr = '' if db == '': self.defdb = 'master' self.connected = 0 if self.version == None or self.version == "": print "Need to pass sql version argument" return self if self.version == "sql2000" or self.version == "sql7": execsql = "osql" if self.version == "sql2005": execsql = "sqlcmd" if self.version == "sybase": execsql = "isql" print "Sorry, Sybase has not been implemented yet!" return self if uname == '': self.constr = execsql + " -E -S" + self.servername + " -d" + self.defdb + " /w 8192 " else: self.constr = execsql + " -U" + self.username + " -P" + self.password + " -S" + self.servername + " -d" + self.defdb + " /w 8192 " #test connection: s = "set nocount on select name from master..syslogins where name = 'sa'" lst = os.popen(self.constr + ' -Q' + '"' + s + '"').readlines() try: if lst[2].strip() == 'sa': self.connected = 1 else: self.connected = 0 c = Cursor() c.servername = sname c.username = uname c.password = password c.defdb = db c.constr = self.constr self.cursor = c except IndexError: print "Could not connect" def commit(self): "this is here for compatibility" pass def close(self): self = None return self class Cursor: def __init__(self): self.defdb = '' self.servername = '' self.username = '' self.password = '' self.constr = '' self.rowcount = -1 self.records = [] self.rowid = 0 self.sqlfile = "-Q" self.colseparator = chr(1) #default column separator #this is going to be a list of lists, each one with: #name, type_code, display_size, internal_size, precision, scale, null_ok self.description = [] self.fieldnames = [] self.fieldvalues = [] self.fieldvalue = [] #one dictionary by column self.dictfield = {'name':'', 'type_code':0,'display_size':0,'internal_size':0,'precision':0, 'scale':0, 'null_ok':0} #list of lists self.dictfields = [] #this is for compatibility to allow both types of calls: #cursor = connection.cursor() or using cursor = connection.cursor def __call__(self): c = Cursor() return c def execute(self, s): self.records = [] lst = os.popen(self.constr + ' -s' + self.colseparator + " " + self.sqlfile + '"' + s + '"').readlines() if len(lst) == 0: return self.rowcount #If we get here we have results #rowcount maybe in last line, in this form: (4 rows affected) tmplastline = lst[-1] if tmplastline[0] == "(": #there is a rowcount lastline = lst[-1] spacepos = lastline.index(" ") count = lastline[1:spacepos] self.rowcount = int(count) else: #last line has no recordcount, so reset it to 0 self.records = lst[:] self.rowcount = 0 return self.rowcount #if we got here we may have a rowcount and the list with results i = 0 #process metadata if we have it: firstline = lst[0] lst1 = lst[0].split(self.colseparator) self.fieldnames = [] for x in lst1: x1 = x.strip() self.fieldnames.append(x1) #add column name #need to make a list for each column name self.description = [] for x in self.fieldnames: l = [] l.append(x) for m in range(len(self.dictfield) - 1): l.append(0) l2 = tuple(l) self.description.append(l2) self.description = tuple(self.description) #Data section: lst[0] is row with column names,skip #If the resulting string starts and ends with '-', discard for x in lst[1:-1]: x0 = ''.join(x) x1 = x0.strip() if x1 > '' and x1[0] > '-' and x1[-1] > '-': self.records.append(x1) #reset for each execution self.rowid = 0 return self.rowcount #returns one row of the result set, keeps track of the position def fetchone(self): i = self.rowid j = i + 1 self.rowid = j try: return tuple(self.records[i].split(self.colseparator)) except IndexError: pass #returns whole recordset def fetchall(self): lst = [] try: for x in range(self.rowid, self.rowcount): x1 = tuple(self.records[x].split(self.colseparator)) lst.append(x1) except IndexError: pass return lst def close(self): self.records = None self = None return self #----------------------------------------- #Testing harness: we create and drop logins and databases #Edit connection for desired server name and security options: #For local server, integrated security # c = Connection('(local)',db='pubs', version='sql2000') #For local server, SQL security # c = Connection('(local)','sa','sa password',db='pubs', version='sql2000') #The first part of the test uses a restored pubs database #in a SQL2005 instance (local)\sql1, the second test uses the pubs database #from the default instance in the same server (local machine) if __name__ == '__main__': c = Connection('(local)\sql1',db='pubs', version='sql2005') print "Connection string: " + c.constr if c.connected == 1: print "Connected OK" cu = c.cursor lst = cu.execute('select * from authors') print 'rowcount=' + str(cu.rowcount) rows = cu.fetchall() for x in rows: print x c.close() #Several SQL statements test lst = cu.execute("sp_addlogin 'test2', 'test2'") print 'rowcount=' + str(cu.rowcount) lst = cu.execute("select name from master..syslogins where name = 'test2'") print 'rowcount=' + str(cu.rowcount) rows = cu.fetchall() for x in rows: print x c.close() lst = cu.execute("EXEC sp_droplogin 'test2'") print 'rowcount=' + str(cu.rowcount) lst = cu.execute("select name from master..syslogins where name = 'test2'") print 'rowcount=' + str(cu.rowcount) rows = cu.fetchall() for x in rows: print x c.close() lst = cu.execute("CREATE DATABASE test") print 'rowcount=' + str(cu.rowcount) lst = cu.execute("select name from master..sysdatabases where name = 'test'") print 'rowcount=' + str(cu.rowcount) rows = cu.fetchall() for x in rows: print x c.close() lst = cu.execute("DROP DATABASE test") print 'rowcount=' + str(cu.rowcount) lst = cu.execute("select name from master..sysdatabases where name = 'test'") print 'rowcount=' + str(cu.rowcount) rows = cu.fetchall() for x in rows: print x c.close() print "\n\nRepeating test with SQL2000" c = Connection('(local)',db='pubs', version='sql2000') print "Connection string: " + c.constr if c.connected == 1: print "Connected OK" cu = c.cursor lst = cu.execute('select * from authors') print 'rowcount=' + str(cu.rowcount) rows = cu.fetchall() for x in rows: print x c.close()