Skip to content

Instantly share code, notes, and snippets.

@Pymmdrza
Created August 27, 2024 13:02
Show Gist options
  • Save Pymmdrza/667c0f05d7d6a973690a90f9f2fbc352 to your computer and use it in GitHub Desktop.
Save Pymmdrza/667c0f05d7d6a973690a90f9f2fbc352 to your computer and use it in GitHub Desktop.
Read Block File From Bitcoin Core (.dat)
import struct
import hashlib
def read_block(file_path):
with open(file_path, 'rb') as f:
while True:
magic = f.read(4)
if len(magic) < 4:
break
if magic != b'\xf9\xbe\xb4\xd9': # Bitcoin's magic number
print("Magic number invalid.")
break
block_size = struct.unpack('<I', f.read(4))[0]
block_data = f.read(block_size)
if len(block_data) < block_size:
print("Incomplete block.")
break
block_header = block_data[:80]
version, prev_hash, merkle_root, timestamp, bits, nonce = struct.unpack('<L32s32sLLL', block_header)
block_hash = hashlib.sha256(hashlib.sha256(block_header).digest()).digest()[::-1].hex()
tx_count, varint_size = read_varint(block_data[80:])
tx_offset = 80 + varint_size
print(f"Block Hash: {block_hash}")
print(f"Number of Transactions: {tx_count}")
transactions = []
for _ in range(tx_count):
tx_data, tx_size = parse_transaction(block_data[tx_offset:])
transactions.append(tx_data)
tx_offset += tx_size
for tx in transactions:
print(f"Transaction Hash: {tx['txid']}")
print("Input Addresses:")
for addr in tx['input_addresses']:
print(f" {addr}")
print("Output Addresses:")
for addr in tx['output_addresses']:
print(f" {addr}")
def read_varint(data):
prefix = data[0]
if prefix < 0xfd:
return prefix, 1
elif prefix == 0xfd:
return struct.unpack('<H', data[1:3])[0], 3
elif prefix == 0xfe:
return struct.unpack('<I', data[1:5])[0], 5
elif prefix == 0xff:
return struct.unpack('<Q', data[1:9])[0], 9
def parse_transaction(data):
tx_data = {}
tx_start = 0
tx_data['version'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0]
tx_start += 4
tx_in_count, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
input_addresses = []
for _ in range(tx_in_count):
prev_txid = data[tx_start:tx_start + 32][::-1].hex()
vout = struct.unpack('<I', data[tx_start + 32:tx_start + 36])[0]
tx_start += 36
script_length, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
script_sig = data[tx_start:tx_start + script_length]
tx_start += script_length
address = extract_address_from_script_sig(script_sig)
if address:
input_addresses.append(address)
else:
input_addresses.append(f"Could not parse address from scriptSig (txid: {prev_txid}, vout: {vout})")
tx_start += 4
tx_out_count, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
output_addresses = []
for _ in range(tx_out_count):
tx_start += 8
script_length, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
script_pubkey = data[tx_start:tx_start + script_length]
address = extract_address(script_pubkey)
if address:
output_addresses.append(address)
tx_start += script_length
tx_data['locktime'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0]
tx_start += 4
tx_data['txid'] = hashlib.sha256(hashlib.sha256(data[:tx_start]).digest()).digest()[::-1].hex()
tx_data['input_addresses'] = input_addresses
tx_data['output_addresses'] = output_addresses
return tx_data, tx_start
def extract_address_from_script_sig(script_sig):
# Ensure the script_sig is long enough to be a valid public key
if len(script_sig) >= 33:
# The last part of scriptSig is usually the public key in P2PKH
pubkey = script_sig[-33:] if script_sig[-33] in (0x02, 0x03) else script_sig[-65:]
if len(pubkey) in (33, 65): # Ensure the public key length is correct
pubkey_hash = hashlib.new('ripemd160', hashlib.sha256(pubkey).digest()).digest()
return hash160_to_p2pkh_address(pubkey_hash.hex())
return None
def extract_address(script):
# P2PKH address
if len(script) == 25 and script[0] == 0x76 and script[1] == 0xa9 and script[-2] == 0x88 and script[-1] == 0xac:
pubkey_hash = script[3:-2]
return hash160_to_p2pkh_address(pubkey_hash.hex())
# P2SH address
elif len(script) == 23 and script[0] == 0xa9 and script[-1] == 0x87:
script_hash = script[2:-1]
return hash160_to_p2sh_address(script_hash.hex())
# Bech32 address
elif len(script) >= 22 and script[0] == 0x00 and (script[1] == 0x14 or script[1] == 0x20):
witness_hash = script[2:]
return hash_to_bech32(witness_hash, len(witness_hash) == 20)
return None
def hash160_to_p2pkh_address(hash160):
prefix = b'\x00'
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160))
def hash160_to_p2sh_address(hash160):
prefix = b'\x05'
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160))
def base58_encode_with_checksum(data):
checksum = hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4]
return base58_encode(data + checksum)
def base58_encode(data):
alphabet = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
num = int.from_bytes(data, 'big')
encoded = ''
while num > 0:
num, rem = divmod(num, 58)
encoded = alphabet[rem] + encoded
for byte in data:
if byte == 0:
encoded = '1' + encoded
else:
break
return encoded
def hash_to_bech32(hash_data, is_p2wpkh):
version = 0 if is_p2wpkh else 0
return bech32_encode("bc", convertbits([version] + list(hash_data), 8, 5))
def bech32_polymod(values):
gen = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3]
chk = 1
for v in values:
b = (chk >> 25)
chk = (chk & 0x1ffffff) << 5 ^ v
for i in range(5):
if (b >> i) & 1:
chk ^= gen[i]
return chk
def bech32_hrp_expand(hrp):
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
def bech32_verify_checksum(hrp, data):
return bech32_polymod(bech32_hrp_expand(hrp) + data) == 1
def bech32_create_checksum(hrp, data):
values = bech32_hrp_expand(hrp) + data
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
def bech32_encode(hrp, data):
combined = data + bech32_create_checksum(hrp, data)
return hrp + '1' + ''.join(['qpzry9x8gf2tvdw0s3jn54khce6mua7l'[(x)] for x in combined])
def convertbits(data, frombits, tobits, pad=True):
acc = 0
bits = 0
ret = []
maxv = (1 << tobits) - 1
for value in data:
if value < 0 or value >> frombits:
return None
acc = (acc << frombits) | value
bits += frombits
while bits >= tobits:
bits -= tobits
ret.append((acc >> bits) & maxv)
if pad:
if bits:
ret.append((acc << (tobits - bits)) & maxv)
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
return None
return ret
import struct
import hashlib
import json
def read_block(file_path):
block_info = []
with open(file_path, 'rb') as f:
while True:
magic = f.read(4)
if len(magic) < 4:
break
if magic != b'\xf9\xbe\xb4\xd9': # Bitcoin's magic number
print("Magic number invalid.")
break
block_size = struct.unpack('<I', f.read(4))[0]
block_data = f.read(block_size)
if len(block_data) < block_size:
print("Incomplete block.")
break
block_header = block_data[:80]
version, prev_hash, merkle_root, timestamp, bits, nonce = struct.unpack('<L32s32sLLL', block_header)
block_hash = hashlib.sha256(hashlib.sha256(block_header).digest()).digest()[::-1].hex()
tx_count, varint_size = read_varint(block_data[80:])
tx_offset = 80 + varint_size
block_info.append({
'block_hash': block_hash,
'tx_count': tx_count,
'transactions': []
})
transactions = []
for _ in range(tx_count):
tx_data, tx_size = parse_transaction(block_data[tx_offset:])
transactions.append(tx_data)
tx_offset += tx_size
block_info[-1]['transactions'] = transactions
return block_info
def read_varint(data):
prefix = data[0]
if prefix < 0xfd:
return prefix, 1
elif prefix == 0xfd:
return struct.unpack('<H', data[1:3])[0], 3
elif prefix == 0xfe:
return struct.unpack('<I', data[1:5])[0], 5
elif prefix == 0xff:
return struct.unpack('<Q', data[1:9])[0], 9
def parse_transaction(data):
tx_data = {}
tx_start = 0
tx_data['version'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0]
tx_start += 4
tx_in_count, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
input_addresses = []
input_values = []
for _ in range(tx_in_count):
prev_txid = data[tx_start:tx_start + 32][::-1].hex()
vout = struct.unpack('<I', data[tx_start + 32:tx_start + 36])[0]
tx_start += 36
script_length, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
script_sig = data[tx_start:tx_start + script_length]
tx_start += script_length
address = extract_address_from_script_sig(script_sig)
if address:
input_addresses.append(address)
else:
input_addresses.append(f"Could not parse address from scriptSig (txid: {prev_txid}, vout: {vout})")
# برای سادگی، فرض می‌کنیم مقدار بیت‌کوین برابر با vout (فرض نادرست)
input_values.append({
'address': address,
'value': vout # مقدار واقعی نیاز به استخراج از تراکنش قبلی دارد
})
tx_start += 4
tx_out_count, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
output_addresses = []
output_values = []
total_output_value = 0
for _ in range(tx_out_count):
value = struct.unpack('<Q', data[tx_start:tx_start + 8])[0]
tx_start += 8
script_length, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
script_pubkey = data[tx_start:tx_start + script_length]
address = extract_address(script_pubkey)
if address:
output_addresses.append(address)
output_values.append({
'address': address,
'value': value / 1e8 # تبدیل به بیت‌کوین
})
total_output_value += value / 1e8
tx_start += script_length
tx_data['locktime'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0]
tx_start += 4
tx_data['txid'] = hashlib.sha256(hashlib.sha256(data[:tx_start]).digest()).digest()[::-1].hex()
tx_data['input_addresses'] = input_addresses
tx_data['output_addresses'] = output_addresses
tx_data['input_values'] = input_values
tx_data['output_values'] = output_values
tx_data['total_output_value'] = total_output_value
return tx_data, tx_start
def extract_address_from_script_sig(script_sig):
if len(script_sig) >= 33:
pubkey = script_sig[-33:] if script_sig[-33] in (0x02, 0x03) else script_sig[-65:]
if len(pubkey) in (33, 65):
pubkey_hash = hashlib.new('ripemd160', hashlib.sha256(pubkey).digest()).digest()
return hash160_to_p2pkh_address(pubkey_hash.hex())
return None
def extract_address(script):
if len(script) == 25 and script[0] == 0x76 and script[1] == 0xa9 and script[-2] == 0x88 and script[-1] == 0xac:
pubkey_hash = script[3:-2]
return hash160_to_p2pkh_address(pubkey_hash.hex())
elif len(script) == 23 and script[0] == 0xa9 and script[-1] == 0x87:
script_hash = script[2:-1]
return hash160_to_p2sh_address(script_hash.hex())
elif len(script) >= 22 and script[0] == 0x00 and (script[1] == 0x14 or script[1] == 0x20):
witness_hash = script[2:]
return hash_to_bech32(witness_hash, len(witness_hash) == 20)
return None
def hash160_to_p2pkh_address(hash160):
prefix = b'\x00'
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160))
def hash160_to_p2sh_address(hash160):
prefix = b'\x05'
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160))
def base58_encode_with_checksum(data):
checksum = hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4]
return base58_encode(data + checksum)
def base58_encode(data):
alphabet = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
num = int.from_bytes(data, 'big')
encoded = ''
while num > 0:
num, rem = divmod(num, 58)
encoded = alphabet[rem] + encoded
for byte in data:
if byte == 0:
encoded = '1' + encoded
else:
break
return encoded
def hash_to_bech32(hash_data, is_p2wpkh):
version = 0 if is_p2wpkh else 0
return bech32_encode("bc", convertbits([version] + list(hash_data), 8, 5))
def bech32_polymod(values):
gen = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3]
chk = 1
for v in values:
b = (chk >> 25)
chk = (chk & 0x1ffffff) << 5 ^ v
for i in range(5):
if (b >> i) & 1:
chk ^= gen[i]
return chk
def bech32_hrp_expand(hrp):
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
def bech32_verify_checksum(hrp, data):
return bech32_polymod(bech32_hrp_expand(hrp) + data) == 1
def bech32_create_checksum(hrp, data):
values = bech32_hrp_expand(hrp) + data
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
def bech32_encode(hrp, data):
combined = data + bech32_create_checksum(hrp, data)
return hrp + '1' + ''.join(['qpzry9x8gf2tvdw0s3jn54khce6mua7l'[(x)] for x in combined])
def convertbits(data, frombits, tobits, pad=True):
acc = 0
bits = 0
ret = []
maxv = (1 << tobits) - 1
for value in data:
if value < 0 or value >> frombits:
return None
acc = (acc << frombits) | value
bits += frombits
while bits >= tobits:
bits -= tobits
ret.append((acc >> bits) & maxv)
if pad:
if bits:
ret.append((acc << (tobits - bits)) & maxv)
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
return None
return ret
def write_json_block(block_info, filename):
with open(filename, 'w') as f:
json.dump(block_info, f, indent=4)
def block_info(filename: str):
nf = filename.split('.')[0]
jsonFile = f"{nf}.json"
block_db = read_block(filename)
write_json_block(block_db, jsonFile)
if __name__ == '__main__':
block_info('blk00001.dat')
print("Done")
import struct
import hashlib
def read_block(file_path):
block_info = []
with open(file_path, 'rb') as f:
while True:
magic = f.read(4)
if len(magic) < 4:
break
if magic != b'\xf9\xbe\xb4\xd9': # Bitcoin's magic number
print("Magic number invalid.")
break
block_size = struct.unpack('<I', f.read(4))[0]
block_data = f.read(block_size)
if len(block_data) < block_size:
print("Incomplete block.")
break
block_header = block_data[:80]
version, prev_hash, merkle_root, timestamp, bits, nonce = struct.unpack('<L32s32sLLL', block_header)
block_hash = hashlib.sha256(hashlib.sha256(block_header).digest()).digest()[::-1].hex()
tx_count, varint_size = read_varint(block_data[80:])
tx_offset = 80 + varint_size
block_info.append({
'block_hash': block_hash,
'tx_count': tx_count,
'transactions': []
})
transactions = []
for _ in range(tx_count):
tx_data, tx_size = parse_transaction(block_data[tx_offset:])
transactions.append(tx_data)
tx_offset += tx_size
block_info[-1]['transactions'] = transactions
return block_info
def read_varint(data):
prefix = data[0]
if prefix < 0xfd:
return prefix, 1
elif prefix == 0xfd:
return struct.unpack('<H', data[1:3])[0], 3
elif prefix == 0xfe:
return struct.unpack('<I', data[1:5])[0], 5
elif prefix == 0xff:
return struct.unpack('<Q', data[1:9])[0], 9
def parse_transaction(data):
tx_data = {}
tx_start = 0
tx_data['version'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0]
tx_start += 4
tx_in_count, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
input_addresses = []
for _ in range(tx_in_count):
prev_txid = data[tx_start:tx_start + 32][::-1].hex()
vout = struct.unpack('<I', data[tx_start + 32:tx_start + 36])[0]
tx_start += 36
script_length, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
script_sig = data[tx_start:tx_start + script_length]
tx_start += script_length
address = extract_address_from_script_sig(script_sig)
if address:
input_addresses.append(address)
else:
input_addresses.append(f"Could not parse address from scriptSig (txid: {prev_txid}, vout: {vout})")
tx_start += 4
tx_out_count, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
output_addresses = []
for _ in range(tx_out_count):
tx_start += 8
script_length, varint_size = read_varint(data[tx_start:])
tx_start += varint_size
script_pubkey = data[tx_start:tx_start + script_length]
address = extract_address(script_pubkey)
if address:
output_addresses.append(address)
tx_start += script_length
tx_data['locktime'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0]
tx_start += 4
tx_data['txid'] = hashlib.sha256(hashlib.sha256(data[:tx_start]).digest()).digest()[::-1].hex()
tx_data['input_addresses'] = input_addresses
tx_data['output_addresses'] = output_addresses
return tx_data, tx_start
def extract_address_from_script_sig(script_sig):
if len(script_sig) >= 33:
pubkey = script_sig[-33:] if script_sig[-33] in (0x02, 0x03) else script_sig[-65:]
if len(pubkey) in (33, 65):
pubkey_hash = hashlib.new('ripemd160', hashlib.sha256(pubkey).digest()).digest()
return hash160_to_p2pkh_address(pubkey_hash.hex())
return None
def extract_address(script):
if len(script) == 25 and script[0] == 0x76 and script[1] == 0xa9 and script[-2] == 0x88 and script[-1] == 0xac:
pubkey_hash = script[3:-2]
return hash160_to_p2pkh_address(pubkey_hash.hex())
elif len(script) == 23 and script[0] == 0xa9 and script[-1] == 0x87:
script_hash = script[2:-1]
return hash160_to_p2sh_address(script_hash.hex())
elif len(script) >= 22 and script[0] == 0x00 and (script[1] == 0x14 or script[1] == 0x20):
witness_hash = script[2:]
return hash_to_bech32(witness_hash, len(witness_hash) == 20)
return None
def hash160_to_p2pkh_address(hash160):
prefix = b'\x00'
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160))
def hash160_to_p2sh_address(hash160):
prefix = b'\x05'
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160))
def base58_encode_with_checksum(data):
checksum = hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4]
return base58_encode(data + checksum)
def base58_encode(data):
alphabet = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
num = int.from_bytes(data, 'big')
encoded = ''
while num > 0:
num, rem = divmod(num, 58)
encoded = alphabet[rem] + encoded
for byte in data:
if byte == 0:
encoded = '1' + encoded
else:
break
return encoded
def hash_to_bech32(hash_data, is_p2wpkh):
version = 0 if is_p2wpkh else 0
return bech32_encode("bc", convertbits([version] + list(hash_data), 8, 5))
def bech32_polymod(values):
gen = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3]
chk = 1
for v in values:
b = (chk >> 25)
chk = (chk & 0x1ffffff) << 5 ^ v
for i in range(5):
if (b >> i) & 1:
chk ^= gen[i]
return chk
def bech32_hrp_expand(hrp):
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
def bech32_verify_checksum(hrp, data):
return bech32_polymod(bech32_hrp_expand(hrp) + data) == 1
def bech32_create_checksum(hrp, data):
values = bech32_hrp_expand(hrp) + data
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
def bech32_encode(hrp, data):
combined = data + bech32_create_checksum(hrp, data)
return hrp + '1' + ''.join(['qpzry9x8gf2tvdw0s3jn54khce6mua7l'[(x)] for x in combined])
def convertbits(data, frombits, tobits, pad=True):
acc = 0
bits = 0
ret = []
maxv = (1 << tobits) - 1
for value in data:
if value < 0 or value >> frombits:
return None
acc = (acc << frombits) | value
bits += frombits
while bits >= tobits:
bits -= tobits
ret.append((acc >> bits) & maxv)
if pad:
if bits:
ret.append((acc << (tobits - bits)) & maxv)
elif bits >= frombits or ((acc << (tobits - bits)) & maxv):
return None
return ret
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment