109 lines
		
	
	
		
			No EOL
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			109 lines
		
	
	
		
			No EOL
		
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import asyncio
 | 
						|
import sys
 | 
						|
import argparse
 | 
						|
 | 
						|
from PIL import Image
 | 
						|
 | 
						|
from ble.bleak import BleakBackend
 | 
						|
from core.advertisement import parse_advertisement
 | 
						|
from core.protocol import GiciskyProtocol
 | 
						|
from image.conversion import convert_to_gicisky_bytes
 | 
						|
from image import optimize
 | 
						|
 | 
						|
 | 
						|
async def select_device(ble_backend):
 | 
						|
    """
 | 
						|
    Scan for devices and either automatically select the only one or prompt the user to choose.
 | 
						|
    
 | 
						|
    Returns:
 | 
						|
        tuple: (device_mac, model_id)
 | 
						|
    """
 | 
						|
    print("Scanning for Gicisky devices...")
 | 
						|
    devices = await ble_backend.scan_devices()
 | 
						|
    
 | 
						|
    if not devices:
 | 
						|
        print("No compatible devices found.")
 | 
						|
        return None, None
 | 
						|
    
 | 
						|
    if len(devices) == 1:
 | 
						|
        # Automatically select the only device
 | 
						|
        mac = list(devices.keys())[0]
 | 
						|
        print(f"Found one device: {mac}")
 | 
						|
    else:
 | 
						|
        # Multiple devices found, prompt user to select
 | 
						|
        print("Multiple devices found:")
 | 
						|
        device_list = list(devices.items())
 | 
						|
        for i, (mac, _) in enumerate(device_list):
 | 
						|
            print(f"{i+1}. {mac}")
 | 
						|
        
 | 
						|
        while True:
 | 
						|
            try:
 | 
						|
                choice = int(input("Select device number: ")) - 1
 | 
						|
                if 0 <= choice < len(device_list):
 | 
						|
                    mac = device_list[choice][0]
 | 
						|
                    break
 | 
						|
                else:
 | 
						|
                    print("Invalid selection. Please try again.")
 | 
						|
            except ValueError:
 | 
						|
                print("Invalid input. Please enter a number.")
 | 
						|
    
 | 
						|
    # Parse advertisement data to get model ID
 | 
						|
    adv = devices[mac]
 | 
						|
    device_data = parse_advertisement(adv)
 | 
						|
    model_id = device_data.model
 | 
						|
    
 | 
						|
    print(f"Selected device: {mac}")
 | 
						|
    print(f"Device model ID: 0x{model_id:02X}")
 | 
						|
    
 | 
						|
    return mac, model_id
 | 
						|
 | 
						|
 | 
						|
async def main_async(image_path: str, no_optimize: bool = False):
 | 
						|
    # Initialize BLE backend (Bleak)
 | 
						|
    ble_backend = BleakBackend()
 | 
						|
    
 | 
						|
    # Scan and select device
 | 
						|
    mac, model_id = await select_device(ble_backend)
 | 
						|
    if not mac or model_id is None:
 | 
						|
        return
 | 
						|
    
 | 
						|
    # Load image
 | 
						|
    original_img = Image.open(image_path)
 | 
						|
    
 | 
						|
    # Optimize image if not disabled
 | 
						|
    if no_optimize:
 | 
						|
        processed_img = original_img
 | 
						|
        print("Image optimization skipped (--no-optimize flag used)")
 | 
						|
    else:
 | 
						|
        processed_img = optimize(original_img, model_id)
 | 
						|
        print("Image optimized for target device")
 | 
						|
    
 | 
						|
    # Convert image to Gicisky format bytes
 | 
						|
    img_bytes = convert_to_gicisky_bytes(processed_img, model_id)
 | 
						|
 | 
						|
    # Connect to the device
 | 
						|
    print(f"Connecting to BLE device {mac}...")
 | 
						|
    await ble_backend.connect(mac)
 | 
						|
 | 
						|
    # Create protocol instance with the BLE transport
 | 
						|
    protocol = GiciskyProtocol(ble_backend)
 | 
						|
 | 
						|
    # Upload image bytes using the Gicisky handshake protocol
 | 
						|
    print("Starting image upload...")
 | 
						|
    await protocol.upload_image(img_bytes)
 | 
						|
 | 
						|
    # Disconnect BLE after upload
 | 
						|
    print("Disconnecting...")
 | 
						|
    await ble_backend.disconnect()
 | 
						|
    print("Done.")
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    parser = argparse.ArgumentParser(description="Send an image to a Gicisky device over BLE")
 | 
						|
    parser.add_argument("image", help="Path to the image file")
 | 
						|
    parser.add_argument("--no-optimize", action="store_true", 
 | 
						|
                        help="Skip image optimization for target device")
 | 
						|
    
 | 
						|
    args = parser.parse_args()
 | 
						|
    
 | 
						|
    asyncio.run(main_async(args.image, args.no_optimize)) |