python socketpair example
2021-03-23 本文已影响0人
help_youself
Test the performance of socketpair .
Provider sendd a number to consumer. Only when the feedback message comes back and the Provider will send again.
socket_pair.py
# author zsy
# create: 2021/03/23
import threading
import signal, os
import socket
import select
import errno
import time
import struct
def TimeMillis():
t=time.time()
millis=int(round(t * 1000))
return millis
def TimeMillis32():
now=TimeMillis()&0xffffffff
return now
class Consumer(object):
def __init__(self,conn,run_times):
self.conn=conn
self.conn.setblocking(False)
self.run_times=run_times
self.terminate=False
self._thread=None
self.msg_count=0
self._epl= select.epoll()
self._epl.register(self.conn.fileno(),select.EPOLLIN)
def __del__(self):
self._epl.close()
def stop_thread(self):
self.terminate= True
if self._thread:
self._thread.join()
def is_alive(self):
alive=False
if self._thread is not None:
alive=self._thread.is_alive()
return alive
def loop_start(self):
if self._thread is not None:
return
self._thread = threading.Thread(target=self._thread_main)
self._thread.start()
def _thread_main(self):
while not self.terminate and self.msg_count<self.run_times:
epoll_list = self._epl.poll(10)
for fd,events in epoll_list:
if fd ==self.conn.fileno() and events == select.EPOLLIN:
self.read_event()
def read_event(self):
buffer=b''
try:
while True:
msg=self.conn.recv(1500)
if msg:
buffer+=msg
else:
if buffer:
self.incoming_data(buffer)
buffer=b''
break
except socket.error as e:
err = e.args[0]
if buffer:
ret=self.incoming_data(buffer)
if ret:
close=True
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
pass
def incoming_data(self,buffer):
num,=struct.unpack("!I",buffer[0:4])
self.msg_count+=1
feeback="ack"
self.conn.send(feeback.encode())
class Provider(object):
def __init__(self,conn,run_times):
self.conn=conn
self.conn.setblocking(False)
self.run_times=run_times
self.ack_count=0
self.num=0
self.terminate=False
self._thread=None
self._epl= select.epoll()
self._epl.register(self.conn.fileno(),select.EPOLLIN)
def __del__(self):
self._epl.close()
def stop_thread(self):
self.terminate= True
if self._thread:
self._thread.join()
def is_alive(self):
alive=False
if self._thread is not None:
alive=self._thread.is_alive()
return alive
def loop_start(self):
if self._thread is not None:
return
self._thread = threading.Thread(target=self._thread_main)
self._thread.start()
def _thread_main(self):
self.send_num()
while not self.terminate and self.ack_count<self.run_times:
epoll_list = self._epl.poll(10)
for fd,events in epoll_list:
if fd ==self.conn.fileno() and events == select.EPOLLIN:
self.read_event()
def read_event(self):
buffer=b''
try:
while True:
msg=self.conn.recv(1500)
if msg:
buffer+=msg
else:
if buffer:
self.incoming_data(buffer)
buffer=b''
break
except socket.error as e:
err = e.args[0]
#print ("error: "+str(err))
if buffer:
ret=self.incoming_data(buffer)
if ret:
close=True
if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
pass
def incoming_data(self,buffer):
ack=buffer.decode()
self.ack_count+=1
self.num+=1
if self.ack_count<self.run_times:
self.send_num()
def send_num(self):
buffer=b''
buffer+=struct.pack("!I",self.num)
self.conn.send(buffer)
Terminate=False
def signal_handler(signum, frame):
global Terminate
Terminate =True
if __name__ == '__main__':
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler) # ctrl+c
signal.signal(signal.SIGTSTP, signal_handler) #ctrl+z
run_times=100
last=TimeMillis32()
s1,s2 = socket.socketpair()
consumer=Consumer(s1,run_times)
provider=Provider(s2,run_times)
consumer.loop_start()
provider.loop_start()
while not Terminate:
if provider.is_alive() is False:
break
consumer.stop_thread()
provider.stop_thread()
s1.close()
s2.close()
delta=TimeMillis32()-last
print("stop {}".format(delta))