Skip to content

Instantly share code, notes, and snippets.

@anaselli
Created January 20, 2025 18:08
Show Gist options
  • Save anaselli/336bb4f63f591b0d675c738942a504f7 to your computer and use it in GitHub Desktop.
Save anaselli/336bb4f63f591b0d675c738942a504f7 to your computer and use it in GitHub Desktop.
Test do_transaction (btanks)
#!/usr/bin/env python3
import select
import os
import pty
import dbus
import sys
import json
from datetime import datetime
DNFDAEMON_BUS_NAME = 'org.rpm.dnf.v0'
DNFDAEMON_OBJECT_PATH = '/' + DNFDAEMON_BUS_NAME.replace('.', '/')
IFACE_SESSION_MANAGER = '{}.SessionManager'.format(DNFDAEMON_BUS_NAME)
IFACE_BASE = '{}.Base'.format(DNFDAEMON_BUS_NAME)
IFACE_REPO = '{}.rpm.Repo'.format(DNFDAEMON_BUS_NAME)
IFACE_REPOCONF = '{}.rpm.RepoConf'.format(DNFDAEMON_BUS_NAME)
IFACE_RPM = '{}.rpm.Rpm'.format(DNFDAEMON_BUS_NAME)
IFACE_GOAL = '{}.Goal'.format(DNFDAEMON_BUS_NAME)
IFACE_ADVISORY = '{}.Advisory'.format(DNFDAEMON_BUS_NAME)
def unpack_dbus(data):
'''
convert dbus data types to python native data types
'''
if (isinstance(data, dbus.String) or
isinstance(data, dbus.ObjectPath) or
isinstance(data, dbus.Signature)):
data = str(data)
elif isinstance(data, dbus.Boolean):
data = bool(data)
elif (isinstance(data, dbus.Int64) or
isinstance(data, dbus.UInt64) or
isinstance(data, dbus.Int32) or
isinstance(data, dbus.UInt32) or
isinstance(data, dbus.Int16) or
isinstance(data, dbus.UInt16) or
isinstance(data, dbus.Byte)):
data = int(data)
elif isinstance(data, dbus.Double):
data = float(data)
elif isinstance(data, dbus.Array):
data = [unpack_dbus(value) for value in data]
elif isinstance(data, dbus.Struct):
data = [unpack_dbus(value) for value in data]
elif isinstance(data, dbus.Dictionary):
new_data = dict()
for key in data.keys():
new_data[unpack_dbus(key)] = unpack_dbus(data[key])
data = new_data
return data
def test():
bus = dbus.SystemBus()
iface_session = dbus.Interface(
bus.get_object(DNFDAEMON_BUS_NAME, DNFDAEMON_OBJECT_PATH),
dbus_interface=IFACE_SESSION_MANAGER)
session = iface_session.open_session({})
print(f"Open Dnf5Daemon session: {session}")
iface_repo = dbus.Interface(
bus.get_object(DNFDAEMON_BUS_NAME, session),
dbus_interface=IFACE_REPO)
iface_rpm = dbus.Interface(
bus.get_object(DNFDAEMON_BUS_NAME, session),
dbus_interface=IFACE_RPM)
iface_goal = dbus.Interface(
bus.get_object(DNFDAEMON_BUS_NAME, session),
dbus_interface=IFACE_GOAL)
iface_repoconf = dbus.Interface(
bus.get_object(DNFDAEMON_BUS_NAME, session),
dbus_interface=IFACE_REPOCONF)
iface_advisory = dbus.Interface(
bus.get_object(DNFDAEMON_BUS_NAME, session),
dbus_interface=IFACE_ADVISORY)
iface_rpm.install(["btanks"], {})
resolved, result = iface_goal.resolve({})
print (unpack_dbus(resolved))
if result == 0:
# execute the transaction offline (durint the next reboot)
print(datetime.now())
iface_goal.do_transaction({}, timeout=600)
#iface_goal.do_transaction({})
print(datetime.now())
print ("End of ransaction")
else:
errors = iface_goal.get_transaction_problems_string()
print("Errors while resolving the transaction:")
for error in errors:
print(error)
session_path = iface_session.close_session(session)
if __name__ == '__main__':
print("START test")
test()
print("END test")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment