心率广播接收器

1. 介绍

长话短说,这段时间夜梦看一些主播玩游戏的时候会把心率显示出来(借助OBS里面的插件就可以实现)。但夜梦不想用OBS,所以就借助codex写了一个小程序,用来接收手表广播的心率。

下载链接(exe软件,下载后直接双击使用即可):心率广播接收器

主界面:

手表打开心率广播后,打开“心率广播接收器”。

直接点击扫描设备,应该可以看到很多蓝牙设备(夜梦这里把UNKNOW设备给过滤了),这里夜梦使用的是华为手环11,可以正常被检测到且链接:

选中手表,然后点击连接设备,下面就会出现心率了,桌面小悬浮窗也会正常显示(心率每秒更新一次):

上面的“警告”为心率警告,如果指定时间内平均心率超过指定值,或者瞬时值超过指定值,桌面悬浮窗就会闪烁提示。

桌面悬浮窗(默认显示在左上角)也有小功能,直接点击悬浮窗可以展开心率曲线(最近半小时):

如果不想用了,可以选择断开连接或者直接退出程序。

2. 代码

import sys
import time
import json
import asyncio
from pathlib import Path
from collections import deque

from bleak import BleakScanner, BleakClient
from PyQt6.QtCore import QPoint, QPointF, Qt, QTimer
from PyQt6.QtGui import QAction, QColor, QFont, QLinearGradient, QPainter, QPainterPath, QPen
from PyQt6.QtWidgets import (
    QApplication,
    QWidget,
    QPushButton,
    QListWidget,
    QLabel,
    QVBoxLayout,
    QHBoxLayout,
    QGridLayout,
    QGroupBox,
    QCheckBox,
    QSpinBox,
    QMenu,
    QStyle,
    QSystemTrayIcon,
)
from qasync import QEventLoop


HR_UUID = "00002A37-0000-1000-8000-00805f9b34fb"
HEART = "❤️"
HISTORY_SECONDS = 30 * 60
NO_DATA_SECONDS = 20
SETTINGS_FILE = Path(__file__).with_name("heart_settings.json")

DEFAULT_ALERTS = {
    "enabled": True,
    "avg_window_minutes": 5,
    "avg_threshold": 95,
    "high_threshold": 120,
    "low_threshold": 50,
}


def load_settings():
    try:
        return json.loads(SETTINGS_FILE.read_text(encoding="utf-8"))
    except (FileNotFoundError, json.JSONDecodeError):
        return {}


def save_settings(settings):
    SETTINGS_FILE.write_text(
        json.dumps(settings, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )


class DesktopHeartWindow(QWidget):
    def __init__(self, owner):
        super().__init__()
        self.owner = owner

        self.setWindowFlags(
            Qt.WindowType.FramelessWindowHint
            | Qt.WindowType.WindowStaysOnTopHint
            | Qt.WindowType.Tool
        )
        self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground)

        self.collapsed_size = (150, 58)
        self.expanded_size = (440, 280)
        self.expanded = False
        self.hr = None
        self.history = []
        self.stats = {}
        self.alert_message = ""
        self.drag_pos = None
        self.press_pos = QPoint()
        self.dragging = False
        self.alert_active = False
        self.flash_on = False

        self.flash_timer = QTimer(self)
        self.flash_timer.setInterval(550)
        self.flash_timer.timeout.connect(self.toggle_flash)

        self.resize(*self.collapsed_size)
        self.move(30, 30)

    def update_hr(self, hr, history, stats):
        self.hr = hr
        self.history = list(history)
        self.stats = stats
        self.update()

    def reset_hr(self):
        self.hr = None
        self.stats = {}
        self.set_alert(False, "")
        self.update()

    def set_alert(self, active, message=""):
        self.alert_message = message

        if self.alert_active == active:
            self.update()
            return

        self.alert_active = active
        self.flash_on = active

        if active:
            self.flash_timer.start()
        else:
            self.flash_timer.stop()

        self.update()

    def toggle_flash(self):
        if not self.alert_active:
            return

        self.flash_on = not self.flash_on
        self.update()

    def toggle_expanded(self):
        self.expanded = not self.expanded
        self.resize(*(self.expanded_size if self.expanded else self.collapsed_size))
        self.update()

    def reset_position(self):
        self.move(30, 30)

    def contextMenuEvent(self, event):
        menu = QMenu(self)

        expand_action = QAction("收起曲线" if self.expanded else "展开曲线", self)
        expand_action.triggered.connect(self.toggle_expanded)
        menu.addAction(expand_action)

        reset_action = QAction("重置位置", self)
        reset_action.triggered.connect(self.reset_position)
        menu.addAction(reset_action)

        show_action = QAction("显示主窗口", self)
        show_action.triggered.connect(self.owner.show_main_window)
        menu.addAction(show_action)

        disconnect_action = QAction("断开连接", self)
        disconnect_action.setEnabled(self.owner.is_connected())
        disconnect_action.triggered.connect(self.owner.request_disconnect)
        menu.addAction(disconnect_action)

        menu.addSeparator()

        quit_action = QAction("退出程序", self)
        quit_action.triggered.connect(self.owner.quit_app)
        menu.addAction(quit_action)

        menu.exec(event.globalPos())

    def mousePressEvent(self, event):
        if event.button() != Qt.MouseButton.LeftButton:
            return

        self.drag_pos = event.globalPosition().toPoint()
        self.press_pos = event.position().toPoint()
        self.dragging = False

    def mouseMoveEvent(self, event):
        if self.drag_pos:
            current_pos = event.globalPosition().toPoint()
            delta = current_pos - self.drag_pos

            if delta.manhattanLength() > 2:
                self.dragging = True

            self.move(self.x() + delta.x(), self.y() + delta.y())
            self.drag_pos = current_pos

    def mouseReleaseEvent(self, event):
        release_pos = event.position().toPoint()
        click_distance = (release_pos - self.press_pos).manhattanLength()

        if not self.dragging and click_distance < 6:
            self.toggle_expanded()

        self.drag_pos = None
        self.dragging = False

    def paintEvent(self, event):
        painter = QPainter(self)
        painter.setRenderHint(QPainter.RenderHint.Antialiasing)

        rect = self.rect().adjusted(1, 1, -1, -1)
        gradient = QLinearGradient(0, 0, rect.width(), rect.height())

        if self.alert_active and self.flash_on:
            gradient.setColorAt(0, QColor(255, 218, 190, 245))
            gradient.setColorAt(1, QColor(255, 105, 82, 245))
            border_color = QColor(220, 76, 48)
            text_color = QColor(120, 32, 20)
        else:
            gradient.setColorAt(0, QColor(225, 245, 255, 235))
            gradient.setColorAt(1, QColor(188, 225, 248, 235))
            border_color = QColor(125, 185, 220)
            text_color = QColor(26, 87, 125)

        painter.setPen(QPen(border_color, 1))
        painter.setBrush(gradient)
        painter.drawRoundedRect(rect, 16, 16)

        painter.setPen(text_color)
        painter.setFont(QFont("Microsoft YaHei", 20, QFont.Weight.Bold))
        text = f"{HEART} {self.hr}" if self.hr is not None else f"{HEART} --"
        painter.drawText(18, 10, self.width() - 36, 38, Qt.AlignmentFlag.AlignVCenter, text)

        if not self.expanded:
            return

        avg_hr = self.stats.get("avg_hr")
        avg_window = self.stats.get("avg_window_minutes", 5)
        min_30m = self.stats.get("min_30m")
        max_30m = self.stats.get("max_30m")

        summary = "等待心率数据"
        if avg_hr is not None:
            summary = f"{avg_window}分钟均值 {avg_hr:.1f}  |  30分钟 {min_30m}-{max_30m} BPM"

        painter.setFont(QFont("Microsoft YaHei", 9))
        painter.setPen(QColor(72, 126, 160))
        painter.drawText(20, 56, self.width() - 40, 20, Qt.AlignmentFlag.AlignLeft, summary)

        chart_top = 82
        if self.alert_message:
            chart_top = 106
            painter.setPen(QColor(185, 58, 35))
            painter.drawText(
                20,
                78,
                self.width() - 40,
                20,
                Qt.AlignmentFlag.AlignLeft,
                self.alert_message,
            )

        chart_rect = self.rect().adjusted(20, chart_top, -20, -24)
        painter.setPen(Qt.PenStyle.NoPen)
        painter.setBrush(QColor(255, 255, 255, 130))
        painter.drawRoundedRect(chart_rect, 12, 12)

        self.draw_chart(painter, chart_rect.adjusted(14, 12, -14, -16))

    def draw_chart(self, painter, rect):
        now = time.time()
        points = [(ts, hr) for ts, hr in self.history if now - ts <= HISTORY_SECONDS]

        painter.setPen(QPen(QColor(178, 214, 235), 1))
        for i in range(4):
            y = rect.top() + i * rect.height() / 3
            painter.drawLine(QPointF(rect.left(), y), QPointF(rect.right(), y))

        if not points:
            painter.setPen(QColor(90, 137, 165))
            painter.setFont(QFont("Microsoft YaHei", 10))
            painter.drawText(rect, Qt.AlignmentFlag.AlignCenter, "等待心率数据")
            return

        values = [hr for _, hr in points]
        min_hr = max(40, min(values) - 5)
        max_hr = min(220, max(values) + 5)

        if max_hr - min_hr < 20:
            middle = (max_hr + min_hr) / 2
            min_hr = max(40, int(middle - 10))
            max_hr = min(220, int(middle + 10))

        path = QPainterPath()

        for index, (ts, hr) in enumerate(points):
            age = now - ts
            x = rect.right() - (age / HISTORY_SECONDS) * rect.width()
            y = rect.bottom() - ((hr - min_hr) / (max_hr - min_hr)) * rect.height()

            if index == 0:
                path.moveTo(x, y)
            else:
                path.lineTo(x, y)

        painter.setPen(QPen(QColor(36, 145, 205), 3))
        painter.drawPath(path)

        last_hr = points[-1][1]
        painter.setPen(QColor(26, 87, 125))
        painter.setFont(QFont("Microsoft YaHei", 9))
        painter.drawText(rect.left(), rect.top() - 2, f"{max_hr} BPM")
        painter.drawText(rect.left(), rect.bottom() + 14, f"{min_hr} BPM")
        painter.drawText(rect.right() - 86, rect.top() - 2, f"当前 {last_hr} BPM")


class HeartGUI(QWidget):
    def __init__(self):
        super().__init__()

        self.loop = asyncio.get_event_loop()
        self.settings = load_settings()
        self.alert_settings = {**DEFAULT_ALERTS, **self.settings.get("alerts", {})}
        self.client = None
        self.notifying = False
        self.devices = []
        self.seen_addresses = set()
        self.scanning = False
        self.manual_disconnect = False
        self.reconnect_task = None
        self.exiting = False
        self.hr_history = deque()
        self.last_data_time = None
        self.current_alert_message = ""

        self.desktop_window = DesktopHeartWindow(self)
        self.desktop_window.show()

        self.setWindowTitle("心率广播接收器")
        self.resize(500, 700)

        self.title = QLabel("心率广播接收器")
        self.title.setAlignment(Qt.AlignmentFlag.AlignCenter)
        self.title.setObjectName("title")

        self.statusLabel = QLabel("请选择设备并连接")
        self.statusLabel.setAlignment(Qt.AlignmentFlag.AlignCenter)
        self.statusLabel.setObjectName("statusLabel")

        self.alertLabel = QLabel("异常提示:无")
        self.alertLabel.setAlignment(Qt.AlignmentFlag.AlignCenter)
        self.alertLabel.setObjectName("alertLabel")

        self.scanBtn = QPushButton("扫描设备")
        self.lastDeviceBtn = QPushButton("连接上次设备")
        self.connectBtn = QPushButton("连接设备")

        self.listWidget = QListWidget()

        self.hrLabel = QLabel(f"{HEART} -- BPM")
        self.hrLabel.setAlignment(Qt.AlignmentFlag.AlignCenter)
        self.hrLabel.setObjectName("heartLabel")

        layout = QVBoxLayout()
        layout.setContentsMargins(18, 18, 18, 18)
        layout.setSpacing(12)
        layout.addWidget(self.title)
        layout.addWidget(self.statusLabel)
        layout.addWidget(self.alertLabel)

        scanLayout = QHBoxLayout()
        scanLayout.addWidget(self.scanBtn)
        scanLayout.addWidget(self.lastDeviceBtn)
        layout.addLayout(scanLayout)
        layout.addWidget(self.listWidget)

        self.build_alert_settings_ui(layout)

        btnLayout = QHBoxLayout()
        btnLayout.addWidget(self.connectBtn)
        layout.addLayout(btnLayout)
        layout.addWidget(self.hrLabel)
        self.setLayout(layout)
        self.apply_theme()

        self.setup_tray()
        self.update_last_device_button()

        self.no_data_timer = QTimer(self)
        self.no_data_timer.setInterval(5000)
        self.no_data_timer.timeout.connect(self.check_no_data_alert)
        self.no_data_timer.start()

        self.scanBtn.clicked.connect(lambda: asyncio.create_task(self.scan()))
        self.lastDeviceBtn.clicked.connect(lambda: asyncio.create_task(self.connect_last_device()))
        self.connectBtn.clicked.connect(lambda: asyncio.create_task(self.toggle_connection()))

    def build_alert_settings_ui(self, parent_layout):
        group = QGroupBox("告警设置")
        grid = QGridLayout()
        grid.setHorizontalSpacing(10)
        grid.setVerticalSpacing(8)

        self.alertEnabledBox = QCheckBox("启用告警")
        self.alertEnabledBox.setChecked(bool(self.alert_settings["enabled"]))
        grid.addWidget(self.alertEnabledBox, 0, 0, 1, 2)

        self.avgWindowSpin = QSpinBox()
        self.avgWindowSpin.setRange(1, 30)
        self.avgWindowSpin.setSuffix(" 分钟")
        self.avgWindowSpin.setValue(int(self.alert_settings["avg_window_minutes"]))
        grid.addWidget(QLabel("均值窗口"), 1, 0)
        grid.addWidget(self.avgWindowSpin, 1, 1)

        self.avgThresholdSpin = QSpinBox()
        self.avgThresholdSpin.setRange(40, 220)
        self.avgThresholdSpin.setSuffix(" BPM")
        self.avgThresholdSpin.setValue(int(self.alert_settings["avg_threshold"]))
        grid.addWidget(QLabel("均值过高"), 2, 0)
        grid.addWidget(self.avgThresholdSpin, 2, 1)

        self.highThresholdSpin = QSpinBox()
        self.highThresholdSpin.setRange(40, 240)
        self.highThresholdSpin.setSuffix(" BPM")
        self.highThresholdSpin.setValue(int(self.alert_settings["high_threshold"]))
        grid.addWidget(QLabel("瞬时过高"), 3, 0)
        grid.addWidget(self.highThresholdSpin, 3, 1)

        self.lowThresholdSpin = QSpinBox()
        self.lowThresholdSpin.setRange(30, 120)
        self.lowThresholdSpin.setSuffix(" BPM")
        self.lowThresholdSpin.setValue(int(self.alert_settings["low_threshold"]))
        grid.addWidget(QLabel("瞬时过低"), 4, 0)
        grid.addWidget(self.lowThresholdSpin, 4, 1)

        group.setLayout(grid)
        parent_layout.addWidget(group)

        self.alertEnabledBox.toggled.connect(self.save_alert_settings)
        self.avgWindowSpin.valueChanged.connect(self.save_alert_settings)
        self.avgThresholdSpin.valueChanged.connect(self.save_alert_settings)
        self.highThresholdSpin.valueChanged.connect(self.save_alert_settings)
        self.lowThresholdSpin.valueChanged.connect(self.save_alert_settings)

    def setup_tray(self):
        self.tray = QSystemTrayIcon(self)
        icon = QApplication.style().standardIcon(QStyle.StandardPixmap.SP_ComputerIcon)
        self.tray.setIcon(icon)
        self.tray.setToolTip("心率广播接收器")

        menu = QMenu(self)

        show_action = QAction("显示主窗口", self)
        show_action.triggered.connect(self.show_main_window)
        menu.addAction(show_action)

        disconnect_action = QAction("断开连接", self)
        disconnect_action.triggered.connect(self.request_disconnect)
        menu.addAction(disconnect_action)

        menu.addSeparator()

        quit_action = QAction("退出程序", self)
        quit_action.triggered.connect(self.quit_app)
        menu.addAction(quit_action)

        self.tray.setContextMenu(menu)
        self.tray.activated.connect(self.on_tray_activated)
        self.tray.show()

    def apply_theme(self):
        self.setStyleSheet(
            """
            QWidget {
                background: #eaf7ff;
                color: #174c68;
                font-family: "Microsoft YaHei";
                font-size: 14px;
            }
            QLabel#title {
                color: #165f86;
                font-size: 22px;
                font-weight: bold;
                padding: 8px;
            }
            QLabel#statusLabel, QLabel#alertLabel {
                color: #386f8b;
                background: #dff3ff;
                border: 1px solid #b3ddf3;
                border-radius: 10px;
                padding: 8px;
            }
            QLabel#alertLabel[alert="true"] {
                color: #8a2718;
                background: #ffe0d2;
                border: 1px solid #ff9f85;
            }
            QLabel#heartLabel {
                color: #115b80;
                background: #d4efff;
                border: 1px solid #9fd4f1;
                border-radius: 16px;
                padding: 18px;
                font-size: 34px;
                font-weight: bold;
            }
            QGroupBox {
                border: 1px solid #a9d8f0;
                border-radius: 12px;
                margin-top: 10px;
                padding: 12px;
                font-weight: bold;
                background: #f4fbff;
            }
            QGroupBox::title {
                subcontrol-origin: margin;
                left: 12px;
                padding: 0 6px;
            }
            QSpinBox {
                background: #f7fcff;
                border: 1px solid #a9d8f0;
                border-radius: 8px;
                padding: 5px 30px 5px 8px;
                min-height: 28px;
                selection-background-color: #bce6fb;
            }
            QSpinBox::up-button, QSpinBox::down-button {
                subcontrol-origin: border;
                width: 24px;
                background: #d7f1ff;
                border-left: 1px solid #a9d8f0;
            }
            QSpinBox::up-button {
                subcontrol-position: top right;
                border-top-right-radius: 8px;
                border-bottom: 1px solid #a9d8f0;
            }
            QSpinBox::down-button {
                subcontrol-position: bottom right;
                border-bottom-right-radius: 8px;
            }
            QSpinBox::up-button:hover, QSpinBox::down-button:hover {
                background: #bce6fb;
            }
            QSpinBox::up-button:pressed, QSpinBox::down-button:pressed {
                background: #8fd0f4;
            }
            QSpinBox::up-arrow {
                image: none;
                width: 0;
                height: 0;
                border-left: 5px solid transparent;
                border-right: 5px solid transparent;
                border-bottom: 6px solid #2d7fa8;
            }
            QSpinBox::down-arrow {
                image: none;
                width: 0;
                height: 0;
                border-left: 5px solid transparent;
                border-right: 5px solid transparent;
                border-top: 6px solid #2d7fa8;
            }
            QPushButton {
                background: #8fd0f4;
                color: #08384f;
                border: 1px solid #67b8e4;
                border-radius: 10px;
                padding: 10px 12px;
                font-weight: bold;
            }
            QPushButton:hover {
                background: #a7dcf7;
            }
            QPushButton:pressed {
                background: #6ebfe9;
            }
            QPushButton:disabled {
                background: #c5e4f3;
                color: #6d8da0;
            }
            QListWidget {
                background: #f7fcff;
                border: 1px solid #a9d8f0;
                border-radius: 12px;
                padding: 8px;
                outline: none;
            }
            QListWidget QScrollBar:vertical {
                background: #e6f6ff;
                width: 12px;
                margin: 8px 3px 8px 0;
                border-radius: 6px;
            }
            QListWidget QScrollBar::handle:vertical {
                background: #9fd4f1;
                min-height: 32px;
                border-radius: 6px;
            }
            QListWidget QScrollBar::handle:vertical:hover {
                background: #76c3eb;
            }
            QListWidget QScrollBar::handle:vertical:pressed {
                background: #4fb0df;
            }
            QListWidget QScrollBar::add-line:vertical,
            QListWidget QScrollBar::sub-line:vertical {
                height: 0;
                border: none;
                background: transparent;
            }
            QListWidget QScrollBar::add-page:vertical,
            QListWidget QScrollBar::sub-page:vertical {
                background: transparent;
            }
            QListWidget QScrollBar:horizontal {
                background: #e6f6ff;
                height: 12px;
                margin: 0 8px 3px 8px;
                border-radius: 6px;
            }
            QListWidget QScrollBar::handle:horizontal {
                background: #9fd4f1;
                min-width: 32px;
                border-radius: 6px;
            }
            QListWidget QScrollBar::handle:horizontal:hover {
                background: #76c3eb;
            }
            QListWidget QScrollBar::handle:horizontal:pressed {
                background: #4fb0df;
            }
            QListWidget QScrollBar::add-line:horizontal,
            QListWidget QScrollBar::sub-line:horizontal {
                width: 0;
                border: none;
                background: transparent;
            }
            QListWidget QScrollBar::add-page:horizontal,
            QListWidget QScrollBar::sub-page:horizontal {
                background: transparent;
            }
            QListWidget::item {
                padding: 8px;
                border-radius: 8px;
            }
            QListWidget::item:selected {
                background: #bce6fb;
                color: #0c4968;
            }
            """
        )

    def is_connected(self):
        return bool(self.client and self.client.is_connected)

    def update_status(self, text):
        self.statusLabel.setText(text)
        self.tray.setToolTip(f"心率广播接收器 - {text}")

    def update_alert_label(self, message):
        self.current_alert_message = message
        self.alertLabel.setText(f"异常提示:{message}" if message else "异常提示:无")
        self.alertLabel.setProperty("alert", "true" if message else "false")
        self.alertLabel.style().unpolish(self.alertLabel)
        self.alertLabel.style().polish(self.alertLabel)
        self.desktop_window.set_alert(bool(message), message)

    def save_alert_settings(self):
        self.alert_settings = {
            "enabled": self.alertEnabledBox.isChecked(),
            "avg_window_minutes": self.avgWindowSpin.value(),
            "avg_threshold": self.avgThresholdSpin.value(),
            "high_threshold": self.highThresholdSpin.value(),
            "low_threshold": self.lowThresholdSpin.value(),
        }
        self.settings["alerts"] = self.alert_settings
        save_settings(self.settings)
        self.evaluate_alerts()

    def update_last_device_button(self):
        last_device = self.settings.get("last_device", {})
        self.lastDeviceBtn.setEnabled(bool(last_device.get("address")))

    def remember_device(self, device):
        self.settings["last_device"] = {
            "name": device.name or "已保存设备",
            "address": device.address,
        }
        save_settings(self.settings)
        self.update_last_device_button()

    async def scan(self):
        if self.scanning:
            return

        self.scanning = True
        self.listWidget.clear()
        self.devices = []
        self.seen_addresses.clear()

        self.scanBtn.setText("扫描中...")
        self.scanBtn.setEnabled(False)
        self.update_status("正在扫描设备")

        def add_device(device, advertisement_data):
            name = device.name or advertisement_data.local_name

            if not name or name.strip().lower() == "unknown":
                return

            if device.address in self.seen_addresses:
                return

            self.seen_addresses.add(device.address)
            self.devices.append(device)
            self.listWidget.addItem(f"{name} | {device.address}")

        try:
            scanner = BleakScanner(detection_callback=add_device)
            await scanner.start()
            await asyncio.sleep(8)
            await scanner.stop()
        except Exception as e:
            self.update_status("扫描失败")
            print(e)
        finally:
            self.scanning = False
            self.scanBtn.setEnabled(True)
            self.scanBtn.setText("重新扫描")

            if self.devices:
                self.update_status(f"扫描完成,发现 {len(self.devices)} 个设备")
            else:
                self.update_status("未发现可显示设备")

    async def toggle_connection(self):
        if self.is_connected():
            await self.disconnect()
        else:
            await self.connect_selected_device()

    async def connect_selected_device(self):
        idx = self.listWidget.currentRow()

        if idx < 0:
            self.update_status("请先选择一个设备")
            return

        device = self.devices[idx]
        await self.connect_to_device(device, save_device=True)

    async def connect_last_device(self):
        last_device = self.settings.get("last_device", {})
        address = last_device.get("address")

        if not address:
            self.update_status("没有保存过的设备")
            return

        name = last_device.get("name", "上次设备")
        await self.connect_to_address(address, name, save_device=False)

    async def connect_to_device(self, device, save_device):
        if save_device:
            self.remember_device(device)

        await self.connect_to_address(
            device.address,
            device.name or "已选择设备",
            save_device=False,
        )

    async def connect_to_address(self, address, name, save_device=False):
        self.manual_disconnect = False
        self.connectBtn.setText("连接中...")
        self.connectBtn.setEnabled(False)
        self.update_status(f"正在连接 {name}")

        try:
            if self.is_connected():
                await self.disconnect(reset_button=False, manual=True)
                self.manual_disconnect = False

            self.client = BleakClient(
                address,
                disconnected_callback=self.handle_ble_disconnect,
            )
            await self.client.connect()
            await self.client.start_notify(HR_UUID, self.hr_callback)

            self.notifying = True
            self.last_data_time = time.time()
            self.connectBtn.setText("断开连接")
            self.update_status(f"已连接 {name}")

            if save_device:
                self.settings["last_device"] = {"name": name, "address": address}
                save_settings(self.settings)
                self.update_last_device_button()

        except Exception as e:
            self.client = None
            self.notifying = False
            self.connectBtn.setText("连接失败,重试")
            self.update_status("连接失败")
            print(e)
        finally:
            self.connectBtn.setEnabled(True)

    async def disconnect(self, reset_button=True, manual=True):
        self.manual_disconnect = manual
        self.connectBtn.setText("断开中...")
        self.connectBtn.setEnabled(False)
        self.update_status("正在断开连接")

        try:
            if self.client and self.client.is_connected:
                if self.notifying:
                    try:
                        await self.client.stop_notify(HR_UUID)
                    except Exception as e:
                        print(e)

                await self.client.disconnect()

        except Exception as e:
            print(e)
        finally:
            self.client = None
            self.notifying = False
            self.last_data_time = None
            self.hrLabel.setText(f"{HEART} -- BPM")
            self.desktop_window.reset_hr()
            self.update_alert_label("")

            if reset_button:
                self.connectBtn.setText("连接设备")
                self.connectBtn.setEnabled(True)
                self.update_status("已断开连接")

    def handle_ble_disconnect(self, client):
        if self.exiting or self.manual_disconnect:
            return

        self.loop.call_soon_threadsafe(
            lambda: asyncio.create_task(self.handle_unexpected_disconnect())
        )

    async def handle_unexpected_disconnect(self):
        if self.exiting:
            return

        self.client = None
        self.notifying = False
        self.last_data_time = None
        self.connectBtn.setText("连接设备")
        self.connectBtn.setEnabled(True)
        self.desktop_window.reset_hr()
        self.update_alert_label("")
        self.update_status("连接已断开,准备自动重连")

        if self.reconnect_task and not self.reconnect_task.done():
            return

        self.reconnect_task = asyncio.create_task(self.reconnect_last_device())

    async def reconnect_last_device(self):
        last_device = self.settings.get("last_device", {})
        address = last_device.get("address")
        name = last_device.get("name", "上次设备")

        if not address:
            self.update_status("连接已断开,没有可自动重连的设备")
            return

        for attempt in range(1, 6):
            if self.manual_disconnect or self.exiting or self.is_connected():
                return

            self.update_status(f"自动重连 {name},第 {attempt}/5 次")
            await asyncio.sleep(5)

            if self.manual_disconnect or self.exiting:
                return

            await self.connect_to_address(address, name, save_device=False)

            if self.is_connected():
                return

        self.update_status("自动重连失败,请手动连接")

    def calculate_stats(self, now):
        avg_window_seconds = self.alert_settings["avg_window_minutes"] * 60
        recent_avg_values = [
            value for timestamp, value in self.hr_history
            if now - timestamp <= avg_window_seconds
        ]
        recent_30m = [value for _, value in self.hr_history]
        avg_hr = sum(recent_avg_values) / len(recent_avg_values) if recent_avg_values else None

        return {
            "avg_hr": avg_hr,
            "avg_window_minutes": self.alert_settings["avg_window_minutes"],
            "min_30m": min(recent_30m) if recent_30m else None,
            "max_30m": max(recent_30m) if recent_30m else None,
        }

    def build_alert_message(self, hr, stats):
        if not self.alert_settings["enabled"]:
            return ""

        alerts = []
        avg_hr = stats.get("avg_hr")

        if avg_hr is not None and avg_hr > self.alert_settings["avg_threshold"]:
            alerts.append(
                f"{self.alert_settings['avg_window_minutes']}分钟均值 {avg_hr:.1f} > "
                f"{self.alert_settings['avg_threshold']}"
            )

        if hr > self.alert_settings["high_threshold"]:
            alerts.append(f"瞬时心率 {hr} > {self.alert_settings['high_threshold']}")

        if hr < self.alert_settings["low_threshold"]:
            alerts.append(f"瞬时心率 {hr} < {self.alert_settings['low_threshold']}")

        return ";".join(alerts)

    def evaluate_alerts(self):
        if not self.hr_history:
            self.update_alert_label("")
            return

        now = time.time()
        hr = self.hr_history[-1][1]
        stats = self.calculate_stats(now)
        message = self.build_alert_message(hr, stats)
        self.update_alert_label(message)
        self.desktop_window.update_hr(hr, self.hr_history, stats)

    def check_no_data_alert(self):
        if not self.is_connected() or not self.alert_settings["enabled"]:
            return

        if self.last_data_time is None:
            return

        if time.time() - self.last_data_time > NO_DATA_SECONDS:
            message = f"超过 {NO_DATA_SECONDS} 秒未收到心率数据"
            self.update_alert_label(message)

    def add_heart_rate(self, hr):
        now = time.time()
        self.last_data_time = now
        self.hr_history.append((now, hr))

        while self.hr_history and now - self.hr_history[0][0] > HISTORY_SECONDS:
            self.hr_history.popleft()

        stats = self.calculate_stats(now)
        message = self.build_alert_message(hr, stats)

        self.hrLabel.setText(f"{HEART} {hr} BPM")
        self.desktop_window.update_hr(hr, self.hr_history, stats)
        self.update_alert_label(message)

    def hr_callback(self, sender, data):
        flags = data[0]

        if flags & 0x01:
            hr = int.from_bytes(data[1:3], "little")
        else:
            hr = data[1]

        self.add_heart_rate(hr)

    def request_disconnect(self):
        asyncio.create_task(self.disconnect())

    def show_main_window(self):
        self.show()
        self.raise_()
        self.activateWindow()

    def on_tray_activated(self, reason):
        if reason == QSystemTrayIcon.ActivationReason.DoubleClick:
            self.show_main_window()

    def closeEvent(self, event):
        if self.exiting:
            event.accept()
            return

        event.ignore()
        self.hide()
        self.update_status("主窗口已隐藏,程序仍在运行")

    def quit_app(self):
        self.exiting = True
        self.manual_disconnect = True
        QApplication.quit()


app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(False)

loop = QEventLoop(app)
asyncio.set_event_loop(loop)

window = HeartGUI()
window.show()

with loop:
    loop.run_forever()
本文为夜梦星尘原创文章。
文章作者:夜梦星尘
文章链接:心率广播接收器
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自夜梦星尘
支持作者:夜梦星尘的爱发电
上一篇
下一篇