Created
April 29, 2021 05:06
-
-
Save pingud98/0f16f733e2c5a1a2ee1afd3879acd546 to your computer and use it in GitHub Desktop.
Indoor Air Quality Monitor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# based on three things: | |
# 1 - CCS811_RPi class usage example | |
# by Petr Lukas V1 | |
# 2 - I2C_LCD_driver | |
# Compiled, mashed and generally mutilated 2014-2015 by Denis Pleic | |
# 3 - BME680 | |
# Pimoroni Library, installed via pip https://shop.pimoroni.com/products/bme680 | |
from datetime import datetime | |
import requests | |
import time | |
import uuid | |
import bme680 | |
from CCS811_RPi import CCS811_RPi | |
import I2C_LCD_driver | |
import paho.mqtt.client as paho | |
broker="cosmicpidata.mooo.com" | |
port=1883 | |
mqtt_ok=0 | |
url = "http://www.google.com" | |
timeout = 600 | |
def on_publish(client,userdata,result): | |
print("Data published to MQTT server") | |
pass | |
def on_connect(client,userdata,flags,rc): | |
if rc==0: | |
print("connected ok") | |
else: | |
print("mqtt connection failed") | |
def on_disconnect(client,userdata,rc): | |
print("disconnecting reason " + str(rc)) | |
mqttidentstring = str(uuid.getnode()) | |
client1= paho.Client(mqttidentstring) | |
client1.on_publish = on_publish | |
client1.username_pw_set(username="cosmicpi",password="MuonsFROMSp8ce") | |
client1.on_connect=on_connect #binding call | |
try: | |
request = requests.get(url,timeout=timeout) | |
print("internet ok") | |
mqtt_ok=1 | |
except (requests.ConnectionError, requests.Timeout) as exception: | |
print("internet fail") | |
mqtt_ok=0 | |
mylcd = I2C_LCD_driver.lcd() | |
mylcd.lcd_display_string("Warming up...", 1) | |
#import urllib2 # comment this line if you don't need ThinkSpeak connection | |
#import SDL_Pi_HDC1000 # comment this line if you don't use HDC sensor | |
try: | |
sensor = bme680.BME680(bme680.I2C_ADDR_PRIMARY) | |
except IOError: | |
sensor = bme680.BME680(bme680.I2C_ADDR_SECONDARY) | |
now = datetime.now() # current date and time | |
date_time = now.strftime("%m/%d/%Y") | |
date1=(date_time) | |
date_time = now.strftime("%H:%M") | |
date2=(date_time) | |
mylcd.lcd_display_string("Date: "+date1,1) | |
mylcd.lcd_display_string("Time: "+date2,2) | |
time.sleep(5) | |
ccs811 = CCS811_RPi() | |
# Do you want to preset sensor baseline? If yes set the value here, otherwise set False | |
INITIALBASELINE = False | |
# Do you want to use integrated temperature meter to compensate temp/RH (CJMCU-8118 board)? | |
# If not pre-set sensor compensation temperature is 25 C and RH is 50 % | |
# You can compensate manually by method ccs811.setCompensation(temperature,humidity) | |
''' | |
MEAS MODE REGISTER AND DRIVE MODE CONFIGURATION | |
0b0 Idle (Measurements are disabled in this mode) | |
0b10000 Constant power mode, IAQ measurement every second | |
0b100000 Pulse heating mode IAQ measurement every 10 seconds | |
0b110000 Low power pulse heating mode IAQ measurement every 60 | |
0b1000000 Constant power mode, sensor measurement every 250ms | |
''' | |
# Set MEAS_MODE (measurement interval) | |
configuration = 0b100000 | |
# Set read interval for retriveving last measurement data from the sensor | |
pause = 60 | |
print('Checking hardware ID...') | |
hwid = ccs811.checkHWID() | |
if(hwid == hex(129)): | |
print('Hardware ID is correct') | |
else: print('Incorrect hardware ID ',hwid, ', should be 0x81') | |
#bme680 setup | |
sensor.set_humidity_oversample(bme680.OS_2X) | |
sensor.set_pressure_oversample(bme680.OS_4X) | |
sensor.set_temperature_oversample(bme680.OS_8X) | |
sensor.set_filter(bme680.FILTER_SIZE_3) | |
sensor.set_gas_status(bme680.ENABLE_GAS_MEAS) | |
sensor.set_gas_heater_temperature(320) | |
sensor.set_gas_heater_duration(150) | |
sensor.select_gas_heater_profile(0) | |
#run burn-in | |
start_time = time.time() | |
curr_time = time.time() | |
burn_in_time = 300 | |
burn_in_data = [] | |
# Collect gas resistance burn-in values, then use the average | |
# of the last 50 values to set the upper limit for calculating | |
# gas_baseline. | |
print('Collecting gas resistance burn-in data for 5 mins\n') | |
while curr_time - start_time < burn_in_time: | |
curr_time = time.time() | |
if sensor.get_sensor_data() and sensor.data.heat_stable: | |
gas = sensor.data.gas_resistance | |
burn_in_data.append(gas) | |
print('Gas: {0} Ohms'.format(gas)) | |
outputwarm ='{0:.2f} Ohms'.format(gas) | |
mylcd.lcd_display_string(outputwarm, 2) | |
time.sleep(1) | |
gas_baseline = sum(burn_in_data[-50:]) / 50.0 | |
# Set the humidity baseline to 40%, an optimal indoor humidity. | |
hum_baseline = 40.0 | |
# This sets the balance between humidity and gas reading in the | |
# calculation of air_quality_score (25:75, humidity:gas) | |
hum_weighting = 0.25 | |
print('Gas baseline: {0} Ohms, humidity baseline: {1:.2f} %RH\n'.format( | |
gas_baseline, | |
hum_baseline)) | |
time.sleep(1) | |
if sensor.get_sensor_data(): | |
sensor.set_gas_heater_temperature(300) | |
sensor.set_gas_heater_duration(100) | |
sensor.select_gas_heater_profile(0) | |
hum = sensor.data.humidity | |
temp = sensor.data.temperature | |
press = sensor.data.pressure | |
#print 'MEAS_MODE:',ccs811.readMeasMode() | |
ccs811.configureSensor(configuration) | |
print('MEAS_MODE:',ccs811.readMeasMode()) | |
print('STATUS: ',bin(ccs811.readStatus())) | |
print('---------------------------------') | |
# Use these lines if you need to pre-set and check sensor baseline value | |
if(INITIALBASELINE > 0): | |
ccs811.setBaseline(INITIALBASELINE) | |
print(ccs811.readBaseline()) | |
print("MQTT connection") | |
if mqtt_ok==1: | |
client1.connect(broker,port) | |
client1.loop_start() | |
#return 0 | |
while(1): | |
humidity = hum | |
temperature = temp | |
ccs811.setCompensation(temperature,humidity) | |
statusbyte = ccs811.readStatus() | |
print('STATUS: ', bin(statusbyte)) | |
error = ccs811.checkError(statusbyte) | |
if(error): | |
print('ERROR:',ccs811.checkError(statusbyte)) | |
if(not ccs811.checkDataReady(statusbyte)): | |
#print('No new samples are ready') | |
#print('---------------------------------') | |
time.sleep(pause) | |
continue; | |
result = ccs811.readAlg(); | |
if(not result): | |
#print 'Invalid result received' | |
time.sleep(pause) | |
continue; | |
baseline = ccs811.readBaseline() | |
outputCO2 = ('eCO2: {}'.format(result['eCO2']) + ' ppm') | |
outputVOC = ('TVOC: {}'.format(result['TVOC']) + ' ppb') | |
#print ('TVOC: ',result['TVOC'], 'ppb') | |
#print ('Status register: ',bin(result['status'])) | |
#print ('Last error ID: ',result['errorid']) | |
#print ('RAW data: ',result['raw']) | |
#print ('Baseline: ',baseline) | |
#print ('---------------------------------') | |
#if sensor.get_sensor_data(): | |
#if sensor.data.heat_stable: | |
if sensor.get_sensor_data() and sensor.data.heat_stable: | |
gas = sensor.data.gas_resistance | |
gas_offset = gas_baseline - gas | |
hum = sensor.data.humidity | |
hum_offset = hum - hum_baseline | |
# Calculate hum_score as the distance from the hum_baseline. | |
if hum_offset > 0: | |
hum_score = (100 - hum_baseline - hum_offset) | |
hum_score /= (100 - hum_baseline) | |
hum_score *= (hum_weighting * 100) | |
else: | |
hum_score = (hum_baseline + hum_offset) | |
hum_score /= hum_baseline | |
hum_score *= (hum_weighting * 100) | |
# Calculate gas_score as the distance from the gas_baseline. | |
if gas_offset > 0: | |
gas_score = (gas / gas_baseline) | |
gas_score *= (100 - (hum_weighting * 100)) | |
else: | |
gas_score = 100 - (hum_weighting * 100) | |
# Calculate air_quality_score. | |
air_quality_score = hum_score + gas_score | |
outputT = '{0:.2f}C'.format(sensor.data.temperature) | |
outputP = '{0:.2f}hPa'.format(sensor.data.pressure) | |
outputH = '{0:.1f}%RH'.format(sensor.data.humidity) | |
outputG = '{0:.0f} Ohms'.format(sensor.data.gas_resistance) | |
outputIAQ = 'IAQ:{0:.1f}'.format(air_quality_score) | |
#print("IAQ {0:.2f} %".format(air_quality_score)) | |
data = [] | |
data.append("{measurement},id={DeviceID} Temp={temp},Press={press},Hum={hum},Gas={gas},IAQ={iaq},eCO2={eco2},TVOC={tvoc} {timestamp}" | |
.format(measurement='IAQ0.1', | |
DeviceID=uuid.getnode(), | |
temp=sensor.data.temperature, | |
press=sensor.data.pressure, | |
hum=sensor.data.humidity, | |
gas=sensor.data.gas_resistance, | |
iaq=air_quality_score, | |
eco2=result['eCO2'], | |
tvoc=result['TVOC'], | |
timestamp=int(time.time()))) | |
print(data) | |
if mqtt_ok==1: | |
ret = client1.publish("iaq/0.1",str(data)) | |
#outputs to lcd | |
#date and time | |
mylcd.lcd_clear() | |
now = datetime.now() # current date and time | |
date_time = now.strftime("%m/%d/%Y") | |
date1=(date_time) | |
date_time = now.strftime("%H:%M") | |
date2=(date_time) | |
mylcd.lcd_display_string(date1+" "+ date2,1) | |
mylcd.lcd_display_string(outputG,2) | |
time.sleep(6) | |
mylcd.lcd_clear() | |
#TPH and IAQ | |
mylcd.lcd_display_string(outputT,1) | |
mylcd.lcd_display_string(outputIAQ,2) | |
time.sleep(6) | |
mylcd.lcd_clear() | |
mylcd.lcd_display_string(outputIAQ,1) | |
mylcd.lcd_display_string(outputCO2,2) | |
time.sleep(6) | |
mylcd.lcd_clear() | |
mylcd.lcd_display_string(outputCO2,1) | |
mylcd.lcd_display_string(outputVOC,2) | |
time.sleep(6) | |
mylcd.lcd_clear() | |
mylcd.lcd_display_string(outputVOC,1) | |
mylcd.lcd_display_string(outputP,2) | |
time.sleep(5) | |
mylcd.lcd_clear() | |
mylcd.lcd_display_string(outputP,1) | |
mylcd.lcd_display_string(outputH,2) | |
time.sleep(5) | |
mylcd.lcd_clear() | |
now = datetime.now() # current date and time | |
date_time = now.strftime("%m/%d/%Y") | |
date1=(date_time) | |
date_time = now.strftime("%H:%M") | |
date2=(date_time) | |
mylcd.lcd_display_string(date1+" "+ date2,1) | |
mylcd.lcd_display_string(outputG,2) | |
time.sleep(4) | |
mylcd.lcd_clear() | |
#TPH and IAQ | |
mylcd.lcd_display_string(outputT,1) | |
mylcd.lcd_display_string(outputIAQ,2) | |
time.sleep(4) | |
mylcd.lcd_clear() | |
mylcd.lcd_display_string(outputIAQ,1) | |
mylcd.lcd_display_string(outputCO2,2) | |
time.sleep(4) | |
mylcd.lcd_clear() | |
mylcd.lcd_display_string(outputCO2,1) | |
mylcd.lcd_display_string(outputVOC,2) | |
time.sleep(4) | |
mylcd.lcd_clear() | |
mylcd.lcd_display_string(outputVOC,1) | |
mylcd.lcd_display_string(outputP,2) | |
time.sleep(4) | |
mylcd.lcd_clear() | |
mylcd.lcd_display_string(outputP,1) | |
mylcd.lcd_display_string(outputH,2) | |
time.sleep(4) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment