Skip to content

Instantly share code, notes, and snippets.

@pbk20191
Last active October 11, 2024 10:47
Show Gist options
  • Save pbk20191/2a02ea12c3686d076eee5f33f9be66f1 to your computer and use it in GitHub Desktop.
Save pbk20191/2a02ea12c3686d076eee5f33f9be66f1 to your computer and use it in GitHub Desktop.
tkinter+asyncio
"""
An event loop policy that integrates Windows message loop with IOCP-based async processing.
"""
# SPDX-License-Identifier: MIT
__all__ = (
'TkinterProactorEventLoopPolicy',
'TclIocpProactor',
)
import asyncio
import asyncio.events
import ctypes
import logging, _tkinter
import tkinter
import math
# import sys
import traceback
if sys.platform != 'win32':
raise ImportError('%s is not available on your platform'.format(__name__))
from _winapi import INFINITE
from ctypes.wintypes import HANDLE, MSG, BOOL, LPMSG, HWND, UINT, LPARAM, DWORD, LPHANDLE
from typing import Callable
import sys, _tkinter
WAIT_OBJECT_0 = 0
WAIT_IO_COMPLETION = 192
WAIT_TIMEOUT = 258
WAIT_FAILED = 4294967295
MWMO_ALERTABLE = 2
MWMO_INPUTAVAILABLE = 4
PM_REMOVE = 1
QS_ALLINPUT = 1279
WS_OVERLAPPED = 0
WM_QUIT = 18
LRESULT = LPARAM
def _winfn(lib, name, *args):
fun = getattr(lib, name)
globals()[name] = ctypes.WINFUNCTYPE(*args)(fun)
_tcl = ctypes.CDLL('tcl86t.dll')
_user32 = ctypes.WinDLL("USER32")
_winfn(_user32, 'MsgWaitForMultipleObjectsEx', DWORD,
DWORD, LPHANDLE, DWORD, DWORD, DWORD)
_tcl.Tcl_SetServiceMode.argtypes = [ctypes.c_int]
_tcl.Tcl_SetServiceMode.restype = ctypes.c_int
_tcl.Tcl_ServiceAll.argtypes = []
_tcl.Tcl_ServiceAll.restype = None
logger = logging.getLogger(__name__)
def debug_stack(limit=None):
stack = traceback.extract_stack(limit=limit)
for item in traceback.StackSummary.from_list(stack).format():
logger.debug(item)
class TclIocpProactor(asyncio.IocpProactor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
_tcl.Tcl_SetServiceMode(_tcl.Tcl_SetServiceMode(True))
self._tk = tkinter.Tcl()
self._dumpstack: bool = True
self._counter = 0
def _poll(self, timeout=None):
logger.debug("_poll enter")
_msg = MSG()
_handles = (HANDLE * 1)()
_handles[0] = self._iocp
if self._dumpstack:
debug_stack(limit=6)
self._dumpstack = False
if timeout is None:
ms = INFINITE
elif timeout < 0:
raise ValueError("negative timeout")
else:
# MsgWaitForMultipleObjectsEx() has a resolution of 1 millisecond,
# round away from zero to wait *at least* timeout seconds.
ms = math.ceil(timeout * 1e3)
if ms >= INFINITE:
raise ValueError("timeout too big")
while True:
rv = MsgWaitForMultipleObjectsEx(
1, _handles, ms, QS_ALLINPUT, MWMO_ALERTABLE | MWMO_INPUTAVAILABLE)
ms = 0
logger.debug(f"* {self._counter}")
self._counter += 1
if rv == WAIT_FAILED:
logger.debug("MsgWaitForMultipleObjectsEx WAIT_FAILED")
else:
super()._poll(timeout=ms)
self._tk.update()
_tcl.Tcl_SetServiceMode(True)
_tcl.Tcl_ServiceAll()
_tcl.Tcl_SetServiceMode(False)
break
logger.debug("_poll exit")
class TkinterProactorEventLoopPolicy(
asyncio.events.BaseDefaultEventLoopPolicy):
def _loop_factory(self):
return asyncio.ProactorEventLoop(proactor=TclIocpProactor())
if __name__ == '__main__':
from tkinter import Tk, Label, Button, _tkinter
import logging
import random
logging.basicConfig(level=logging.INFO)
asyncio.set_event_loop_policy(TkinterProactorEventLoopPolicy())
root = Tk()
text = "This is Tcl/Tk %s" % root.globalgetvar('tk_patchLevel')
text += "\nThis should be a cedilla: \xe7"
label = Label(root, text=text)
label.pack()
def soo():
loop = asyncio.get_running_loop()
loop.call_later(1, lambda : print("later"))
pass
loop2 = Button(root, text="Asyncio", command=soo)
loop2.pack()
test = Button(root, text="Click me!",
command=lambda root=root: root.test.configure(
text="[%s]" % root.test['text']))
test.pack()
root.test = test
quit = Button(root, text="QUIT", command=root.destroy)
quit.pack()
root.iconify()
root.update()
root.deiconify()
async def foo():
await asyncio.sleep(10)
asyncio.run(foo(), debug=True)
import asyncio
import selectors
import tkinter
import sys
__all__ = (
'TkinterEventLoopPolicy',
'_TkinterSelector',
)
if sys.platform == 'win32':
raise ImportError('%s is not available on your platform'.format(__name__))
import _tkinter
class _TkinterSelector(selectors._BaseSelectorImpl):
def __init__(self):
super().__init__()
self._tk = tkinter.Tcl()
self._ready = []
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
mask = 0
if events & selectors.EVENT_READ:
mask |= tkinter.READABLE
if events & selectors.EVENT_WRITE:
mask |= tkinter.WRITABLE
def ready(fd, mask):
assert key.fd == fd
events = 0
if mask & tkinter.READABLE:
events |= selectors.EVENT_READ
if mask & tkinter.WRITABLE:
events |= selectors.EVENT_WRITE
self._ready.append((key, events))
self._tk.createfilehandler(key.fd, mask, ready)
return key
def unregister(self, fileobj):
key = super().unregister(fileobj)
self._tk.deletefilehandler(key.fd)
return key
def select(self, timeout=None):
self._ready = []
if timeout is not None and timeout != 0:
timeout = int(timeout*1000)
token = self._tk.createtimerhandler(timeout, lambda: True)
if timeout == 0:
self._tk.dooneevent(_tkinter.DONT_WAIT)
else:
self._tk.dooneevent()
if timeout is not None and timeout != 0:
token.deletetimerhandler()
return self._ready
class TkinterEventLoopPolicy(asyncio.events.BaseDefaultEventLoopPolicy):
def _loop_factory(self):
return asyncio.SelectorEventLoop(selector=_TkinterSelector())
@pbk20191
Copy link
Author

pbk20191 commented Oct 8, 2024

inspired by windows, unix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment