Last active
October 24, 2018 19:51
-
-
Save emmatyping/c88752c7b1605fe04596c1c4848c88f3 to your computer and use it in GitHub Desktop.
An experiment writing nice wrappers around namedpipes using the _winapi module.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import sys | |
from namedpipe import * | |
if __name__ == '__main__': | |
if len(sys.argv) < 2: | |
print("need s or c as argument") | |
elif sys.argv[1] == "s": | |
with NamedPipeServer('testFoo') as server: | |
count = 10 | |
while count > 0: | |
server.write(str(count)) | |
time.sleep(1) | |
count -= 1 | |
elif sys.argv[1] == "c": | |
with NamedPipeClient('testFoo') as client: | |
while True: | |
client.read() | |
else: | |
print(f"no can do: {sys.argv[1]}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""A clean wrapper around Windows APIs allowing interprocess communication over named pipes | |
Here is a clear, simple explanation of the incantation needed to do this correctly: | |
* From the client, spawn the server. | |
* The server should CreateNamedPipe, then ConnectNamedPipe | |
* The client must poll for the pipe, then open with CreateFile | |
* Then each can read/write from the pipe as needed. | |
* Finally, the client should close the pipe. | |
""" | |
import io | |
import time | |
from types import TracebackType | |
from typing import Optional, Type | |
import _winapi | |
import time | |
from contextlib import ContextDecorator | |
class NamedPipeBase(io.StringIO): | |
handle = _winapi.NULL | |
buffer_size = 2**16 | |
def __init__(self, name: str) -> None: | |
self.name = r'\\.\pipe\{}'.format(name) | |
def read(self, size: Optional[int] = None) -> str: | |
if self.handle != _winapi.NULL: | |
msg, status = _winapi.ReadFile(self.handle, size if size else self.buffer_size) | |
return msg.decode() | |
else: | |
raise WindowsError("Cannot read pipe") | |
def write(self, msg: str) -> int: | |
if self.handle != _winapi.NULL: | |
return _winapi.WriteFile(self.handle, msg.encode(), len(msg)) | |
else: | |
raise WindowsError("Cannot read pipe") | |
def __exit__(self, | |
exc_ty: Optional[Type[BaseException]] = None, | |
exc_val: Optional[BaseException] = None, | |
exc_tb: Optional[TracebackType] = None, | |
) -> bool: | |
if self.handle != _winapi.NULL: | |
_winapi.CloseHandle(self.handle) | |
return False | |
class NamedPipeServer(NamedPipeBase): | |
def __init__(self, name: str) -> None: | |
super().__init__(name) | |
self.handle = _winapi.CreateNamedPipe( | |
self.name, | |
_winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, | |
_winapi.PIPE_READMODE_MESSAGE | _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_WAIT, | |
1, # one instance | |
self.buffer_size, | |
self.buffer_size, | |
0, | |
_winapi.NULL, | |
) | |
if _winapi.GetLastError() != 0: | |
err = _winapi.GetLastError() | |
raise WindowsError(f'Error creating pipe: {err}') | |
def __enter__(self) -> 'NamedPipeServer': | |
_winapi.ConnectNamedPipe(self.handle, _winapi.NULL) | |
return self | |
class NamedPipeClient(NamedPipeBase): | |
def __init__(self, name: str) -> None: | |
super().__init__(name) | |
# Sadly we need to try several times until this works, or it times out | |
# thanks Windows | |
done = False | |
while not done: | |
try: | |
self.handle = _winapi.CreateFile( | |
self.name, | |
_winapi.GENERIC_READ | _winapi.GENERIC_WRITE, | |
0, | |
_winapi.NULL, | |
_winapi.OPEN_EXISTING, | |
0, | |
_winapi.NULL, | |
) | |
_winapi.SetNamedPipeHandleState(self.handle, | |
_winapi.PIPE_READMODE_MESSAGE, | |
_winapi.NULL, | |
_winapi.NULL) | |
except FileNotFoundError: | |
raise WindowsError( | |
"Unable to open connection to pipe at {}".format(self.name) | |
) | |
except WindowsError as e: | |
if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, _winapi.ERROR_PIPE_BUSY): | |
break | |
else: | |
time.sleep(1) | |
break | |
def __enter__(self) -> 'NamedPipeClient': | |
return self |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from namedpipe import NamedPipeServer | |
from shared import PIPE_NAME | |
import time | |
import sys | |
with NamedPipeServer(PIPE_NAME) as p: | |
time.sleep(1) | |
p.write('Hello from the server side.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment