Last active
November 28, 2019 20:26
-
-
Save m33x/9632d4c47483e5a6768865a485e71f40 to your computer and use it in GitHub Desktop.
Markov model to predict next char(s) based on a given prefix
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env pypy | |
# -*- coding: utf-8 -*- | |
''' This script loads the training and predicts the next most likely passwords | |
:author: Maximilian Golla | |
:contact: [email protected] | |
:version: 0.0.1, 2019-11-28 | |
''' | |
# Load external modules | |
from configs.configure import * | |
''' Loads the training data from disk ''' | |
def worker(length): | |
ngram_creator = NGramCreator({ | |
"name": CONFIG.NAME, | |
"alphabet": CONFIG.ALPHABET, | |
"ngram_size": CONFIG.NGRAM_SIZE, | |
"training_file": "input/"+CONFIG.TRAINING_FILE, | |
"length": length, | |
"progress_bar": CONFIG.PROGRESS_BAR | |
}) | |
logging.debug("Thread: {} - ip_list load() ...".format(length)) | |
ngram_creator.load("ip_list") | |
logging.debug("Thread: {} - cp_list load() ...".format(length)) | |
ngram_creator.load("cp_list") | |
logging.debug("Thread: {} - ep_list load() ...".format(length)) | |
ngram_creator.load("ep_list") | |
logging.debug("Thread: {} - Loading done ...".format(length)) | |
MARKOV_MODELS.append(ngram_creator) | |
''' Every length has its own model, we select the correct model for every password ''' | |
def _select_correct_markov_model(pw_length, markov_models): | |
result = markov_models[0] # Fallback solution, if there is no model for the selected length | |
for model in markov_models: | |
if model.length == pw_length: | |
result = model | |
return result | |
''' This function predicts the next likely password, given a prefix ''' | |
def predict(pw_prefix): | |
# ngram creator | |
global MARKOV_MODELS | |
MARKOV_MODELS = [] | |
threads = [] | |
for length in CONFIG.LENGTHS: | |
# Using threads is not beneficial, because it's a disk intensive task | |
thread = Thread(target = worker, args = (length,)) | |
thread.start() | |
threads.append(thread) | |
# Wait for all threads to finish | |
for thread in threads: | |
thread.join() | |
logging.debug("Training loaded from disk ...") | |
logging.debug("Number of Markov models: "+str(len(MARKOV_MODELS))) | |
# Determine correct model | |
ngram_creator = _select_correct_markov_model(len(pw_prefix) + 1 , MARKOV_MODELS) ## +1 For the prediction | |
results = dict() # Maintain all probs, and sort them at the end | |
# Generate all possible next passwords | |
for c in ngram_creator.alphabet: | |
line = pw_prefix + c | |
if len(line) != ngram_creator.length: # Important to prevent generating "passwor", or "iloveyo", or "babygir" | |
sys.stderr.write("\x1b[1;%dm" % (31) + "Info: No Markov model for this length: {} {}\n".format(len(line),line) + "\x1b[0m") | |
sys.exit(-1) | |
if ngram_creator._is_in_alphabet(line): # Filter non-printable | |
ip = line[:ngram_creator.ngram_size-1] | |
ip_prob = ngram_creator.ip_list[ngram_creator._n2iIP(ip)] | |
ep = line[len(line)-(ngram_creator.ngram_size-1):] | |
ep_prob = ngram_creator.ep_list[ngram_creator._n2iIP(ep)] | |
old_pos = 0 | |
cp_probs = [] | |
for new_pos in range(ngram_creator.ngram_size, len(line)+1, 1): | |
cp = line[old_pos:new_pos] | |
cp_probs.append(ngram_creator.cp_list[ngram_creator._n2iCP(cp)]) | |
old_pos += 1 | |
pw_prob = ip_prob * ep_prob | |
for cp_prob in cp_probs: | |
pw_prob = pw_prob * cp_prob | |
results[line] = pw_prob | |
# Sort all possible probs | |
results_sorted = sorted(results.items(), key=lambda kv: kv[1], reverse=True) | |
# Output | |
print("Given '{}' the top 5 most likely predictions are:".format(pw_prefix)) | |
for i in range(0, 5): | |
print(results_sorted[i]) | |
def main(): | |
try: | |
global CONFIG | |
CONFIG = Configure({"name":"My Config"}) | |
pw_prefix = "passwor" | |
## NGRAM_SIZE = 4 | |
# PASS: passwor? | |
# INTR: ^passwor?$ | |
# | |
# IP: pas | |
# CP1: pass | |
# CP2: assw | |
# CP3: sswo | |
# CP4: swor | |
# CP5: wor? | |
# EP: or? | |
# | |
''' | |
Given 'passwor' the top 5 most likely predictions are: | |
('password', 5.217937750136535e-07) | |
('passwork', 5.0743563645302264e-09) | |
('passwort', 1.311252190776853e-09) | |
('passworm', 2.1270776175931523e-10) | |
('passwore', 1.196025357549521e-10) | |
''' | |
predict(pw_prefix) | |
except KeyboardInterrupt: | |
print('User canceled') | |
sys.exit(1) | |
except Exception as e: | |
sys.stderr.write("\x1b[1;%dm" % (31) + "Error: {}\n".format(e) + "\x1b[0m") | |
sys.exit(1) | |
if __name__ == '__main__': | |
print("{0}: {1:%Y-%m-%d %H:%M:%S}\n".format("Start", datetime.datetime.now())) | |
print("Press Ctrl+C to shutdown") | |
main() | |
print("{0}: {1:%Y-%m-%d %H:%M:%S}".format("Done", datetime.datetime.now())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment