Difference between revisions of "Experimenting with HTTP"
Jump to navigation
Jump to search
PeterHarding (talk | contribs) (New page: =An Experimental Reader= <pre> #!/usr/bin/env python import re import zlib import gzip import socket import StringIO #-------------------------------------------------------------------...) |
PeterHarding (talk | contribs) |
||
| (3 intermediate revisions by the same user not shown) | |||
| Line 1: | Line 1: | ||
=An Experimental HTTP Reader= | |||
<pre> | |||
cat go.py | |||
#!/usr/bin/env python | |||
#----------------------------------------------------------------------- | |||
""" | |||
So far does: | |||
1) Chunked Transfer-Encoding | |||
2) gzip Content-Encoding | |||
""" | |||
#----------------------------------------------------------------------- | |||
import re | |||
import zlib | |||
import gzip | |||
import socket | |||
import StringIO | |||
#----------------------------------------------------------------------- | |||
HOST = 'www.performiq.com.au' | |||
PORT = 80 | |||
status = None | |||
HEADER = 0 | |||
BODY = 1 | |||
context = HEADER | |||
data_encoding = None | |||
tranfer_encoding = None | |||
chunk_length = None | |||
body = '' | |||
p_Encoding = re.compile('Transfer-Encoding') | |||
#----------------------------------------------------------------------- | |||
# 'Accept' : 'text/plain, text/html', | |||
# 'Accept-Encoding' : 'gzip, deflate', | |||
""" | |||
get_headers = { | |||
'Accept' : '*/*', | |||
'Accept-Encoding' : 'gzip, deflate', | |||
'Accept-Language' : 'en-au', | |||
'Host' : HOST, | |||
'Connection' : 'Keep-Alive', | |||
'User-Agent' : 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727' | |||
} | |||
post_headers = { | |||
'Content-Type' : 'application/x-www-form-urlencoded', | |||
'Accept' : 'image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, application/x-shockwave-flash, */*', | |||
'Accept-Encoding' : 'gzip, deflate', | |||
'Accept-Language' : 'en-au', | |||
'Host' : HOST, | |||
'Connection' : 'Keep-Alive', | |||
'Cache-Control' : 'no-cache', | |||
'User-Agent' : 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)', | |||
} | |||
""" | |||
#----------------------------------------------------------------------- | |||
class Response: | |||
pass | |||
#------------------------------------------------------------------ | |||
def __init__(self): | |||
self.status = 0 | |||
#------------------------------------------------------------------ | |||
#----------------------------------------------------------------------- | |||
class Connection: | |||
sent = False | |||
#------------------------------------------------------------------ | |||
def __init__(self, host, port=80): | |||
self.host = host | |||
self.port = port | |||
# Create an INET, STREAMing socket | |||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |||
self.sock.connect((host, port)) | |||
self.residue = '' | |||
#------------------------------------------------------------------ | |||
def send(self, request): | |||
self.sent = True | |||
return self.sock.send(request) | |||
#------------------------------------------------------------------ | |||
def getresponse(self): | |||
self.pkt_cnt = 0 | |||
self.buffer = '' | |||
self.residue = '' | |||
self.headers = None | |||
done = False | |||
content_encoding = None | |||
while True: | |||
if not self.headers: | |||
self.get_headers() | |||
else: # Body... | |||
if self.headers.has_key('Transfer-Encoding'): | |||
transfer_encoding = self.headers['Transfer-Encoding'] | |||
if self.headers.has_key('Content-Encoding'): | |||
content_encoding = self.headers['Content-Encoding'] | |||
print "content_encoding -> ", content_encoding | |||
if transfer_encoding == 'chunked': | |||
done = self.get_chunked_body() | |||
if done: break | |||
print len(self.buffer) | |||
if content_encoding == 'gzip': | |||
data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(self.page_body)) | |||
page_body = data.read() | |||
else: | |||
page_body = self.page_body | |||
self.sent = False | |||
return page_body | |||
#------------------------------------------------------------------ | |||
def recv(self): | |||
self.buffer += self.sock.recv(1024) | |||
#------------------------------------------------------------------ | |||
def read(self, buf): | |||
pass | |||
#------------------------------------------------------------------ | |||
def close(self): | |||
pass | |||
#------------------------------------------------------------------ | |||
def get_headers(self): | |||
print "Get Headers" | |||
self.status = None | |||
self.headers = {} | |||
while True: | |||
if len(self.buffer) == 0: self.recv() | |||
idx = self.buffer.find('\r\n') | |||
if idx >= 0: | |||
line = self.buffer[:idx] | |||
self.buffer = self.buffer[idx+2:] | |||
if len(line) > 0: | |||
if self.status: | |||
idx = line.find(': ') | |||
if idx > 0: | |||
tag = line[:idx] | |||
value = line[idx+2:] | |||
print "Tag [%s] Value [%s]" % (tag, value) | |||
self.headers[tag] = value | |||
continue | |||
else: | |||
if line.find('HTTP') == 0: | |||
info = line.split(' ', 2) | |||
self.status = int(info[1]) | |||
continue | |||
else: | |||
print self.status, self.headers | |||
break | |||
#------------------------------------------------------------------ | |||
def get_chunked_body(self): | |||
self.chunk_length = None | |||
page_body = '' | |||
while True: | |||
if self.chunk_length: | |||
if len(self.buffer) < self.chunk_length: | |||
break | |||
else: | |||
data = self.buffer[:self.chunk_length] | |||
page_body += data | |||
# print "PLH >>%s<<" % page_body | |||
self.buffer = self.buffer[self.chunk_length:] | |||
print "Chopped out %d bytes" % self.chunk_length | |||
self.chunk_length = None | |||
idx = self.buffer.find('\r\n') | |||
self.buffer = self.buffer[2:] | |||
# print "[%s] -> %d - %d" % (self.buffer, len(self.buffer), idx) | |||
continue | |||
else: # <CR><LF> delimited text | |||
idx = self.buffer.find('\r\n') | |||
print idx | |||
if idx >= 0: | |||
line = self.buffer[:idx] | |||
self.buffer = self.buffer[idx+2:] | |||
if not self.chunk_length: | |||
print "chunk -> [%s]" % line | |||
self.chunk_length = int(line, 16) | |||
print "chunk_length -> %d [%d]" % (self.chunk_length, len(self.buffer)) | |||
if len(self.buffer) < self.chunk_length: | |||
print ">>> Read some more..." | |||
self.recv() | |||
# print self.buffer | |||
if self.chunk_length == 0: # Should be done... | |||
print "# Should be done..." | |||
idx = self.buffer.find('\r\n') | |||
print "idx -> %d" % idx | |||
if idx == 0: | |||
finished = True | |||
self.page_body = page_body | |||
break | |||
elif len(s) < chunk_length: | |||
finished = True | |||
self.page_body = page_body | |||
break | |||
else: | |||
continue | |||
else: | |||
self.recv() | |||
return True | |||
#----------------------------------------------------------------------- | |||
def dump(x): | |||
while i < len(x): | |||
print "%s %04x" % (repr(x[i]), ord(x[i])) | |||
i += 1 | |||
#----------------------------------------------------------------------- | |||
# Accept-Encoding: gzip, deflate | |||
request = """\ | |||
GET /test/ HTTP/1.1 | |||
Accept: image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, application/x-shockwave-flash, */* | |||
Accept-Encoding: gzip, deflate | |||
Accept-Language: en-au | |||
User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727) | |||
Host: www.performiq.com.au | |||
Connection: Keep-Alive | |||
""" | |||
#----------------------------------------------------------------------- | |||
def get(): | |||
s = Connection(HOST, PORT) | |||
print "len(request) = %d" % len(request) | |||
n = s.send(request) | |||
print "send() -> %d" % n | |||
page = s.getresponse() | |||
print len(page) | |||
#----------------------------------------------------------------------- | |||
def main(): | |||
get() | |||
#----------------------------------------------------------------------- | |||
main() | |||
#----------------------------------------------------------------------- | |||
</pre> | |||
=An Experimental Reader= | =An Experimental Reader= | ||
| Line 12: | Line 322: | ||
#----------------------------------------------------------------------- | #----------------------------------------------------------------------- | ||
URL = 'www.performiq.com.au' | URL = 'www.performiq.com.au' | ||
PORT = 80 | PORT = 80 | ||
| Line 169: | Line 476: | ||
request = """\ | request = """\ | ||
GET / | GET /test HTTP/1.1 | ||
Accept: image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, application/x-shockwave-flash, */* | Accept: image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, application/x-shockwave-flash, */* | ||
Accept-Encoding: gzip, deflate | Accept-Encoding: gzip, deflate | ||
Accept-Language: en-au | Accept-Language: en-au | ||
User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727) | User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727) | ||
Host: | Host: www.performiq.com.au | ||
Connection: Keep-Alive | Connection: Keep-Alive | ||
| Line 274: | Line 581: | ||
#------------------------------------------------------------------------------- | #------------------------------------------------------------------------------- | ||
URL = 'http:// | URL = 'http://www.performiq.com.au/test' | ||
referer = 'http:// | referer = 'http://www.performiq.com.au/' | ||
uagent = 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)' | uagent = 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)' | ||
| Line 456: | Line 763: | ||
#------------------------------------------------------------------------------- | #------------------------------------------------------------------------------- | ||
</pre> | </pre> | ||
[[Category:Internet]] | [[Category:Internet]] | ||
[[Category:Python]] | [[Category:Python]] | ||
[[Category: | [[Category:Python httplib]] | ||
Latest revision as of 15:06, 1 August 2015
An Experimental HTTP Reader
cat go.py
#!/usr/bin/env python
#-----------------------------------------------------------------------
"""
So far does:
1) Chunked Transfer-Encoding
2) gzip Content-Encoding
"""
#-----------------------------------------------------------------------
import re
import zlib
import gzip
import socket
import StringIO
#-----------------------------------------------------------------------
HOST = 'www.performiq.com.au'
PORT = 80
status = None
HEADER = 0
BODY = 1
context = HEADER
data_encoding = None
tranfer_encoding = None
chunk_length = None
body = ''
p_Encoding = re.compile('Transfer-Encoding')
#-----------------------------------------------------------------------
# 'Accept' : 'text/plain, text/html',
# 'Accept-Encoding' : 'gzip, deflate',
"""
get_headers = {
'Accept' : '*/*',
'Accept-Encoding' : 'gzip, deflate',
'Accept-Language' : 'en-au',
'Host' : HOST,
'Connection' : 'Keep-Alive',
'User-Agent' : 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727'
}
post_headers = {
'Content-Type' : 'application/x-www-form-urlencoded',
'Accept' : 'image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, application/x-shockwave-flash, */*',
'Accept-Encoding' : 'gzip, deflate',
'Accept-Language' : 'en-au',
'Host' : HOST,
'Connection' : 'Keep-Alive',
'Cache-Control' : 'no-cache',
'User-Agent' : 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)',
}
"""
#-----------------------------------------------------------------------
class Response:
pass
#------------------------------------------------------------------
def __init__(self):
self.status = 0
#------------------------------------------------------------------
#-----------------------------------------------------------------------
class Connection:
sent = False
#------------------------------------------------------------------
def __init__(self, host, port=80):
self.host = host
self.port = port
# Create an INET, STREAMing socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
self.residue = ''
#------------------------------------------------------------------
def send(self, request):
self.sent = True
return self.sock.send(request)
#------------------------------------------------------------------
def getresponse(self):
self.pkt_cnt = 0
self.buffer = ''
self.residue = ''
self.headers = None
done = False
content_encoding = None
while True:
if not self.headers:
self.get_headers()
else: # Body...
if self.headers.has_key('Transfer-Encoding'):
transfer_encoding = self.headers['Transfer-Encoding']
if self.headers.has_key('Content-Encoding'):
content_encoding = self.headers['Content-Encoding']
print "content_encoding -> ", content_encoding
if transfer_encoding == 'chunked':
done = self.get_chunked_body()
if done: break
print len(self.buffer)
if content_encoding == 'gzip':
data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(self.page_body))
page_body = data.read()
else:
page_body = self.page_body
self.sent = False
return page_body
#------------------------------------------------------------------
def recv(self):
self.buffer += self.sock.recv(1024)
#------------------------------------------------------------------
def read(self, buf):
pass
#------------------------------------------------------------------
def close(self):
pass
#------------------------------------------------------------------
def get_headers(self):
print "Get Headers"
self.status = None
self.headers = {}
while True:
if len(self.buffer) == 0: self.recv()
idx = self.buffer.find('\r\n')
if idx >= 0:
line = self.buffer[:idx]
self.buffer = self.buffer[idx+2:]
if len(line) > 0:
if self.status:
idx = line.find(': ')
if idx > 0:
tag = line[:idx]
value = line[idx+2:]
print "Tag [%s] Value [%s]" % (tag, value)
self.headers[tag] = value
continue
else:
if line.find('HTTP') == 0:
info = line.split(' ', 2)
self.status = int(info[1])
continue
else:
print self.status, self.headers
break
#------------------------------------------------------------------
def get_chunked_body(self):
self.chunk_length = None
page_body = ''
while True:
if self.chunk_length:
if len(self.buffer) < self.chunk_length:
break
else:
data = self.buffer[:self.chunk_length]
page_body += data
# print "PLH >>%s<<" % page_body
self.buffer = self.buffer[self.chunk_length:]
print "Chopped out %d bytes" % self.chunk_length
self.chunk_length = None
idx = self.buffer.find('\r\n')
self.buffer = self.buffer[2:]
# print "[%s] -> %d - %d" % (self.buffer, len(self.buffer), idx)
continue
else: # <CR><LF> delimited text
idx = self.buffer.find('\r\n')
print idx
if idx >= 0:
line = self.buffer[:idx]
self.buffer = self.buffer[idx+2:]
if not self.chunk_length:
print "chunk -> [%s]" % line
self.chunk_length = int(line, 16)
print "chunk_length -> %d [%d]" % (self.chunk_length, len(self.buffer))
if len(self.buffer) < self.chunk_length:
print ">>> Read some more..."
self.recv()
# print self.buffer
if self.chunk_length == 0: # Should be done...
print "# Should be done..."
idx = self.buffer.find('\r\n')
print "idx -> %d" % idx
if idx == 0:
finished = True
self.page_body = page_body
break
elif len(s) < chunk_length:
finished = True
self.page_body = page_body
break
else:
continue
else:
self.recv()
return True
#-----------------------------------------------------------------------
def dump(x):
while i < len(x):
print "%s %04x" % (repr(x[i]), ord(x[i]))
i += 1
#-----------------------------------------------------------------------
# Accept-Encoding: gzip, deflate
request = """\
GET /test/ HTTP/1.1
Accept: image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, application/x-shockwave-flash, */*
Accept-Encoding: gzip, deflate
Accept-Language: en-au
User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)
Host: www.performiq.com.au
Connection: Keep-Alive
"""
#-----------------------------------------------------------------------
def get():
s = Connection(HOST, PORT)
print "len(request) = %d" % len(request)
n = s.send(request)
print "send() -> %d" % n
page = s.getresponse()
print len(page)
#-----------------------------------------------------------------------
def main():
get()
#-----------------------------------------------------------------------
main()
#-----------------------------------------------------------------------
An Experimental Reader
#!/usr/bin/env python
import re
import zlib
import gzip
import socket
import StringIO
#-----------------------------------------------------------------------
URL = 'www.performiq.com.au'
PORT = 80
p_Encoding = re.compile('Transfer-Encoding')
#-----------------------------------------------------------------------
# 'Accept' : 'text/plain, text/html',
# 'Accept-Encoding' : 'gzip, deflate',
"""
get_headers = {
'Accept-Encoding' : 'gzip, deflate',
'Accept' : '*/*',
'Accept-Language' : 'en-au',
'Host' : SITE,
'Connection' : 'Keep-Alive',
'User-Agent' : 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727'
}
post_headers = {
'Accept:' : 'image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, application/x-shockwave-flash, */*',
'Accept-Language:' : 'en-au',
'Content-Type:' : 'application/x-www-form-urlencoded',
'Accept-Encoding:' : 'gzip, deflate',
'User-Agent:' : 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)',
'Host' : SITE,
'Connection:' : 'Keep-Alive',
'Cache-Control:' : 'no-cache',
}
"""
#-----------------------------------------------------------------------
def dump(x):
while i < len(x):
print "%s %04x" % (repr(x[i]), ord(x[i]))
i += 1
#-----------------------------------------------------------------------
headers = {}
status = None
HEADER = 0
BODY = 1
context = HEADER
data_encoding = None
tranfer_encoding = None
chunk_length = None
body = ''
def parse(s):
global status
global body
global context
global transfer_encoding
global chunk_length
cnt = 0
finished = False
while True:
if chunk_length:
if len(s) < chunk_length:
break
else:
data = s[:chunk_length]
body += data
s = s[chunk_length:]
print "Chopped out %d bytes" % chunk_length
chunk_length = None
idx = s.find('\r\n')
s = s[2:]
# print "[%s] -> %d - %d" % (s, len(s), idx)
continue
else: # <CR><LF> delimited text
idx = s.find('\r\n')
if idx >= 0:
l = s[:idx]
s = s[idx+2:]
if context == HEADER:
if len(l) > 0:
if status:
idx = l.find(': ')
if idx > 0:
tag = l[:idx]
value = l[idx+2:]
print "Tag [%s] Value [%s]" % (tag, value)
headers[tag] = value
continue
else:
if l.find('HTTP') == 0:
info = l.split(' ', 2)
status = int(info[1])
continue
else:
print status, headers
context = BODY
if headers.has_key('Transfer-Encoding'):
if headers['Transfer-Encoding'] == 'chunked':
transfer_encoding = 'chunked'
else:
if transfer_encoding == 'chunked':
if not chunk_length:
print "chunk -> [%s]" % l
chunk_length = int(l, 16)
print "chunk_length -> %d [%d]" % (chunk_length, len(s))
# print s
if chunk_length == 0: # Should be done...
print "# Should be done..."
idx = s.find('\r\n')
print "idx -> %d" % idx
if idx == 0:
finished = True
s = body
break
if len(s) < chunk_length:
break
else:
continue
else:
print "Should not get here!"
break
else:
break
return (finished, s)
#-----------------------------------------------------------------------
def setup():
# Create an INET, STREAMing socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((URL, PORT))
return s
#-----------------------------------------------------------------------
# Accept-Encoding: gzip, deflate
request = """\
GET /test HTTP/1.1
Accept: image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, application/x-shockwave-flash, */*
Accept-Encoding: gzip, deflate
Accept-Language: en-au
User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)
Host: www.performiq.com.au
Connection: Keep-Alive
"""
def get():
s = setup()
print "len(request) = %d" % len(request)
n = s.send(request)
print"send() -> %d" % n
cnt = 0
residue = ''
while True:
recv = s.recv(1024)
buf = residue + recv
i = 0
(done, residue) = parse(buf)
print ">**> %s" % done
cnt += 1
if done: break
print len(residue)
# print zlib.decompress(residue)
# print residue
data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(residue))
page = data.read()
print page
#-----------------------------------------------------------------------
def main():
get()
#-----------------------------------------------------------------------
main()
#-----------------------------------------------------------------------
"""
# zlib.decompressobj().decompress('x\x9c' + binary_str)
So you can do the same from you Python code. Just add ('Accept-Encoding', 'gzip,deflate') in the request header. Check the following code chunk:
opener = urllib2.build_opener()
opener.addheaders = [('Referer', referer),
('User-Agent', uagent),
('Accept-Encoding', 'gzip,deflate')]
usock = opener.open(url)
url = usock.geturl()
data = decode(usock)
usock.close()
return data
Note the decode() function used in the code. Yes, you have to decode the content (if it's compressed).
def decode (page):
encoding = page.info().get("Content-Encoding")
if encoding in ('gzip', 'x-gzip', 'deflate'):
content = page.read()
if encoding == 'deflate':
data = StringIO.StringIO(zlib.decompress(content))
else:
data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(content))
page = data.read()
return page
"""
Handling gzip,deflate Transfer-Encoding
#!/usr/bin/env python
import sys
import gzip
import getopt
import urllib2
import StringIO
#-------------------------------------------------------------------------------
URL = 'http://www.performiq.com.au/test'
referer = 'http://www.performiq.com.au/'
uagent = 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)'
#-------------------------------------------------------------------------------
def get_page():
opener = urllib2.build_opener()
opener.addheaders = [
('Referer', referer),
('User-Agent', uagent),
('Accept-Encoding', 'gzip,deflate')
]
usock = opener.open(URL)
url = usock.geturl()
print "[[%s]]" % url
page = decode(usock)
usock.close()
return page
#-------------------------------------------------------------------------------
def decode(page):
print page.info()
encoding = page.info().get("Content-Encoding")
if encoding in ('gzip', 'x-gzip', 'deflate'):
f_gzip = open('gzip.dat', 'w')
content = page.read()
f_gzip.write(content)
f_gzip.close()
if encoding == 'deflate':
data = StringIO.StringIO(zlib.decompress(content))
else:
data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(content))
page = data.read()
return page
#-------------------------------------------------------------------------------
def usage():
USAGE = """
Usage:
$ get_page.py
"""
sys.stderr.write(USAGE)
#-------------------------------------------------------------------------------
def main(argv):
global debug_flg, verbose_flg, sundry_flg, id, no_orders, no_sundries
loop_cnt = 1
examine_flg = False
#----- Process command line arguments ----------------------------
try:
opts, args = getopt.getopt(argv, "dD:hv",
["debug", "debug_cnt=", "help","verbose"])
except getopt.GetoptError:
usage()
sys.exit(2)
else:
for opt, arg in opts:
if opt in ("-h", "--help"):
usage()
sys.exit(0)
elif opt in ("-d", "--debug"):
debug_lvl += 1
elif opt in ("-D", "--debug_cnt"):
debug_lvl = int(arg)
elif opt in ("-v", "--verbose"):
verbose_flg = True
page = get_page()
print page
#-------------------------------------------------------------------------------
if __name__ == "__main__":
main(sys.argv[1:])
#-------------------------------------------------------------------------------
Using GZIP Module
#!/usr/bin/env python
import sys
import gzip
import getopt
import StringIO
#-------------------------------------------------------------------------------
def decode():
f_gzip = open('gzip.dat', 'r')
gz = f_gzip.read()
# data = StringIO.StringIO(zlib.decompress(content))
data = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(gz))
page = data.read()
print page
f_gzip.close()
#-------------------------------------------------------------------------------
def usage():
USAGE = """
Usage:
$ unzip.py
"""
sys.stderr.write(USAGE)
#-------------------------------------------------------------------------------
def main(argv):
global debug_flg, verbose_flg, sundry_flg, id, no_orders, no_sundries
loop_cnt = 1
examine_flg = False
#----- Process command line arguments ----------------------------
try:
opts, args = getopt.getopt(argv, "dD:hv",
["debug", "debug_cnt=", "help","verbose"])
except getopt.GetoptError:
usage()
sys.exit(2)
else:
for opt, arg in opts:
if opt in ("-h", "--help"):
usage()
sys.exit(0)
elif opt in ("-d", "--debug"):
debug_lvl += 1
elif opt in ("-D", "--debug_cnt"):
debug_lvl = int(arg)
elif opt in ("-v", "--verbose"):
verbose_flg = True
decode()
#-------------------------------------------------------------------------------
if __name__ == "__main__":
main(sys.argv[1:])
#-------------------------------------------------------------------------------