-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPiicoDev_Unified.py
179 lines (152 loc) · 6.99 KB
/
PiicoDev_Unified.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
'''
PiicoDev.py: Unifies I2C drivers for different builds of MicroPython
Changelog:
- 2021 M.Ruppe - Initial Unified Driver
- 2022-10-13 P.Johnston - Add helptext to run i2csetup script on Raspberry Pi
- 2022-10-14 M.Ruppe - Explicitly set default I2C initialisation parameters for machine-class (Raspberry Pi Pico + W)
- 2023-01-31 L.Howell - Add minimal support for ESP32
- 2023-05-17 M.Ruppe - Make I2CUnifiedMachine() more flexible on initialisation. Frequency is optional.
'''
import os
_SYSNAME = os.uname().sysname
compat_ind = 1
i2c_err_str = 'PiicoDev could not communicate with module at address 0x{:02X}, check wiring'
setupi2c_str = ', run "sudo curl -L https://piico.dev/i2csetup | bash". Suppress this warning by setting suppress_warnings=True'
if _SYSNAME == 'microbit':
from microbit import i2c
from utime import sleep_ms
elif _SYSNAME == 'Linux':
from smbus2 import SMBus, i2c_msg
from time import sleep
from math import ceil
def sleep_ms(t):
sleep(t/1000)
else:
from machine import I2C, Pin
from utime import sleep_ms
class I2CBase:
def writeto_mem(self, addr, memaddr, buf, *, addrsize=8):
raise NotImplementedError('writeto_mem')
def readfrom_mem(self, addr, memaddr, nbytes, *, addrsize=8):
raise NotImplementedError('readfrom_mem')
def write8(self, addr, buf, stop=True):
raise NotImplementedError('write')
def read16(self, addr, nbytes, stop=True):
raise NotImplementedError('read')
def __init__(self, bus=None, freq=None, sda=None, scl=None):
raise NotImplementedError('__init__')
class I2CUnifiedMachine(I2CBase):
def __init__(self, bus=None, freq=None, sda=None, scl=None):
if _SYSNAME == 'esp32' and (bus is None or sda is None or scl is None):
raise Exception('Please input bus, machine.pin SDA, and SCL objects to use ESP32')
if freq is None: freq = 400_000
if not isinstance(freq, (int)):
raise ValueError("freq must be an Int")
if freq < 400_000: print("\033[91mWarning: minimum freq 400kHz is recommended if using OLED module.\033[0m")
if bus is not None and sda is not None and scl is not None:
print('Using supplied bus, sda, and scl to create machine.I2C() with freq: {} Hz'.format(freq))
self.i2c = I2C(bus, freq=freq, sda=sda, scl=scl)
elif bus is None and sda is None and scl is None:
self.i2c = I2C(0, scl=Pin(9), sda=Pin(8), freq=freq) # RPi Pico in Expansion Board
else:
raise Exception("Please provide at least bus, sda, and scl")
self.writeto_mem = self.i2c.writeto_mem
self.readfrom_mem = self.i2c.readfrom_mem
def write8(self, addr, reg, data):
if reg is None:
self.i2c.writeto(addr, data)
else:
self.i2c.writeto(addr, reg + data)
def read16(self, addr, reg):
self.i2c.writeto(addr, reg, False)
return self.i2c.readfrom(addr, 2)
class I2CUnifiedMicroBit(I2CBase):
def __init__(self, freq=None):
if freq is not None:
print('Initialising I2C freq to {}'.format(freq))
microbit.i2c.init(freq=freq)
def writeto_mem(self, addr, memaddr, buf, *, addrsize=8):
ad = memaddr.to_bytes(addrsize // 8, 'big') # pad address for eg. 16 bit
i2c.write(addr, ad + buf)
def readfrom_mem(self, addr, memaddr, nbytes, *, addrsize=8):
ad = memaddr.to_bytes(addrsize // 8, 'big') # pad address for eg. 16 bit
i2c.write(addr, ad, repeat=True)
return i2c.read(addr, nbytes)
def write8(self, addr, reg, data):
if reg is None:
i2c.write(addr, data)
else:
i2c.write(addr, reg + data)
def read16(self, addr, reg):
i2c.write(addr, reg, repeat=True)
return i2c.read(addr, 2)
class I2CUnifiedLinux(I2CBase):
def __init__(self, bus=None, suppress_warnings=True):
if suppress_warnings == False:
with open('/boot/config.txt') as config_file:
if 'dtparam=i2c_arm=on' in config_file.read():
pass
else:
print('I2C is not enabled. To enable' + setupi2c_str)
config_file.close()
with open('/boot/config.txt') as config_file:
if 'dtparam=i2c_arm_baudrate=400000' in config_file.read():
pass
else:
print('Slow baudrate detected. If glitching occurs' + setupi2c_str)
config_file.close()
if bus is None:
bus = 1
self.i2c = SMBus(bus)
def readfrom_mem(self, addr, memaddr, nbytes, *, addrsize=8):
data = [None] * nbytes # initialise empty list
self.smbus_i2c_read(addr, memaddr, data, nbytes, addrsize=addrsize)
return data
def writeto_mem(self, addr, memaddr, buf, *, addrsize=8):
self.smbus_i2c_write(addr, memaddr, buf, len(buf), addrsize=addrsize)
def smbus_i2c_write(self, address, reg, data_p, length, addrsize=8):
ret_val = 0
data = []
for index in range(length):
data.append(data_p[index])
if addrsize == 8:
msg_w = i2c_msg.write(address, [reg] + data)
elif addrsize == 16:
msg_w = i2c_msg.write(address, [reg >> 8, reg & 0xff] + data)
else:
raise Exception('address must be 8 or 16 bits long only')
self.i2c.i2c_rdwr(msg_w)
return ret_val
def smbus_i2c_read(self, address, reg, data_p, length, addrsize=8):
ret_val = 0
if addrsize == 8:
msg_w = i2c_msg.write(address, [reg]) # warning this is set up for 16-bit addresses
elif addrsize == 16:
msg_w = i2c_msg.write(address, [reg >> 8, reg & 0xff]) # warning this is set up for 16-bit addresses
else:
raise Exception('address must be 8 or 16 bits long only')
msg_r = i2c_msg.read(address, length)
self.i2c.i2c_rdwr(msg_w, msg_r)
if ret_val == 0:
for index in range(length):
data_p[index] = ord(msg_r.buf[index])
return ret_val
def write8(self, addr, reg, data):
if reg is None:
d = int.from_bytes(data, 'big')
self.i2c.write_byte(addr, d)
else:
r = int.from_bytes(reg, 'big')
d = int.from_bytes(data, 'big')
self.i2c.write_byte_data(addr, r, d)
def read16(self, addr, reg):
regInt = int.from_bytes(reg, 'big')
return self.i2c.read_word_data(addr, regInt).to_bytes(2, byteorder='little', signed=False)
def create_unified_i2c(bus=None, freq=None, sda=None, scl=None, suppress_warnings=True):
if _SYSNAME == 'microbit':
i2c = I2CUnifiedMicroBit(freq=freq)
elif _SYSNAME == 'Linux':
i2c = I2CUnifiedLinux(bus=bus, suppress_warnings=suppress_warnings)
else:
i2c = I2CUnifiedMachine(bus=bus, freq=freq, sda=sda, scl=scl)
return i2c