Skip to content

Fix BGRA mode for Turing 8.8" depending on ROM version #770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 85 additions & 60 deletions library/lcd/lcd_comm_rev_c.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from serial.tools.list_ports import comports

from library.lcd.lcd_comm import Orientation, LcdComm
from library.lcd.serialize import image_to_BGRA, image_to_BGR, chunked
from library.lcd.serialize import image_to_BGRA, image_to_compressed_BGRA, chunked
from library.log import logger


Expand Down Expand Up @@ -92,17 +92,11 @@ class Command(Enum):
NO_FLIP = bytearray((0x00,))
SEND_PAYLOAD = bytearray((0xFF,))

def __init__(self, command):
self.command = command


class Padding(Enum):
NULL = bytearray([0x00])
START_DISPLAY_BITMAP = bytearray([0x2c])

def __init__(self, command):
self.command = command


class SleepInterval(Enum):
OFF = bytearray((0x00,))
Expand All @@ -117,19 +111,13 @@ class SleepInterval(Enum):
NINE = bytearray((0x09,))
TEN = bytearray((0x0a,))

def __init__(self, command):
self.command = command


class SubRevision(Enum):
UNKNOWN = ""
REV_2INCH = "chs_21inch"
REV_5INCH = "chs_5inch"
REV_8INCH = "chs_88inch"

def __init__(self, command):
self.command = command


# This class is for Turing Smart Screen 2.1" / 5" / 8" screens
class LcdCommRevC(LcdComm):
Expand All @@ -146,7 +134,20 @@ def __del__(self):
def auto_detect_com_port() -> Optional[str]:
com_ports = comports()

# Try to find awake device through serial number or vid/pid
# First, try to find sleeping device and wake it up
for com_port in com_ports:
if com_port.serial_number == 'USB7INCH' or com_port.serial_number == 'CT21INCH':
LcdCommRevC._wake_up_device(com_port)
return LcdCommRevC.auto_detect_com_port()
if com_port.vid == 0x1a86 and com_port.pid == 0xca21:
LcdCommRevC._wake_up_device(com_port)
return LcdCommRevC.auto_detect_com_port()

return LcdCommRevC._get_awake_com_port(com_ports)

@staticmethod
def _get_awake_com_port(com_ports) -> Optional[str]:
# Then try to find awake device through serial number or vid/pid
for com_port in com_ports:
if com_port.serial_number == '20080411':
return com_port.device
Expand All @@ -155,25 +156,24 @@ def auto_detect_com_port() -> Optional[str]:
if com_port.vid == 0x1d6b and (com_port.pid == 0x0121 or com_port.pid == 0x0106):
return com_port.device

# Try to find sleeping device and wake it up
for com_port in com_ports:
if com_port.serial_number == 'USB7INCH' or com_port.serial_number == 'CT21INCH':
LcdCommRevC._connect_to_reset_device_name(com_port)
return LcdCommRevC.auto_detect_com_port()
if com_port.serial_number == '20080411':
return com_port.device

return None

@staticmethod
def _connect_to_reset_device_name(com_port):
def _wake_up_device(com_port):
# this device enumerates differently when off, we need to connect once to reset it to correct COM device
try:
logger.debug(f"Waiting for device {com_port} to be turned ON...")
serial.Serial(com_port.device, 115200, timeout=1, rtscts=True)
except serial.SerialException:
pass
time.sleep(10)
logger.debug(f"Waiting for device {com_port} to be turned ON...")

for i in range(15):
try:
# Try to connect every second, since it takes sometimes multiple connect to wake up the device
serial.Serial(com_port.device, 115200, timeout=1, rtscts=True)
except serial.SerialException:
pass

if LcdCommRevC._get_awake_com_port(comports()) is not None:
time.sleep(1)
return
time.sleep(1)

def _send_command(self, cmd: Command, payload: Optional[bytearray] = None, padding: Optional[Padding] = None,
bypass_queue: bool = False, readsize: Optional[int] = None):
Expand Down Expand Up @@ -210,29 +210,23 @@ def _send_command(self, cmd: Command, payload: Optional[bytearray] = None, paddi
def _hello(self):
# This command reads LCD answer on serial link, so it bypasses the queue
self.sub_revision = SubRevision.UNKNOWN
self.serial_flush_input()
self._send_command(Command.HELLO, bypass_queue=True)
response = str(self.serial_read(23).decode(errors="ignore"))
response = ''.join(
filter(lambda x: x in set(string.printable), str(self.serial_read(23).decode(errors="ignore"))))
self.serial_flush_input()
logger.debug("HW sub-revision returned: %s" % ''.join(filter(lambda x: x in set(string.printable), response)))

# Note: sub-revisions returned by display are not reliable e.g. 2.1" displays return "chs_5inch"
# if response.startswith(SubRevision.REV_5INCH.value):
# self.sub_revision = SubRevision.REV_5INCH
# self.display_width = 480
# self.display_height = 800
# elif response.startswith(SubRevision.REV_2INCH.value):
# self.sub_revision = SubRevision.REV_2INCH
# self.display_width = 480
# self.display_height = 480
# elif response.startswith(SubRevision.REV_8INCH.value):
# self.sub_revision = SubRevision.REV_8INCH
# self.display_width = 480
# self.display_height = 1920
# else:
# logger.warning("Display returned unknown sub-revision on Hello answer (%s)" % str(response))
# logger.debug("HW sub-revision detected: %s" % (str(self.sub_revision)))

# Relay on width/height for sub-revision detection
logger.debug("Display ID returned: %s" % response)
while not response.startswith("chs_"):
logger.warning("Display returned invalid or unsupported ID, try again in 1 second")
time.sleep(1)
self._send_command(Command.HELLO, bypass_queue=True)
response = ''.join(
filter(lambda x: x in set(string.printable), str(self.serial_read(23).decode(errors="ignore"))))
self.serial_flush_input()
logger.debug("Display ID returned: %s" % response)

# Note: ID returned by display are not reliable for some models e.g. 2.1" displays return "chs_5inch"
# Rely on width/height for sub-revision detection
if self.display_width == 480 and self.display_height == 480:
self.sub_revision = SubRevision.REV_2INCH
elif self.display_width == 480 and self.display_height == 800:
Expand All @@ -242,6 +236,18 @@ def _hello(self):
else:
logger.error(f"Unsupported resolution {self.display_width}x{self.display_height} for revision C")

# Detect ROM version
try:
self.rom_version = int(response.split(".")[2])
if self.rom_version < 80 or self.rom_version > 100:
logger.warning("ROM version %d may be invalid, use default ROM version 87" % self.rom_version)
self.rom_version = 87
except:
logger.warning("Display returned invalid or unsupported ID, use default ROM version 87")
self.rom_version = 87

logger.debug("HW sub-revision detected: %s, ROM version: %d" % ((str(self.sub_revision)), self.rom_version))

def InitializeComm(self):
self._hello()

Expand All @@ -250,8 +256,15 @@ def Reset(self):
# Reset command bypasses queue because it is run when queue threads are not yet started
self._send_command(Command.RESTART, bypass_queue=True)
self.closeSerial()
# Wait for display reset then reconnect
time.sleep(15)
# Wait for disconnection (max. 15 seconds)
for i in range(15):
if LcdCommRevC._get_awake_com_port(comports()) is not None:
time.sleep(1)
# Wait for reconnection (max. 15 seconds)
for i in range(15):
if LcdCommRevC._get_awake_com_port(comports()) is None:
time.sleep(1)
# Reconnect to device
self.openSerial()

def Clear(self):
Expand All @@ -267,13 +280,13 @@ def Clear(self):
self.SetOrientation(orientation=backup_orientation)

def ScreenOff(self):
logger.info("Calling ScreenOff")
# logger.info("Calling ScreenOff")
self._send_command(Command.STOP_VIDEO)
self._send_command(Command.STOP_MEDIA, readsize=1024)
self._send_command(Command.TURNOFF)

def ScreenOn(self):
logger.info("Calling ScreenOn")
# logger.info("Calling ScreenOn")
self._send_command(Command.STOP_VIDEO)
self._send_command(Command.STOP_MEDIA, readsize=1024)
# self._send_command(Command.SET_BRIGHTNESS, payload=bytearray([255]))
Expand All @@ -293,8 +306,8 @@ def SetOrientation(self, orientation: Orientation = Orientation.PORTRAIT):
# logger.info(f"Call SetOrientation to: {self.orientation.name}")

# if self.orientation == Orientation.REVERSE_LANDSCAPE or self.orientation == Orientation.REVERSE_PORTRAIT:
# b = Command.STARTMODE_DEFAULT.value + Padding.NULL.value + Command.FLIP_180.value + SleepInterval.OFF.value
# self._send_command(Command.OPTIONS, payload=b)
# b = Command.STARTMODE_DEFAULT.value + Padding.NULL.value + Command.FLIP_180.value + SleepInterval.OFF.value
# self._send_command(Command.OPTIONS, payload=b)
# else:
b = Command.STARTMODE_DEFAULT.value + Padding.NULL.value + Command.NO_FLIP.value + SleepInterval.OFF.value
self._send_command(Command.OPTIONS, payload=b)
Expand Down Expand Up @@ -339,7 +352,8 @@ def DisplayPILImage(
display_bmp_cmd = Command.DISPLAY_BITMAP_8INCH

self._send_command(display_bmp_cmd,
payload=bytearray(int(self.display_width * self.display_width / 64).to_bytes(2, "big")))
payload=bytearray(
int(self.display_width * self.display_width / 64).to_bytes(2, "big")))
self._send_command(Command.SEND_PAYLOAD,
payload=bytearray(self._generate_full_image(image)),
readsize=1024)
Expand All @@ -354,6 +368,7 @@ def DisplayPILImage(

def _generate_full_image(self, image: Image.Image) -> bytes:
if self.sub_revision == SubRevision.REV_8INCH:
# Switch landscape/portrait mode for 8"
if self.orientation == Orientation.LANDSCAPE:
image = image.rotate(270, expand=True)
elif self.orientation == Orientation.REVERSE_LANDSCAPE:
Expand All @@ -370,7 +385,7 @@ def _generate_full_image(self, image: Image.Image) -> bytes:
elif self.orientation == Orientation.REVERSE_LANDSCAPE:
image = image.rotate(180)

bgra_data = image_to_BGRA(image)
bgra_data, pixel_size = image_to_BGRA(image)

return b'\x00'.join(chunked(bgra_data, 249))

Expand All @@ -379,6 +394,7 @@ def _generate_update_image(
) -> Tuple[bytearray, bytearray]:
x0, y0 = x, y
if self.sub_revision == SubRevision.REV_8INCH:
# Switch landscape/portrait mode for 8"
if self.orientation == Orientation.LANDSCAPE:
image = image.rotate(270, expand=True)
y0 = self.get_height() - y - image.width
Expand Down Expand Up @@ -408,9 +424,18 @@ def _generate_update_image(
y0 = x

img_raw_data = bytearray()
bgr_data = image_to_BGR(image)
for h, line in enumerate(chunked(bgr_data, image.width * 3)):

# Some screens require different RGBA encoding
if self.rom_version > 88:
# BGRA mode on 4 bytes : [B, G, R, A]
img_data, pixel_size = image_to_BGRA(image)
else:
# BGRA mode on 3 bytes: [6-bit B + 2-bit A, 6-bit G + 2-bit A, 8-bit R]
img_data, pixel_size = image_to_compressed_BGRA(image)

for h, line in enumerate(chunked(img_data, image.width * pixel_size)):
if self.sub_revision == SubRevision.REV_8INCH:
# Switch landscape/portrait mode for 8"
img_raw_data += int(((x0 + h) * self.display_width) + y0).to_bytes(3, "big")
else:
img_raw_data += int(((x0 + h) * self.display_height) + y0).to_bytes(3, "big")
Expand Down
25 changes: 20 additions & 5 deletions library/lcd/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

def chunked(data: bytes, chunk_size: int) -> Iterator[bytes]:
for i in range(0, len(data), chunk_size):
yield data[i : i + chunk_size]
yield data[i: i + chunk_size]


def image_to_RGB565(image: Image.Image, endianness: Literal["big", "little"]) -> bytes:
Expand Down Expand Up @@ -39,20 +39,35 @@ def image_to_RGB565(image: Image.Image, endianness: Literal["big", "little"]) ->
return rgb565.astype(typ).tobytes()


def image_to_BGR(image: Image.Image) -> bytes:
def image_to_BGR(image: Image.Image) -> (bytes, int):
if image.mode not in ["RGB", "RGBA"]:
# we need the first 3 channels to be R, G and B
image = image.convert("RGB")
rgb = np.asarray(image)
# same as rgb[:, :, [2, 1, 0]] but faster
bgr = np.take(rgb, (2, 1, 0), axis=-1)
return bgr.tobytes()
return bgr.tobytes(), 3


def image_to_BGRA(image: Image.Image) -> bytes:
def image_to_BGRA(image: Image.Image) -> (bytes, int):
if image.mode != "RGBA":
image = image.convert("RGBA")
rgba = np.asarray(image)
# same as rgba[:, :, [2, 1, 0, 3]] but faster
bgra = np.take(rgba, (2, 1, 0, 3), axis=-1)
return bgra.tobytes()
return bgra.tobytes(), 4


# FIXME: to optimize like other functions above
def image_to_compressed_BGRA(image: Image.Image) -> (bytes, int):
compressed_bgra = bytearray()
image_data = image.convert("RGBA").load()
for h in range(image.height):
for w in range(image.width):
# r = pixel[0], g = pixel[1], b = pixel[2], a = pixel[3]
pixel = image_data[w, h]
a = pixel[3] >> 4
compressed_bgra.append(pixel[2] & 0xFC | a >> 2)
compressed_bgra.append(pixel[1] & 0xFC | a & 2)
compressed_bgra.append(pixel[0])
return bytes(compressed_bgra), 3