Created
July 18, 2020 13:57
-
-
Save niansa/15770adee78ab4633037bebf36d694cc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, io | |
from base64 import b64encode, b64decode | |
class itypes: | |
FILE = 1 | |
DIRECTORY = 0 | |
# READABLE WRITABLE APPENDS CREATES | |
FREAD_ONLY = "r" # YES NO NO NO | |
FREAD_WRITE = "r+" # YES YES NO NO | |
FWRITE_ONLY = "w" # NO YES NO YES | |
FWRITE_READ = "w+" # YES YES NO YES | |
FAPPEND = "a" # NO YES YES YES | |
FAPPEND_READ = "a+" # YES YES YES YES | |
FREADABLE = [FREAD_ONLY,FREAD_WRITE,FWRITE_READ,FAPPEND_READ] | |
FWRITABLE = [FREAD_WRITE,FWRITE_ONLY,FWRITE_READ,FAPPEND,FAPPEND_READ] | |
FAPPENDS = [FAPPEND,FAPPEND_READ] | |
FCREATES = [FWRITE_ONLY,FWRITE_READ,FAPPEND,FAPPEND_READ] | |
FALL = [FREAD_ONLY,FREAD_WRITE,FWRITE_ONLY,FWRITE_READ,FAPPEND,FAPPEND_READ] | |
class vfs_class: | |
def __init__(self, filesystem=None): | |
# If no filesystem given; create one | |
if not filesystem: | |
self.filesystem = {} | |
# Else; use existing one | |
else: | |
self.filesystem = filesystem | |
# Default current working directory to / | |
self.cwd = "/" | |
# No default update event | |
self._update_event = False | |
def _def_update_event(self, definition): | |
self._update_event = definition | |
def _takeover_fs(self): | |
for function in dir(os): | |
if function in dir(self) and not function.startswith("__"): | |
exec(f'os.{function} = self.{function}') | |
for function in dir(os): | |
if function in dir(self) and not function.startswith("__"): | |
exec(f'os.path.{function} = self.{function}') | |
#open = self.open | |
def _get_path_realsplit(self, path): | |
# Make sure path is not an empty string | |
if path == "": | |
raise FileNotFoundError("[Errno 2] No such file or directory: ''") | |
# Make sure path begins with CWD if it does not begin with either no / or ./ | |
if not path.startswith("/"): | |
if path.startswith("./") or path == ".": | |
path = path[2:] | |
path = self.cwd + path | |
# Do the magic | |
path_split = path.split('/') | |
path_depth = 0 | |
path_realsplit = [] | |
for filename in path_split: | |
if filename != "": | |
if filename == ".": | |
continue | |
elif filename == "..": | |
try: | |
del path_realsplit[path_depth-1] | |
path_depth -= 1 | |
except IndexError: pass | |
else: | |
path_realsplit.append(filename) | |
path_depth += 1 | |
return path_realsplit | |
def _get_inode_data(self, path, firsterr=True): | |
# Get path in list type | |
path_realsplit = self._get_path_realsplit(path) | |
# Dig through the filesystem | |
fsdata = self.filesystem | |
try: | |
for filename in path_realsplit: | |
fsdata = fsdata[filename] | |
# No such file or directory | |
except: | |
if firsterr == False: | |
firsterr = True | |
else: | |
raise FileNotFoundError(f"[Errno 2] No such file or directory: '{path}'") | |
# Return the inode | |
return fsdata | |
def _set_inode(self, path, value): | |
# Trigger event | |
if self._update_event: | |
self._update_event(0, self.realpath(path)) | |
# Get path list from string | |
path_realsplit = self._get_path_realsplit(path) | |
# Convert the list into a string | |
dstring = "self.filesystem" | |
for filename in path_realsplit: | |
dstring += f'["{filename}"]' | |
# Apply changes to the filesystem | |
try: | |
exec(dstring + value) # I know, I have to find a better way | |
except KeyError: | |
raise FileNotFoundError(f"[Errno 2] No such file or directory: '{path}'") | |
# Trigger event | |
if self._update_event: | |
self._update_event(1, self.realpath(path)) | |
def _get_inode_type(self, inode): | |
# folder | |
if type(inode) == type({}): | |
return itypes.DIRECTORY | |
# file | |
elif type(inode) == type(""): | |
return itypes.FILE | |
# exception | |
else: | |
return False | |
def _remove_inode(self, path): | |
# Remove trailing / | |
if path[-1:] == "/": | |
path = path[:-1] | |
# Get important details | |
basepath = os.path.split(path)[0] | |
filename = os.path.basename(os.path.normpath(path)) | |
# Set basepath if empty | |
if basepath == "": | |
basepath = "/" | |
# Undefine inode, then redefine it | |
self._set_inode(basepath, f'.pop("{filename}")') | |
# os.path functions | |
def realpath(self, path): | |
# Get real pathsplit | |
path_realsplit = self._get_path_realsplit(path) | |
# Get real path | |
path_real = "" | |
for filename in path_realsplit: | |
path_real += f'/{filename}' | |
if len(path_real) == 0: | |
path_real = "/" | |
return path_real | |
def isabs(self, path): | |
if path.startswith("./") or not path.startswith("/"): | |
return False | |
else: | |
return True | |
def isdir(self, path): | |
inode = self._get_inode_data(path) | |
return self._get_inode_type(inode) == itypes.DIRECTORY | |
def isfile(self, path): | |
inode = self._get_inode_data(path) | |
return self._get_inode_type(inode) == itypes.FILE | |
# Dummy functions | |
def access(self, path, mode): raise OSError(95, "Operation not supported") | |
def link(self, src): raise OSError(95, "Operation not supported") | |
def unlink(self, path): raise OSError(95, "Operation not supported") | |
def readlink(self, path): raise OSError(95, "Operation not supported") | |
def mknod(self, path): raise OSError(95, "Operation not supported") | |
def statvfs(self, path): raise OSError(95, "Operation not supported") | |
def utime(self, path): raise OSError(95, "Operation not supported") | |
def chmod(self, path, mode): raise OSError(95, "Operation not supported") | |
### DIRECTORY FUNCTIONS | |
def chdir(self, path): | |
# Get absolute path | |
path = self.realpath(path) | |
# Load inode | |
inode = self._get_inode_data(path) | |
# Check if path is a directory | |
if self._get_inode_type(inode) != itypes.DIRECTORY: | |
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'") | |
# Add trailing / if required | |
if path[-1:] != "/": | |
path = f"{path}/" | |
# Set the current working directory to path | |
self.cwd = path | |
def getcwd(self): | |
if len(self.cwd) != 1: | |
return self.cwd[:-1] | |
else: | |
return self.cwd | |
def listdir(self, path): | |
inode = self._get_inode_data(path) | |
# Check if inode is a directory | |
if self._get_inode_type(inode) != itypes.DIRECTORY: | |
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'") | |
# Create list | |
dirents = [] | |
for sinode in inode.keys(): | |
dirents.append(sinode) | |
# Sort the list | |
dirents_dirs = [] | |
dirents_files = [] | |
for file in dirents: | |
if self.isdir(file): | |
dirents_dirs.append(file) | |
if self.isfile(file): | |
dirents_files.append(file) | |
dirents_dirs.sort() | |
dirents_files.sort() | |
# Return the list | |
return dirents_dirs + dirents_files | |
def mkdir(self, path): | |
# Remove trailing / | |
if path[-1:] == "/": | |
path = path[:-1] | |
# Get absolute path | |
path = self.realpath(path) | |
# Check if inode exists already | |
nonexistent = False | |
try: | |
inode = self._get_inode_data(path) | |
except FileNotFoundError: | |
nonexistent = True | |
if not nonexistent: | |
raise FileNotFoundError(f"[Errno 2] No such file or directory: '{path}'") | |
# Check if target path is a directory | |
if self._get_inode_type(self._get_inode_data(os.path.split(path)[0], firsterr=False)) != itypes.DIRECTORY: | |
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'") | |
inode = self._get_inode_data(path, firsterr=False) | |
# Get filename | |
filename = os.path.basename(os.path.normpath(path)) | |
# Define inode | |
self._set_inode(path, ' = {}') | |
def rmdir(self, path): | |
inode = self._get_inode_data(path) | |
# Check if inode is a directory | |
if self._get_inode_type(inode) != itypes.DIRECTORY: | |
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'") | |
# Check if directory is empty | |
if inode != {}: | |
raise OSError(39, f"Directory not empty: '{path}'") | |
# Remove the inode | |
self._remove_inode(path) | |
### FILE FUNCTIONS | |
def touch(self, path): | |
self.open(path, mode="w").close() | |
def remove(self, path): | |
inode = self._get_inode_data(path) | |
# Check weather inode is a file or a directory | |
if self._get_inode_type(inode) != itypes.FILE: | |
raise IsADirectoryError | |
self._remove_inode(path) | |
def rename(self, old, new): | |
inode = self._get_inode_data(old) | |
# Get important details for new inode | |
basepath = os.path.split(new)[0] | |
filename = os.path.basename(os.path.normpath(new)) | |
# Create new inode | |
if self._get_inode_type(inode) == itypes.FILE: | |
inode = f'"{inode}"' | |
self._set_inode(new, f' = {inode}') | |
# Remove old inode | |
self._remove_inode(old) | |
def open(self, path, mode="r"): | |
# Check if mode is bytes | |
if 'b' in mode: | |
bytesfd = True | |
mode = mode.replace("b", "") | |
else: | |
bytesfd = False | |
# Check if mode is valid | |
if not mode in FALL: | |
raise ValueError(f"invalid mode: '{mode}'") | |
# Try to get the inode. If it does not exist and write mode is enabled; use a empty one | |
try: | |
inode = self._get_inode_data(path) | |
except FileNotFoundError: | |
if mode in FCREATES: | |
inode = "" | |
else: | |
raise FileNotFoundError(f"[Errno 2] No such file or directory: '{path}'") | |
# Check weather inode is a file or a directory | |
if self._get_inode_type(inode) != itypes.FILE: | |
raise IsADirectoryError | |
# Check for trailing / | |
elif path[-1:] == "/": | |
raise NotADirectoryError(f"[Errno 20] Not a directory: '{path}'") | |
# Decode base64 | |
inode = b64decode(inode.encode("utf-8")) | |
# Create IO | |
if bytesfd: | |
fd = io.BytesIO(inode) | |
else: | |
fd = io.StringIO(inode.decode("utf-8")) | |
# Add functions to force read() and seek() | |
fd._fread = fd.read | |
fd._fseek = fd.seek | |
# Restrict write() | |
if not mode in FWRITABLE: | |
def fdwrite(data): | |
raise io.UnsupportedOperation("not writable") | |
fd.write = fdwrite | |
# Restrict read() | |
if not mode in FREADABLE: | |
def fdread(): | |
raise io.UnsupportedOperation("not readable") | |
fd.read = fdread | |
# Append path to fd | |
fd.name = path | |
fd.mode = mode | |
fd.bytesfd = bytesfd | |
# Implement append mode if required | |
if mode in FAPPENDS: | |
inode_legenth = len(inode) | |
fd._fseek(inode_legenth) | |
def fdseek(position): | |
nonlocal inode_legenth | |
nonlocal fd | |
return fd._fseek(inode_legenth + position) | |
fd.seek = fdseek | |
# Write back data to inode on close() | |
def fdclose(*args): | |
nonlocal fd | |
nonlocal self | |
if mode in FWRITABLE: | |
fd._fseek(0) | |
inode = fd._fread() | |
# Convert to bytes object | |
if not fd.bytesfd: | |
inode = inode.encode('utf-8') | |
# Convert to base64 | |
b64inode = b64encode(inode).decode('utf-8') | |
# Write to filesystem | |
self._set_inode(fd.name, f' = "{b64inode}"') | |
# Invalidate fd | |
fd = None | |
fd.close = fdclose | |
fd.__exit__ = fdclose | |
# Return created fd | |
return fd |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment