Python进阶9

2019-02-03  本文已影响0人  MetaT1an

Python socket编程

引言

sockets的历史悠久,它们最早在 1971 年的 APPANET 中使用,后来成为1983年发布的Berkeley Software Distribution(BSD)操作系统中的API,称为Berkeley sockets

Web服务器和浏览器并不是使用sockets的唯一程序,各种规模和类型的客户端 - 服务器(client - server)应用程序也得到了广泛使用。

今天,尽管socket API使用的底层协议已经发展多年,而且已经有新协议出现,但是底层 API 仍然保持不变。

最常见的套接字应用程序类型是客户端 - 服务器(client - server)应用程序,其中一方充当服务器并等待来自客户端的连接。

Socket API介绍

Python中的socket模块提供了一个到Berkeley sockets API的接口,其中的主要接口函数如下:

这些方便使用的接口函数和系统底层的功能调用相一致。

TCP Sockets

我们准备构建一个基于 TCP 协议的socket对象,为什么使用 TCP 呢,因为:

对比之下,UDP 协议是不提供这些保证的,但是它的响应效率更高,资源消耗更少。

TCP 协议并不需要我们自己去实现,在底层都已经实现好了,我们只需要使用Pythonsocket模块,进行协议指定就可以了。socket.SOCK_STREAM表示使用 TCP 协议,socket.SOCK_DGRAM表示使用 UDP 协议

我们来看看基于 TCP 协议socket的 API 调用和数据传送流程图,右边的一列是服务器端(server),左边的一列是客户端(client)。

image

要实现左边的处于监听状态的server,我们需要按照顺序调用这样几个函数:

client调用connect()时,会通过 TCP 的三次握手,建立连接。当client连接到server时,server会调用accept()完成这次连接。

双方通过send()recv()来接收和发送数据,最后通过close()来关闭这次连接,释放资源。一般server端是不关闭的,会继续等待其他的连接。

Echo Client and Server

刚才我们弄清楚了serverclient使用socket进行通信的过程,我们现在要自己进行一个简单的也是经典的实现:server复述从client接收的信息。

Echo Server

import socket

HOST = '127.0.0.1'  # Standard loopback interface address (localhost)
PORT = 65431       # Port to listen on (non-privileged ports are > 1023)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind((HOST, PORT))
    s.listen()
    conn, addr = s.accept()
    with conn:
        print('Connected by', addr)
        while True:
            data = conn.recv(1024)
            if not data:
                break
            conn.sendall(data)

socket.socket()创建了一个socket对象,它实现了上下文管理器协议,我们直接用 with 语句进行创建即可,而且最后不需要调用close()函数。

socket()中的两个参数指明了连接需要的 ip地址类型传输协议类型,socket.AF_INET 表示使用 IPv4的地址进行连接,socket.SOCK_STREAM 表示使用 TCP 协议进行数据的传输。

bind()用来将socket对象和特定的网络对象和端口号进行关联,函数中的两个参数是由创建socket对象时指定的 ip地址类型 决定的,这里使用的是socket.AF_INET(IPv4),因此,bind()函数接收一个元组对象作为参数(HOST, PORT)

listen()使得server可以接受连接,它可以接受一个参数:backlog,用来指明系统可以接受的连接数量,虽然同一时刻只能与一端建立连接,但是其他的连接请求可以被放入等待队列中,当前面的连接断开,后面的请求会依次被处理,超过这个数量的连接请求再次发起后,会被server直接拒绝。

Python 3.5开始,这个参数是可选的,如果我们不明确指明,它就采用系统默认值。如果server端在同一时刻会收到大量的连接请求,通常要把这个值调大一些,在Linux中,可以在/proc/sys/net/core/somaxconn看到值的情况,详细请参阅:

accept()监听连接的建立,是一个阻塞式调用,当有client连接之后,它会返回一个代表这个连接的新的socket对象和代表client地址信息的元组。对于 IPv4 的地址连接,地址信息是 (host, port),对于 IPv6 ,(host, port, flowinfo, scopeid)

有一件事情需要特别注意,accept()之后,我们获得了一个新的socket对象,它和server以及client都不同,我们用它来进行和client的通信。

conn, addr = s.accept()
with conn:
    print('Connected by', addr)
    while True:
        data = conn.recv(1024)
        if not data:
            break
        conn.sendall(data)

conn是我们新获得的socket对象,conn.recv()也是一个阻塞式调用,它会等待底层的 I/O 响应,直到获得数据才继续向下执行。外面的while循环保证server端一直监听,通过conn.sendall将数据再发送回去。

Echo Client

import socket

HOST = '127.0.0.1'  # The server's hostname or IP address
PORT = 65431        # The port used by the server

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect((HOST, PORT))
    s.sendall("Hello, world".encode("utf8"))
    data = s.recv(1024)

print('Received', data.decode("utf8"))

server相比,client更加简单,先是创建了一个socket对象,然后将它和server连接,通过s.sendall()将信息发送给server,通过s.recv()获得来自server的数据,然后将其打印输出。

在发送数据时,只支持发送字节型数据,所以我们要将需要发送的数据进行编码,在收到server端的回应后,将得到的数据进行解码,就能还原出我们能够识别的字符串了。

启动程序

我们要先启动server端,做好监听准备,然后再启动client端,进行连接。

这个信息是在client连接后打印出来的。

image image

可以使用netstat这个命令查看socket的状态,更详细使用可以查阅帮助文档。

查看系统中处于监听状态的socket,过滤出了使用 TCP协议 和 IPv4 地址的对象:

image

如果先启动了client,会有下面这个经典的错误:

image

造成的原因可能是端口号写错了,或者server根本就没运行,也可能是在server端存在防火墙阻值了连接建立,下面是一些常见的错误异常:

Exception errno Constant Description
BlockingIOError EWOULDBLOCK Resource temporarily unavailable. For example, in non-blocking mode, when calling send() and the peer is busy and not reading, the send queue (network buffer) is full. Or there are issues with the network. Hopefully this is a temporary condition.
OSError ADDRINUSE Address already in use. Make sure there’s not another process running that’s using the same port number and your server is setting the socket option SO_REUSEADDR: socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1).
ConnectionResetError ECONNRESET Connection reset by peer. The remote process crashed or did not close its socket properly (unclean shutdown). Or there’s a firewall or other device in the network path that’s missing rules or misbehaving.
TimeoutError ETIMEDOUT Operation timed out. No response from peer.
ConnectionRefusedError ECONNREFUSED Connection refused. No application listening on specified port.

连接的建立

现在我们仔细看一下serverclient是怎样建通信的:

image

当使用环路网络((IPv4 address 127.0.0.1 or IPv6 address ::1))的时候,数据没有离开过主机跑到外部的网络。如图所示,环路网络是在主机内部建立的,数据就经过它来发送,从主机上运行的一个程序发送到另一个程序,从主机发到主机。这就是为什么我们喜欢说环路网络和 IP地址 127.0.0.1(IPv4) 或 ::1(IPv6) 都表示主机

如果server使用的时其他的合法IP地址,它就会通过以太网接口与外部网络建立联系:

image

如何处理多端连接

echo server最大的缺点就是它同一时间只能服务一个client,直到连接的断开,echo client同样也有不足,当client进行如下操作时,有可能s.recv()只返回了一个字节的数据,数据并不完整。

data = s.recv(1024)

这里所设定的参数 1024 表示单次接收的最大数据量,并不是说会返回 1024 字节的数据。在server中使用的send()与之类似,调用后它有一个返回值,标示已经发送出去的数据量,可能是小于我们实际要发送的数据量,比如说有 6666 字节的数据要发送,用上面的发送方式要发送很多此才行,也就是说一次调用send()数据并没有被完整发送,我们需要自己做这个检查来确保数据完整发送了。

因此,这里使用了sendall(),它会不断地帮我们发送数据直到数据全部发送或者出现错误。

所以,目前有两个问题:

要实现并发,传统方法是使用多线程,最近比较流行的方法是使用在Python3.4中引入的异步IO模块asyncio

这里准备用更加传统,但是更容易理解的方式来实现,基于系统底层的一个调用:select()Python中也提供了对应的模块:selectors ,它在原生的实现上进行了封装,通过使用DefaultSelector,能更加简单的完成任务。

select()通过了一种机制,它来监听操作发生情况,一旦某个操作准备就绪(一般是读就绪或者是写就绪),然后将需要进行这些操作的应用程序select出来,进行相应的读和写操作。到这里,你可能会发现这并没有实现并发,但是它的响应速度非常快,通过异步操作,足够模拟并发的效果了。

Muti-Connection Client and Server

Multi-Connection Server

import selectors

sel = selectors.DefaultSelector()
# ...
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()
print('listening on', (host, port))
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)

echo server最大的不同就在于,通过lsock.setblocking(False),将这个socket对象设置成了非阻塞形式,与sel.select()一起使用,就可以在一个或多个socket对象上等待事件,然后在数据准备就绪时进行数据的读写操作。

sel.register()server注册了我们需要的事件,对server来说,我们需要 I/O 可读,从而进行client发送数据的读入,因此,通过selector.EVENT_READ来指明。

data用来存储和socket有关的任何数据,当sel.select()返回结果时,它也被返回,我们用它作为一个标志,来追踪拥有读入和写入操作的socket对象。

接下来是事件循环:

import selectors
sel = selectors.DefaultSelector()

# ...

while True:
    events = sel.select(timeout=None)
    for key, mask in events:
        if key.data is None:
            accept_wrapper(key.fileobj)
        else:
            service_connection(key, mask)

sel.select(timeout=None)是一个阻塞式调用,直到有socket对象准备好了 I/O 操作,或者等待时间超过设定的timeout。它将返回(key, events)这类元组构成的一个列表,每一个对应一个就绪的socket对象。

key是一个SeletorKey类型的实例,它有一个fileobj的属性,这个属性就是sokect对象。

mask是就绪操作的状态掩码。

如果key.data is None,我们就知道,这是一个server对象,于是要调用accept()方法,用来等待client的连接。不过我们要调用我们自己的accept_wrapper()函数,里面还会包含其他的逻辑。

如果key.data is not None,我们就知道,这是一个client对象,它带着数据来建立连接啦!然后我们要为它提供服务,于是就调用service_connection(key, mask),完成所有的服务逻辑。

def accept_wrapper(sock):
    conn, addr = sock.accept()  # Should be ready to read
    print('accepted connection from', addr)
    conn.setblocking(False)
    data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')
    events = selectors.EVENT_READ | selectors.EVENT_WRITE
    sel.register(conn, events, data=data)

这个函数用来处理与client的连接,使用conn.setblocking(False)将该对象设置为非阻塞状态,这正是我们在这个版本的程序中所需要的,否则,整个server会停止,直到它返回,这意味着其他socket对象进入等待状态。

然后,使用types.SimplleNamespace()构建了一个data对象,存储我们想保存的数据和socket对象。

因为数据的读写都是通过conn,所以使用selectors.EVENT_READ | selectors.EVENT_WRITE,然后用sel.register(conn, events, data=data)进行注册。

def service_connection(key, mask):
    sock = key.fileobj
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)  # Should be ready to read
        if recv_data:
            data.outb += recv_data
        else:
            print('closing connection to', data.addr)
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if data.outb:
            print('echoing', repr(data.outb), 'to', data.addr)
            sent = sock.send(data.outb)  # Should be ready to write
            data.outb = data.outb[sent:]

这就时服务逻辑的核心,key中包含了socket对象和data对象,mask是已经就绪操作的掩码。根据sock可以读,将数据保存在data.outb中,这也将成为写出的数据。

if recv_data:
    data.outb += recv_data
else:
    print('closing connection to', data.addr)
    sel.unregister(sock)
    sock.close()

如果没有接收到数据,说明client数据发完了,sock的状态不再被追踪,然后关闭这次连接。

Multi-Connection Client

messages = [b'Message 1 from client.', b'Message 2 from client.']


def start_connections(host, port, num_conns):
    server_addr = (host, port)
    for i in range(0, num_conns):
        connid = i + 1
        print('starting connection', connid, 'to', server_addr)
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        sock.connect_ex(server_addr)
        events = selectors.EVENT_READ | selectors.EVENT_WRITE
        data = types.SimpleNamespace(connid=connid,
                                     msg_total=sum(len(m) for m in messages),
                                     recv_total=0,
                                     messages=list(messages),
                                     outb=b'')
        sel.register(sock, events, data=data)

使用connect_ex()而不是connect(),因为connect()会立即引发BlockingIOError异常。connect_ex()只返回错误码 errno.EINPROGRESS,而不是在连接正在进行时引发异常。连接完成后,socket对象就可以进行读写,并通过select()返回。

连接建立完成后,我们使用了types.SimpleNamespace构建出和socket对象一同保存的数据,里面的messages对我们要发送的数据做了一个拷贝,因为在后续的发送过程中,它会被修改。client需要发送什么,已经发送了什么以及已经接收了什么都要进行追踪,总共要发送的数据字节数也保存在了data对象中。

def service_connection(key, mask):
    sock = key.fileobj
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)  # Should be ready to read
        if recv_data:
            print('received', repr(recv_data), 'from connection', data.connid)
            data.recv_total += len(recv_data)
        if not recv_data or data.recv_total == data.msg_total:
            print('closing connection', data.connid)
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if not data.outb and data.messages:
            data.outb = data.messages.pop(0)
        if data.outb:
            print('sending', repr(data.outb), 'to connection', data.connid)
            sent = sock.send(data.outb)  # Should be ready to write
            data.outb = data.outb[sent:]

client要追踪来自server的数据字节数,如果收到的数据字节数和发送的相等,或者有一次没有收到数据,说明数据接收完成,本次服务目的已经达成,就可以关闭这次连接了。

data.outb用来维护发送的数据,前面提到过,一次发送不一定能将数据全部送出,使用data.outb = data.outb[sent:]来更新数据的发送。发送完毕后,再messages中取出数据准备再次发送。

可以在这里看到最后的完整代码:

最后的运行效果如下:

image image

还是要先启动server,进入监听状态,然后client启动,与server建立两条连接,要发送的信息有两条,这里分开发送,先将fist message分别发送到server,然后再发送second messageserver端收到信息后进行暂时保存,当两条信息都收到了才开始进行echoclient端收到完整信息后表示服务结束,断开连接。

上一篇下一篇

猜你喜欢

热点阅读