Created
May 14, 2025 17:33
-
-
Save vikulin/cb8666c78820b1d8f4e5d44b853b62b9 to your computer and use it in GitHub Desktop.
BT example with UUID during advertising
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import json | |
import bluetooth, struct, time | |
from micropython import const | |
from utime import sleep, sleep_ms, sleep_us, ticks_us, ticks_diff | |
from machine import ADC, Pin, time_pulse_us, disable_irq, enable_irq | |
i=0 | |
t=0 | |
csum=0 | |
ts=0 | |
timestamp=0 | |
rpm=Pin(21, Pin.IN) | |
vin=ADC(Pin(35)) | |
vin.atten(ADC.ATTN_11DB) | |
is_ble_connected = False | |
IRQ_CENTRAL_CONNECT = const(1) | |
IRQ_CENTRAL_DISCONNECT = const(2) | |
IRQ_GATTS_WRITE = const(3) | |
if 'eh.json' in os.listdir(): | |
with open('eh.json', 'r') as eh: | |
timestamp = json.load(eh) | |
ts=timestamp | |
class MAX6675: | |
MEASUREMENT_PERIOD_MS = 220 | |
def __init__(self, sck, cs, so): | |
""" | |
Creates new object for controlling MAX6675 | |
:param sck: SCK (clock) pin, must be configured as Pin.OUT | |
:param cs: CS (select) pin, must be configured as Pin.OUT | |
:param so: SO (data) pin, must be configured as Pin.IN | |
""" | |
# Thermocouple | |
self._sck = sck | |
self._sck.value(0) | |
self._cs = cs | |
self._cs.value(1) | |
self._so = so | |
self._so.value(0) | |
self._last_measurement_start = 0 | |
self._last_read_temp = 0 | |
self._error = 0 | |
def _cycle_sck(self): | |
self._sck.value(1) | |
time.sleep_us(1) | |
self._sck.value(0) | |
time.sleep_us(1) | |
def refresh(self): | |
""" | |
Start a new measurement. | |
""" | |
self._cs.value(0) | |
time.sleep_us(10) | |
self._cs.value(1) | |
self._last_measurement_start = time.ticks_ms() | |
def ready(self): | |
""" | |
Signals if measurement is finished. | |
:return: True if measurement is ready for reading. | |
""" | |
return time.ticks_ms() - self._last_measurement_start > MAX6675.MEASUREMENT_PERIOD_MS | |
def error(self): | |
""" | |
Returns error bit of last reading. If this bit is set (=1), there's problem with the | |
thermocouple - it can be damaged or loosely connected | |
:return: Error bit value | |
""" | |
return self._error | |
def read(self): | |
""" | |
Reads last measurement and starts a new one. If new measurement is not ready yet, returns last value. | |
Note: The last measurement can be quite old (e.g. since last call to `read`). | |
To refresh measurement, call `refresh` and wait for `ready` to become True before reading. | |
:return: Measured temperature | |
""" | |
# Check if new reading is available | |
if self.ready(): | |
# Bring CS pin low to start protocol for reading result of | |
# the conversion process. Forcing the pin down outputs | |
# first (dummy) sign bit 15. | |
self._cs.value(0) | |
time.sleep_us(10) | |
# Read temperature bits 14-3 from MAX6675. | |
value = 0 | |
for i in range(12): | |
# SCK should resemble clock signal and new SO value | |
# is presented at falling edge | |
self._cycle_sck() | |
value += self._so.value() << (11 - i) | |
# Read the TC Input pin to check if the input is open | |
self._cycle_sck() | |
self._error = self._so.value() | |
# Read the last two bits to complete protocol | |
for i in range(2): | |
self._cycle_sck() | |
# Finish protocol and start new measurement | |
self._cs.value(1) | |
self._last_measurement_start = time.ticks_ms() | |
self._last_read_temp = value * 0.25 | |
return self._last_read_temp | |
class ESP32_BLE(): | |
def __init__(self, name): | |
self.ble = bluetooth.BLE() | |
self.name = name | |
self._conn_handle = None | |
self.ble.active(True) | |
self.ble.irq(self.irq) | |
self.register() | |
def irq(self, event, data): | |
global is_ble_connected | |
if event == IRQ_CENTRAL_CONNECT: | |
self._conn_handle,_,_ = data | |
is_ble_connected=True | |
print("connected") | |
elif event == IRQ_CENTRAL_DISCONNECT: | |
self._conn_handle = None | |
self.register() | |
is_ble_connected=False | |
print("disconnected") | |
elif event == IRQ_GATTS_WRITE: | |
conn_handle, value_handle = data | |
self.receive(self.ble.gatts_read(value_handle)) | |
def receive(self, data): | |
print("RX:", data) | |
def register(self): | |
#rpm measurement sensor 459e | |
#temperature measurement sensor 2a1c | |
#fuel level measurement sensor 8fcc | |
#engine hours ed11 | |
NUS_UUID = '459e2a1c-8fcc-ed11-ffff-ffffffffffff' | |
TX_UUID = 'beb5483e-36e1-4688-b7f5-ea07361b26a8' | |
BLE_NUS = bluetooth.UUID(NUS_UUID) | |
#R/W/N | |
BLE_TX = (bluetooth.UUID(TX_UUID), 0x1a) | |
BLE_UART = (BLE_NUS, (BLE_TX,),) | |
SERVICES = (BLE_UART,) | |
((self.tx,),) = self.ble.gatts_register_services(SERVICES) | |
self.ble.gatts_set_buffer(self.tx, 20, True) | |
self._payload = bytes() | |
self.adv_append(0x01, struct.pack("B", 0x06)) | |
self.adv_append(0x09, self.name) | |
self.adv_append(0x07, bytes(BLE_NUS)) | |
self.ble.gap_advertise(100000, adv_data=self._payload) | |
# build advertise payload | |
def adv_append(self, adv_type, value): | |
self._payload += struct.pack("BB", len(value) + 1, adv_type) + value | |
def send(self, data): | |
# BLE can send max 20 bytes per message | |
chunk = data if len(data) <= 20 else data[:20] | |
try: | |
self.ble.gatts_notify(self._conn_handle, self.tx, chunk) | |
except TypeError: | |
pass | |
time.sleep(0.01) # sending too fast results in out of memory | |
def measure_frequency(): | |
global i | |
global t | |
global ts | |
global timestamp | |
print("f") | |
#sleep_ms(200) | |
sum = 0 | |
loops = 15 | |
for _ in range(loops//2): | |
state= disable_irq() | |
time_pulse_us(rpm, 1, 100000) | |
sum += time_pulse_us(rpm, 1, 100000) | |
enable_irq(state) | |
state= disable_irq() | |
time_pulse_us(rpm, 0, 100000) | |
sum += time_pulse_us(rpm, 0, 100000) | |
enable_irq(state) | |
if sum <= 0: | |
print("0") | |
print("is_ble_connected:"+str(is_ble_connected)) | |
#remember latest time before rpm>0 | |
t=time.time() | |
if is_ble_connected: | |
ble.send("{0:0}") | |
ble.send("{3:"+str(ts)+"}") | |
else: | |
sr=str(int(60*500000*loops/sum)) | |
print(sr) | |
#do save engine hours | |
ts = timestamp + time.time() - t | |
if is_ble_connected: | |
ble.send("{0:"+sr+"}") | |
ble.send("{3:"+str(ts)+"}") | |
if i % 10 == 0: | |
with open('eh.json', 'w') as eh: | |
json.dump(ts, eh) | |
def measure_temperature(): | |
print("t") | |
sleep_ms(200) | |
if is_ble_connected: | |
tr = temp.read() | |
if temp.error(): | |
ble.send("{1:null}") | |
else: | |
temperature = str(int(tr)) | |
print(t) | |
ble.send("{1:"+temperature+"}") | |
def read_battery_voltage(): | |
global i | |
global csum | |
voltage=vin.read() | |
v=2*voltage*3.3/4095 | |
c=100*(v-3.27+0.27)/(4.0-3.27) | |
if c>100: | |
c=100 | |
if i % 5 == 0: | |
cl=str(int(csum/4)) | |
csum=0 | |
if is_ble_connected: | |
ble.send("{2:"+cl+"}") | |
else: | |
csum+=c | |
#delta=4.2-3.27 | |
#100-0 | |
#dx=100*(vin-3.27+0.345)/(4.0-3.27) | |
#4095-3.3V | |
#vin-xV | |
#xV=vin*3.3/4095 | |
#1.86,3.77,23% | |
#2.0.2,4.0.3,50% | |
ble = ESP32_BLE("ESP32") | |
so = Pin(22, Pin.IN) | |
sck = Pin(23, Pin.OUT) | |
cs = Pin(19, Pin.OUT) | |
temp = MAX6675(sck, cs , so) | |
#create looped tasks | |
while True: | |
measure_frequency() | |
measure_temperature() | |
read_battery_voltage() | |
i+=1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment