Python使用bleak连接低功耗蓝牙读数据和接收通知

2023-04-19  本文已影响0人  itfitness

实现代码

import asyncio
from bleak import BleakClient

# 蓝牙设备的MAC地址
DEVICE_ADDRESS = ""
# 写数据的UUID
SERVICE_WRITE_UUID = "0000fff2-0000-1000-8000-00805f9b34fb"
# 接收通知的UUID
SERVICE_NOTIFY_UUID = "0000fff1-0000-1000-8000-00805f9b34fb"

# 发送的数据(这里是要发送的指令)
openDoor_send = [0x68, 0x06, 0x01]

# 记录数据
watchData = []


async def main(address):
    async with BleakClient(address) as client:
        try:
            # 是否连接
            if not client.is_connected:
                client.connect()
            print(f"Connected: {client.is_connected}")
            # 是否配对
            paired = await client.pair(protection_level=2)
            print(f"Paired: {paired}")
            # 开启通知的接收
            await client.start_notify(SERVICE_NOTIFY_UUID, notify_callback)
            # 循环发送指令
            while client.is_connected and paired:
                print("send data")
                await client.write_gatt_char(SERVICE_WRITE_UUID, bytes(openDoor_send))
                await asyncio.sleep(3.0)
            print('Disconnected')
        except Exception as e:
            print(f"Exception during write_and_listen loop: {e}")
        finally:
            # 结束监听
            await client.stop_notify(SERVICE_NOTIFY_UUID)
            # 断开与蓝牙设备的连接
            await client.disconnect()
            print("结束")


# 回调监听
def notify_callback(sender, data):
    try:
        # 有的数据一次发送不完所以需要判断一下拼接到一起
        for temp in data:
            watchData.append(temp)
        if data.hex().endswith("16"):
            # 处理从设备发来的Notify数据
            print(f"Received notification from {sender}: {data.hex()}")
            print(len(watchData))
            print(f"HeartRateData: {str(watchData[5])}")
            print(f"BloodOxygenData: {str(watchData[24])}")
            print(f"SystolicBloodPressureData: {str(watchData[25])}")
            print(f"DiastolicBloodPressureData: {str(watchData[26])}")
            watchData.clear()
    except Exception as e:
        print(f"Exception during write_and_listen loop: {e}")


if __name__ == '__main__':
    try:
        asyncio.run(main(DEVICE_ADDRESS))
    except:
        pass
上一篇下一篇

猜你喜欢

热点阅读