diff --git a/library/lcd/lcd_comm_rev_c.py b/library/lcd/lcd_comm_rev_c.py index 91aeb7f3..fd44ab75 100644 --- a/library/lcd/lcd_comm_rev_c.py +++ b/library/lcd/lcd_comm_rev_c.py @@ -30,7 +30,7 @@ from serial.tools.list_ports import comports from library.lcd.lcd_comm import Orientation, LcdComm -from library.lcd.serialize import image_to_BGRA, image_to_BGR, chunked +from library.lcd.serialize import image_to_BGRA, image_to_compressed_BGRA, chunked from library.log import logger @@ -92,17 +92,11 @@ class Command(Enum): NO_FLIP = bytearray((0x00,)) SEND_PAYLOAD = bytearray((0xFF,)) - def __init__(self, command): - self.command = command - class Padding(Enum): NULL = bytearray([0x00]) START_DISPLAY_BITMAP = bytearray([0x2c]) - def __init__(self, command): - self.command = command - class SleepInterval(Enum): OFF = bytearray((0x00,)) @@ -117,9 +111,6 @@ class SleepInterval(Enum): NINE = bytearray((0x09,)) TEN = bytearray((0x0a,)) - def __init__(self, command): - self.command = command - class SubRevision(Enum): UNKNOWN = "" @@ -127,9 +118,6 @@ class SubRevision(Enum): REV_5INCH = "chs_5inch" REV_8INCH = "chs_88inch" - def __init__(self, command): - self.command = command - # This class is for Turing Smart Screen 2.1" / 5" / 8" screens class LcdCommRevC(LcdComm): @@ -146,7 +134,20 @@ def __del__(self): def auto_detect_com_port() -> Optional[str]: com_ports = comports() - # Try to find awake device through serial number or vid/pid + # First, try to find sleeping device and wake it up + for com_port in com_ports: + if com_port.serial_number == 'USB7INCH' or com_port.serial_number == 'CT21INCH': + LcdCommRevC._wake_up_device(com_port) + return LcdCommRevC.auto_detect_com_port() + if com_port.vid == 0x1a86 and com_port.pid == 0xca21: + LcdCommRevC._wake_up_device(com_port) + return LcdCommRevC.auto_detect_com_port() + + return LcdCommRevC._get_awake_com_port(com_ports) + + @staticmethod + def _get_awake_com_port(com_ports) -> Optional[str]: + # Then try to find awake device through serial number or vid/pid for com_port in com_ports: if com_port.serial_number == '20080411': return com_port.device @@ -155,25 +156,24 @@ def auto_detect_com_port() -> Optional[str]: if com_port.vid == 0x1d6b and (com_port.pid == 0x0121 or com_port.pid == 0x0106): return com_port.device - # Try to find sleeping device and wake it up - for com_port in com_ports: - if com_port.serial_number == 'USB7INCH' or com_port.serial_number == 'CT21INCH': - LcdCommRevC._connect_to_reset_device_name(com_port) - return LcdCommRevC.auto_detect_com_port() - if com_port.serial_number == '20080411': - return com_port.device - return None @staticmethod - def _connect_to_reset_device_name(com_port): + def _wake_up_device(com_port): # this device enumerates differently when off, we need to connect once to reset it to correct COM device - try: - logger.debug(f"Waiting for device {com_port} to be turned ON...") - serial.Serial(com_port.device, 115200, timeout=1, rtscts=True) - except serial.SerialException: - pass - time.sleep(10) + logger.debug(f"Waiting for device {com_port} to be turned ON...") + + for i in range(15): + try: + # Try to connect every second, since it takes sometimes multiple connect to wake up the device + serial.Serial(com_port.device, 115200, timeout=1, rtscts=True) + except serial.SerialException: + pass + + if LcdCommRevC._get_awake_com_port(comports()) is not None: + time.sleep(1) + return + time.sleep(1) def _send_command(self, cmd: Command, payload: Optional[bytearray] = None, padding: Optional[Padding] = None, bypass_queue: bool = False, readsize: Optional[int] = None): @@ -210,29 +210,23 @@ def _send_command(self, cmd: Command, payload: Optional[bytearray] = None, paddi def _hello(self): # This command reads LCD answer on serial link, so it bypasses the queue self.sub_revision = SubRevision.UNKNOWN + self.serial_flush_input() self._send_command(Command.HELLO, bypass_queue=True) - response = str(self.serial_read(23).decode(errors="ignore")) + response = ''.join( + filter(lambda x: x in set(string.printable), str(self.serial_read(23).decode(errors="ignore")))) self.serial_flush_input() - logger.debug("HW sub-revision returned: %s" % ''.join(filter(lambda x: x in set(string.printable), response))) - - # Note: sub-revisions returned by display are not reliable e.g. 2.1" displays return "chs_5inch" - # if response.startswith(SubRevision.REV_5INCH.value): - # self.sub_revision = SubRevision.REV_5INCH - # self.display_width = 480 - # self.display_height = 800 - # elif response.startswith(SubRevision.REV_2INCH.value): - # self.sub_revision = SubRevision.REV_2INCH - # self.display_width = 480 - # self.display_height = 480 - # elif response.startswith(SubRevision.REV_8INCH.value): - # self.sub_revision = SubRevision.REV_8INCH - # self.display_width = 480 - # self.display_height = 1920 - # else: - # logger.warning("Display returned unknown sub-revision on Hello answer (%s)" % str(response)) - # logger.debug("HW sub-revision detected: %s" % (str(self.sub_revision))) - - # Relay on width/height for sub-revision detection + logger.debug("Display ID returned: %s" % response) + while not response.startswith("chs_"): + logger.warning("Display returned invalid or unsupported ID, try again in 1 second") + time.sleep(1) + self._send_command(Command.HELLO, bypass_queue=True) + response = ''.join( + filter(lambda x: x in set(string.printable), str(self.serial_read(23).decode(errors="ignore")))) + self.serial_flush_input() + logger.debug("Display ID returned: %s" % response) + + # Note: ID returned by display are not reliable for some models e.g. 2.1" displays return "chs_5inch" + # Rely on width/height for sub-revision detection if self.display_width == 480 and self.display_height == 480: self.sub_revision = SubRevision.REV_2INCH elif self.display_width == 480 and self.display_height == 800: @@ -242,6 +236,18 @@ def _hello(self): else: logger.error(f"Unsupported resolution {self.display_width}x{self.display_height} for revision C") + # Detect ROM version + try: + self.rom_version = int(response.split(".")[2]) + if self.rom_version < 80 or self.rom_version > 100: + logger.warning("ROM version %d may be invalid, use default ROM version 87" % self.rom_version) + self.rom_version = 87 + except: + logger.warning("Display returned invalid or unsupported ID, use default ROM version 87") + self.rom_version = 87 + + logger.debug("HW sub-revision detected: %s, ROM version: %d" % ((str(self.sub_revision)), self.rom_version)) + def InitializeComm(self): self._hello() @@ -250,8 +256,15 @@ def Reset(self): # Reset command bypasses queue because it is run when queue threads are not yet started self._send_command(Command.RESTART, bypass_queue=True) self.closeSerial() - # Wait for display reset then reconnect - time.sleep(15) + # Wait for disconnection (max. 15 seconds) + for i in range(15): + if LcdCommRevC._get_awake_com_port(comports()) is not None: + time.sleep(1) + # Wait for reconnection (max. 15 seconds) + for i in range(15): + if LcdCommRevC._get_awake_com_port(comports()) is None: + time.sleep(1) + # Reconnect to device self.openSerial() def Clear(self): @@ -267,13 +280,13 @@ def Clear(self): self.SetOrientation(orientation=backup_orientation) def ScreenOff(self): - logger.info("Calling ScreenOff") + # logger.info("Calling ScreenOff") self._send_command(Command.STOP_VIDEO) self._send_command(Command.STOP_MEDIA, readsize=1024) self._send_command(Command.TURNOFF) def ScreenOn(self): - logger.info("Calling ScreenOn") + # logger.info("Calling ScreenOn") self._send_command(Command.STOP_VIDEO) self._send_command(Command.STOP_MEDIA, readsize=1024) # self._send_command(Command.SET_BRIGHTNESS, payload=bytearray([255])) @@ -293,8 +306,8 @@ def SetOrientation(self, orientation: Orientation = Orientation.PORTRAIT): # logger.info(f"Call SetOrientation to: {self.orientation.name}") # if self.orientation == Orientation.REVERSE_LANDSCAPE or self.orientation == Orientation.REVERSE_PORTRAIT: - # b = Command.STARTMODE_DEFAULT.value + Padding.NULL.value + Command.FLIP_180.value + SleepInterval.OFF.value - # self._send_command(Command.OPTIONS, payload=b) + # b = Command.STARTMODE_DEFAULT.value + Padding.NULL.value + Command.FLIP_180.value + SleepInterval.OFF.value + # self._send_command(Command.OPTIONS, payload=b) # else: b = Command.STARTMODE_DEFAULT.value + Padding.NULL.value + Command.NO_FLIP.value + SleepInterval.OFF.value self._send_command(Command.OPTIONS, payload=b) @@ -339,7 +352,8 @@ def DisplayPILImage( display_bmp_cmd = Command.DISPLAY_BITMAP_8INCH self._send_command(display_bmp_cmd, - payload=bytearray(int(self.display_width * self.display_width / 64).to_bytes(2, "big"))) + payload=bytearray( + int(self.display_width * self.display_width / 64).to_bytes(2, "big"))) self._send_command(Command.SEND_PAYLOAD, payload=bytearray(self._generate_full_image(image)), readsize=1024) @@ -354,6 +368,7 @@ def DisplayPILImage( def _generate_full_image(self, image: Image.Image) -> bytes: if self.sub_revision == SubRevision.REV_8INCH: + # Switch landscape/portrait mode for 8" if self.orientation == Orientation.LANDSCAPE: image = image.rotate(270, expand=True) elif self.orientation == Orientation.REVERSE_LANDSCAPE: @@ -370,7 +385,7 @@ def _generate_full_image(self, image: Image.Image) -> bytes: elif self.orientation == Orientation.REVERSE_LANDSCAPE: image = image.rotate(180) - bgra_data = image_to_BGRA(image) + bgra_data, pixel_size = image_to_BGRA(image) return b'\x00'.join(chunked(bgra_data, 249)) @@ -379,6 +394,7 @@ def _generate_update_image( ) -> Tuple[bytearray, bytearray]: x0, y0 = x, y if self.sub_revision == SubRevision.REV_8INCH: + # Switch landscape/portrait mode for 8" if self.orientation == Orientation.LANDSCAPE: image = image.rotate(270, expand=True) y0 = self.get_height() - y - image.width @@ -408,9 +424,18 @@ def _generate_update_image( y0 = x img_raw_data = bytearray() - bgr_data = image_to_BGR(image) - for h, line in enumerate(chunked(bgr_data, image.width * 3)): + + # Some screens require different RGBA encoding + if self.rom_version > 88: + # BGRA mode on 4 bytes : [B, G, R, A] + img_data, pixel_size = image_to_BGRA(image) + else: + # BGRA mode on 3 bytes: [6-bit B + 2-bit A, 6-bit G + 2-bit A, 8-bit R] + img_data, pixel_size = image_to_compressed_BGRA(image) + + for h, line in enumerate(chunked(img_data, image.width * pixel_size)): if self.sub_revision == SubRevision.REV_8INCH: + # Switch landscape/portrait mode for 8" img_raw_data += int(((x0 + h) * self.display_width) + y0).to_bytes(3, "big") else: img_raw_data += int(((x0 + h) * self.display_height) + y0).to_bytes(3, "big") diff --git a/library/lcd/serialize.py b/library/lcd/serialize.py index 7f6f63d0..5909c185 100644 --- a/library/lcd/serialize.py +++ b/library/lcd/serialize.py @@ -6,7 +6,7 @@ def chunked(data: bytes, chunk_size: int) -> Iterator[bytes]: for i in range(0, len(data), chunk_size): - yield data[i : i + chunk_size] + yield data[i: i + chunk_size] def image_to_RGB565(image: Image.Image, endianness: Literal["big", "little"]) -> bytes: @@ -39,20 +39,35 @@ def image_to_RGB565(image: Image.Image, endianness: Literal["big", "little"]) -> return rgb565.astype(typ).tobytes() -def image_to_BGR(image: Image.Image) -> bytes: +def image_to_BGR(image: Image.Image) -> (bytes, int): if image.mode not in ["RGB", "RGBA"]: # we need the first 3 channels to be R, G and B image = image.convert("RGB") rgb = np.asarray(image) # same as rgb[:, :, [2, 1, 0]] but faster bgr = np.take(rgb, (2, 1, 0), axis=-1) - return bgr.tobytes() + return bgr.tobytes(), 3 -def image_to_BGRA(image: Image.Image) -> bytes: +def image_to_BGRA(image: Image.Image) -> (bytes, int): if image.mode != "RGBA": image = image.convert("RGBA") rgba = np.asarray(image) # same as rgba[:, :, [2, 1, 0, 3]] but faster bgra = np.take(rgba, (2, 1, 0, 3), axis=-1) - return bgra.tobytes() + return bgra.tobytes(), 4 + + +# FIXME: to optimize like other functions above +def image_to_compressed_BGRA(image: Image.Image) -> (bytes, int): + compressed_bgra = bytearray() + image_data = image.convert("RGBA").load() + for h in range(image.height): + for w in range(image.width): + # r = pixel[0], g = pixel[1], b = pixel[2], a = pixel[3] + pixel = image_data[w, h] + a = pixel[3] >> 4 + compressed_bgra.append(pixel[2] & 0xFC | a >> 2) + compressed_bgra.append(pixel[1] & 0xFC | a & 2) + compressed_bgra.append(pixel[0]) + return bytes(compressed_bgra), 3