Last active
March 8, 2023 18:30
-
-
Save chrisboyle/64d4037ee909b21b0f5b185f501cc8dd to your computer and use it in GitHub Desktop.
Tells Home Assistant whether my webcam is in use. The inotify one is unreliable, alas, as Discord leaks webcam file descriptors e.g. every time you test video :-(
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from pathlib import Path | |
from asyncinotify import Inotify, Mask | |
from collections import Counter | |
from homeassistant_api import Client | |
import asyncio | |
import glob | |
import os | |
ENTITY_ID = 'input_boolean.webcam_in_use' | |
V4L_DIR = '/sys/class/video4linux' | |
DELAY_SEC = 0.5 | |
async def main(): | |
with Inotify() as inotify: | |
was_on = None | |
counter = Counter() | |
inotify.add_watch(V4L_DIR, Mask.CREATE | Mask.DELETE | Mask.ONLYDIR) | |
cams = set(['/dev/' + os.path.basename(link) for link in glob.glob(V4L_DIR + '/*')]) | |
for cam in cams: | |
inotify.add_watch(cam, Mask.OPEN | Mask.CLOSE) | |
for fd in glob.glob('/proc/*/fd/*'): | |
try: | |
opened = os.readlink(fd) | |
if opened in cams: | |
counter[opened] += 1 | |
except OSError: | |
pass | |
while True: | |
try: | |
event = await asyncio.wait_for(inotify.get(), DELAY_SEC) | |
except asyncio.exceptions.TimeoutError: | |
print(counter.total()) | |
is_on = counter.total() > 0 | |
if is_on != was_on: | |
print(f"Update: {is_on}") | |
async with Client(os.getenv('HASS_API_URL'), os.getenv('HASS_TOKEN'), use_async=True) as client: | |
await client.async_trigger_service('input_boolean', 'turn_on' if is_on else 'turn_off', entity_id=ENTITY_ID) | |
was_on = is_on | |
continue | |
p = str(event.path) | |
if event.mask & Mask.OPEN: | |
print(f"open {p}") | |
counter[p] += 1 | |
elif event.mask & Mask.CLOSE: | |
print(f"close {p}") | |
counter[p] -= 1 | |
elif event.mask & Mask.CREATE: | |
print(f"create {p}") | |
inotify.add_watch('/dev/' + os.path.basename(p)) | |
elif event.mask & Mask.DELETE: | |
print(f"delete {p}") | |
counter[p] = 0 | |
if __name__ == "__main__": | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from homeassistant_api import Client | |
import asyncio | |
import aiofiles | |
import os | |
import re | |
ENTITY_ID = 'input_boolean.bifrost_webcam' | |
IS_IN_USE = re.compile(r"^uvcvideo \d+ [1-9]", flags=re.MULTILINE) | |
DELAY_SEC = 5 | |
async def main(): | |
was_on = None | |
while True: | |
async with aiofiles.open('/proc/modules', mode='r') as proc: | |
modules = await proc.read() | |
is_on = IS_IN_USE.search(modules) is not None | |
if is_on != was_on: | |
async with Client(os.getenv('HASS_API_URL'), os.getenv('HASS_TOKEN'), use_async=True) as client: | |
await client.async_trigger_service('input_boolean', 'turn_on' if is_on else 'turn_off', entity_id=ENTITY_ID) | |
was_on = is_on | |
await asyncio.sleep(DELAY_SEC) | |
if __name__ == "__main__": | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment