89 lines
No EOL
4.1 KiB
Python
89 lines
No EOL
4.1 KiB
Python
import asyncio
|
|
import logging
|
|
|
|
from bleak import BleakClient, BleakGATTCharacteristic, BleakScanner
|
|
from typing import Callable, Awaitable, Optional, Dict
|
|
|
|
from core.protocol import SERVICE_UUID, CHAR_CMD_UUID, CHAR_IMG_UUID
|
|
from logger.logger import GiciskyLogger, LogCategory
|
|
from .interface import BLEInterface, Advertisement
|
|
|
|
|
|
class BleakBackend(BLEInterface):
|
|
"""
|
|
Bleak-based implementation of the BLEInterface abstraction.
|
|
|
|
This backend handles BLE communication using the Bleak library but exposes
|
|
a backend-agnostic interface so users can swap in another library (e.g.,
|
|
BlueZ D-Bus or a custom GATT client) without modifying higher-level logic.
|
|
"""
|
|
|
|
def __init__(self, logger: Optional[logging.Logger] = None):
|
|
self.client: Optional[BleakClient] = None
|
|
self.on_notify: Optional[Callable[[int, bytes], Awaitable[None]]] = None
|
|
self.logger = GiciskyLogger(logger)
|
|
|
|
async def connect(self, mac: str):
|
|
"""Connect to a BLE device by MAC address."""
|
|
self.client = BleakClient(mac)
|
|
await self.client.connect()
|
|
if not self.client.is_connected:
|
|
self.logger.error(f"Failed to connect to {mac}", LogCategory.CONNECTION)
|
|
raise ConnectionError(f"Failed to connect to {mac}")
|
|
self.logger.info(f"Connected to {mac}", LogCategory.CONNECTION)
|
|
|
|
async def write(self, characteristic: str, data: bytes, response: bool = False):
|
|
"""Write data to a characteristic."""
|
|
if not self.client:
|
|
self.logger.error("Not connected to a device", LogCategory.CONNECTION)
|
|
raise RuntimeError("Not connected to a device")
|
|
await self.client.write_gatt_char(characteristic, data, response=response)
|
|
self.logger.debug(f"Wrote {len(data)} bytes to {characteristic}", LogCategory.DATA_TRANSFER)
|
|
|
|
async def start_notify(self, characteristic: str, callback: Callable[[bytes], Awaitable[None]]):
|
|
"""
|
|
Start receiving notifications from a specific characteristic.
|
|
The callback must be an async function taking (handle, data).
|
|
"""
|
|
if not self.client:
|
|
self.logger.error("Not connected to a device", LogCategory.CONNECTION)
|
|
raise RuntimeError("Not connected to a device")
|
|
|
|
async def handler(sender: BleakGATTCharacteristic, data: bytearray):
|
|
# Wrap bleak's callback in async-safe call
|
|
self.logger.debug(f"Received notification from {characteristic}", LogCategory.NOTIFICATION)
|
|
if asyncio.iscoroutinefunction(callback):
|
|
await callback(bytes(data))
|
|
else:
|
|
callback(bytes(data))
|
|
|
|
await self.client.start_notify(characteristic, handler)
|
|
self.logger.info(f"Notifications started on {characteristic}", LogCategory.CONNECTION)
|
|
|
|
async def disconnect(self):
|
|
"""Disconnect safely from the device."""
|
|
if self.client:
|
|
try:
|
|
await self.client.disconnect()
|
|
self.logger.info("Disconnected successfully", LogCategory.CONNECTION)
|
|
except Exception as e:
|
|
if e is EOFError:
|
|
self.logger.info("Disconnected successfully (EOF)", LogCategory.CONNECTION)
|
|
else:
|
|
self.logger.error(f"Disconnect error: {e}", LogCategory.CONNECTION)
|
|
finally:
|
|
self.client = None
|
|
|
|
async def scan_devices(self, mac: str = None) -> Dict[str, Advertisement]:
|
|
ret = {}
|
|
self.logger.info("Starting scan for devices...", LogCategory.CONNECTION)
|
|
discovered = await BleakScanner.discover(return_adv=True)
|
|
for device, adv in discovered.values():
|
|
if (mac is None or mac == device.address) and SERVICE_UUID in adv.service_uuids:
|
|
ret[device.address] = Advertisement(
|
|
name=device.name,
|
|
service_uuids=adv.service_uuids,
|
|
manufacturer_data=adv.manufacturer_data,
|
|
)
|
|
self.logger.info(f"Found {len(discovered)} devices, {len(ret)} of which are compatible", LogCategory.CONNECTION)
|
|
return ret |