gicisky/examples/send_bleak.py
2025-11-01 00:16:33 +01:00

109 lines
No EOL
3.3 KiB
Python

import asyncio
import sys
import argparse
from PIL import Image
from ble.bleak import BleakBackend
from core.advertisement import parse_advertisement
from core.protocol import GiciskyProtocol
from image.conversion import convert_to_gicisky_bytes
from image import optimize
async def select_device(ble_backend):
"""
Scan for devices and either automatically select the only one or prompt the user to choose.
Returns:
tuple: (device_mac, model_id)
"""
print("Scanning for Gicisky devices...")
devices = await ble_backend.scan_devices()
if not devices:
print("No compatible devices found.")
return None, None
if len(devices) == 1:
# Automatically select the only device
mac = list(devices.keys())[0]
print(f"Found one device: {mac}")
else:
# Multiple devices found, prompt user to select
print("Multiple devices found:")
device_list = list(devices.items())
for i, (mac, _) in enumerate(device_list):
print(f"{i+1}. {mac}")
while True:
try:
choice = int(input("Select device number: ")) - 1
if 0 <= choice < len(device_list):
mac = device_list[choice][0]
break
else:
print("Invalid selection. Please try again.")
except ValueError:
print("Invalid input. Please enter a number.")
# Parse advertisement data to get model ID
adv = devices[mac]
device_data = parse_advertisement(adv)
model_id = device_data.model
print(f"Selected device: {mac}")
print(f"Device model ID: 0x{model_id:02X}")
return mac, model_id
async def main_async(image_path: str, no_optimize: bool = False):
# Initialize BLE backend (Bleak)
ble_backend = BleakBackend()
# Scan and select device
mac, model_id = await select_device(ble_backend)
if not mac or model_id is None:
return
# Load image
original_img = Image.open(image_path)
# Optimize image if not disabled
if no_optimize:
processed_img = original_img
print("Image optimization skipped (--no-optimize flag used)")
else:
processed_img = optimize(original_img, model_id)
print("Image optimized for target device")
# Convert image to Gicisky format bytes
img_bytes = convert_to_gicisky_bytes(processed_img, model_id)
# Connect to the device
print(f"Connecting to BLE device {mac}...")
await ble_backend.connect(mac)
# Create protocol instance with the BLE transport
protocol = GiciskyProtocol(ble_backend)
# Upload image bytes using the Gicisky handshake protocol
print("Starting image upload...")
await protocol.upload_image(img_bytes)
# Disconnect BLE after upload
print("Disconnecting...")
await ble_backend.disconnect()
print("Done.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Send an image to a Gicisky device over BLE")
parser.add_argument("image", help="Path to the image file")
parser.add_argument("--no-optimize", action="store_true",
help="Skip image optimization for target device")
args = parser.parse_args()
asyncio.run(main_async(args.image, args.no_optimize))