Skip to content

Commit

Permalink
Compiled releasable version of code snippets into real library
Browse files Browse the repository at this point in the history
  • Loading branch information
MSeal committed Nov 26, 2017
1 parent b2d5cd2 commit 38edec8
Show file tree
Hide file tree
Showing 13 changed files with 498 additions and 0 deletions.
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2017 Matthew Seal

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# htu21df Sensor Library
A lightweight library for talking to an htu21df sensor over i2c. Supports python 2 or 3.

## Installation
Use pip:

pip install htu21df

Or from any download, install with setup:

python setup.py install

## Why?
It turns out that the expected interface with the htu21df sensor is a little off the standard i2c library interfaces. To get it to work on a raspberry pi took a lot of headbanging and soul searching. So instead of having others repeat the pain, I made this library which makes (semi) direct calls to the i2c io which match the sensor perfectly. I also added in a bunch of internal patches for python2 vs python3 so both can work without rewriting all the byte-level interfaces.

## Author
Author(s): Matthew Seal

## License
MIT
11 changes: 11 additions & 0 deletions htu21/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .htu21 import (
HTU21,
HTU21DF_I2C_ADDR,
HTU21DF_READ_TEMP_HOLD,
HTU21DF_READ_HUM_HOLD,
HTU21DF_READ_TEMP_NO_HOLD,
HTU21DF_READ_HUM_NO_HOLD,
HTU21DF_WRITE_REG,
HTU21DF_READ_REG,
HTU21DF_RESET,
HTU21DF_RESET_REG_VALUE)
51 changes: 51 additions & 0 deletions htu21/htu21.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# https://cdn-shop.adafruit.com/datasheets/1899_HTU21D.pdf is useful for determining expected behavior
import time
from .retry import retry_immediately
from .rawi2c import I2C, CRC8Error

HTU21DF_I2C_ADDR = 0x40
HTU21DF_READ_TEMP_HOLD = 0xE3
HTU21DF_READ_HUM_HOLD = 0xE5
HTU21DF_READ_TEMP_NO_HOLD = 0xF3
HTU21DF_READ_HUM_NO_HOLD = 0xF5
HTU21DF_WRITE_REG = 0xE6
HTU21DF_READ_REG = 0xE7
HTU21DF_RESET = 0xFE
HTU21DF_RESET_REG_VALUE = 0x02

class HTU21(object):
data_retry_warpper = retry_immediately(CRC8Error, 3, lambda s, _e: s.reset())

def __init__(self, start_immediately=True):
self.i2c = I2C(HTU21DF_I2C_ADDR)
if start_immediately:
self.start_sensor()

def start_sensor(self):
self.reset()

def reset(self):
self.i2c.write(HTU21DF_RESET)
time.sleep(0.015)
self.i2c.write(HTU21DF_READ_REG)
if not self.i2c.read_int() == HTU21DF_RESET_REG_VALUE:
raise IOError("HTU21D-F device reset failed")

@data_retry_warpper
def read_temperature(self):
self.i2c.write(HTU21DF_READ_TEMP_NO_HOLD)
time.sleep(0.05)
t = float(self.i2c.read_int(2, crc8=True))
return (t * 175.72 / 65536) - 46.85

@data_retry_warpper
def read_humidity(self):
self.i2c.write(HTU21DF_READ_HUM_NO_HOLD)
time.sleep(0.016)
h = float(self.i2c.read_int(2, crc8=True))
return (h * 125 / 65536) - 6

if __name__ == '__main__':
htu = HTU21()
print(htu.read_temperature())
print(htu.read_humidity())
103 changes: 103 additions & 0 deletions htu21/rawi2c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Parts transcribed from https://www.raspberrypi.org/forums/viewtopic.php?t=76688
import array, fcntl, os, sys

python_2 = sys.version_info[0] == 2

def bus_path(bus):
return "/dev/i2c-{}".format(bus)

def available_bus():
for bus in range(5):
if os.access(bus_path(bus), os.F_OK):
return bus
raise IOError("No detectable i2c bus available")

def array_to_int_be(byte_array):
outcome = 0
for val in byte_array:
outcome = (outcome << 8) + val
return outcome

def array_to_int_le(byte_array):
return array_to_int_be(reversed(list(byte_array)))

def array_to_int(byte_array, big_endian=True):
return array_to_int_be(byte_array) if big_endian else array_to_int_le(byte_array)

def crc8check(values, big_endian=True):
# Ported and refactored from Sparkfun Arduino HTU21D Library: https://github.com/sparkfun/HTU21D_Breakout
remainder = array_to_int(values, big_endian)

# POLYNOMIAL = 0x0131 = x^8 + x^5 + x^4 + 1
# divsor = 0x988000 is the 0x0131 polynomial shifted to farthest left of three bytes
divsor = 0x988000

for i in range(0, 16):
if remainder & 1 << (23 - i):
remainder ^= divsor
divsor = divsor >> 1

return remainder == 0

def try_ord(val):
if isinstance(val, int):
return val
return ord(val)

def any_py_bytes(bytes_str):
if python_2 and not isinstance(bytes_str, basestring):
bytes_str = chr(bytes_str)
elif not python_2 and not isinstance(bytes_str, (str, bytes)):
bytes_str = chr(bytes_str)
if not python_2 and isinstance(bytes_str, str):
bytes_str = bytes_str.encode('charmap')
return bytes_str

class CRC8Error(IOError): pass

class I2C(object):
I2C_SLAVE=0x0703

def __init__(self, addr, bus=None):
if bus is None:
bus = available_bus()
self._fd = open(bus_path(bus), 'rb+', 0)
self.set_address(addr)

def set_address(self, addr):
fcntl.ioctl(self._fd, self.I2C_SLAVE, addr)
self.addr = addr

def write(self, byte):
self._fd.write(any_py_bytes(byte))

def read(self):
return any_py_bytes(self._fd.read(1))

def read_many(self, num_bytes, big_endian=True, crc8=False):
if crc8:
num_bytes += 1
result = list(any_py_bytes(self._fd.read(num_bytes)))
if crc8:
if not crc8check(map(try_ord, result), big_endian):
raise CRC8Error("Bad i2c checksum")
result = result[:-1] if big_endian else result[1:]
return result

def read_int(self, num_bytes=1, big_endian=True, crc8=False):
results = map(try_ord, self.read_many(num_bytes, big_endian, crc8))
return array_to_int(results, big_endian)

def close(self):
if self._fd is not None:
self._fd.close()
self._fd = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def __del__(self):
self.close()
20 changes: 20 additions & 0 deletions htu21/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import wrapt

def retry_immediately(PossibleExceptions, max_retries=1, failure_callback=None):
try:
iter(PossibleExceptions)
except TypeError:
PossibleExceptions = [PossibleExceptions]

@wrapt.decorator
def wrapped_retry(wrapped, instance, args, kwargs):
retries = max_retries
while retries >= 0:
try:
return wrapped(*args, **kwargs)
except tuple(PossibleExceptions) as e:
if failure_callback:
failure_callback(instance, e)
retries -= 1

return wrapped_retry
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
wrapt>=1.0.0,<2.0a0
mock
34 changes: 34 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import os
import sys
from subprocess import call
from setuptools import setup

python_2 = sys.version_info[0] == 2
def read(fname):
with open(fname, 'rU' if python_2 else 'r') as fhandle:
return fhandle.read()

version = '0.1.3'
required = [req.strip() for req in read('requirements.txt').splitlines() if req.strip()]
setup(
name='htu21df',
version=version,
author='Matthew Seal',
author_email='[email protected]',
description='A simple program for controlling an htu21d-f sensor from a Raspberry Pi',
install_requires=required,
license='MIT',
packages=['htu21'],
test_suite='tests',
zip_safe=False,
url='https://github.com/MSeal/htu21df_sensor',
download_url='https://github.com/MSeal/htu21df_sensor/tarball/v' + version,
keywords=['sensors', 'raspberry_pi', 'adafruit', 'scripting'],
classifiers=[
'Development Status :: 4 - Beta Development Status',
'Topic :: Utilities',
'License :: OSI Approved :: MIT License',
'Natural Language :: English',
'Programming Language :: Python'
]
)
Empty file added tests/__init__.py
Empty file.
38 changes: 38 additions & 0 deletions tests/fakebus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from htu21.rawi2c import any_py_bytes

class FakeBus(object):
def __init__(self, *args, **kwargs):
self.open()
self.clear_buffers()

def clear_buffers(self):
self.write_array = []
self.read_array = []

def add_read_byte(self, value):
self.read_array.append(any_py_bytes(value))

def add_many_read_bytes(self, values):
for val in values:
self.add_read_byte(val)

def write(self, byte):
if not self.connected:
raise IOError('Device is disconnected')
self.write_array.append(byte)

def read_byte(self):
if not self.connected:
raise IOError('Device is disconnected')
if not self.read_array:
raise IOError('No readable values available')
return self.read_array.pop(0)

def read(self, num_bytes=1):
return ''.join(self.read_byte().decode('charmap') for _ in range(num_bytes))

def close(self):
self.connected = False

def open(self):
self.connected = True
7 changes: 7 additions & 0 deletions tests/parentpath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import sys
import os

# Add parent import capabilities
parentdir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if parentdir not in sys.path:
sys.path.insert(1, parentdir)
75 changes: 75 additions & 0 deletions tests/test_htu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# This import fixes sys.path issues
from . import parentpath

import unittest
import sys
import time
from mock import patch, MagicMock
from .fakebus import FakeBus
from htu21 import (
HTU21,
HTU21DF_READ_TEMP_NO_HOLD,
HTU21DF_READ_HUM_NO_HOLD,
HTU21DF_READ_REG,
HTU21DF_RESET,
HTU21DF_RESET_REG_VALUE
)

python_2 = sys.version_info[0] == 2

@patch('time.sleep', return_value=None)
class HTUTest(unittest.TestCase):
def decoded_values(self, values):
return [chr(v) if python_2 else chr(v).encode('charmap') for v in values]

@patch('fcntl.ioctl')
@patch('htu21.rawi2c.open', FakeBus)
@patch('os.access', return_value=True)
def setUp(self, *mocks):
self.htu = HTU21(False)
self.fake_bus = self.htu.i2c._fd
self.fake_bus.add_read_byte(HTU21DF_RESET_REG_VALUE)
self.htu.start_sensor()
self.fake_bus.clear_buffers()

def test_reset(self, patched_sleep):
self.fake_bus.add_read_byte(HTU21DF_RESET_REG_VALUE)
self.htu.reset()
self.assertEqual(self.fake_bus.write_array, self.decoded_values([HTU21DF_RESET, HTU21DF_READ_REG]))
patched_sleep.assert_called_once_with(0.015)

def test_start_sensor(self, patched_sleep):
self.htu.reset = MagicMock()
self.htu.start_sensor()
self.htu.reset.assert_called_once()
self.assertEqual(self.fake_bus.write_array, [])

def test_read_temperature(self, patched_sleep):
self.fake_bus.add_many_read_bytes([104, 112, 154])
t = self.htu.read_temperature()
self.assertEqual(self.fake_bus.write_array, self.decoded_values([HTU21DF_READ_TEMP_NO_HOLD]))
self.assertAlmostEqual(t, 24.84, 2)
patched_sleep.assert_called_once_with(0.05)

def test_read_temperature_chechsum_failure(self, patched_sleep):
self.fake_bus.add_many_read_bytes([104, 112, 255]) # Bad bytes
self.fake_bus.add_read_byte(HTU21DF_RESET_REG_VALUE) # Reset on bad read
self.fake_bus.add_many_read_bytes([104, 112, 154]) # Good bytes
t = self.htu.read_temperature()
self.assertEqual(self.fake_bus.write_array, self.decoded_values([HTU21DF_READ_TEMP_NO_HOLD, HTU21DF_RESET, HTU21DF_READ_REG, HTU21DF_READ_TEMP_NO_HOLD]))
self.assertAlmostEqual(t, 24.84, 2)

def test_humidity(self, patched_sleep):
self.fake_bus.add_many_read_bytes([126, 106, 54])
h = self.htu.read_humidity()
self.assertEqual(self.fake_bus.write_array, self.decoded_values([HTU21DF_READ_HUM_NO_HOLD]))
self.assertAlmostEqual(h, 55.73, 2)
patched_sleep.assert_called_once_with(0.016)

def test_read_temperature_chechsum_failure(self, patched_sleep):
self.fake_bus.add_many_read_bytes([126, 106, 255]) # Bad bytes
self.fake_bus.add_read_byte(HTU21DF_RESET_REG_VALUE) # Reset on bad read
self.fake_bus.add_many_read_bytes([126, 106, 54]) # Good bytes
h = self.htu.read_humidity()
self.assertEqual(self.fake_bus.write_array, self.decoded_values([HTU21DF_READ_HUM_NO_HOLD, HTU21DF_RESET, HTU21DF_READ_REG, HTU21DF_READ_HUM_NO_HOLD]))
self.assertAlmostEqual(h, 55.73, 2)
Loading

0 comments on commit 38edec8

Please sign in to comment.