Skip to content

Instantly share code, notes, and snippets.

@KubaO
Last active January 1, 2025 03:44
Show Gist options
  • Save KubaO/8c2395cd539777151caa8454f0596854 to your computer and use it in GitHub Desktop.
Save KubaO/8c2395cd539777151caa8454f0596854 to your computer and use it in GitHub Desktop.
import builtins, pickle, struct
from collections.abc import Iterable
from dataclasses import dataclass, field, InitVar
from typing import Any
from itertools import batched
__all__ = ("asciionly", "dump", "Memory")
# version 7
def passthrough(x):
return x
def asciionly(byte):
if byte in range(32, 128):
return chr(byte)
else:
return "."
def dump(buf, doprint=True):
width = 16
output = "Hexadecimal dump:\n"
pieces = batched(buf, width)
for i, piece in enumerate(pieces):
output += f"{(i * width):4}: "
for j, byte in enumerate(piece):
if j and j % 4 == 0: output += " "
output += f"{byte:02x}"
output += " "
for j, byte in enumerate(piece):
if j and j % 8 == 0: output += " "
output += asciionly(byte)
output += "\n"
if doprint:
builtins.print(output)
return output
def intformat(value: int) -> bytes:
if value >= 0:
return b"H" if value <= 0xFFFF else b"I" if value <= 0xFFFF_FFFF else b"Q"
else:
return b"h" if value >= -0x8000 else b"i" if value >= -0x8000_0000 else b"q"
@dataclass(slots=True)
class BaseBlockData:
offset: int
_bformat: bytes | None = field(default=None, init=False, repr=False) # format stored in the block
size: int = 0
bname: bytes = b""
valformat: bytes = b"" # actual format of the value used by struct
valsize: int = 0 # size of the value stored in this block - may be less than valspace
value: int | float | bytes | str | None = None
rawvalue: int | float | bytes | None = None
bformat: InitVar[bytes | None] = None
def __post_init__(self, bformat):
self._bformat = bformat
class BlockData(BaseBlockData):
def trimsize(self):
"""Trims the size of the block to only as large as needed for valsize"""
self.size -= self.valspace - self.valsize
@property
def name(self) -> str:
return self.bname.decode()
@property
def valspace(self):
"""The space available in the block for a value, may be bigger than valsize"""
return self.size - 3 - len(self.bname)
@property
def isempty(self):
return self.valsize == 0
@property
def bformat(self) -> bytes:
return self._bformat or self.valformat[-1:]
@bformat.setter
def bformat(self, value: bytes):
self._bformat = value
@property
def end(self) -> int: # end of the range of addresses spanned by the block
return self.offset + self.size
def __enter__(self):
return self
def __exit__(self, *args):
pass
class Memory:
def __init__(self, size=128, allowexpansion=True):
self._allowexpansion = allowexpansion
self._mem = bytearray(size)
self._memsize = size
def m_dump(self, *args):
return dump(self._mem, *args)
# Each variable value is stored like this:
#
# format, length of name, maximum length of value, name, value
#
# Subsequent variables are stored immediately after each other.
#
# If a variable exists and can fit the new value, it will be overwritten.
# Otherwise, the variable block is zeroed out, and a new block is found etc.
#
# Once the memory is full, memory compaction is performed to reclaim fragmented
# empty space.
# @formatter:off
TYPES = {
str: (None, lambda value: b"%ds" % len(value), str.encode),
bytes: (None, lambda value: b"%dp" % (len(value)+1), passthrough),
int: (None, intformat, passthrough),
float: (None, lambda value: b"d", passthrough),
b"s": (lambda valsize: b"%ds" % valsize, lambda value: value.rstrip(b"\0").decode()),
b"p": (lambda valsize: b"%dp" % valsize, passthrough),
}
# @formatter:on
def _types_fallback(self, key) -> tuple | None:
if isinstance(key, type):
# pickle all types not in TYPES
return b"o", lambda value: b"%dp" % (len(value) + 1), pickle.dumps
elif isinstance(key, bytes):
if key == b"o":
# unpickle all objects
return lambda valsize: b"%dp" % valsize, pickle.loads
raise KeyError(key)
def _get_typeinfo(self, key, default=None) -> tuple | None:
try:
return Memory.TYPES[key]
except KeyError:
try:
return self._types_fallback(key)
except KeyError:
if default:
return default
raise TypeError(f"Unhandled type: {key}")
def _emptyblocksize(self, offset: int) -> int:
"""Finds the size of the empty block at given offset. It may be zero if the block is not empty"""
end = offset
while end < self._memsize and self._mem[end] == 0:
end += 1
return end - offset
def _zeroblock(self, block: BlockData):
"""Zero out a given variable, turning it into an empty block"""
struct.pack_into(f"{block.size}x", self._mem, block.offset)
def _formatblock(self, offset: int, name: str, value: Any) -> BlockData:
bformat, formatter, encoder = self._get_typeinfo(type(value))
rawvalue = encoder(value)
valformat = formatter(rawvalue)
valsize = struct.calcsize(valformat)
bname = name.encode()
blockformat = b">cBB%ds%s" % (len(bname), valformat)
blocksize = struct.calcsize(blockformat)
return BlockData(offset,
size=blocksize, bformat=bformat, bname=bname, valformat=valformat, valsize=valsize,
value=value, rawvalue=rawvalue)
def _writeblock(self, block: BlockData):
blockformat = b">cBB%ds%s%dx" % (len(block.bname), block.valformat, max(0, block.valspace - block.valsize))
struct.pack_into(blockformat, self._mem, block.offset,
block.bformat, len(block.bname), block.valspace, block.bname, block.rawvalue)
def _readblock(self, offset: int) -> BlockData:
if self._mem[offset] == 0:
return BlockData(offset, size=self._emptyblocksize(offset))
bformat, namesize, valsize = struct.unpack_from(b">cBB", self._mem, offset)
formatter, decoder = self._get_typeinfo(bformat, default=(lambda _: bformat, passthrough))
bformat = formatter(valsize)
assert valsize >= struct.calcsize(bformat)
bname, rawvalue = struct.unpack_from(b">%ds%s" % (namesize, bformat), self._mem, offset + 3)
value = decoder(rawvalue)
blocksize = 3 + namesize + valsize
return BlockData(offset, size=blocksize, bname=bname, valformat=bformat, valsize=valsize, value=value,
rawvalue=rawvalue)
def _blocks(self) -> Iterable[BlockData]:
offset = 0
while offset < self._memsize:
block = self._readblock(offset)
blocksize = block.size # save the size since block may be modified during yield
yield block
offset += blocksize
def _compact(self) -> int:
"""Compacts all empty space to the end of the memory, returns the size of empty memory left"""
writeoffset = 0
for block in self._blocks():
if not block.isempty:
block.trimsize()
block.offset = writeoffset
self._writeblock(block)
writeoffset += block.size
block = BlockData(writeoffset, size=self._memsize - writeoffset)
self._zeroblock(block)
return block.size
def _findvar(self, varname: str) -> BlockData | None:
for block in self._blocks():
if block.name == varname:
return block
return None
def _expandby(self, by: int):
if not self._allowexpansion:
raise MemoryError
self._mem += b"\0" * by
self._memsize += by
def _findemptyblock(self, size: int) -> BlockData | None:
for block in self._blocks():
if block.isempty and block.size >= size:
block.size = size
return block
return None
def _findemptyblockgc(self, size: int) -> BlockData:
block = self._findemptyblock(size)
if block is None:
free = self._compact()
if free < size:
self._expandby(max(self._memsize, free - size)) # at least double memory size
block = self._findemptyblock(size)
assert block is not None
return block
def m_readvar(self, varname: str) -> int | float | str | bytes | None:
var = self._findvar(varname)
return var and var.value
def m_deletevar(self, varname: str):
var = self._findvar(varname)
if var:
self._zeroblock(var)
def m_writevar(self, varname: str, value: int | float | str | bytes) -> BlockData:
newblock = self._formatblock(0, varname, value)
target = self._findvar(varname)
if target and target.valspace < newblock.valsize:
self._zeroblock(target)
target = None
if not target:
target = self._findemptyblockgc(newblock.size)
newblock.offset, newblock.size = target.offset, target.size
self._writeblock(newblock)
return newblock
def __getattribute__(self, name):
if name.startswith("_") or name.startswith("m_"):
return object.__getattribute__(self, name)
else:
return self.m_readvar(name)
def __setattr__(self, name, value):
if name.startswith("_") or name.startswith("m_"):
object.__setattr__(self, name, value)
else:
self.m_writevar(name, value)
def __delattr__(self, name):
if name.startswith("_") or name.startswith("m_"):
object.__delattr__(self, name)
else:
self.m_deletevar(name)
def test_Memory():
print()
mem = Memory(64)
aval = ":" + " " * 30 + ":"
mem.a = aval
blks = list(mem._blocks())
assert len(blks) == 2
with blks[0] as blk:
assert blk.offset == 0 and blk.value == aval
with blks[1] as blk:
assert blk.offset == blks[0].end and blk.isempty
bval = ":" + " " * 10 + ":"
mem.b = bval
blks = list(mem._blocks())
assert len(blks) == 3
with blks[1] as blk:
assert blk.offset == blks[0].end and blk.value == bval
with blks[2] as blk:
assert blk.offset == blks[1].end and blk.isempty
boffset = blks[1].offset
del mem.a
blks = list(mem._blocks())
assert len(blks) == 3
with blks[0] as blk:
assert blk.isempty
with blks[1] as blk:
assert blk.offset == blks[0].end == boffset and blk.value == bval
with blks[2] as blk:
assert blk.offset == blks[1].end and blk.isempty
mem.a = aval
blks = list(mem._blocks())
assert len(blks) == 3
with blks[0] as blk:
assert blk.offset == 0 and blk.value == aval
with blks[1] as blk:
assert blk.offset == blks[0].end == boffset and blk.value == bval
with blks[2] as blk:
assert blk.offset == blks[1].end and blk.isempty
del mem.a
cval = ":" + " " * 31 + ":"
# cval is larger that can fit into any empty blocks - this will cause a compaction
mem.c = cval
blks = list(mem._blocks())
assert len(blks) == 3
with blks[0] as blk:
# b value was compacted to the beginning of memory
assert blk.offset == 0 and blk.value == bval
with blks[1] as blk:
# c value goes right after it
assert blk.offset == blks[0].end and blk.value == cval
with blks[2] as blk:
assert blk.offset == blks[1].end and blk.isempty
def test_pickling():
mem = Memory(64)
mem.a = [1, 2]
assert mem.a == [1, 2]
def main():
mem = Memory()
mem.n1 = 1
print(mem.n1)
mem.m_dump()
mem.n1 = "abc"
print(mem.n1)
mem.m_dump()
mem.n1 = 0x1234
print(mem.n1)
mem.m_dump()
mem.n1 = "efghijkl"
print(mem.n1)
mem.m_dump()
mem.n1 = 0x5678
print(mem.n1)
mem.m_dump()
mem.n1 = 0x12345678
print(mem.n1)
mem.m_dump()
mem.n1 = 1
while True:
mem.n1 += 1
print(mem.n1)
if mem.n1 == 5:
break
print(mem.m_readvar("n1"))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment