物联网IOT创意开发

自己动手编写一个熊孩子监视器(U型遥控器,Python 3,多线

2020-04-22  本文已影响0人  TypeY

本篇所展示的脚本均无加密处理,请在安全的网络环境下使用。本篇所展示内容均开源开放,但其使用务必遵循当地法律法规!

熊孩子在家上网课偷偷玩电脑?是时候处理一下了!

借此机会展示一下图传和多线程同时收发的脚本,脚本不是很完善但是希望能做到抛砖引玉。

目标

1.捕捉熊孩子的电脑桌面并且发送到USwitch;

2.通过摄像头观察熊孩子是否在认真听课;

3.点点手机就可以弹出窗口警告熊孩子认真学习;

4.合理的资源占用,随时停止发送图像,随时启用/停用摄像头;

5.多线程,多客户端,简单配对,可以做到父母亲友同时监督 :)

下面是一些效果图:

同时启用摄像头和屏幕捕捉 弹出窗口警告 弹出窗口警告

准备工作

要有网,有网很重要 (手机和熊孩子的电脑连接同一个路由器,或者你有自己的服务器...)

0.Windows平台的脚本已经打包好可以直接下载使用,因此你可以直接跳过后边所有的步骤,直接跳到“USwitch端的配置”学习如何使用。(备用下载地址,提取码:5yam)

1.安装必备的库(windows 在 cmd 执行,linux 在终端执行):

pip install opencv-python
pip install pillow
pip install numpy 

2.在手机上安装U型遥控器

在手机上安装U型遥控器

脚本和注释

脚本的后缀由.py修改为.pyw可以在windows平台上实现无弹窗运行,可以手动设置为开机启动项。(运行后仍可在任务管理器中找到)

from socket import *
from threading import Thread
from PIL import ImageGrab
from io import BytesIO
from numpy import array
from sys import platform
from math import ceil
import  cv2

#弹窗警告功能暂时只针对win平台
if platform == "win32" or platform == "win64":
    ISWIN = True
    import ctypes

Use_WebCam=False
WebCam_starting=False


#接收端ip列表,当接收端地址从其中移除将不再向其发送
#支持多端接收
Running=[]
#发送图像的分辨率
Res_x=690
Res_y=360

#配置UDP
r_host = ''
r_port = 127  #电脑的接收端口
s_port = 9921 #手机的接收端,使用4位数的端口(受部分安卓系统限制)
bufsize = 1024 
r_addr = (r_host,r_port)
r_udpServer = socket(AF_INET,SOCK_DGRAM) 
r_udpServer.bind(r_addr)

#接收线程
class Rec_Thread(Thread):
    def __init__(self, server: socket):
        super().__init__()
        self.server = server
    def run(self):
        global Running
        global Use_WebCam
        while 1:
            data,addr = self.server.recvfrom(bufsize)
            data=data.decode()
            if data=="exit":
                Running=[]
                if Use_WebCam:
                    Use_WebCam=False
                    try:
                        cap.release()
                    except:
                        pass
                exit(0)
            if data=="start" and not addr[0] in Running:
                Running.append(addr[0]) #将发来start命令的ip地址添加到列表
                stream_thread = Send_Thread(addr[0]) #创建发送线程
                stream_thread.start()
            elif data=="stop" and addr[0] in Running:
                Running.remove(addr[0])
            elif data=="warning":
                Msg_Box_thread = Msg_Box(0)
                Msg_Box_thread.start()
            elif data=="camera" and not WebCam_starting: #摄像头启动有延迟请耐心
                if Use_WebCam:
                    Use_WebCam=False
                    try:
                        cap.release()
                    except:
                        pass
                else:
                    Use_WebCam=True
            else:
                Msg_Box_thread = Msg_Box(1,addr[0],data)
                Msg_Box_thread.start()

                
#发送线程
class  Send_Thread(Thread):
    def __init__(self,host):
        super().__init__()
        self.host = host
        self.addr = (host,s_port)
        self.udpClient = socket(AF_INET,SOCK_DGRAM)
    def run(self):
        while self.host in Running: 
            captureImage = ImageGrab.grab() #捕获屏幕截图
            captureImage = captureImage.resize((Res_x,Res_y))
            #转变为cv格式以便编码发送
            captureImage = cv2.cvtColor(array(captureImage), cv2.COLOR_RGB2BGR)
            #如果启用摄像头将进行图片叠加
            if Use_WebCam:
                try:
                    get, image_np = cap.read()
                    if get :
                        rows, cols = image_np.shape[:2]
                        captureImage[:rows, :cols]=image_np
                    elif not WebCam_starting:
                        Cam_thread = Set_Webcam()
                        Cam_thread.start()
                except:
                    if not WebCam_starting:
                        Cam_thread = Set_Webcam()
                        Cam_thread.start()

            #captureImage.resize((690,360))
            data=cv2.imencode(".jpg",captureImage,[cv2.IMWRITE_JPEG_QUALITY, 30])[1].tobytes()
            cut=int(ceil(len(data)/(bufsize)))#计算切片数
            strr="size;"+str(cut)# 通知手机开始接收切片
            self.udpClient.sendto(strr.encode(),self.addr)
            for i in range(cut):
              self.udpClient.sendto(data[i*int(bufsize):(i+1)*int(bufsize)],self.addr)#切片并且发送
            self.udpClient.sendto(("end").encode(),self.addr)#通知手机显示图片
        self.udpClient.sendto(("clear").encode(),self.addr)#结束,清理手机上显示的图片,V1.2.7以后版本可用


#弹窗警告功能
class  Msg_Box(Thread):
    def __init__(self,Type,addr="",Text=""):
        super().__init__()
        self.Type=Type
        self.addr=addr
        self.Text=Text
    def run(self):
        if ISWIN:
            if self.Type == 0: #警告
                ctypes.windll.user32.MessageBoxW(0,  "专心学习!", "不专心警告:",16+4096)
            if self.Type == 1: #其他消息
                ctypes.windll.user32.MessageBoxW(0,  self.Text, "来自:"+self.addr,1+4096)
        
#异步摄像头启动
class  Set_Webcam(Thread):
    def run(self):
        global WebCam_starting
        global cap
        WebCam_starting=True
        cap = cv2.VideoCapture(0)
        cap.set(3,int(Res_x/3.0))
        cap.set(4,int(Res_y/3.0))
        WebCam_starting=False
        
# 获取本机ip
def get_host_ip():
    ip='127.0.0.1'
    try:
        s=socket(AF_INET,SOCK_DGRAM)
        s.connect(('8.8.8.8',80))
        ip=s.getsockname()[0]
    finally:
        s.close()
    return ip

        
    


server_thread = Rec_Thread(r_udpServer)
server_thread.start()
#提示本机ip便于接收
Msg_Box_thread = Msg_Box(1,"熊孩子监视器","本机IP:"+get_host_ip()+"\n接收端口:"+str(r_port))
Msg_Box_thread.start()



理论上,上面的脚本可运行在任何平台,但是弹窗提醒只支持Windows平台,截屏函数只支持Windows和MacOS,其他平台有需要的,可以自行替换对应内容。

USwitch端的配置

1.和熊孩子电脑连接同一个路由器

2.进入编辑模式,点击左上角的“地球”,设置一个全局IP(电脑端脚本运行时会显示那个IP)

设置全局IP

3.创建按钮,脚本可以接受"start","stop","warning","camera"等命令,你可以选择性的将它们创建成按钮,由于我们已经设置了全局IP,所以这里每个按钮的的IP将不再重要,相比之下我们更关心端口(127),和发送的消息("start","stop","warning","camera"等)。

创建按钮g

4.设置图传,现版本图传在FPSView中,同样这里最重要的是开启图传,把接收端口设置为9921(如脚本),注意接收端口可以更改,
但是不可以小于四位数,不可以是已经被占用的端口(检查你的会话界面)

图传设置

5.去试一下吧!

同样你可以设置一个会话界面,生成各种自定义的弹窗,这里就不一一介绍了。

本教程到此就结束:)

APP下载

在手机上安装U型遥控器
上一篇下一篇

猜你喜欢

热点阅读