python socketpair example

2021-03-23  本文已影响0人  help_youself

 Test the performance of socketpair .
 Provider sendd a number to consumer. Only when the feedback message comes back and the Provider will send again.
socket_pair.py

# author zsy
# create: 2021/03/23
import threading
import signal, os
import socket
import select
import errno
import time
import struct 
def TimeMillis():
    t=time.time()
    millis=int(round(t * 1000))
    return millis
def TimeMillis32():
    now=TimeMillis()&0xffffffff
    return now
class Consumer(object):
    def __init__(self,conn,run_times):
        self.conn=conn
        self.conn.setblocking(False)
        self.run_times=run_times
        self.terminate=False
        self._thread=None
        self.msg_count=0
        self._epl= select.epoll()
        self._epl.register(self.conn.fileno(),select.EPOLLIN)
    def __del__(self):
        self._epl.close()
    def stop_thread(self):
        self.terminate= True
        if self._thread:
            self._thread.join()
    def is_alive(self):
        alive=False
        if self._thread is not None:
            alive=self._thread.is_alive()
        return alive
    def loop_start(self):
        if self._thread is not None:
            return
        self._thread = threading.Thread(target=self._thread_main)
        self._thread.start()
    def _thread_main(self):
        while not self.terminate and self.msg_count<self.run_times:
            epoll_list = self._epl.poll(10)
            for fd,events in epoll_list:
                if fd ==self.conn.fileno() and events == select.EPOLLIN:
                    self.read_event()
    def read_event(self):
        buffer=b''
        try:
            while True:
                msg=self.conn.recv(1500)
                if msg:
                    buffer+=msg
                else:
                    if buffer:
                        self.incoming_data(buffer)
                        buffer=b''
                    break
        except socket.error as e:
            err = e.args[0]
            if buffer:
                ret=self.incoming_data(buffer)
                if ret:
                   close=True
            if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
                pass
    def incoming_data(self,buffer):
        num,=struct.unpack("!I",buffer[0:4])
        self.msg_count+=1
        feeback="ack"
        self.conn.send(feeback.encode())
class Provider(object):
    def __init__(self,conn,run_times):
        self.conn=conn
        self.conn.setblocking(False)
        self.run_times=run_times
        self.ack_count=0
        self.num=0
        self.terminate=False
        self._thread=None
        self._epl= select.epoll()
        self._epl.register(self.conn.fileno(),select.EPOLLIN)
    def __del__(self):
        self._epl.close()
    def stop_thread(self):
        self.terminate= True
        if self._thread:
            self._thread.join()
    def is_alive(self):
        alive=False
        if self._thread is not None:
            alive=self._thread.is_alive()
        return alive
    def loop_start(self):
        if self._thread is not None:
            return
        self._thread = threading.Thread(target=self._thread_main)
        self._thread.start()
    def _thread_main(self):
        self.send_num()
        while not self.terminate and self.ack_count<self.run_times:
            epoll_list = self._epl.poll(10)
            for fd,events in epoll_list:
                if fd ==self.conn.fileno() and events == select.EPOLLIN:
                    self.read_event()
    def read_event(self):
        buffer=b''
        try:
            while True:
                msg=self.conn.recv(1500)
                if msg:
                    buffer+=msg
                else:
                    if buffer:
                        self.incoming_data(buffer)
                        buffer=b''
                    break
        except socket.error as e:
            err = e.args[0]
            #print ("error: "+str(err))
            if buffer:
                ret=self.incoming_data(buffer)
                if ret:
                   close=True
            if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
                pass
    def incoming_data(self,buffer):
        ack=buffer.decode()
        self.ack_count+=1
        self.num+=1
        if self.ack_count<self.run_times:
            self.send_num()
    def send_num(self):
        buffer=b''
        buffer+=struct.pack("!I",self.num)
        self.conn.send(buffer)
Terminate=False
def signal_handler(signum, frame):
    global Terminate
    Terminate =True
if __name__ == '__main__':
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGHUP, signal_handler) # ctrl+c
    signal.signal(signal.SIGTSTP, signal_handler) #ctrl+z
    run_times=100
    last=TimeMillis32()
    s1,s2 = socket.socketpair()
    consumer=Consumer(s1,run_times)
    provider=Provider(s2,run_times)
    consumer.loop_start()
    provider.loop_start()
    while not Terminate:
        if provider.is_alive() is False:
            break
    consumer.stop_thread()
    provider.stop_thread()
    s1.close()
    s2.close()
    delta=TimeMillis32()-last 
    print("stop {}".format(delta))
上一篇下一篇

猜你喜欢

热点阅读