Skip to content
This repository was archived by the owner on Sep 4, 2021. It is now read-only.

Tuyadevice #69

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 64 additions & 92 deletions pytuya/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@


import base64
from hashlib import md5
import binascii
import colorsys
import json
import logging
import socket
import sys
import time
import colorsys
import binascii
from hashlib import md5
from typing import Union

try:
#raise ImportError
import Crypto
from Crypto.Cipher import AES # PyCrypto
except ImportError:
Expand All @@ -29,10 +29,7 @@

from pytuya.const import __version__


log = logging.getLogger(__name__)
#logging.basicConfig() # TODO include function name/line numbers in log
#log.setLevel(level=logging.DEBUG) # Debug hack!

log.info('%s version %s', __name__, __version__)
log.info('Python %s on %s', sys.version, sys.platform)
Expand All @@ -51,12 +48,14 @@

IS_PY2 = sys.version_info[0] == 2


class AESCipher(object):
def __init__(self, key):
#self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/
# self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/
self.bs = 16
self.key = key
def encrypt(self, raw, use_base64 = True):

def encrypt(self, raw, use_base64=True):
if Crypto:
raw = self._pad(raw)
cipher = AES.new(self.key, mode=AES.MODE_ECB)
Expand All @@ -66,8 +65,6 @@ def encrypt(self, raw, use_base64 = True):
cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16
crypted_text = cipher.feed(raw)
crypted_text += cipher.feed() # flush final block
#print('crypted_text %r' % crypted_text)
#print('crypted_text (%d) %r' % (len(crypted_text), crypted_text))
if use_base64:
return base64.b64encode(crypted_text)
else:
Expand All @@ -76,27 +73,23 @@ def encrypt(self, raw, use_base64 = True):
def decrypt(self, enc, use_base64=True):
if use_base64:
enc = base64.b64decode(enc)
#print('enc (%d) %r' % (len(enc), enc))
#enc = self._unpad(enc)
#enc = self._pad(enc)
#print('upadenc (%d) %r' % (len(enc), enc))
if Crypto:
cipher = AES.new(self.key, AES.MODE_ECB)
raw = cipher.decrypt(enc)
#print('raw (%d) %r' % (len(raw), raw))
return self._unpad(raw).decode('utf-8')
#return self._unpad(cipher.decrypt(enc)).decode('utf-8')
else:
cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16
plain_text = cipher.feed(enc)
plain_text += cipher.feed() # flush final block
return plain_text

def _pad(self, s):
padnum = self.bs - len(s) % self.bs
return s + padnum * chr(padnum).encode()

@staticmethod
def _unpad(s):
return s[:-ord(s[len(s)-1:])]
return s[:-ord(s[len(s) - 1:])]


def bin2hex(x, pretty=False):
Expand All @@ -117,6 +110,7 @@ def hex2bin(x):
else:
return bytes.fromhex(x)


# This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi
payload_dict = {
"device": {
Expand All @@ -133,6 +127,7 @@ def hex2bin(x):
}
}


class XenonDevice(object):
def __init__(self, dev_id, address, local_key=None, dev_type=None, connection_timeout=10):
"""
Expand Down Expand Up @@ -207,7 +202,6 @@ def generate_payload(self, command, data=None):

# Create byte buffer from hex data
json_payload = json.dumps(json_data)
#print(json_payload)
json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond!
json_payload = json_payload.encode('utf-8')
log.debug('json_payload=%r', json_payload)
Expand All @@ -221,55 +215,32 @@ def generate_payload(self, command, data=None):
json_payload = PROTOCOL_VERSION_BYTES_33 + b"\0\0\0\0\0\0\0\0\0\0\0\0" + json_payload
elif command == SET:
# need to encrypt
#print('json_payload %r' % json_payload)
self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new
json_payload = self.cipher.encrypt(json_payload)
#print('crypted json_payload %r' % json_payload)
preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES_31 + b'||' + self.local_key
#print('preMd5String %r' % preMd5String)
m = md5()
m.update(preMd5String)
#print(repr(m.digest()))
hexdigest = m.hexdigest()
#print(hexdigest)
#print(hexdigest[8:][:16])
json_payload = PROTOCOL_VERSION_BYTES_31 + hexdigest[8:][:16].encode('latin1') + json_payload
#print('data_to_send')
#print(json_payload)
#print('crypted json_payload (%d) %r' % (len(json_payload), json_payload))
#print('json_payload %r' % repr(json_payload))
#print('json_payload len %r' % len(json_payload))
#print(bin2hex(json_payload))
self.cipher = None # expect to connect and then disconnect to set new


postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix'])
#print('postfix_payload %r' % postfix_payload)
#print('postfix_payload %r' % len(postfix_payload))
#print('postfix_payload %x' % len(postfix_payload))
#print('postfix_payload %r' % hex(len(postfix_payload)))
assert len(postfix_payload) <= 0xff
postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff)
buffer = hex2bin( payload_dict[self.dev_type]['prefix'] +
payload_dict[self.dev_type][command]['hexByte'] +
'000000' +
postfix_payload_hex_len ) + postfix_payload
buffer = hex2bin(payload_dict[self.dev_type]['prefix'] +
payload_dict[self.dev_type][command]['hexByte'] +
'000000' +
postfix_payload_hex_len) + postfix_payload

# calc the CRC of everything except where the CRC goes and the suffix
hex_crc = format(binascii.crc32(buffer[:-8]) & 0xffffffff, '08X')
buffer = buffer[:-8] + hex2bin(hex_crc) + buffer[-4:]
#print('command', command)
#print('prefix')
#print(payload_dict[self.dev_type][command]['prefix'])
#print(repr(buffer))
#print(bin2hex(buffer, pretty=True))
#print(bin2hex(buffer, pretty=False))
#print('full buffer(%d) %r' % (len(buffer), " ".join("{:02x}".format(ord(c)) for c in buffer)))
return buffer

class Device(XenonDevice):
def __init__(self, dev_id, address, local_key=None, dev_type=None):
super(Device, self).__init__(dev_id, address, local_key, dev_type)

class TuyaDevice(XenonDevice):
def __init__(self, dev_id, address, local_key=None, dev_type="device"):
super(TuyaDevice, self).__init__(dev_id, address, local_key, dev_type)

def status(self):
log.debug('status() entry')
Expand All @@ -281,8 +252,7 @@ def status(self):

result = data[20:-8] # hard coded offsets
log.debug('result=%r', result)
#result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer
#print('result %r' % result)

if result.startswith(b'{'):
# this is the regular expected code path
if not isinstance(result, str):
Expand All @@ -293,7 +263,8 @@ def status(self):
# expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM}
# NOTE dps.2 may or may not be present
result = result[len(PROTOCOL_VERSION_BYTES_31):] # remove version header
result = result[16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload
result = result[
16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload
cipher = AESCipher(self.local_key)
result = cipher.decrypt(result)
log.debug('decrypted result=%r', result)
Expand All @@ -312,41 +283,41 @@ def status(self):

return result

def set_status(self, on, switch=1):
def set_value(self, index: Union[str, int], value: Union[bool, int, float, str]):
"""
Set status of the device to 'on' or 'off'.
Set int value of any index.

Args:
on(bool): True for 'on', False for 'off'.
switch(int): The switch to set
index: index to set
value: new value for the index
"""
# open device, send request, then close connection
if isinstance(switch, int):
switch = str(switch) # index and payload is a string
payload = self.generate_payload(SET, {switch:on})
#print('payload %r' % payload)
if isinstance(index, int):
index = str(index) # index and payload is a string

data = self._send_receive(payload)
log.debug('set_status received data=%r', data)
payload = self.generate_payload(SET, {index: value})
return self._send_receive(payload)

return data

def set_value(self, index, value):
class Device(TuyaDevice):
def __init__(self, dev_id, address, local_key=None, dev_type=None):
super(Device, self).__init__(dev_id, address, local_key, dev_type)

def set_status(self, on, switch=1):
"""
Set int value of any index.
Set status of the device to 'on' or 'off'.

Args:
index(int): index to set
value(int): new value for the index
on(bool): True for 'on', False for 'off'.
switch(int): The switch to set
"""
# open device, send request, then close connection
if isinstance(index, int):
index = str(index) # index and payload is a string

payload = self.generate_payload(SET, {
index: value})
if isinstance(switch, int):
switch = str(switch) # index and payload is a string
payload = self.generate_payload(SET, {switch: on})

data = self._send_receive(payload)
log.debug('set_status received data=%r', data)

return data

Expand Down Expand Up @@ -374,35 +345,37 @@ def set_timer(self, num_secs):
devices_numbers.sort()
dps_id = devices_numbers[-1]

payload = self.generate_payload(SET, {dps_id:num_secs})
payload = self.generate_payload(SET, {dps_id: num_secs})

data = self._send_receive(payload)
log.debug('set_timer received data=%r', data)
return data


class OutletDevice(Device):
def __init__(self, dev_id, address, local_key=None):
dev_type = 'device'
super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type)


class BulbDevice(Device):
DPS_INDEX_ON = '1'
DPS_INDEX_MODE = '2'
DPS_INDEX_ON = '1'
DPS_INDEX_MODE = '2'
DPS_INDEX_BRIGHTNESS = '3'
DPS_INDEX_COLOURTEMP = '4'
DPS_INDEX_COLOUR = '5'
DPS_INDEX_COLOUR = '5'

DPS = 'dps'
DPS = 'dps'
DPS_MODE_COLOUR = 'colour'
DPS_MODE_WHITE = 'white'
DPS_MODE_WHITE = 'white'

DPS_2_STATE = {
'1':'is_on',
'2':'mode',
'3':'brightness',
'4':'colourtemp',
'5':'colour',
}
'1': 'is_on',
'2': 'mode',
'3': 'brightness',
'4': 'colourtemp',
'5': 'colour',
}

def __init__(self, dev_id, address, local_key=None):
dev_type = 'device'
Expand All @@ -425,20 +398,20 @@ def _rgb_to_hexvalue(r, g, b):
g(int): Value for the colour green as int from 0-255.
b(int): Value for the colour blue as int from 0-255.
"""
rgb = [r,g,b]
hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255)
rgb = [r, g, b]
hsv = colorsys.rgb_to_hsv(rgb[0] / 255, rgb[1] / 255, rgb[2] / 255)

hexvalue = ""
for value in rgb:
temp = str(hex(int(value))).replace("0x","")
temp = str(hex(int(value))).replace("0x", "")
if len(temp) == 1:
temp = "0" + temp
hexvalue = hexvalue + temp

hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)]
hexvalue_hsv = ""
for value in hsvarray:
temp = str(hex(int(value))).replace("0x","")
temp = str(hex(int(value))).replace("0x", "")
if len(temp) == 1:
temp = "0" + temp
hexvalue_hsv = hexvalue_hsv + temp
Expand Down Expand Up @@ -495,7 +468,6 @@ def set_colour(self, r, g, b):
if not 0 <= b <= 255:
raise ValueError("The value for blue needs to be between 0 and 255.")

#print(BulbDevice)
hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b)

payload = self.generate_payload(SET, {
Expand Down Expand Up @@ -576,7 +548,7 @@ def state(self):
state = {}

for key in status[self.DPS].keys():
if(int(key)<=5):
state[self.DPS_2_STATE[key]]=status[self.DPS][key]
if (int(key) <= 5):
state[self.DPS_2_STATE[key]] = status[self.DPS][key]

return state
9 changes: 5 additions & 4 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ def compare_json_strings(json1, json2, ignoring_keys=None):

def check_data_frame(data, expected_prefix, encrypted=True):
prefix = data[:15]
suffix = data[-8:]

crc = data[-8:-4]
suffix = data[-4:]

if encrypted:
payload_len = struct.unpack(">B",data[15:16])[0] # big-endian, unsigned char
version = data[16:19]
Expand All @@ -51,10 +52,10 @@ def check_data_frame(data, expected_prefix, encrypted=True):
frame_ok = True
if prefix != pytuya.hex2bin(expected_prefix):
frame_ok = False
elif suffix != pytuya.hex2bin("000000000000aa55"):
elif suffix != pytuya.hex2bin("0000aa55"):
frame_ok = False
elif encrypted:
if payload_len != len(version) + len(checksum) + len(encrypted_json) + len(suffix):
if payload_len != len(version) + len(checksum) + len(encrypted_json) + len(crc) + len(suffix):
frame_ok = False
elif version != b"3.1":
frame_ok = False
Expand Down