Skip to content

Instantly share code, notes, and snippets.

@vikulin
Created May 14, 2025 17:33
Show Gist options
  • Save vikulin/cb8666c78820b1d8f4e5d44b853b62b9 to your computer and use it in GitHub Desktop.
Save vikulin/cb8666c78820b1d8f4e5d44b853b62b9 to your computer and use it in GitHub Desktop.
BT example with UUID during advertising
import os
import sys
import json
import bluetooth, struct, time
from micropython import const
from utime import sleep, sleep_ms, sleep_us, ticks_us, ticks_diff
from machine import ADC, Pin, time_pulse_us, disable_irq, enable_irq
i=0
t=0
csum=0
ts=0
timestamp=0
rpm=Pin(21, Pin.IN)
vin=ADC(Pin(35))
vin.atten(ADC.ATTN_11DB)
is_ble_connected = False
IRQ_CENTRAL_CONNECT = const(1)
IRQ_CENTRAL_DISCONNECT = const(2)
IRQ_GATTS_WRITE = const(3)
if 'eh.json' in os.listdir():
with open('eh.json', 'r') as eh:
timestamp = json.load(eh)
ts=timestamp
class MAX6675:
MEASUREMENT_PERIOD_MS = 220
def __init__(self, sck, cs, so):
"""
Creates new object for controlling MAX6675
:param sck: SCK (clock) pin, must be configured as Pin.OUT
:param cs: CS (select) pin, must be configured as Pin.OUT
:param so: SO (data) pin, must be configured as Pin.IN
"""
# Thermocouple
self._sck = sck
self._sck.value(0)
self._cs = cs
self._cs.value(1)
self._so = so
self._so.value(0)
self._last_measurement_start = 0
self._last_read_temp = 0
self._error = 0
def _cycle_sck(self):
self._sck.value(1)
time.sleep_us(1)
self._sck.value(0)
time.sleep_us(1)
def refresh(self):
"""
Start a new measurement.
"""
self._cs.value(0)
time.sleep_us(10)
self._cs.value(1)
self._last_measurement_start = time.ticks_ms()
def ready(self):
"""
Signals if measurement is finished.
:return: True if measurement is ready for reading.
"""
return time.ticks_ms() - self._last_measurement_start > MAX6675.MEASUREMENT_PERIOD_MS
def error(self):
"""
Returns error bit of last reading. If this bit is set (=1), there's problem with the
thermocouple - it can be damaged or loosely connected
:return: Error bit value
"""
return self._error
def read(self):
"""
Reads last measurement and starts a new one. If new measurement is not ready yet, returns last value.
Note: The last measurement can be quite old (e.g. since last call to `read`).
To refresh measurement, call `refresh` and wait for `ready` to become True before reading.
:return: Measured temperature
"""
# Check if new reading is available
if self.ready():
# Bring CS pin low to start protocol for reading result of
# the conversion process. Forcing the pin down outputs
# first (dummy) sign bit 15.
self._cs.value(0)
time.sleep_us(10)
# Read temperature bits 14-3 from MAX6675.
value = 0
for i in range(12):
# SCK should resemble clock signal and new SO value
# is presented at falling edge
self._cycle_sck()
value += self._so.value() << (11 - i)
# Read the TC Input pin to check if the input is open
self._cycle_sck()
self._error = self._so.value()
# Read the last two bits to complete protocol
for i in range(2):
self._cycle_sck()
# Finish protocol and start new measurement
self._cs.value(1)
self._last_measurement_start = time.ticks_ms()
self._last_read_temp = value * 0.25
return self._last_read_temp
class ESP32_BLE():
def __init__(self, name):
self.ble = bluetooth.BLE()
self.name = name
self._conn_handle = None
self.ble.active(True)
self.ble.irq(self.irq)
self.register()
def irq(self, event, data):
global is_ble_connected
if event == IRQ_CENTRAL_CONNECT:
self._conn_handle,_,_ = data
is_ble_connected=True
print("connected")
elif event == IRQ_CENTRAL_DISCONNECT:
self._conn_handle = None
self.register()
is_ble_connected=False
print("disconnected")
elif event == IRQ_GATTS_WRITE:
conn_handle, value_handle = data
self.receive(self.ble.gatts_read(value_handle))
def receive(self, data):
print("RX:", data)
def register(self):
#rpm measurement sensor 459e
#temperature measurement sensor 2a1c
#fuel level measurement sensor 8fcc
#engine hours ed11
NUS_UUID = '459e2a1c-8fcc-ed11-ffff-ffffffffffff'
TX_UUID = 'beb5483e-36e1-4688-b7f5-ea07361b26a8'
BLE_NUS = bluetooth.UUID(NUS_UUID)
#R/W/N
BLE_TX = (bluetooth.UUID(TX_UUID), 0x1a)
BLE_UART = (BLE_NUS, (BLE_TX,),)
SERVICES = (BLE_UART,)
((self.tx,),) = self.ble.gatts_register_services(SERVICES)
self.ble.gatts_set_buffer(self.tx, 20, True)
self._payload = bytes()
self.adv_append(0x01, struct.pack("B", 0x06))
self.adv_append(0x09, self.name)
self.adv_append(0x07, bytes(BLE_NUS))
self.ble.gap_advertise(100000, adv_data=self._payload)
# build advertise payload
def adv_append(self, adv_type, value):
self._payload += struct.pack("BB", len(value) + 1, adv_type) + value
def send(self, data):
# BLE can send max 20 bytes per message
chunk = data if len(data) <= 20 else data[:20]
try:
self.ble.gatts_notify(self._conn_handle, self.tx, chunk)
except TypeError:
pass
time.sleep(0.01) # sending too fast results in out of memory
def measure_frequency():
global i
global t
global ts
global timestamp
print("f")
#sleep_ms(200)
sum = 0
loops = 15
for _ in range(loops//2):
state= disable_irq()
time_pulse_us(rpm, 1, 100000)
sum += time_pulse_us(rpm, 1, 100000)
enable_irq(state)
state= disable_irq()
time_pulse_us(rpm, 0, 100000)
sum += time_pulse_us(rpm, 0, 100000)
enable_irq(state)
if sum <= 0:
print("0")
print("is_ble_connected:"+str(is_ble_connected))
#remember latest time before rpm>0
t=time.time()
if is_ble_connected:
ble.send("{0:0}")
ble.send("{3:"+str(ts)+"}")
else:
sr=str(int(60*500000*loops/sum))
print(sr)
#do save engine hours
ts = timestamp + time.time() - t
if is_ble_connected:
ble.send("{0:"+sr+"}")
ble.send("{3:"+str(ts)+"}")
if i % 10 == 0:
with open('eh.json', 'w') as eh:
json.dump(ts, eh)
def measure_temperature():
print("t")
sleep_ms(200)
if is_ble_connected:
tr = temp.read()
if temp.error():
ble.send("{1:null}")
else:
temperature = str(int(tr))
print(t)
ble.send("{1:"+temperature+"}")
def read_battery_voltage():
global i
global csum
voltage=vin.read()
v=2*voltage*3.3/4095
c=100*(v-3.27+0.27)/(4.0-3.27)
if c>100:
c=100
if i % 5 == 0:
cl=str(int(csum/4))
csum=0
if is_ble_connected:
ble.send("{2:"+cl+"}")
else:
csum+=c
#delta=4.2-3.27
#100-0
#dx=100*(vin-3.27+0.345)/(4.0-3.27)
#4095-3.3V
#vin-xV
#xV=vin*3.3/4095
#1.86,3.77,23%
#2.0.2,4.0.3,50%
ble = ESP32_BLE("ESP32")
so = Pin(22, Pin.IN)
sck = Pin(23, Pin.OUT)
cs = Pin(19, Pin.OUT)
temp = MAX6675(sck, cs , so)
#create looped tasks
while True:
measure_frequency()
measure_temperature()
read_battery_voltage()
i+=1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment