Last active
January 1, 2025 03:44
-
-
Save KubaO/8c2395cd539777151caa8454f0596854 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import builtins, pickle, struct | |
from collections.abc import Iterable | |
from dataclasses import dataclass, field, InitVar | |
from typing import Any | |
from itertools import batched | |
__all__ = ("asciionly", "dump", "Memory") | |
# version 7 | |
def passthrough(x): | |
return x | |
def asciionly(byte): | |
if byte in range(32, 128): | |
return chr(byte) | |
else: | |
return "." | |
def dump(buf, doprint=True): | |
width = 16 | |
output = "Hexadecimal dump:\n" | |
pieces = batched(buf, width) | |
for i, piece in enumerate(pieces): | |
output += f"{(i * width):4}: " | |
for j, byte in enumerate(piece): | |
if j and j % 4 == 0: output += " " | |
output += f"{byte:02x}" | |
output += " " | |
for j, byte in enumerate(piece): | |
if j and j % 8 == 0: output += " " | |
output += asciionly(byte) | |
output += "\n" | |
if doprint: | |
builtins.print(output) | |
return output | |
def intformat(value: int) -> bytes: | |
if value >= 0: | |
return b"H" if value <= 0xFFFF else b"I" if value <= 0xFFFF_FFFF else b"Q" | |
else: | |
return b"h" if value >= -0x8000 else b"i" if value >= -0x8000_0000 else b"q" | |
@dataclass(slots=True) | |
class BaseBlockData: | |
offset: int | |
_bformat: bytes | None = field(default=None, init=False, repr=False) # format stored in the block | |
size: int = 0 | |
bname: bytes = b"" | |
valformat: bytes = b"" # actual format of the value used by struct | |
valsize: int = 0 # size of the value stored in this block - may be less than valspace | |
value: int | float | bytes | str | None = None | |
rawvalue: int | float | bytes | None = None | |
bformat: InitVar[bytes | None] = None | |
def __post_init__(self, bformat): | |
self._bformat = bformat | |
class BlockData(BaseBlockData): | |
def trimsize(self): | |
"""Trims the size of the block to only as large as needed for valsize""" | |
self.size -= self.valspace - self.valsize | |
@property | |
def name(self) -> str: | |
return self.bname.decode() | |
@property | |
def valspace(self): | |
"""The space available in the block for a value, may be bigger than valsize""" | |
return self.size - 3 - len(self.bname) | |
@property | |
def isempty(self): | |
return self.valsize == 0 | |
@property | |
def bformat(self) -> bytes: | |
return self._bformat or self.valformat[-1:] | |
@bformat.setter | |
def bformat(self, value: bytes): | |
self._bformat = value | |
@property | |
def end(self) -> int: # end of the range of addresses spanned by the block | |
return self.offset + self.size | |
def __enter__(self): | |
return self | |
def __exit__(self, *args): | |
pass | |
class Memory: | |
def __init__(self, size=128, allowexpansion=True): | |
self._allowexpansion = allowexpansion | |
self._mem = bytearray(size) | |
self._memsize = size | |
def m_dump(self, *args): | |
return dump(self._mem, *args) | |
# Each variable value is stored like this: | |
# | |
# format, length of name, maximum length of value, name, value | |
# | |
# Subsequent variables are stored immediately after each other. | |
# | |
# If a variable exists and can fit the new value, it will be overwritten. | |
# Otherwise, the variable block is zeroed out, and a new block is found etc. | |
# | |
# Once the memory is full, memory compaction is performed to reclaim fragmented | |
# empty space. | |
# @formatter:off | |
TYPES = { | |
str: (None, lambda value: b"%ds" % len(value), str.encode), | |
bytes: (None, lambda value: b"%dp" % (len(value)+1), passthrough), | |
int: (None, intformat, passthrough), | |
float: (None, lambda value: b"d", passthrough), | |
b"s": (lambda valsize: b"%ds" % valsize, lambda value: value.rstrip(b"\0").decode()), | |
b"p": (lambda valsize: b"%dp" % valsize, passthrough), | |
} | |
# @formatter:on | |
def _types_fallback(self, key) -> tuple | None: | |
if isinstance(key, type): | |
# pickle all types not in TYPES | |
return b"o", lambda value: b"%dp" % (len(value) + 1), pickle.dumps | |
elif isinstance(key, bytes): | |
if key == b"o": | |
# unpickle all objects | |
return lambda valsize: b"%dp" % valsize, pickle.loads | |
raise KeyError(key) | |
def _get_typeinfo(self, key, default=None) -> tuple | None: | |
try: | |
return Memory.TYPES[key] | |
except KeyError: | |
try: | |
return self._types_fallback(key) | |
except KeyError: | |
if default: | |
return default | |
raise TypeError(f"Unhandled type: {key}") | |
def _emptyblocksize(self, offset: int) -> int: | |
"""Finds the size of the empty block at given offset. It may be zero if the block is not empty""" | |
end = offset | |
while end < self._memsize and self._mem[end] == 0: | |
end += 1 | |
return end - offset | |
def _zeroblock(self, block: BlockData): | |
"""Zero out a given variable, turning it into an empty block""" | |
struct.pack_into(f"{block.size}x", self._mem, block.offset) | |
def _formatblock(self, offset: int, name: str, value: Any) -> BlockData: | |
bformat, formatter, encoder = self._get_typeinfo(type(value)) | |
rawvalue = encoder(value) | |
valformat = formatter(rawvalue) | |
valsize = struct.calcsize(valformat) | |
bname = name.encode() | |
blockformat = b">cBB%ds%s" % (len(bname), valformat) | |
blocksize = struct.calcsize(blockformat) | |
return BlockData(offset, | |
size=blocksize, bformat=bformat, bname=bname, valformat=valformat, valsize=valsize, | |
value=value, rawvalue=rawvalue) | |
def _writeblock(self, block: BlockData): | |
blockformat = b">cBB%ds%s%dx" % (len(block.bname), block.valformat, max(0, block.valspace - block.valsize)) | |
struct.pack_into(blockformat, self._mem, block.offset, | |
block.bformat, len(block.bname), block.valspace, block.bname, block.rawvalue) | |
def _readblock(self, offset: int) -> BlockData: | |
if self._mem[offset] == 0: | |
return BlockData(offset, size=self._emptyblocksize(offset)) | |
bformat, namesize, valsize = struct.unpack_from(b">cBB", self._mem, offset) | |
formatter, decoder = self._get_typeinfo(bformat, default=(lambda _: bformat, passthrough)) | |
bformat = formatter(valsize) | |
assert valsize >= struct.calcsize(bformat) | |
bname, rawvalue = struct.unpack_from(b">%ds%s" % (namesize, bformat), self._mem, offset + 3) | |
value = decoder(rawvalue) | |
blocksize = 3 + namesize + valsize | |
return BlockData(offset, size=blocksize, bname=bname, valformat=bformat, valsize=valsize, value=value, | |
rawvalue=rawvalue) | |
def _blocks(self) -> Iterable[BlockData]: | |
offset = 0 | |
while offset < self._memsize: | |
block = self._readblock(offset) | |
blocksize = block.size # save the size since block may be modified during yield | |
yield block | |
offset += blocksize | |
def _compact(self) -> int: | |
"""Compacts all empty space to the end of the memory, returns the size of empty memory left""" | |
writeoffset = 0 | |
for block in self._blocks(): | |
if not block.isempty: | |
block.trimsize() | |
block.offset = writeoffset | |
self._writeblock(block) | |
writeoffset += block.size | |
block = BlockData(writeoffset, size=self._memsize - writeoffset) | |
self._zeroblock(block) | |
return block.size | |
def _findvar(self, varname: str) -> BlockData | None: | |
for block in self._blocks(): | |
if block.name == varname: | |
return block | |
return None | |
def _expandby(self, by: int): | |
if not self._allowexpansion: | |
raise MemoryError | |
self._mem += b"\0" * by | |
self._memsize += by | |
def _findemptyblock(self, size: int) -> BlockData | None: | |
for block in self._blocks(): | |
if block.isempty and block.size >= size: | |
block.size = size | |
return block | |
return None | |
def _findemptyblockgc(self, size: int) -> BlockData: | |
block = self._findemptyblock(size) | |
if block is None: | |
free = self._compact() | |
if free < size: | |
self._expandby(max(self._memsize, free - size)) # at least double memory size | |
block = self._findemptyblock(size) | |
assert block is not None | |
return block | |
def m_readvar(self, varname: str) -> int | float | str | bytes | None: | |
var = self._findvar(varname) | |
return var and var.value | |
def m_deletevar(self, varname: str): | |
var = self._findvar(varname) | |
if var: | |
self._zeroblock(var) | |
def m_writevar(self, varname: str, value: int | float | str | bytes) -> BlockData: | |
newblock = self._formatblock(0, varname, value) | |
target = self._findvar(varname) | |
if target and target.valspace < newblock.valsize: | |
self._zeroblock(target) | |
target = None | |
if not target: | |
target = self._findemptyblockgc(newblock.size) | |
newblock.offset, newblock.size = target.offset, target.size | |
self._writeblock(newblock) | |
return newblock | |
def __getattribute__(self, name): | |
if name.startswith("_") or name.startswith("m_"): | |
return object.__getattribute__(self, name) | |
else: | |
return self.m_readvar(name) | |
def __setattr__(self, name, value): | |
if name.startswith("_") or name.startswith("m_"): | |
object.__setattr__(self, name, value) | |
else: | |
self.m_writevar(name, value) | |
def __delattr__(self, name): | |
if name.startswith("_") or name.startswith("m_"): | |
object.__delattr__(self, name) | |
else: | |
self.m_deletevar(name) | |
def test_Memory(): | |
print() | |
mem = Memory(64) | |
aval = ":" + " " * 30 + ":" | |
mem.a = aval | |
blks = list(mem._blocks()) | |
assert len(blks) == 2 | |
with blks[0] as blk: | |
assert blk.offset == 0 and blk.value == aval | |
with blks[1] as blk: | |
assert blk.offset == blks[0].end and blk.isempty | |
bval = ":" + " " * 10 + ":" | |
mem.b = bval | |
blks = list(mem._blocks()) | |
assert len(blks) == 3 | |
with blks[1] as blk: | |
assert blk.offset == blks[0].end and blk.value == bval | |
with blks[2] as blk: | |
assert blk.offset == blks[1].end and blk.isempty | |
boffset = blks[1].offset | |
del mem.a | |
blks = list(mem._blocks()) | |
assert len(blks) == 3 | |
with blks[0] as blk: | |
assert blk.isempty | |
with blks[1] as blk: | |
assert blk.offset == blks[0].end == boffset and blk.value == bval | |
with blks[2] as blk: | |
assert blk.offset == blks[1].end and blk.isempty | |
mem.a = aval | |
blks = list(mem._blocks()) | |
assert len(blks) == 3 | |
with blks[0] as blk: | |
assert blk.offset == 0 and blk.value == aval | |
with blks[1] as blk: | |
assert blk.offset == blks[0].end == boffset and blk.value == bval | |
with blks[2] as blk: | |
assert blk.offset == blks[1].end and blk.isempty | |
del mem.a | |
cval = ":" + " " * 31 + ":" | |
# cval is larger that can fit into any empty blocks - this will cause a compaction | |
mem.c = cval | |
blks = list(mem._blocks()) | |
assert len(blks) == 3 | |
with blks[0] as blk: | |
# b value was compacted to the beginning of memory | |
assert blk.offset == 0 and blk.value == bval | |
with blks[1] as blk: | |
# c value goes right after it | |
assert blk.offset == blks[0].end and blk.value == cval | |
with blks[2] as blk: | |
assert blk.offset == blks[1].end and blk.isempty | |
def test_pickling(): | |
mem = Memory(64) | |
mem.a = [1, 2] | |
assert mem.a == [1, 2] | |
def main(): | |
mem = Memory() | |
mem.n1 = 1 | |
print(mem.n1) | |
mem.m_dump() | |
mem.n1 = "abc" | |
print(mem.n1) | |
mem.m_dump() | |
mem.n1 = 0x1234 | |
print(mem.n1) | |
mem.m_dump() | |
mem.n1 = "efghijkl" | |
print(mem.n1) | |
mem.m_dump() | |
mem.n1 = 0x5678 | |
print(mem.n1) | |
mem.m_dump() | |
mem.n1 = 0x12345678 | |
print(mem.n1) | |
mem.m_dump() | |
mem.n1 = 1 | |
while True: | |
mem.n1 += 1 | |
print(mem.n1) | |
if mem.n1 == 5: | |
break | |
print(mem.m_readvar("n1")) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment