python自学TCP/IP网络编程随笔-生活工作点滴

Python TCP编程

2019-07-07  本文已影响5人  Recalcitrant

Python网络编程之TCP

一、TCP协议

TCP协议,传输控制协议(Transmission Control Protocol,TCP)是一种面向连接的、可靠的、基于字节流的传输层通信协议,由IETF的RFC793定义。
TCP通信需要经过创建连接、数据传送、终止连接三个步骤。

二、TCP控制台消息传输示例(单进程)

服务器端:

from socket import *

HOST = ''
PORT = 9000
BUFSIZ = 1024
ADDRESS = (HOST, PORT)

# 创建监听socket
tcpServerSocket = socket(AF_INET, SOCK_STREAM)

# 绑定IP地址和固定端口
tcpServerSocket.bind(ADDRESS)
print("服务器启动,监听端口{}...".format(ADDRESS[1]))

tcpServerSocket.listen(5)

try:
    while True:
        print('服务器正在运行,等待客户端连接...')

        # client_socket是专为这个客户端服务的socket,client_address是包含客户端IP和端口的元组
        client_socket, client_address = tcpServerSocket.accept()
        print('客户端{}已连接!'.format(client_address))

        try:
            while True:
                # 接收客户端发来的数据,阻塞,直到有数据到来
                # 事实上,除非当前客户端关闭后,才会跳转到外层的while循环,即一次只能服务一个客户
                # 如果客户端关闭了连接,data是空字符串
                data = client_socket.recv(2048)
                if data:
                    print('接收到消息 {}({} bytes) 来自 {}'.format(data.decode('utf-8'), len(data), client_address))
                    # 返回响应数据,将客户端发送来的数据原样返回
                    client_socket.send(data)
                    print('发送消息 {} 至 {}'.format(data.decode('utf-8'), client_address))
                else:
                    print('客户端 {} 已断开!'.format(client_address))
                    break
        finally:
            # 关闭为这个客户端服务的socket
            client_socket.close()
finally:
    # 关闭监听socket,不再响应其它客户端连接
    tcpServerSocket.close()

客户端:

from socket import *

HOST = '127.0.0.1'
PORT = 9000
BUFSIZ = 1024
ADDRESS = (HOST, PORT)

tcpClientSocket = socket(AF_INET, SOCK_STREAM)
tcpClientSocket.connect(ADDRESS)

while True:
    data = input('>')
    if not data:
        break

    # 发送数据
    tcpClientSocket.send(data.encode('utf-8'))
    # 接收数据
    data, ADDR = tcpClientSocket.recvfrom(BUFSIZ)
    if not data:
        break
    print("服务器端响应:", data.decode('utf-8'))

print("链接已断开!")
tcpClientSocket.close()

三、TCP控制台消息传输示例(多进程)

import os
from socket import *
from multiprocessing import Process


def client_handler(client_socket, client_address):
    """
    接收各个客户端发来的数据,并原样返回
    :param client_socket:
    :param client_address:
    :return:
    """
    try:
        while True:
            # 接收客户端发来的数据,阻塞,直到有数据到来
            # 如果客户端关闭了连接,data是空字符串
            data = client_socket.recv(2048)
            if data:
                print('子进程 [PID: {}]: 接收到消息 {}({} bytes) 来自 {}'.format(os.getpid(), data.decode('utf-8'), len(data), client_address))
                # 返回响应数据,将客户端发送来的数据原样返回
                client_socket.send(data)
                print('子进程 [PID: {}]: 发送 {} 至 {}'.format(os.getpid(), data.decode('utf-8'), client_address))
            else:
                print('子进程 [PID: {}]: 客户端 {} 已断开!'.format(os.getpid(), client_address))
                break
    finally:
        # 关闭为这个客户端服务的socket
        client_socket.close()

HOST = ''
PORT = 9000
BUFSIZ = 1024
ADDRESS = (HOST, PORT)

# 创建监听socket
tcpServerSocket = socket(AF_INET, SOCK_STREAM)

# 绑定IP地址和固定端口
tcpServerSocket.bind(ADDRESS)
print("服务器启动,监听端口{}...".format(ADDRESS[1]))

# socket默认是主动连接,调用listen()函数将socket变为被动连接,这样就可以接收客户端连接了
tcpServerSocket.listen(5)

try:
    while True:
        print('主进程 [PID: {}]: 服务器正在运行,等待客户端连接...'.format(os.getpid()))

        # 主进程只用来负责监听新的客户连接
        # client_socket是专为这个客户端服务的socket,client_address是包含客户端IP和端口的元组
        client_socket, client_address = tcpServerSocket.accept()
        print('主进程 [PID: {}]: 客户端 {} 已连接!'.format(os.getpid(), client_address))

        # 为每个新的客户连接创建一个子进程,用来处理客户数据
        client = Process(target=client_handler, args=(client_socket, client_address))
        client.start()
        # 子进程已经复制了一份client_sock,所以主进程中可以关闭此client_sock
        client_socket.close()
finally:
    # 关闭监听socket,不再响应其它客户端连接
    tcpServerSocket.close()

四、TCP控制台消息传输示例(多线程)

上面多进程版本的问题在于,为每个客户端连接都分别创建一个进程,如果同时有10000个客户连接,操作系统不可能创建10000个进程,那样系统开销会非常大,内存会被耗尽,导致系统崩溃。就算没有崩溃,使用了虚拟内存,那么性能将急剧下降。同时,这么多个进程,CPU进行进程间切换(上下文切换)的代价也无比巨大,最终的结果就是大部分时间都花在进程切换上了,而为客户提供服务的时间几乎没有

虽然可以使用进程池concurrent.futures.ProcessPoolExecutor创建固定数量的进程,一旦有客户端关闭了连接后,对应的进程就可以重新为下一个新的客户连接服务,但是多进程间的上下文切换的代价还是太大

多线程版本比多进程版本的系统开销小几个数量级,操作系统可以同时开启更多的线程,而线程间的调度切换比多进程也小很多。

from socket import *
import threading


def client_handler(client_socket, client_address):
    """
    接收各个客户端发来的数据,并原样返回
    :param client_socket:
    :param client_address:
    :return:
    """
    try:
        while True:
            # 接收客户端发来的数据,阻塞,直到有数据到来
            # 如果客户端关闭了连接,data是空字符串
            data = client_socket.recv(4096)
            if data:
                print('子线程 [{}]: 接收到消息 {}({} bytes) 来自 {}'.format(threading.current_thread().name, data, len(data), client_address))
                # 返回响应数据,将客户端发送来的数据原样返回
                client_socket.send(data)
                print('子线程 [{}]: 发送 {} 至 {}'.format(threading.current_thread().name, data, client_address))
            else:
                print('子线程 [{}]: 客户端 {} 已断开!'.format(threading.current_thread().name, client_address))
                break
    finally:
        # 关闭为这个客户端服务的socket
        client_socket.close()


HOST = ''
PORT = 9000
BUFSIZ = 1024
ADDRESS = (HOST, PORT)

# 创建监听socket
tcpServerSocket = socket(AF_INET, SOCK_STREAM)

# 绑定IP地址和固定端口
tcpServerSocket.bind(ADDRESS)
print("服务器启动,监听端口{}...".format(ADDRESS[1]))

# socket默认是主动连接,调用listen()函数将socket变为被动连接,这样就可以接收客户端连接了
tcpServerSocket.listen(5)

try:
    while True:
        print('主线程 [{}]: 服务器正在运行,等待客户端连接...'.format(threading.current_thread().name))

        # 主进程只用来负责监听新的客户连接
        # client_socket是专为这个客户端服务的socket,client_address是包含客户端IP和端口的元组
        client_socket, client_address = tcpServerSocket.accept()
        print('主线程 [{}]: 客户端 {} 已连接!'.format(threading.current_thread().name, client_address))

        # 为每个新的客户连接创建一个线程,用来处理客户数据
        client = threading.Thread(target=client_handler, args=(client_socket, client_address))
        client.start()

        # 因为主线程与子线程共享client_socket,所以在主线程中不能关闭client_socket
        # client_socket.close()
finally:
    # 关闭监听socket,不再响应其它客户端连接
    tcpServerSocket.close()

五、TCP控制台消息传输示例(线程池)

服务器端:

from socket import *
import logging as logger
import concurrent.futures as futures

logger.basicConfig(level=logger.DEBUG)


class TCPServer:
    def __init__(self, host='', port=9000):
        self.HOST = host
        self.PORT = port
        self.BUFSIZ = 1024
        self.ADDRESS = (self.HOST, self.PORT)
        self.clients = []
        self.ex = futures.ThreadPoolExecutor(max_workers=3)

        self.tcpServerSocket = socket(AF_INET, SOCK_STREAM)
        self.tcpServerSocket.bind(self.ADDRESS)
        logger.info("服务器启动,监听端口{}...".format(self.ADDRESS))
        self.tcpServerSocket.listen(5)

    def launch(self):
        while True:
            print('服务器正在运行,等待客户端连接...')
            client_socket, client_address = self.tcpServerSocket.accept()
            self.ex.submit(self.response, client_socket, client_address)
            print('客户端 {} 已连接!'.format(client_address))
            self.clients.append((client_socket, client_address))

    def response(self, client_socket, client_address):
        try:
            while True:
                data = client_socket.recv(self.BUFSIZ)
                if data:
                    print('接收到消息 {}({} bytes) 来自 {}'.format(data.decode('utf-8'), len(data), client_address))
                    for client in self.clients:
                        sock = client[0]
                        addr = client[1]
                        if sock != client_socket:
                            info = "{}:{}>>{}".format(addr[0], str(addr[1]), data.decode('utf-8'))
                            logger.info("向客户端{}:{}发送数据{}".format(addr[0], str(addr[1]), data.decode('utf-8')))

                            sock.send(info.encode('utf-8'))
                else:
                    print("客户端{}已断开!".format(client_address))
                    self.clients.remove((client_socket, client_address))
                    break
        finally:
            client_socket.close()


def main():
    ts = TCPServer()
    ts.launch()


main()

客户端:

from socket import *
import concurrent.futures as futures


class TCPClient:
    def __init__(self, host='127.0.0.1', port=9000):
        self.HOST = host
        self.PORT = port
        self.BUFSIZ = 1024
        self.ADDRESS = (self.HOST, self.PORT)
        self.tcpClientSocket = socket(AF_INET, SOCK_STREAM)
        self.tcpClientSocket.connect(self.ADDRESS)

    def send(self, msg):
        """
        向服务器端发送信息
        :param msg:
        :return:
        """
        self.tcpClientSocket.send(msg.encode('utf-8'))

    def receive(self):
        try:
            while True:
                data = self.tcpClientSocket.recv(self.BUFSIZ)
                if not data:
                    break
                print("接收到服务器端消息:{}".format(data.decode('utf-8')))
        finally:
            print("连接已断开!")
            self.tcpClientSocket.close()


def main():
    ex = futures.ThreadPoolExecutor(max_workers=1)
    tc = TCPClient()
    ex.submit(tc.receive)

    while True:
        data = input('>')
        if not data:
            print("连接已断开!")
            tc.tcpClientSocket.close()
            break
        tc.send(data)


main()
上一篇下一篇

猜你喜欢

热点阅读