Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save 46cv8/5aa65ba54217ceb03556bc7b1eb4f689 to your computer and use it in GitHub Desktop.
Save 46cv8/5aa65ba54217ceb03556bc7b1eb4f689 to your computer and use it in GitHub Desktop.
import os
import asyncio
import uuid
import xml.dom.minidom
import aiohttp
import bleak
import pywifi
import logging
# pip install bleak
# pip install pywifi
# pip install aiohttp
# mkdir /home/pi/images
# hciconfig requires non root access which can be set as follows. https://bbs.archlinux.org/viewtopic.php?id=215080
# sudo setcap 'cap_net_raw,cap_net_admin+eip' `which hciconfig`
#logger = logging.getLogger("pywifi")
#logger.setLevel(logging.DEBUG)
BLUETOOTH_ADDRESS = "nn:nn:nn:nn:nn:nn" # your campark t100 camera bluetooth address
BLUETOOTH_WIFI_CONTROL = 45 # 0x2e 0xFFE9
WIFI_SSID = "Campark-T100-yourcamparkwifissid" # your campark t100 camera wifi ssid
WIFI_PASSWORD = "yourcamparkt100wifipassword" # your campark t100 camera wifi password
WIFI_INTERFACE = "wlan0" # "wlan0" for pi, "wlp2s0" for pc
LOCAL_IMAGE_FILE_PATH = "/home/pi/images"
WWW_IMAGE_FILE_PATH = "username@servername:/home/website/www/latestcameraimage.jpg" # the site to upload the image too via scp using shared key authentication
BLUETOOTH_CONNECTION_RETRY_ATTEMPTS = 3
profile = pywifi.Profile()
profile.ssid = WIFI_SSID
profile.auth = pywifi.const.AUTH_ALG_OPEN
profile.akm.append(pywifi.const.AKM_TYPE_WPA2PSK)
profile.cipher = pywifi.const.CIPHER_TYPE_CCMP
#profile.cipher = pywifi.const.CIPHER_TYPE_UNKNOWN
profile.key = WIFI_PASSWORD
async def main(address):
os.system("rfkill unblock bluetooth")
#os.system("hciconfig hci1 down")
#os.system("hciconfig hci0 up") # confirm which adapter is the one you actually want ot use
print("PC - Turned bluetooth on...")
await asyncio.sleep(5.0)
try:
bt_connected = False
for c in range(BLUETOOTH_CONNECTION_RETRY_ATTEMPTS):
try:
print(f"PC - BT Connecting (attempt {c+1})...")
async with bleak.BleakClient(address, timeout=20.0) as client:
print(f"PC - BT Connected: {client.is_connected}")
#services = await client.get_services()
#print(services.services)
#print(services.characteristics)
#print(services.descriptors)
#paired = await client.pair(protection_level=2)
#print(f"Paired: {paired}")
for i in range(3):
try:
print("Device - Turning wifi on...")
await client.write_gatt_char(BLUETOOTH_WIFI_CONTROL, b"GPIO3")
bt_connected = True
print("Device - Suceeded Turning wifi on...")
break
except:
print("Device - Failed turning wifi on...")
if i == 2:
raise
if bt_connected:
break
except bleak.exc.BleakDeviceNotFoundError:
if c == BLUETOOTH_CONNECTION_RETRY_ATTEMPTS-1:
raise
pass
try:
await asyncio.sleep(10.0) # 10
os.system("rfkill unblock wifi")
print("PC - Turned computer wifi on...")
await asyncio.sleep(2.0)
wifi = pywifi.PyWiFi()
iface = wifi.interfaces()[0]
ifaces = [i for i in wifi.interfaces() if i.name() == WIFI_INTERFACE]
assert len(ifaces) > 0
iface = ifaces[0]
iface.disconnect()
await asyncio.sleep(1.0) # 10
assert iface.status() in [pywifi.const.IFACE_DISCONNECTED, pywifi.const.IFACE_INACTIVE]
iface.remove_all_network_profiles()
tmp_profile = iface.add_network_profile(profile)
for i in range(3):
print("PC - Connecting to wifi access point...")
#iface.scan()
#await asyncio.sleep(10.0)
#iface.scan_results()
iface.connect(tmp_profile)
await asyncio.sleep(1.0)
for _ in range(20):
if iface.status() == pywifi.const.IFACE_CONNECTED:
print("PC - Connected to wifi access point...")
await asyncio.sleep(20.0) # delay to give dhcp time to get an ip address
break
await asyncio.sleep(1.0)
if iface.status() == pywifi.const.IFACE_CONNECTED:
break
print("PC - Failed to Connect to wifi access point...")
if i == 2:
raise Exception("Unable to connect to wifi access point")
try:
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3012") as response:
await response.text()
#async with session.get("http://192.168.8.120:80/?custom=1&cmd=3005&str=2023-01-09") as response:
# await response.text()
#async with session.get("http://192.168.8.120:80/?custom=1&cmd=3006&str=11:12:38") as response:
# await response.text()
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3016") as response:
await response.text()
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3014") as response:
await response.text()
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3031&str=") as response:
await response.text()
await asyncio.sleep(2.0)
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3001&par=0") as response:
await response.text() # switch to photo mode
print("Device - Switched to photo mode...")
await asyncio.sleep(2.0)
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3014") as response:
await response.text()
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3031&str=") as response:
await response.text()
await asyncio.sleep(2.0)
async with session.get("http://192.168.8.120:80/?custom=1&cmd=1001") as response:
await response.text() # take photo
print("Device - Took photo...")
await asyncio.sleep(2.0)
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3001&par=2") as response:
await response.text() # switch to photo gallery
await asyncio.sleep(2.0)
print("Device - Switched to photo grallery")
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3015") as response: # get list of photos in gallery
latest_image_file_name = [n.childNodes[0].data for n in xml.dom.minidom.parseString(await response.read()).documentElement.getElementsByTagName("NAME")][-1]
print(f"Device - Got photo gallery list, retrieving latest image {latest_image_file_name}...")
latest_image_file_path = None
async with session.get(f"http://192.168.8.120:80/DCIM/PHOTO/{latest_image_file_name}") as response: # download desired photo
latest_image_file_path = f"{LOCAL_IMAGE_FILE_PATH}/{latest_image_file_name}"
latest_image_file = open(latest_image_file_path, 'wb')
latest_image_file.write(await response.read())
latest_image_file.close()
print("Device - Retrieved latest image...")
if latest_image_file_path is not None:
print("PC - Uploading latest image...")
result = os.system(f"scp {latest_image_file_path} {WWW_IMAGE_FILE_PATH}")
if result == 0:
print("PC - Uploaded latest image...")
else:
print("PC - Failed to upload latest image...")
finally:
if iface.status() == pywifi.const.IFACE_CONNECTED:
iface.disconnect()
await asyncio.sleep(1.0)
if iface.status() in [pywifi.const.IFACE_DISCONNECTED, pywifi.const.IFACE_INACTIVE]:
print("PC - Disconnected from device wifi...")
else:
print("PC - Failed to disconnect from device wifi...")
else:
print("PC - Already disconnected from device wifi...")
finally:
bt_connected = False
for c in range(BLUETOOTH_CONNECTION_RETRY_ATTEMPTS):
try:
print(f"PC - BT Connecting (attempt {c+1})...")
async with bleak.BleakClient(address, timeout=20.0) as client:
print(f"PC - BT Connected: {client.is_connected}")
for i in range(3):
try:
print("Device - Turning wifi off...")
await client.write_gatt_char(BLUETOOTH_WIFI_CONTROL, b"GPIO2")
bt_connected = True
print("Device - Suceeded Turning wifi off...")
break
except:
print("Device - Failed turning wifi off...")
if i == 2:
raise
if bt_connected:
break
except bleak.exc.BleakDeviceNotFoundError:
if c == BLUETOOTH_CONNECTION_RETRY_ATTEMPTS-1:
raise
pass
finally:
os.system("rfkill block wifi")
print("PC - Turned wifi off...")
os.system("rfkill block bluetooth")
print("PC - Turned bluetooth off...")
if __name__ == "__main__":
asyncio.run(main(BLUETOOTH_ADDRESS))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment