Last active
August 18, 2018 19:36
-
-
Save matejc/0d8e5f14d8b0611f2834f3e5613ae9db to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import logging | |
import tty | |
import termios | |
import argparse | |
import pychromecast | |
import threading | |
from pychromecast.controllers.youtube import YouTubeController | |
from urllib.parse import urlparse | |
from urllib.parse import parse_qs | |
class ThreadJob(threading.Thread): | |
def __init__(self, callback, interval): | |
self.callback = callback | |
self.event = threading.Event() | |
self.interval = interval | |
self._stopped = False | |
super(ThreadJob, self).__init__() | |
def run(self): | |
while not self.event.wait(self.interval): | |
cont = self.callback() | |
if not cont or self._stopped: | |
break | |
def stop(self): | |
self._stopped = True | |
def cast_run(videoId, host=None, name=None): | |
cast = None | |
if host is not None: | |
cast = pychromecast._get_chromecast_from_host(host) | |
elif name is not None: | |
chromecasts = pychromecast.get_chromecasts() | |
cast = next( | |
cc for cc in chromecasts if cc.device.friendly_name == name) | |
else: | |
chromecasts = pychromecast.get_chromecasts() | |
cast = chromecasts[0] | |
cast.wait() | |
yt = YouTubeController() | |
cast.register_handler(yt) | |
yt.play_video(videoId) | |
def cleanup(): | |
if cast.media_controller.status.idle_reason is not None: | |
cast.quit_app() | |
return False | |
return True | |
job = ThreadJob(cleanup, 1.0) | |
job.start() | |
return cast, yt, job | |
class ReadChar(): | |
def __enter__(self): | |
self.fd = sys.stdin.fileno() | |
self.old_settings = termios.tcgetattr(self.fd) | |
tty.setraw(sys.stdin.fileno()) | |
return sys.stdin.read(1) | |
def __exit__(self, type, value, traceback): | |
termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old_settings) | |
def set_seek(cast, by): | |
if cast.media_controller.status.supports_seek is False: | |
return | |
cast.media_controller.update_status() | |
position = cast.media_controller.status.current_time + by | |
if by >= 0: | |
if position > cast.media_controller.status.duration: | |
position = cast.media_controller.status.duration - by | |
else: | |
if position < 0: | |
position = 0 | |
cast.media_controller.seek(position) | |
def set_volume(cast, by): | |
volume = cast.status.volume_level + by | |
if by >= 0: | |
if volume > 1.0: | |
volume = 1.0 | |
else: | |
if volume < 0.0: | |
volume = 0.0 | |
cast.set_volume(volume) | |
def toggle_play(cast): | |
if cast.media_controller.status.supports_pause is False: | |
return | |
if cast.media_controller.status.player_state == "PLAYING": | |
cast.media_controller.pause() | |
else: | |
cast.media_controller.play() | |
def parse_video_id(value): | |
""" | |
https://stackoverflow.com/a/7936523 | |
Examples: | |
- http://youtu.be/SA2iWivDJiE | |
- http://www.youtube.com/watch?v=_oPAwA_Udwc&feature=feedu | |
- http://www.youtube.com/embed/SA2iWivDJiE | |
- http://www.youtube.com/v/SA2iWivDJiE?version=3&hl=en_US | |
""" | |
query = urlparse(value) | |
if query.hostname == 'youtu.be': | |
return query.path[1:] | |
if query.hostname in ('www.youtube.com', 'youtube.com'): | |
if query.path == '/watch': | |
p = parse_qs(query.query) | |
return p['v'][0] | |
if query.path[:7] == '/embed/': | |
return query.path.split('/')[2] | |
if query.path[:3] == '/v/': | |
return query.path.split('/')[2] | |
# fail? | |
return None | |
def main(args): | |
cast, yt, job = None, None, None | |
video_id = parse_video_id(args.url) | |
if args.address is not None: | |
cast, yt, job = cast_run(video_id, host=(args.address, args.port, | |
'1-2', 'dev12', "Some TV")) | |
elif args.name is not None: | |
cast, yt, job = cast_run(video_id, name=args.name) | |
else: | |
cast, yt, job = cast_run(video_id) | |
print('casting ...') | |
if args.control is False: | |
return | |
print('use keys to control') | |
lastOrdChars = [] | |
while True: | |
with ReadChar() as rc: | |
char = rc | |
while len(lastOrdChars) >= 3: | |
del lastOrdChars[0] | |
lastOrdChars += [ord(rc)] | |
if ord(char) == 3 or char == "q": | |
job.stop() | |
cast.quit_app() | |
sys.exit() | |
elif char == "0": | |
set_volume(cast, 0.1) | |
elif char == "9": | |
set_volume(cast, -0.1) | |
elif ord(char) == 32: | |
toggle_play(cast) | |
elif lastOrdChars == [27, 91, 67]: # right | |
set_seek(cast, 5.0) | |
elif lastOrdChars == [27, 91, 68]: # left | |
set_seek(cast, -5.0) | |
elif lastOrdChars == [27, 91, 65]: # up | |
set_seek(cast, 60.0) | |
elif lastOrdChars == [27, 91, 66]: # down | |
set_seek(cast, -60.0) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument('url', help='youtube url') | |
parser.add_argument('--control', help='control by keyboard', | |
action='store_const', const=True, default=False) | |
parser.add_argument('--name', help='friendly chromecast name') | |
parser.add_argument('--address', help='ip address of chromecast') | |
parser.add_argument('--port', help='port of chromecast', default=8009) | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment