Python进阶9
Python socket编程
引言
sockets
的历史悠久,它们最早在 1971 年的 APPANET 中使用,后来成为1983年发布的Berkeley Software Distribution(BSD)操作系统中的API,称为Berkeley sockets
。
Web服务器和浏览器并不是使用sockets
的唯一程序,各种规模和类型的客户端 - 服务器(client - server)应用程序也得到了广泛使用。
今天,尽管socket API
使用的底层协议已经发展多年,而且已经有新协议出现,但是底层 API 仍然保持不变。
最常见的套接字应用程序类型是客户端 - 服务器(client - server)应用程序,其中一方充当服务器并等待来自客户端的连接。
Socket API介绍
Python
中的socket模块提供了一个到Berkeley sockets API
的接口,其中的主要接口函数如下:
- socket()
- bind()
- listen()
- accept()
- connect()
- connect_ex()
- send()
- recv()
- close()
这些方便使用的接口函数和系统底层的功能调用相一致。
TCP Sockets
我们准备构建一个基于 TCP 协议的socket
对象,为什么使用 TCP 呢,因为:
- 可靠性:如果在传输过程中因为网络原因导致数据包丢失,会有相关机制检测到并且进行重新传输
- 按序到达:一方发送到另一方的数据包是按发送顺序被接收的。
对比之下,UDP 协议是不提供这些保证的,但是它的响应效率更高,资源消耗更少。
TCP 协议并不需要我们自己去实现,在底层都已经实现好了,我们只需要使用Python
的socket
模块,进行协议指定就可以了。socket.SOCK_STREAM
表示使用 TCP 协议,socket.SOCK_DGRAM
表示使用 UDP 协议
我们来看看基于 TCP 协议socket
的 API 调用和数据传送流程图,右边的一列是服务器端(server),左边的一列是客户端(client)。
要实现左边的处于监听状态的server
,我们需要按照顺序调用这样几个函数:
- socket(): 创建一个
socket
对象 - bind(): 关联对应 ip 地址和端口号
- listen(): 允许对象接收其他
socket
的连接 - accept(): 接收其他
socket
的连接,返回一个元组(conn, addr),conn 是一个新的socket
对象,代表这个连接,addr 是连接端的地址信息。
client
调用connect()
时,会通过 TCP 的三次握手,建立连接。当client
连接到server
时,server
会调用accept()
完成这次连接。
双方通过send()
和recv()
来接收和发送数据,最后通过close()
来关闭这次连接,释放资源。一般server
端是不关闭的,会继续等待其他的连接。
Echo Client and Server
刚才我们弄清楚了server
和client
使用socket
进行通信的过程,我们现在要自己进行一个简单的也是经典的实现:server
复述从client
接收的信息。
Echo Server
import socket
HOST = '127.0.0.1' # Standard loopback interface address (localhost)
PORT = 65431 # Port to listen on (non-privileged ports are > 1023)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen()
conn, addr = s.accept()
with conn:
print('Connected by', addr)
while True:
data = conn.recv(1024)
if not data:
break
conn.sendall(data)
socket.socket()
创建了一个socket
对象,它实现了上下文管理器协议,我们直接用 with 语句进行创建即可,而且最后不需要调用close()
函数。
socket()
中的两个参数指明了连接需要的 ip地址类型和传输协议类型,socket.AF_INET 表示使用 IPv4的地址进行连接,socket.SOCK_STREAM 表示使用 TCP 协议进行数据的传输。
bind()
用来将socket
对象和特定的网络对象和端口号进行关联,函数中的两个参数是由创建socket
对象时指定的 ip地址类型 决定的,这里使用的是socket.AF_INET
(IPv4),因此,bind()
函数接收一个元组对象作为参数(HOST, PORT)
-
host
可以是一个主机名,IP地址,或者空字符串。如果使用的是 IP地址,host
必须是 IPv4格式的地址字符串。127.0.0.1
是本地环路的标准写法,因此只有在主机上的进程才能够连接到server
,如果设置为空字符串,它可以接受所有合法 IPv4地址的连接。 -
port
应该是从1 - 65535
的一个整数(0被保留了),它相当于是一个窗口和其他的客户端建立连接,如果想使用1 - 1024
的端口,一些系统可能会要求要有管理员权限。
listen()
使得server
可以接受连接,它可以接受一个参数:backlog
,用来指明系统可以接受的连接数量,虽然同一时刻只能与一端建立连接,但是其他的连接请求可以被放入等待队列中,当前面的连接断开,后面的请求会依次被处理,超过这个数量的连接请求再次发起后,会被server
直接拒绝。
从Python 3.5
开始,这个参数是可选的,如果我们不明确指明,它就采用系统默认值。如果server
端在同一时刻会收到大量的连接请求,通常要把这个值调大一些,在Linux
中,可以在/proc/sys/net/core/somaxconn
看到值的情况,详细请参阅:
accept()
监听连接的建立,是一个阻塞式调用,当有client
连接之后,它会返回一个代表这个连接的新的socket
对象和代表client
地址信息的元组。对于 IPv4 的地址连接,地址信息是 (host, port),对于 IPv6 ,(host, port, flowinfo, scopeid)
有一件事情需要特别注意,accept()
之后,我们获得了一个新的socket
对象,它和server
以及client
都不同,我们用它来进行和client
的通信。
conn, addr = s.accept()
with conn:
print('Connected by', addr)
while True:
data = conn.recv(1024)
if not data:
break
conn.sendall(data)
conn
是我们新获得的socket
对象,conn.recv()
也是一个阻塞式调用,它会等待底层的 I/O 响应,直到获得数据才继续向下执行。外面的while
循环保证server
端一直监听,通过conn.sendall
将数据再发送回去。
Echo Client
import socket
HOST = '127.0.0.1' # The server's hostname or IP address
PORT = 65431 # The port used by the server
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
s.sendall("Hello, world".encode("utf8"))
data = s.recv(1024)
print('Received', data.decode("utf8"))
和server
相比,client
更加简单,先是创建了一个socket
对象,然后将它和server
连接,通过s.sendall()
将信息发送给server
,通过s.recv()
获得来自server
的数据,然后将其打印输出。
在发送数据时,只支持发送字节型数据,所以我们要将需要发送的数据进行编码,在收到server
端的回应后,将得到的数据进行解码,就能还原出我们能够识别的字符串了。
启动程序
我们要先启动server
端,做好监听准备,然后再启动client
端,进行连接。
这个信息是在client
连接后打印出来的。
可以使用netstat
这个命令查看socket
的状态,更详细使用可以查阅帮助文档。
查看系统中处于监听状态的socket
,过滤出了使用 TCP协议 和 IPv4 地址的对象:
如果先启动了client
,会有下面这个经典的错误:
造成的原因可能是端口号写错了,或者server
根本就没运行,也可能是在server
端存在防火墙阻值了连接建立,下面是一些常见的错误异常:
Exception | errno Constant | Description |
---|---|---|
BlockingIOError | EWOULDBLOCK | Resource temporarily unavailable. For example, in non-blocking mode, when calling send() and the peer is busy and not reading, the send queue (network buffer) is full. Or there are issues with the network. Hopefully this is a temporary condition. |
OSError | ADDRINUSE | Address already in use. Make sure there’s not another process running that’s using the same port number and your server is setting the socket option SO_REUSEADDR: socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1). |
ConnectionResetError | ECONNRESET | Connection reset by peer. The remote process crashed or did not close its socket properly (unclean shutdown). Or there’s a firewall or other device in the network path that’s missing rules or misbehaving. |
TimeoutError | ETIMEDOUT | Operation timed out. No response from peer. |
ConnectionRefusedError | ECONNREFUSED | Connection refused. No application listening on specified port. |
连接的建立
现在我们仔细看一下server
和client
是怎样建通信的:
当使用环路网络((IPv4 address 127.0.0.1 or IPv6 address ::1))的时候,数据没有离开过主机跑到外部的网络。如图所示,环路网络是在主机内部建立的,数据就经过它来发送,从主机上运行的一个程序发送到另一个程序,从主机发到主机。这就是为什么我们喜欢说环路网络和 IP地址 127.0.0.1(IPv4) 或 ::1(IPv6) 都表示主机
如果server
使用的时其他的合法IP地址,它就会通过以太网接口与外部网络建立联系:
如何处理多端连接
echo server
最大的缺点就是它同一时间只能服务一个client
,直到连接的断开,echo client
同样也有不足,当client
进行如下操作时,有可能s.recv()
只返回了一个字节的数据,数据并不完整。
data = s.recv(1024)
这里所设定的参数 1024 表示单次接收的最大数据量,并不是说会返回 1024 字节的数据。在server
中使用的send()
与之类似,调用后它有一个返回值,标示已经发送出去的数据量,可能是小于我们实际要发送的数据量,比如说有 6666 字节的数据要发送,用上面的发送方式要发送很多此才行,也就是说一次调用send()
数据并没有被完整发送,我们需要自己做这个检查来确保数据完整发送了。
因此,这里使用了sendall()
,它会不断地帮我们发送数据直到数据全部发送或者出现错误。
所以,目前有两个问题:
- 怎样同时处理多个连接?
- 怎样调用
send()
和recv()
直到数据全部发送或接收。
要实现并发,传统方法是使用多线程,最近比较流行的方法是使用在Python3.4
中引入的异步IO模块asyncio
。
这里准备用更加传统,但是更容易理解的方式来实现,基于系统底层的一个调用:select()
,Python
中也提供了对应的模块:selectors ,它在原生的实现上进行了封装,通过使用DefaultSelector
,能更加简单的完成任务。
select()
通过了一种机制,它来监听操作发生情况,一旦某个操作准备就绪(一般是读就绪或者是写就绪),然后将需要进行这些操作的应用程序select出来,进行相应的读和写操作。到这里,你可能会发现这并没有实现并发,但是它的响应速度非常快,通过异步操作,足够模拟并发的效果了。
Muti-Connection Client and Server
Multi-Connection Server
import selectors
sel = selectors.DefaultSelector()
# ...
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()
print('listening on', (host, port))
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)
和echo server
最大的不同就在于,通过lsock.setblocking(False)
,将这个socket
对象设置成了非阻塞形式,与sel.select()
一起使用,就可以在一个或多个socket
对象上等待事件,然后在数据准备就绪时进行数据的读写操作。
sel.register()
给server
注册了我们需要的事件,对server
来说,我们需要 I/O 可读,从而进行client
发送数据的读入,因此,通过selector.EVENT_READ
来指明。
data
用来存储和socket
有关的任何数据,当sel.select()
返回结果时,它也被返回,我们用它作为一个标志,来追踪拥有读入和写入操作的socket
对象。
接下来是事件循环:
import selectors
sel = selectors.DefaultSelector()
# ...
while True:
events = sel.select(timeout=None)
for key, mask in events:
if key.data is None:
accept_wrapper(key.fileobj)
else:
service_connection(key, mask)
sel.select(timeout=None)
是一个阻塞式调用,直到有socket
对象准备好了 I/O 操作,或者等待时间超过设定的timeout
。它将返回(key, events)
这类元组构成的一个列表,每一个对应一个就绪的socket
对象。
key
是一个SeletorKey
类型的实例,它有一个fileobj
的属性,这个属性就是sokect
对象。
mask
是就绪操作的状态掩码。
如果key.data is None
,我们就知道,这是一个server
对象,于是要调用accept()
方法,用来等待client
的连接。不过我们要调用我们自己的accept_wrapper()
函数,里面还会包含其他的逻辑。
如果key.data is not None
,我们就知道,这是一个client
对象,它带着数据来建立连接啦!然后我们要为它提供服务,于是就调用service_connection(key, mask)
,完成所有的服务逻辑。
def accept_wrapper(sock):
conn, addr = sock.accept() # Should be ready to read
print('accepted connection from', addr)
conn.setblocking(False)
data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')
events = selectors.EVENT_READ | selectors.EVENT_WRITE
sel.register(conn, events, data=data)
这个函数用来处理与client
的连接,使用conn.setblocking(False)
将该对象设置为非阻塞状态,这正是我们在这个版本的程序中所需要的,否则,整个server
会停止,直到它返回,这意味着其他socket
对象进入等待状态。
然后,使用types.SimplleNamespace()
构建了一个data
对象,存储我们想保存的数据和socket
对象。
因为数据的读写都是通过conn
,所以使用selectors.EVENT_READ | selectors.EVENT_WRITE
,然后用sel.register(conn, events, data=data)
进行注册。
def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024) # Should be ready to read
if recv_data:
data.outb += recv_data
else:
print('closing connection to', data.addr)
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if data.outb:
print('echoing', repr(data.outb), 'to', data.addr)
sent = sock.send(data.outb) # Should be ready to write
data.outb = data.outb[sent:]
这就时服务逻辑的核心,key
中包含了socket
对象和data
对象,mask
是已经就绪操作的掩码。根据sock
可以读,将数据保存在data.outb
中,这也将成为写出的数据。
if recv_data:
data.outb += recv_data
else:
print('closing connection to', data.addr)
sel.unregister(sock)
sock.close()
如果没有接收到数据,说明client
数据发完了,sock
的状态不再被追踪,然后关闭这次连接。
Multi-Connection Client
messages = [b'Message 1 from client.', b'Message 2 from client.']
def start_connections(host, port, num_conns):
server_addr = (host, port)
for i in range(0, num_conns):
connid = i + 1
print('starting connection', connid, 'to', server_addr)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
sock.connect_ex(server_addr)
events = selectors.EVENT_READ | selectors.EVENT_WRITE
data = types.SimpleNamespace(connid=connid,
msg_total=sum(len(m) for m in messages),
recv_total=0,
messages=list(messages),
outb=b'')
sel.register(sock, events, data=data)
使用connect_ex()
而不是connect()
,因为connect()
会立即引发BlockingIOError异常。connect_ex()
只返回错误码 errno.EINPROGRESS,而不是在连接正在进行时引发异常。连接完成后,socket
对象就可以进行读写,并通过select()
返回。
连接建立完成后,我们使用了types.SimpleNamespace
构建出和socket
对象一同保存的数据,里面的messages
对我们要发送的数据做了一个拷贝,因为在后续的发送过程中,它会被修改。client
需要发送什么,已经发送了什么以及已经接收了什么都要进行追踪,总共要发送的数据字节数也保存在了data
对象中。
def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024) # Should be ready to read
if recv_data:
print('received', repr(recv_data), 'from connection', data.connid)
data.recv_total += len(recv_data)
if not recv_data or data.recv_total == data.msg_total:
print('closing connection', data.connid)
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if not data.outb and data.messages:
data.outb = data.messages.pop(0)
if data.outb:
print('sending', repr(data.outb), 'to connection', data.connid)
sent = sock.send(data.outb) # Should be ready to write
data.outb = data.outb[sent:]
client
要追踪来自server
的数据字节数,如果收到的数据字节数和发送的相等,或者有一次没有收到数据,说明数据接收完成,本次服务目的已经达成,就可以关闭这次连接了。
data.outb
用来维护发送的数据,前面提到过,一次发送不一定能将数据全部送出,使用data.outb = data.outb[sent:]
来更新数据的发送。发送完毕后,再messages
中取出数据准备再次发送。
可以在这里看到最后的完整代码:
最后的运行效果如下:
image image还是要先启动server
,进入监听状态,然后client
启动,与server
建立两条连接,要发送的信息有两条,这里分开发送,先将fist message
分别发送到server
,然后再发送second message
。server
端收到信息后进行暂时保存,当两条信息都收到了才开始进行echo,client
端收到完整信息后表示服务结束,断开连接。