commit 735e5de32859e6d4341eb04423a6003cad82d8e1 Author: MassiveBox Date: Sat Nov 1 00:16:33 2025 +0100 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9dd3a0d --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +venv +.lingma +dist/ +*.egg-info/ +build/ +__pycache__/ +*.pyc +.env +.DS_Store \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/gicisky.iml b/.idea/gicisky.iml new file mode 100644 index 0000000..4975be3 --- /dev/null +++ b/.idea/gicisky.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..4a094a7 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,16 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..64df4a4 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,10 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..39203c8 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..08fce05 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 MassiveBox + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..9e440ba --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,3 @@ +include requirements.txt +include README.md +include LICENSE \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..faf47d5 --- /dev/null +++ b/README.md @@ -0,0 +1,82 @@ +# 🏷 Gicisky Python Library + +A Python library for interacting with Gicisky electronic ink display tags via Bluetooth Low Energy (BLE). + +## â„šī¸ Features + +- **Advertisement parsing**: Discover compatible devices and get their info (battery level, model, hardware and software +version) +- **Image uploading**: Upload images to your ESL, with all the features it supports (including third color and compression!) +- **Bluetooth library independent**: Use the provided [Bleak backend](ble/bleak.py) or implement the [BLE Interface](ble/interface.py) +to use any other Bluetooth library. +- **Image conversion**: Provide any image and let the library convert it for you to the device's format. + +## 📱 Supported Devices + +- EPD 2.1" BWR (0x0B) +- EPD 2.9" BWR (0x33) +- EPD 4.2" BWR (0x4B) +- EPD 7.5" BWR (0x2B) +- TFT 2.1" BW (0xA0) + +## âŦ‡ī¸ Installation + +```bash +pip install gicisky +``` + +Or install from source: + +```bash +git clone https://git.massive.box/massivebox/gicisky +cd gicisky +pip install -r requirements.txt +``` + +## â–ļī¸ Quick Start + +Check out the [examples](examples)! + +```bash +python3 examples/send_bleak.py ~/path/to/image.png +python3 examples/send_bleak.py --no-optimize ~/path/to/image.png # Without dithering +``` + +## âš™ī¸ Components + +1. **BLE Interface** ([ble/](file:///home/massive/Dev/gicisky/ble/)): Handles Bluetooth Low Energy communication +2. **Core Protocol** ([core/](file:///home/massive/Dev/gicisky/core/)): Implements the Gicisky communication protocol, independent of the Bluetooth library +3. **Image Processing** ([image/](file:///home/massive/Dev/gicisky/image/)): `conversion` formats images to the Gicisky format, `optimizer` (optional) uses +dithering and letterboxing for better results +4. **Logging** ([logger/](file:///home/massive/Dev/gicisky/logger/)): Provides detailed logging capabilities + +## 🧱 Requirements + +- Python 3.7+ +- bleak (for BLE communication) +- PIL/Pillow (for image processing) +- numpy (for image manipulation) + +Install all requirements: + +```bash +pip install -r requirements.txt +``` + +## â¤ī¸ Credits + +- For much of the original work on the protocol: [atc1441/ATC_GICISKY_ESL](https://github.com/atc1441/ATC_GICISKY_ESL) +- For most of the advertisement parsing logic: [eigger/hass-gicisky](https://github.com/eigger/hass-gicisky) + +## 🤝 Support the Project + +Thanks for your interest in supporting the project. + +- Help me by opening issues and creating pull requests: all contributions are welcome! +- If you want to contribute financially, take a look [here](https://s.massive.box/donate). Thanks a lot! +- If you haven't bought your Gigisky ESL yet, please buy it through my [AliExpress affiliate link](https://s.click.aliexpress.com/e/_c2IOUkF5). +I will earn a small commission from your order, but it will not cost you anything. Thanks! + +## 📚 License + +MIT License \ No newline at end of file diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..8f9d0eb --- /dev/null +++ b/__init__.py @@ -0,0 +1,7 @@ +""" +Gicisky - A Python library for interacting with Gicisky electronic ink display tags. +""" + +__version__ = "0.1.0" +__author__ = "Your Name" +__email__ = "your.email@example.com" \ No newline at end of file diff --git a/ble/__init__.py b/ble/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ble/bleak.py b/ble/bleak.py new file mode 100644 index 0000000..5b42613 --- /dev/null +++ b/ble/bleak.py @@ -0,0 +1,89 @@ +import asyncio +import logging + +from bleak import BleakClient, BleakGATTCharacteristic, BleakScanner +from typing import Callable, Awaitable, Optional, Dict + +from core.protocol import SERVICE_UUID, CHAR_CMD_UUID, CHAR_IMG_UUID +from logger.logger import GiciskyLogger, LogCategory +from .interface import BLEInterface, Advertisement + + +class BleakBackend(BLEInterface): + """ + Bleak-based implementation of the BLEInterface abstraction. + + This backend handles BLE communication using the Bleak library but exposes + a backend-agnostic interface so users can swap in another library (e.g., + BlueZ D-Bus or a custom GATT client) without modifying higher-level logic. + """ + + def __init__(self, logger: Optional[logging.Logger] = None): + self.client: Optional[BleakClient] = None + self.on_notify: Optional[Callable[[int, bytes], Awaitable[None]]] = None + self.logger = GiciskyLogger(logger) + + async def connect(self, mac: str): + """Connect to a BLE device by MAC address.""" + self.client = BleakClient(mac) + await self.client.connect() + if not self.client.is_connected: + self.logger.error(f"Failed to connect to {mac}", LogCategory.CONNECTION) + raise ConnectionError(f"Failed to connect to {mac}") + self.logger.info(f"Connected to {mac}", LogCategory.CONNECTION) + + async def write(self, characteristic: str, data: bytes, response: bool = False): + """Write data to a characteristic.""" + if not self.client: + self.logger.error("Not connected to a device", LogCategory.CONNECTION) + raise RuntimeError("Not connected to a device") + await self.client.write_gatt_char(characteristic, data, response=response) + self.logger.debug(f"Wrote {len(data)} bytes to {characteristic}", LogCategory.DATA_TRANSFER) + + async def start_notify(self, characteristic: str, callback: Callable[[bytes], Awaitable[None]]): + """ + Start receiving notifications from a specific characteristic. + The callback must be an async function taking (handle, data). + """ + if not self.client: + self.logger.error("Not connected to a device", LogCategory.CONNECTION) + raise RuntimeError("Not connected to a device") + + async def handler(sender: BleakGATTCharacteristic, data: bytearray): + # Wrap bleak's callback in async-safe call + self.logger.debug(f"Received notification from {characteristic}", LogCategory.NOTIFICATION) + if asyncio.iscoroutinefunction(callback): + await callback(bytes(data)) + else: + callback(bytes(data)) + + await self.client.start_notify(characteristic, handler) + self.logger.info(f"Notifications started on {characteristic}", LogCategory.CONNECTION) + + async def disconnect(self): + """Disconnect safely from the device.""" + if self.client: + try: + await self.client.disconnect() + self.logger.info("Disconnected successfully", LogCategory.CONNECTION) + except Exception as e: + if e is EOFError: + self.logger.info("Disconnected successfully (EOF)", LogCategory.CONNECTION) + else: + self.logger.error(f"Disconnect error: {e}", LogCategory.CONNECTION) + finally: + self.client = None + + async def scan_devices(self, mac: str = None) -> Dict[str, Advertisement]: + ret = {} + self.logger.info("Starting scan for devices...", LogCategory.CONNECTION) + discovered = await BleakScanner.discover(return_adv=True) + for device, adv in discovered.values(): + if (mac is None or mac == device.address) and SERVICE_UUID in adv.service_uuids: + ret[device.address] = Advertisement( + name=device.name, + service_uuids=adv.service_uuids, + manufacturer_data=adv.manufacturer_data, + ) + self.logger.info(f"Found {len(discovered)} devices, {len(ret)} of which are compatible", LogCategory.CONNECTION) + return ret \ No newline at end of file diff --git a/ble/interface.py b/ble/interface.py new file mode 100644 index 0000000..892bcec --- /dev/null +++ b/ble/interface.py @@ -0,0 +1,45 @@ +import dataclasses +from abc import ABC, abstractmethod +from typing import Callable, Awaitable, TypedDict, Dict + +@dataclasses.dataclass +class Advertisement: + """ + BLE advertisement data. + """ + name: str + manufacturer_data: {int: bytes} + service_uuids: list[str] + +class BLEInterface(ABC): + @abstractmethod + async def connect(self, mac: str): + pass + + @abstractmethod + async def write(self, characteristic: str, data: bytes, response: bool = False): + pass + + @abstractmethod + async def start_notify(self, characteristic: str, callback: Callable[[bytes], Awaitable[None]]): + pass + + @abstractmethod + async def disconnect(self): + pass + + @abstractmethod + async def scan_devices(self, mac: str = None) -> Dict[str, Advertisement]: + """ + Scan for compatible devices and get their BLE advertisement data. A device is considered compatible when it + advertises the service UUID specified in core.protocol.SERVICE_UUID. + This function uses BLE advertisements, which are rarely available when the device is connected! + + Args: + mac: Optional MAC address to filter results + + Returns: + A dictionary with MAC address as key, advertisement data as value. Only compatible devices matching the + MAC (if specified) are returned. + """ + pass diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/advertisement.py b/core/advertisement.py new file mode 100644 index 0000000..a1c057f --- /dev/null +++ b/core/advertisement.py @@ -0,0 +1,33 @@ +import dataclasses + +from ble.interface import Advertisement +from image.conversion import DEVICE_SPECS, DeviceSpec, ModelId + + +@dataclasses.dataclass +class DeviceData: + name: str + model: ModelId + firmware: int + hardware: int + battery: float + voltage: float + +def parse_advertisement(adv: Advertisement) -> DeviceData: + + data = adv.manufacturer_data[0x5053] + device_id = data[0] + volt = data[1] / 10 + firmware = (data[2] << 8) + data[3] + hardware = (data[0] << 8) + data[4] + + device = DEVICE_SPECS[device_id] + + return DeviceData( + name=adv.name, + model=device_id, + firmware=firmware, + hardware=hardware, + battery=(volt - device.min_voltage) * 100 / (device.max_voltage - device.min_voltage), + voltage=volt + ) \ No newline at end of file diff --git a/core/protocol.py b/core/protocol.py new file mode 100644 index 0000000..da692ff --- /dev/null +++ b/core/protocol.py @@ -0,0 +1,86 @@ +import logging +import sys, asyncio, struct, time +from typing import Optional + +from ble.interface import BLEInterface, Advertisement +from logger.logger import GiciskyLogger, LogCategory + +# Default UUIDs for Gicisky tags +BASE_SERVICE = 0xFEF0 +SERVICE_UUID = f"0000{BASE_SERVICE:04x}-0000-1000-8000-00805f9b34fb" +CHAR_CMD_UUID = f"0000{BASE_SERVICE+1:04x}-0000-1000-8000-00805f9b34fb" +CHAR_IMG_UUID = f"0000{BASE_SERVICE+2:04x}-0000-1000-8000-00805f9b34fb" + +# Constants +CHUNK_SIZE = 240 # 480 hex chars (as seen in the JS uploader) +MTU_WAIT = 0.03 # delay between chunks (adjustable) +DEBUG = True + +def log(msg): + if DEBUG: + print(f"[LOG] {msg}", flush=True) + +class GiciskyProtocol: + + def __init__(self, ble: BLEInterface, logger: Optional[logging.Logger] = None): + self.ble = ble + self.packet_index = 0 + self.ready_to_send = False + self.img_hex = b"" + self.upload_done = False + self.logger = GiciskyLogger(logger) + + async def handle_notification(self, data): + hx = data.hex() + self.logger.debug(f"Notify: {hx}", LogCategory.NOTIFICATION) + if hx.startswith("01f400"): + # Step 2: ready to accept image size + self.logger.info("Device ready to accept image size", LogCategory.PROTOCOL) + await self.send_command(2, struct.pack(" bytes: + """ + Process image according to Gicisky device specifications. + Usage with image.optimize is recommended. + + Args: + img: PIL Image to process + model: Device model identifier + lum_threshold: Luminance threshold for black/white conversion + red_threshold: Threshold for red color detection + + Returns: + bytes: Processed image data in Gicisky format + """ + # Get device specifications + specs = DEVICE_SPECS.get(model) + if not specs: + raise ValueError("Unknown model") + + width, height = specs.size + + # Resize image to device dimensions + img = img.convert("RGB").resize((width, height), Image.Resampling.LANCZOS) + + # Apply TFT transformation if enabled + if specs.tft: + # Resize to half width and double height + img = img.resize((width // 2, height * 2), Image.Resampling.LANCZOS) + width, height = img.size + + # Apply mirroring if enabled + if specs.mirror: + img = img.transpose(Image.FLIP_TOP_BOTTOM) + + # Convert to numpy array + arr = np.array(img) + + # Process pixels - column-major order + byte_data, red_byte_data = [], [] + current_byte, current_red_byte = 0, 0 + bit_position = 7 + + for x in range(width): + for y in range(height): + r, g, b = arr[y, x] # Note: numpy uses [y, x] indexing + luminance = 0.2126 * r + 0.7152 * g + 0.0722 * b + + # Apply thresholding based on compression setting + if specs.compression: + # When compression is enabled, dark pixels set bits + if luminance < lum_threshold: + current_byte |= (1 << bit_position) + else: + # When compression is disabled, light pixels set bits + if luminance > lum_threshold: + current_byte |= (1 << bit_position) + + # Red color detection + if r > red_threshold and g < red_threshold: + current_red_byte |= (1 << bit_position) + + bit_position -= 1 + if bit_position < 0: + byte_data.append(current_byte) + red_byte_data.append(current_red_byte) + current_byte, current_red_byte = 0, 0 + bit_position = 7 + + # Handle remaining bits + if bit_position != 7: + byte_data.append(current_byte) + red_byte_data.append(current_red_byte) + + # Apply compression if enabled + if specs.compression: + byte_data_compressed = _apply_compression(byte_data, red_byte_data, width, height, specs.second_color) + else: + # Simple concatenation when compression is disabled + byte_data_compressed = byte_data[:] + if specs.second_color: + byte_data_compressed.extend(red_byte_data) + + return bytes(byte_data_compressed) + + +def _apply_compression(byte_data: List[int], red_byte_data: List[int], + width: int, height: int, second_color: bool) -> List[int]: + """ + Apply compression algorithm + + Args: + byte_data: Black/white pixel data + red_byte_data: Red pixel data + width: Image width + height: Image height + second_color: Whether to include second color (red) data + + Returns: + List[int]: Compressed byte data + """ + byte_data_compressed = [0x00, 0x00, 0x00, 0x00] # Header + byte_per_line = height // 8 + current_pos = 0 + + # Process black/white data + for i in range(width): + # Add line header + byte_data_compressed.extend([ + 0x75, + byte_per_line + 7, + byte_per_line, + 0x00, + 0x00, + 0x00, + 0x00 + ]) + + # Add pixel data for this line + for b in range(byte_per_line): + if current_pos < len(byte_data): + byte_data_compressed.append(byte_data[current_pos]) + else: + byte_data_compressed.append(0x00) # Padding + current_pos += 1 + + # Process red data if enabled + if second_color: + current_pos = 0 + for i in range(width): + # Add line header + byte_data_compressed.extend([ + 0x75, + byte_per_line + 7, + byte_per_line, + 0x00, + 0x00, + 0x00, + 0x00 + ]) + + # Add pixel data for this line + for b in range(byte_per_line): + if current_pos < len(red_byte_data): + byte_data_compressed.append(red_byte_data[current_pos]) + else: + byte_data_compressed.append(0x00) # Padding + current_pos += 1 + + # Update header with total length + total_length = len(byte_data_compressed) + byte_data_compressed[0] = total_length & 0xff + byte_data_compressed[1] = (total_length >> 8) & 0xff + byte_data_compressed[2] = (total_length >> 16) & 0xff + byte_data_compressed[3] = (total_length >> 24) & 0xff + + return byte_data_compressed diff --git a/image/optimizer.py b/image/optimizer.py new file mode 100644 index 0000000..3302695 --- /dev/null +++ b/image/optimizer.py @@ -0,0 +1,59 @@ +from PIL import Image +import numpy as np +from typing import Tuple +from .conversion import DEVICE_SPECS, ModelId, DeviceSpec + + +def optimize(img: Image.Image, model: ModelId) -> Image.Image: + """ + Optimize an image for a specific Gicisky device model. + + Args: + img: Input PIL Image + model: Device model identifier + + Returns: + Image.Image: Optimized image + """ + specs: DeviceSpec = DEVICE_SPECS.get(model) + if not specs: + raise ValueError(f"Unknown model: {model}") + target_width, target_height = specs.size + + canvas = Image.new("RGB", (target_width, target_height), color="white") + img_width, img_height = img.size + scale = min(target_width / img_width, target_height / img_height) + new_width = int(img_width * scale) + new_height = int(img_height * scale) + + resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) + x_offset = (target_width - new_width) // 2 + y_offset = (target_height - new_height) // 2 + + canvas.paste(resized_img, (x_offset, y_offset)) + + palette = Image.new("P", (1, 1)) + colors = 3 + + # Apply appropriate color conversion + if specs.second_color: + palette.putpalette([ + 0, 0, 0, # Black + 255, 255, 255, # White + 255, 0, 0 # Red + ]) + else: + palette.putpalette([ + 0, 0, 0, # Black + 255, 255, 255, # White + ]) + colors = 2 + + processed_img = canvas.quantize(method=Image.MEDIANCUT, + colors=colors, + kmeans=0, + palette=palette) + processed_img = processed_img.convert("RGB") + + return processed_img + diff --git a/logger/__init__.py b/logger/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/logger/logger.py b/logger/logger.py new file mode 100644 index 0000000..2dcb5b7 --- /dev/null +++ b/logger/logger.py @@ -0,0 +1,42 @@ +import logging +from typing import Optional +from enum import Enum + +class LogCategory(Enum): + CONNECTION = "connection" + PROTOCOL = "protocol" + DATA_TRANSFER = "data_transfer" + NOTIFICATION = "notification" + COMMAND = "command" + +class GiciskyLogger: + def __init__(self, logger: Optional[logging.Logger] = None): + if logger: + self.logger = logger + else: + self.logger = logging.getLogger("gicisky") + if not self.logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter('[%(name)s] %(levelname)s: %(message)s') + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + + def log(self, level: int, message: str, category: LogCategory = None, **kwargs): + extra_info = "" + if category: + extra_info = f"[{category.value}] " + + self.logger.log(level, f"{extra_info}{message}", **kwargs) + + def debug(self, message: str, category: LogCategory = None): + self.log(logging.DEBUG, message, category) + + def info(self, message: str, category: LogCategory = None): + self.log(logging.INFO, message, category) + + def warning(self, message: str, category: LogCategory = None): + self.log(logging.WARNING, message, category) + + def error(self, message: str, category: LogCategory = None): + self.log(logging.ERROR, message, category) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..15f2a3f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +bleak>=1.1.1 +numpy>=2.2.6 +Pillow>=12.0.0 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..ff7c3eb --- /dev/null +++ b/setup.py @@ -0,0 +1,33 @@ +from setuptools import setup, find_packages + +with open("README.md", "r", encoding="utf-8") as fh: + long_description = fh.read() + +with open("requirements.txt", "r", encoding="utf-8") as fh: + requirements = [line.strip() for line in fh if line.strip() and not line.startswith("#")] + +setup( + name="gicisky", + version="0.1.0", + author="MassiveBox", + author_email="box@massive.box", + description="A Python library for interacting with Gicisky E-Ink display tags via Bluetooth Low Energy", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://git.massive.box/massivebox/gicisky", + packages=find_packages(), + classifiers=[ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + ], + python_requires=">=3.7", + install_requires=requirements, +) \ No newline at end of file