Last active
March 12, 2024 06:33
-
-
Save nv1t/3856d1f23c7d7696bdaf6ba0d4d1c17b to your computer and use it in GitHub Desktop.
Walking Pad Logger
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import sys | |
import os | |
from datetime import datetime | |
def convert_seconds_to_hms(seconds): | |
"""Converts seconds to hours, minutes, and seconds format.""" | |
hours = int(seconds // 3600) | |
minutes = int((seconds % 3600) // 60) | |
seconds = int(seconds % 60) | |
return f"{hours} hours {minutes} minutes {seconds} seconds" | |
def convert_unix_timestamp(timestamp): | |
"""Converts unix timestamp to human-readable date and time.""" | |
readable_time = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") | |
return readable_time | |
starting_timestamp = None | |
workout_data = {} | |
# Check if the log file for the current date exists | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
current_date = datetime.now().strftime("%Y-%m-%d") | |
file_name = f"{current_dir}/logs/{current_date}.log.csv" | |
if not os.path.exists(file_name): | |
sys.exit() | |
with open(file_name) as csvfile: | |
reader = csv.reader(csvfile) | |
next(reader) | |
# Initialize an empty dictionary to store final results | |
workout_data = {} | |
for workout, timestamp, speed, distance, time in reader: | |
if starting_timestamp is None: | |
starting_timestamp = int(timestamp) | |
# Check if workout exists in the dictionary | |
if workout not in workout_data: | |
workout_data[workout] = {"distance": 0, "time": 0} | |
# Update distance and time if current values are higher | |
if int(distance) > workout_data[workout]["distance"]: | |
workout_data[workout]["distance"] = int(distance) | |
if int(time) > workout_data[workout]["time"]: | |
workout_data[workout]["time"] = int(time) | |
print("Final Workout Data:") | |
for workout, data in workout_data.items(): | |
print(f"\tWorkout: {workout}") | |
print(f"\t\tDistance: {data['distance']} meters") | |
print(f"\t\tTime: {data['time']} seconds") | |
total_distance = 0 | |
total_time = 0 | |
for workout, data in workout_data.items(): | |
total_distance += data["distance"] / 1000 # Convert meters to kilometers | |
total_time += data["time"] | |
# Convert meters to kilometers and format time | |
starting_time = convert_unix_timestamp(starting_timestamp) | |
total_distance_km = f"{total_distance:.2f} km" # Format to 2 decimal places | |
total_time_hms = convert_seconds_to_hms(total_time) | |
print("Combined Workout Data:") | |
print(f"\tStarting Time: {starting_time}") | |
print(f"\tTotal Distance: {total_distance_km}") | |
print(f"\tTotal Time: {total_time_hms}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This script connects to a specified Bluetooth Low Energy (BLE) device and logs data related | |
to speed, distance, and time intervals from notifications. It uses the bleak library for | |
BLE communication, handles SIGINT signals for graceful shutdown, and logs data to a CSV file | |
with a daily log rotation. It is losely based on FTSM and could work for all kind of walking pads, | |
but not sure. | |
## Prerequisites | |
- Python 3.6+ | |
- `bleak` library (install using `pip install bleak`) | |
## Usage | |
1. Clone the repository or download the script. | |
2. Install the required dependencies using `pip install -r requirements.txt`. | |
3. Run the script by executing `python ble_device_data_logger.py`. | |
## Configuration | |
- You may need to modify the `addr` parameter in the `main` function to match the address of your BLE device. | |
""" | |
import argparse | |
import asyncio | |
import logging | |
import signal | |
import sys | |
import time | |
import os | |
import datetime | |
import struct | |
import uuid | |
from bleak import BleakClient, BleakScanner | |
from bleak.backends.characteristic import BleakGATTCharacteristic | |
logger = logging.getLogger(__name__) | |
async def main(addr): | |
""" | |
Main function to scan for a specific BLE device and read its characteristics. | |
""" | |
stopped = False # Flag to control the main loop | |
workout_id = uuid.uuid4() | |
def signal_handler(sig, frame): | |
""" | |
Signal handler function to handle SIGINT (Ctrl+C) signal. | |
Args: | |
sig: The signal number | |
frame: The current stack frame | |
""" | |
print('You pressed Ctrl+C!') | |
print('Waiting 5sec to kill') | |
stopped = True | |
time.sleep(5) | |
sys.exit() | |
signal.signal(signal.SIGINT, signal_handler) | |
def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray): | |
""" | |
Handle notifications by logging the received data and appending it to a CSV file. | |
Args: | |
characteristic (BleakGATTCharacteristic): The characteristic the data is received from. | |
data (bytearray): The data received. | |
Returns: | |
None | |
""" | |
# Log the received data | |
logger.info("%s: %r", characteristic.description, data.hex()) | |
# Extract speed, distance, and timedelta from the data | |
speed = struct.unpack("h", data[2:4])[0] | |
distance = struct.unpack("h", data[4:6])[0] | |
timedelta = struct.unpack("h", data[13:15])[0] | |
# Check if the log file for the current date exists | |
current_date = datetime.datetime.now().strftime("%Y-%m-%d") | |
file_name = f"logs/{current_date}.log.csv" | |
if not os.path.exists(file_name): | |
with open(file_name, "w") as f: | |
f.write("Workout (UUID), Speed (n/100 km/h), Distance (meter), Time (seconds)") | |
f.write("\n") | |
# Append the data to the log file | |
with open(file_name, "a") as f: | |
f.write("%s,%d,%d,%d,%d" % (str(workout_id),int(time.time()), speed, distance, timedelta)) | |
f.write("\n") | |
logger.info("starting scan...") | |
# Find the BLE device by address | |
device = await BleakScanner.find_device_by_address( | |
addr, cb=dict(use_bdaddr=False) | |
) | |
if device is None: | |
logger.error("could not find device with address '69:82:20:D9:27:90'") | |
return | |
logger.info("connecting to device...") | |
# Connect to the device | |
async with BleakClient(device) as client: | |
logger.info("Connected") | |
# Read device characteristics | |
manufacturer = await client.read_gatt_char("00002a29-0000-1000-8000-00805f9b34fb") | |
model = await client.read_gatt_char("00002a24-0000-1000-8000-00805f9b34fb") | |
hardware_revision = await client.read_gatt_char("00002a27-0000-1000-8000-00805f9b34fb") | |
firmware_revision = await client.read_gatt_char("00002a26-0000-1000-8000-00805f9b34fb") | |
fitness_machine_feature = await client.read_gatt_char("00002acc-0000-1000-8000-00805f9b34fb") | |
supported_speed_range = await client.read_gatt_char("00002ad4-0000-1000-8000-00805f9b34fb") | |
# Log the device characteristics | |
logger.info("Manufacturer: %s", str(manufacturer, "utf-8")) | |
logger.info("Model: %s", str(model, "utf-8")) | |
logger.info("Hardware Revision: %s", str(hardware_revision, "utf-8")) | |
logger.info("Firmware Revision: %s", str(firmware_revision, 'utf-8')) | |
logger.info("Supported Speed Range: %s", supported_speed_range.hex()) | |
logger.info("Fitness Machine Feature: %s", fitness_machine_feature.hex()) | |
# Start notification for a specific characteristic and handle notifications in a separate function | |
await client.start_notify("00002acd-0000-1000-8000-00805f9b34fb", notification_handler) | |
# Main loop to keep the program running until stopped | |
while not stopped: | |
await asyncio.sleep(1) | |
# Stop notification for a specific characteristic | |
await client.stop_notify("00002acd-0000-1000-8000-00805f9b34fb") | |
if __name__ == "__main__": | |
log_level = logging.INFO | |
logging.basicConfig( | |
level=log_level, | |
format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s", | |
) | |
asyncio.run(main("69:82:20:D9:27:90")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment