学习:socket编程
转自:http://www.byhy.net/tut/py/etc/socket/
Python 语言中socket编程
要进行socket编程,发送网络消息,我们可以使用 Python 内置的 socket 库 。
目前的socket编程,使用的最多的就是通过tcp协议进行网络通讯的。
tcp进行通讯的程序双方,分为服务端和客户端。
tcp 协议进行通讯的双方,是需要先建立一个虚拟连接的。然后双方程序才能发送业务数据信息。
建立tcp虚拟连接是通过著名的 三次握手
进行的。
具体三次握手的细节大家可以参考这篇文章 https://zhuanlan.zhihu.com/p/40499563
我们现在来看一个 tcp协议进行通讯的 socket 服务端程序和客户端程序。
下面是tcp 服务端程序 server.py
# === TCP 服务端程序 server.py ===
# 导入socket 库
from socket import *
# 主机地址为空字符串,表示绑定本机所有网络接口ip地址
# 等待客户端来连接
IP = ''
# 端口号
PORT = 50000
# 定义一次从socket缓冲区最多读入512个字节数据
BUFLEN = 512
# 实例化一个socket对象
# 参数 AF_INET 表示该socket网络层使用IP协议
# 参数 SOCK_STREAM 表示该socket传输层使用tcp协议
listenSocket = socket(AF_INET, SOCK_STREAM)
# socket绑定地址和端口
listenSocket.bind((IP, PORT))
# 使socket处于监听状态,等待客户端的连接请求
# 参数 8 表示 最多接受多少个等待连接的客户端
listenSocket.listen(8)
print(f'服务端启动成功,在{PORT}端口等待客户端连接...')
dataSocket, addr = listenSocket.accept()
print('接受一个客户端连接:', addr)
while True:
# 尝试读取对方发送的消息
# BUFLEN 指定从接收缓冲里最多读取多少字节
recved = dataSocket.recv(BUFLEN)
# 如果返回空bytes,表示对方关闭了连接
# 退出循环,结束消息收发
if not recved:
break
# 读取的字节数据是bytes类型,需要解码为字符串
info = recved.decode()
print(f'收到对方信息: {info}')
# 发送的数据类型必须是bytes,所以要编码
dataSocket.send(f'服务端接收到了信息 {info}'.encode())
# 服务端也调用close()关闭socket
dataSocket.close()
listenSocket.close()
下面是tcp 客户端程序 client.py
# === TCP 客户端程序 client.py ===
from socket import *
IP = '127.0.0.1'
SERVER_PORT = 50000
BUFLEN = 1024
# 实例化一个socket对象,指明协议
dataSocket = socket(AF_INET, SOCK_STREAM)
# 连接服务端socket
dataSocket.connect((IP, SERVER_PORT))
while True:
# 从终端读入用户输入的字符串
toSend = input('>>> ')
if toSend =='exit':
break
# 发送消息,也要编码为 bytes
dataSocket.send(toSend.encode())
# 等待接收服务端的消息
recved = dataSocket.recv(BUFLEN)
# 如果返回空bytes,表示对方关闭了连接
if not recved:
break
# 打印读取的信息
print(recved.decode())
dataSocket.close()
大家保存一下代码运行一遍看看。
注意,要先运行服务段,再运行客户段。
上面代码的细节,请看 上面链接的视频讲解
应用消息格式
为什么要定义消息格式
上面的例子中,我们发送的消息就是要传递的内容。 比如字符串 你好,我是白月黑羽
。
实际上,我们在企业中开发的程序通讯,消息往往是有 格式定义
的。 消息的格式定义可以归入 OSI网络模型的 表示层
。
比如: 定义的消息包括 消息头 和 消息体。
消息头存放消息的格式数据, 比如 消息的长度、类型、状态等等, 而消息体存放具体的传送数据。
对于使用TCP协议传输信息的程序来说,格式定义一定要明确规定 消息的边界
。
因为 TCP协议传输的是 字节流(bytes stream)
, 如果消息中没有指定 边界 或者 长度,接收方就不知道一个完整的消息从字节流的 哪里开始,到 哪里结束。
具体的讲解,请参看上面链接的视频讲解。
指定消息的边界有两种方式:
- 用特殊字节作为消息的结尾符号
可以用消息内容中不可能出现的字节串 (比如 FFFFFF
) 作为消息的结尾字符。
- 在消息开头某个位置,直接指定消息的长度
比如在一个消息的最前面用2个字节表示本消息的长度。
UDP协议通常不需要指定消息边界,因为UDP是数据报协议,应用程序从socket接收到的必定是发送方发送的完整消息。
示例1
我们现在要开发一个实验室的工作站监控系统,包括
-
安装在机房工作站上的 数据采集器 RUS
这个程序作为TCP服务端,获取资源使用数据,简称 RUS (Resource Usage Stat)
-
安装在监控室的管理控制台 AT
这个程序作为TCP客户端,向管理员显示资源使用数据,简称 AT (Admin Terminal)
这是我们白月黑羽实战班的 一个项目实战练习。
作为这个系统的设计者,你可以自行设计 RUS 和 AT 之间的数据传输规范,包括消息 数据格式规范。
下面是一种参考的规范:
-
AT 和 RUS 之间采用 TCP 长连接方式进行通讯
如果中途出现连接断开,AT 作为 TCP 客户端必须进行重连
-
消息整体
每个消息 都是 UTF8 编码的 字符串
由消息头 和消息体组成。
消息头 和消息体之间 用一个 换行符 (UTF8编码后的字节为
0A
)隔开。有如下类型的消息:
-
控制命令
由 AT 发送给 RUS , 下达管理控制命令。
比如:
- pause 暂停数据采集
- resume 恢复数据采集
RUS接收到 控制命令后,必须完成操作后必须回复响应消息,告诉 AT 命令已经接收已经完成
-
数据上报
由 RUS 发送给 AT,汇报采集的资源数据。 AT接收到数据后,应该回复一个接收汇报的响应消息。
-
-
消息头
消息头只包含一个信息: 消息体的长度
消息头用十进制的字符串 表示一个整数的长度
-
消息体
消息体用json格式的字符串 表示数据信息,如下
- 数据上报 RUS -> AT
{ "type" : "report", "info" : { "CPU Usage" : "30%", "Mem usage" : "53%" } }
- 数据上报响应 AT -> RUS
{ "type" : "report-ack" }
- 暂停数据上报命令 AT -> RUS
{ "type" : "pause", "duration" : 200 }
其中 duration 表示暂停上报的时间,以秒为单位
- 恢复数据上报命令 AT -> RUS
{ "type" : "resume" }
- 命令处理响应 RUS -> AT
{ "type" : "cmd-ack", "code" : 200, "info" : "处理成功" }
其中code 是处理结果码,用200表示成功。 info 是处理结果文字描述。
实战班学员请联系老师获取详细的实战练习指导。
示例2
示例1 中,我们给出的参考 接口, 传递的消息都是 放在一个大字符串里面, 然后采用字符串编码为 字节串进行传输的。
这种接口设计的好处就是简单,便于发送时的字节编码操作:消息头和消息体分别进行 UTF8编码,然后字节串拼接即可
接收方处理也简单,直接分离出消息头和消息体,分别进行UTF8解码即可。
我们设计普通应用程序之间的通信,这样就很好,简单就是美,容易开发,容易维护。
但是如果消息接口是在 秒理万机
的计算节点之间的通讯, 这样的接口的缺点就暴露了:消息长,而且编解码耗费处理器资源比较大。
典型的例子,就是通讯设备, 比如 4G核心网的业务处理节点。 它们每秒往往要处理数以万计的认证、鉴权、计费 等消息,采用上述方法会给设备带来巨大负担。
首先,数据都用字符表示,其实是比较浪费带宽的做法。
比如返回码 用 200 这样的字符串表示,就会耗费3个字节,24个比特。 如果处理结果 只有成功和不成功,只需要1个bit 即可, 1表示成功,0表示不成功
其次, json这种复杂语法的编解码算法,需要程序代码进行各种复杂处理(参考一下Python json内置库的代码)是比较耗费CPU 资源的。
可以定义更为简单的数据表达方式,比如像这样:
-
消息头 开头2个字节表示消息的长度
-
消息头 第3个字节表示 消息类型 :
0:暂停命令, 1 :恢复命令 2:命令响应 3:统计上报 4:统计上报响应
-
消息体 数据定义
可以使用类似 Radius/Diameter Attribute-Value Pairs (AVP) 的定义方法
Attribute: 使用一个字节,表示数据种类。
比如
1: CPU 使用率 2: 内存使用率
Length: 使用一个字节,表示信息长度
Value: 表示具体的数据值
这样,前面的示例信息
{ "CPU Usage" : "30%", "Mem usage" : "53%" }
其中
"CPU Usage" : "30%"
, 就像这样用16进制字节表示01011E
"Mem usage" : "53%"
, 就像这样用16进制字节表示020135
合起来就是
01011E020135
对比一下,第一种编码方法
优点:更节省传输带宽,编码解码数据效率更高
缺点:对于人的可读性差,数据表示灵活性较差;
支持多个客户端
上面的服务端代码 只能和一个客户端进行通信。
如果我们同时运行多个客户端,就会发现 后面的客户端程序不能和服务端连接成功。为什么呢?
因为,服务端程序必须不停的对 监听 socket 对象调用 accept()方法,才能不断的接受 新的客户端连接请求。
而且 还需要运行额外的代码 对 多个客户端连接后,返回的多个数据传输socket对象 进行数据的收发。
显然,我们上面的程序没有这样的处理。
因为缺省情况创建的 socket 是 阻塞式
的,进行 accpet调用时,如果没有客户端连接,程序就阻塞在此处,不再执行后续代码。
同样的,调用recv方法,如果没有数据在本socket的接收缓冲,也会阻塞。
所以,通常一个线程里面,没法 不断地 调用 监听socket的 accept方法,同时还能 负责多个 数据传输socket消息的收发。
那么让一个服务端程序 和多个客户端同时连接 并 通信 呢?
聪明的你一定想到了,一个线程不行,就使用多个线程啊。
我们 修改服务端的代码,如下
# === TCP 服务端程序 server.py , 支持多客户端 ===
# 导入socket 库
from socket import *
from threading import Thread
IP = ''
PORT = 50000
BUFLEN = 512
# 这是新线程执行的函数,每个线程负责和一个客户端进行通信
def clientHandler(dataSocket,addr):
while True:
recved = dataSocket.recv(BUFLEN)
# 当对方关闭连接的时候,返回空字符串
if not recved:
print(f'客户端{addr} 关闭了连接' )
break
# 读取的字节数据是bytes类型,需要解码为字符串
info = recved.decode()
print(f'收到{addr}信息: {info}')
dataSocket.send(f'服务端接收到了信息 {info}'.encode())
dataSocket.close()
# 实例化一个socket对象 用来监听客户端连接请求
listenSocket = socket(AF_INET, SOCK_STREAM)
# socket绑定地址和端口
listenSocket.bind((IP, PORT))
listenSocket.listen(8)
print(f'服务端启动成功,在{PORT}端口等待客户端连接...')
while True:
# 在循环中,一直接受新的连接请求
dataSocket, addr = listenSocket.accept() # Establish connection with client.
addr = str(addr)
print(f'一个客户端 {addr} 连接成功' )
# 创建新线程处理和这个客户端的消息收发
th = Thread(target=clientHandler,args=(dataSocket,addr))
th.start()
listenSocket.close()
多线程方式有个缺点。
如果一个服务端要同时处理大量的客户端连接,比如10000个,需要创建10000个线程。
而操作系统通常不可能为一个进程分配这么多的线程。
实际上,我们的服务端程序,大部分时间都是空闲的,都在等待连接请求,等待接受消息,根本不需要这么多的线程来处理。
这种程序通常被称之为 IO bound 程序,也就是说程序的主要时间都是花费在 IO 上面。
这种程序,其实一个线程就足够了。
关键问题是,需要这一个线程 很好的分配 时间, 在有连接请求到来的时候,执行处理连接请求代码,有消息到达socket缓冲的时候,执行读取处理消息的代码。
这种处理方式称之为异步IO。
Python 3 新增了 asyncio 库, 我们可以使用该库来 实现 同时处理多个客户端数据收发。
示例代码如下:
# === TCP 服务端程序 server.py 异步支持多客户端 ===
import asyncio, socket
IP = ''
PORT = 50000
BUFLEN = 512
# 定义处理数据收发的回调
async def handle_echo(reader, writer):
addr = writer.get_extra_info('peername')
while True:
data = await reader.read(100)
if not data:
print(f'客户端{addr}关闭了连接')
writer.close()
break
message = data.decode()
print(f'收到{addr}信息: {message}')
writer.write(data)
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, IP, PORT, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('服务端启动成功,在{}端口等待客户端连接...'.format(server.sockets[0].getsockname()[1]))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()