Socket Programming in Python
Jump to navigation
Jump to search
References
- http://docs.python.org/lib/socket-objects.html
- http://docs.python.org/lib/socket-example.html
- http://www.amk.ca/python/howto/sockets/
- http://www.evolt.org/article/Socket_Programming_in_Python/17/60276/
- http://www.example-code.com/python/ssl_client.asp
- http://python.about.com/od/networkingwithpython/ss/beg_web_client_5.htm
- http://www.java2s.com/Code/Python/Network/Socket-Client.htm
- http://pleac.sourceforge.net/pleac_python/sockets.html
- http://www.xmlblaster.org/mhonarc-xmlBlaster-devel/msg01129.html
- http://www.codefetch.com/search?qy=socket&lang=python
- http://www.nightmare.com/pythonwin/async_sockets.html
Also:
SSL Info
Examples
Client
#!/usr/bin/env python
import socket
# Create an INET, STREAMing socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Now connect to the web server on port 80 - the normal http port
s.connect(("www.performiq.com.au", 80))
Simple HTTP Server
#!/usr/bin/env python import socket # Create an INET, STREAMing socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Bind the socket to a public host ip, and a well-known port serversocket.bind((socket.gethostname(), 80)) # Become a server socket serversocket.listen(5) while True: # Accept connections from outside (clientsocket, address) = serversocket.accept() # Now do something with the clientsocket - # say, pretend this is a threaded server ct = client_thread(clientsocket) ct.run()
Socket Comm Class
Fixed Length
class socket_comm:
'''demonstration class only
- coded for clarity, not efficiency
'''
MSGLEN = 64
def __init__(self, sock=None):
if sock is None:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.sock = sock
def connect(host, port):
self.sock.connect((host, port))
def send(msg):
totalsent = 0
while totalsent < self.MSGLEN:
sent = self.sock.send(msg[totalsent:])
if sent == 0:
raise RuntimeError, "Socket connection failed..."
totalsent += sent
def recv():
msg = ''
while len(msg) < self.MSGLEN:
chunk = self.sock.recv(self.MSGLEN-len(msg))
if chunk == '':
raise RuntimeError, "Socket connection failed..."
msg += chunk
return msg
Variable Length
Matched Pair
Reader
# Echo server program
import socket
HOST = '' # Symbolic name meaning the local host
PORT = 11001 # Arbitrary non-privileged port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen(1)
conn, addr = s.accept()
print 'Connected by', addr
while 1:
data = conn.recv(1024)
if not data: break
conn.send(data)
conn.close()
Client
# Echo client program
import socket
HOST = 'xxx.com.au' # The remote host
PORT = 11001 # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
s.send('Hello, world')
data = s.recv(1024)
s.close()
print 'Received', repr(data)
Usin SSL
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('www.verisign.com', 443))
ssl_sock = socket.ssl(s)
print repr(ssl_sock.server())
print repr(ssl_sock.issuer())
# Set a simple HTTP request -- use httplib in actual code.
ssl_sock.write("""GET / HTTP/1.0\r
Host: www.verisign.com\r\n\r\n""")
# Read a chunk of data. Will not necessarily
# read all the data returned by the server.
data = ssl_sock.read()
# Note that you need to close the underlying socket, not the SSL object.
del ssl_sock
s.close()
Some Utility Functions
Data Format
import socket
# Convert human readable form to 32 bit value
packed_ip = socket.inet_aton("192.160.0.1")
packed_ip = socket.inet_aton("www.oreilly.com")
# Convert 32 bit value to ip adress
ip_adress = socket.inet_ntoa(packed_ip)
# Create socket object
socketobj = socket(family, type) # Example socket.AF_INT, socket.SOCK_STREAM
# Get socketname
socketobj.getsockname() # Example, get port adress of client
Socket Server
Examples
# Create a Server, calling handler for every client
# You can test it with "telnet localhost 9999"
from SocketServer import TCPServer
from SocketServer import BaseRequestHandler
class LocalHandler(BaseRequestHandler):
def handle(self):
print "I got an request"
server = TCPServer(("127.0.0.1", 9999), LocalHandler)
server.serve_forever()
This is an extension of above example...
import time
from SocketServer import TCPServer
from SocketServer import BaseRequestHandler
class MyHandler(BaseRequestHandler):
def handle(self):
# self.request is the socket object
print "%s I got an request from ip=%s port=%s" % (
time.strftime("%Y-%m-%d %H:%M:%S"),
self.client_address[0],
self.client_address[1]
)
self.request.send("What is your name?\n")
bufsize=1024
response=self.request.recv(bufsize).strip() # or recv(bufsize, flags)
data_to_send="Welcome %s!\n" % response
self.request.send(data_to_send) # or send(data, flags)
print "%s connection finnished" % self.client_address[0]
server = TCPServer(("127.0.0.1", 9999), MyHandler)
server.serve_forever()
# -----------------
# Using select
import select
import socket
in_list = []
in_list.append(mysocket)
in_list.append(myfile)
# ...
out_list = []
out_list.append(...)
except_list = []
except_list.append(...)
(in_, out_, exc_) = select.select(in_list, out_list, except_list, timeout)
for fd in in_:
print "Can read", fd
for fd in out_:
print "Can write", fd
for fd in exc_:
print "Exception on", fd
# Missing: setting TCP_NODELAY