Created
January 12, 2023 00:56
-
-
Save 46cv8/5aa65ba54217ceb03556bc7b1eb4f689 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import asyncio | |
import uuid | |
import xml.dom.minidom | |
import aiohttp | |
import bleak | |
import pywifi | |
import logging | |
# pip install bleak | |
# pip install pywifi | |
# pip install aiohttp | |
# mkdir /home/pi/images | |
# hciconfig requires non root access which can be set as follows. https://bbs.archlinux.org/viewtopic.php?id=215080 | |
# sudo setcap 'cap_net_raw,cap_net_admin+eip' `which hciconfig` | |
#logger = logging.getLogger("pywifi") | |
#logger.setLevel(logging.DEBUG) | |
BLUETOOTH_ADDRESS = "nn:nn:nn:nn:nn:nn" # your campark t100 camera bluetooth address | |
BLUETOOTH_WIFI_CONTROL = 45 # 0x2e 0xFFE9 | |
WIFI_SSID = "Campark-T100-yourcamparkwifissid" # your campark t100 camera wifi ssid | |
WIFI_PASSWORD = "yourcamparkt100wifipassword" # your campark t100 camera wifi password | |
WIFI_INTERFACE = "wlan0" # "wlan0" for pi, "wlp2s0" for pc | |
LOCAL_IMAGE_FILE_PATH = "/home/pi/images" | |
WWW_IMAGE_FILE_PATH = "username@servername:/home/website/www/latestcameraimage.jpg" # the site to upload the image too via scp using shared key authentication | |
BLUETOOTH_CONNECTION_RETRY_ATTEMPTS = 3 | |
profile = pywifi.Profile() | |
profile.ssid = WIFI_SSID | |
profile.auth = pywifi.const.AUTH_ALG_OPEN | |
profile.akm.append(pywifi.const.AKM_TYPE_WPA2PSK) | |
profile.cipher = pywifi.const.CIPHER_TYPE_CCMP | |
#profile.cipher = pywifi.const.CIPHER_TYPE_UNKNOWN | |
profile.key = WIFI_PASSWORD | |
async def main(address): | |
os.system("rfkill unblock bluetooth") | |
#os.system("hciconfig hci1 down") | |
#os.system("hciconfig hci0 up") # confirm which adapter is the one you actually want ot use | |
print("PC - Turned bluetooth on...") | |
await asyncio.sleep(5.0) | |
try: | |
bt_connected = False | |
for c in range(BLUETOOTH_CONNECTION_RETRY_ATTEMPTS): | |
try: | |
print(f"PC - BT Connecting (attempt {c+1})...") | |
async with bleak.BleakClient(address, timeout=20.0) as client: | |
print(f"PC - BT Connected: {client.is_connected}") | |
#services = await client.get_services() | |
#print(services.services) | |
#print(services.characteristics) | |
#print(services.descriptors) | |
#paired = await client.pair(protection_level=2) | |
#print(f"Paired: {paired}") | |
for i in range(3): | |
try: | |
print("Device - Turning wifi on...") | |
await client.write_gatt_char(BLUETOOTH_WIFI_CONTROL, b"GPIO3") | |
bt_connected = True | |
print("Device - Suceeded Turning wifi on...") | |
break | |
except: | |
print("Device - Failed turning wifi on...") | |
if i == 2: | |
raise | |
if bt_connected: | |
break | |
except bleak.exc.BleakDeviceNotFoundError: | |
if c == BLUETOOTH_CONNECTION_RETRY_ATTEMPTS-1: | |
raise | |
pass | |
try: | |
await asyncio.sleep(10.0) # 10 | |
os.system("rfkill unblock wifi") | |
print("PC - Turned computer wifi on...") | |
await asyncio.sleep(2.0) | |
wifi = pywifi.PyWiFi() | |
iface = wifi.interfaces()[0] | |
ifaces = [i for i in wifi.interfaces() if i.name() == WIFI_INTERFACE] | |
assert len(ifaces) > 0 | |
iface = ifaces[0] | |
iface.disconnect() | |
await asyncio.sleep(1.0) # 10 | |
assert iface.status() in [pywifi.const.IFACE_DISCONNECTED, pywifi.const.IFACE_INACTIVE] | |
iface.remove_all_network_profiles() | |
tmp_profile = iface.add_network_profile(profile) | |
for i in range(3): | |
print("PC - Connecting to wifi access point...") | |
#iface.scan() | |
#await asyncio.sleep(10.0) | |
#iface.scan_results() | |
iface.connect(tmp_profile) | |
await asyncio.sleep(1.0) | |
for _ in range(20): | |
if iface.status() == pywifi.const.IFACE_CONNECTED: | |
print("PC - Connected to wifi access point...") | |
await asyncio.sleep(20.0) # delay to give dhcp time to get an ip address | |
break | |
await asyncio.sleep(1.0) | |
if iface.status() == pywifi.const.IFACE_CONNECTED: | |
break | |
print("PC - Failed to Connect to wifi access point...") | |
if i == 2: | |
raise Exception("Unable to connect to wifi access point") | |
try: | |
async with aiohttp.ClientSession(trust_env=True) as session: | |
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3012") as response: | |
await response.text() | |
#async with session.get("http://192.168.8.120:80/?custom=1&cmd=3005&str=2023-01-09") as response: | |
# await response.text() | |
#async with session.get("http://192.168.8.120:80/?custom=1&cmd=3006&str=11:12:38") as response: | |
# await response.text() | |
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3016") as response: | |
await response.text() | |
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3014") as response: | |
await response.text() | |
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3031&str=") as response: | |
await response.text() | |
await asyncio.sleep(2.0) | |
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3001&par=0") as response: | |
await response.text() # switch to photo mode | |
print("Device - Switched to photo mode...") | |
await asyncio.sleep(2.0) | |
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3014") as response: | |
await response.text() | |
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3031&str=") as response: | |
await response.text() | |
await asyncio.sleep(2.0) | |
async with session.get("http://192.168.8.120:80/?custom=1&cmd=1001") as response: | |
await response.text() # take photo | |
print("Device - Took photo...") | |
await asyncio.sleep(2.0) | |
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3001&par=2") as response: | |
await response.text() # switch to photo gallery | |
await asyncio.sleep(2.0) | |
print("Device - Switched to photo grallery") | |
async with session.get("http://192.168.8.120:80/?custom=1&cmd=3015") as response: # get list of photos in gallery | |
latest_image_file_name = [n.childNodes[0].data for n in xml.dom.minidom.parseString(await response.read()).documentElement.getElementsByTagName("NAME")][-1] | |
print(f"Device - Got photo gallery list, retrieving latest image {latest_image_file_name}...") | |
latest_image_file_path = None | |
async with session.get(f"http://192.168.8.120:80/DCIM/PHOTO/{latest_image_file_name}") as response: # download desired photo | |
latest_image_file_path = f"{LOCAL_IMAGE_FILE_PATH}/{latest_image_file_name}" | |
latest_image_file = open(latest_image_file_path, 'wb') | |
latest_image_file.write(await response.read()) | |
latest_image_file.close() | |
print("Device - Retrieved latest image...") | |
if latest_image_file_path is not None: | |
print("PC - Uploading latest image...") | |
result = os.system(f"scp {latest_image_file_path} {WWW_IMAGE_FILE_PATH}") | |
if result == 0: | |
print("PC - Uploaded latest image...") | |
else: | |
print("PC - Failed to upload latest image...") | |
finally: | |
if iface.status() == pywifi.const.IFACE_CONNECTED: | |
iface.disconnect() | |
await asyncio.sleep(1.0) | |
if iface.status() in [pywifi.const.IFACE_DISCONNECTED, pywifi.const.IFACE_INACTIVE]: | |
print("PC - Disconnected from device wifi...") | |
else: | |
print("PC - Failed to disconnect from device wifi...") | |
else: | |
print("PC - Already disconnected from device wifi...") | |
finally: | |
bt_connected = False | |
for c in range(BLUETOOTH_CONNECTION_RETRY_ATTEMPTS): | |
try: | |
print(f"PC - BT Connecting (attempt {c+1})...") | |
async with bleak.BleakClient(address, timeout=20.0) as client: | |
print(f"PC - BT Connected: {client.is_connected}") | |
for i in range(3): | |
try: | |
print("Device - Turning wifi off...") | |
await client.write_gatt_char(BLUETOOTH_WIFI_CONTROL, b"GPIO2") | |
bt_connected = True | |
print("Device - Suceeded Turning wifi off...") | |
break | |
except: | |
print("Device - Failed turning wifi off...") | |
if i == 2: | |
raise | |
if bt_connected: | |
break | |
except bleak.exc.BleakDeviceNotFoundError: | |
if c == BLUETOOTH_CONNECTION_RETRY_ATTEMPTS-1: | |
raise | |
pass | |
finally: | |
os.system("rfkill block wifi") | |
print("PC - Turned wifi off...") | |
os.system("rfkill block bluetooth") | |
print("PC - Turned bluetooth off...") | |
if __name__ == "__main__": | |
asyncio.run(main(BLUETOOTH_ADDRESS)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment