Created
November 19, 2023 02:59
-
-
Save manueldeprada/8dce4a0a02067190715e92a4d69ff75d to your computer and use it in GitHub Desktop.
modified scg screen cast from python for screen resolution fetching
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import re | |
import signal | |
import dbus | |
from gi.repository import GLib | |
from dbus.mainloop.glib import DBusGMainLoop | |
import gi | |
gi.require_version('Gst', '1.0') | |
from gi.repository import GObject, Gst | |
DBusGMainLoop(set_as_default=True) | |
Gst.init(None) | |
loop = GLib.MainLoop() | |
bus = dbus.SessionBus() | |
request_iface = 'org.freedesktop.portal.Request' | |
screen_cast_iface = 'org.freedesktop.portal.ScreenCast' | |
pipeline = None | |
def terminate(): | |
if pipeline is not None: | |
self.player.set_state(Gst.State.NULL) | |
loop.quit() | |
request_token_counter = 0 | |
session_token_counter = 0 | |
sender_name = re.sub(r'\.', r'_', bus.get_unique_name()[1:]) | |
def new_request_path(): | |
global request_token_counter | |
request_token_counter = request_token_counter + 1 | |
token = 'u%d'%request_token_counter | |
path = '/org/freedesktop/portal/desktop/request/%s/%s'%(sender_name, token) | |
return (path, token) | |
def new_session_path(): | |
global session_token_counter | |
session_token_counter = session_token_counter + 1 | |
token = 'u%d'%session_token_counter | |
path = '/org/freedesktop/portal/desktop/session/%s/%s'%(sender_name, token) | |
return (path, token) | |
def screen_cast_call(method, callback, *args, options={}): | |
(request_path, request_token) = new_request_path() | |
bus.add_signal_receiver(callback, | |
'Response', | |
request_iface, | |
'org.freedesktop.portal.Desktop', | |
request_path) | |
options['handle_token'] = request_token | |
method(*(args + (options, )), | |
dbus_interface=screen_cast_iface) | |
def on_gst_message(bus, message): | |
type = message.type | |
if type == Gst.MessageType.EOS or type == Gst.MessageType.ERROR: | |
terminate() | |
def print_stream_resolution(element): | |
pad = element.get_static_pad("src") | |
if pad: | |
caps = pad.get_current_caps() | |
if caps: | |
structure = caps.get_structure(0) | |
width = structure.get_value('width') | |
height = structure.get_value('height') | |
print(f"Stream resolution: {width}x{height}") | |
else: | |
print("No source pad found.") | |
def play_pipewire_stream(node_id): | |
empty_dict = dbus.Dictionary(signature="sv") | |
fd_object = portal.OpenPipeWireRemote(session, empty_dict, | |
dbus_interface=screen_cast_iface) | |
fd = fd_object.take() | |
pipeline = Gst.parse_launch('pipewiresrc name=pipewiresrc0 fd=%d path=%u ! videoconvert ! xvimagesink'%(fd, node_id)) | |
pipewiresrc_element = pipeline.get_by_name('pipewiresrc0') | |
def on_state_changed(bus, message): | |
if message.src == pipeline and message.type == Gst.MessageType.STATE_CHANGED: | |
old_state, new_state, pending_state = message.parse_state_changed() | |
if new_state == Gst.State.PLAYING: | |
print_stream_resolution(pipewiresrc_element) | |
if not pipewiresrc_element: | |
print("Error: pipewiresrc element not found in pipeline") | |
pipeline.set_state(Gst.State.PLAYING) | |
bus = pipeline.get_bus() | |
bus.add_signal_watch() | |
bus.connect("message::state-changed", on_state_changed) | |
pipeline.get_bus().connect('message', on_gst_message) | |
def on_start_response(response, results): | |
if response != 0: | |
print("Failed to start: %s"%response) | |
terminate() | |
return | |
print("streams:") | |
for (node_id, stream_properties) in results['streams']: | |
print("stream {}".format(node_id)) | |
print(" properties:") | |
for (key, value) in stream_properties.items(): | |
print(" %s: %s"%(key, value)) | |
play_pipewire_stream(node_id) | |
def on_select_sources_response(response, results): | |
if response != 0: | |
print("Failed to select sources: %d"%response) | |
terminate() | |
return | |
print("select_sources response & result:") | |
print(response) | |
print(results) | |
print("sources selected") | |
global session | |
screen_cast_call(portal.Start, on_start_response, | |
session, '') | |
def on_create_session_response(response, results): | |
if response != 0: | |
print("Failed to create session: %d"%response) | |
terminate() | |
return | |
global session | |
session = results['session_handle'] | |
print("session %s created"%session) | |
print("on_create_session_response result:") | |
print(results) | |
screen_cast_call(portal.SelectSources, on_select_sources_response, | |
session, | |
options={ 'multiple': False, | |
'types': dbus.UInt32(1|2) }) | |
portal = bus.get_object('org.freedesktop.portal.Desktop', | |
'/org/freedesktop/portal/desktop') | |
(session_path, session_token) = new_session_path() | |
screen_cast_call(portal.CreateSession, on_create_session_response, | |
options={ 'session_handle_token': session_token }) | |
try: | |
loop.run() | |
except KeyboardInterrupt: | |
terminate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment