Created
August 27, 2024 13:02
-
-
Save Pymmdrza/667c0f05d7d6a973690a90f9f2fbc352 to your computer and use it in GitHub Desktop.
Read Block File From Bitcoin Core (.dat)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import hashlib | |
def read_block(file_path): | |
with open(file_path, 'rb') as f: | |
while True: | |
magic = f.read(4) | |
if len(magic) < 4: | |
break | |
if magic != b'\xf9\xbe\xb4\xd9': # Bitcoin's magic number | |
print("Magic number invalid.") | |
break | |
block_size = struct.unpack('<I', f.read(4))[0] | |
block_data = f.read(block_size) | |
if len(block_data) < block_size: | |
print("Incomplete block.") | |
break | |
block_header = block_data[:80] | |
version, prev_hash, merkle_root, timestamp, bits, nonce = struct.unpack('<L32s32sLLL', block_header) | |
block_hash = hashlib.sha256(hashlib.sha256(block_header).digest()).digest()[::-1].hex() | |
tx_count, varint_size = read_varint(block_data[80:]) | |
tx_offset = 80 + varint_size | |
print(f"Block Hash: {block_hash}") | |
print(f"Number of Transactions: {tx_count}") | |
transactions = [] | |
for _ in range(tx_count): | |
tx_data, tx_size = parse_transaction(block_data[tx_offset:]) | |
transactions.append(tx_data) | |
tx_offset += tx_size | |
for tx in transactions: | |
print(f"Transaction Hash: {tx['txid']}") | |
print("Input Addresses:") | |
for addr in tx['input_addresses']: | |
print(f" {addr}") | |
print("Output Addresses:") | |
for addr in tx['output_addresses']: | |
print(f" {addr}") | |
def read_varint(data): | |
prefix = data[0] | |
if prefix < 0xfd: | |
return prefix, 1 | |
elif prefix == 0xfd: | |
return struct.unpack('<H', data[1:3])[0], 3 | |
elif prefix == 0xfe: | |
return struct.unpack('<I', data[1:5])[0], 5 | |
elif prefix == 0xff: | |
return struct.unpack('<Q', data[1:9])[0], 9 | |
def parse_transaction(data): | |
tx_data = {} | |
tx_start = 0 | |
tx_data['version'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0] | |
tx_start += 4 | |
tx_in_count, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
input_addresses = [] | |
for _ in range(tx_in_count): | |
prev_txid = data[tx_start:tx_start + 32][::-1].hex() | |
vout = struct.unpack('<I', data[tx_start + 32:tx_start + 36])[0] | |
tx_start += 36 | |
script_length, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
script_sig = data[tx_start:tx_start + script_length] | |
tx_start += script_length | |
address = extract_address_from_script_sig(script_sig) | |
if address: | |
input_addresses.append(address) | |
else: | |
input_addresses.append(f"Could not parse address from scriptSig (txid: {prev_txid}, vout: {vout})") | |
tx_start += 4 | |
tx_out_count, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
output_addresses = [] | |
for _ in range(tx_out_count): | |
tx_start += 8 | |
script_length, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
script_pubkey = data[tx_start:tx_start + script_length] | |
address = extract_address(script_pubkey) | |
if address: | |
output_addresses.append(address) | |
tx_start += script_length | |
tx_data['locktime'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0] | |
tx_start += 4 | |
tx_data['txid'] = hashlib.sha256(hashlib.sha256(data[:tx_start]).digest()).digest()[::-1].hex() | |
tx_data['input_addresses'] = input_addresses | |
tx_data['output_addresses'] = output_addresses | |
return tx_data, tx_start | |
def extract_address_from_script_sig(script_sig): | |
# Ensure the script_sig is long enough to be a valid public key | |
if len(script_sig) >= 33: | |
# The last part of scriptSig is usually the public key in P2PKH | |
pubkey = script_sig[-33:] if script_sig[-33] in (0x02, 0x03) else script_sig[-65:] | |
if len(pubkey) in (33, 65): # Ensure the public key length is correct | |
pubkey_hash = hashlib.new('ripemd160', hashlib.sha256(pubkey).digest()).digest() | |
return hash160_to_p2pkh_address(pubkey_hash.hex()) | |
return None | |
def extract_address(script): | |
# P2PKH address | |
if len(script) == 25 and script[0] == 0x76 and script[1] == 0xa9 and script[-2] == 0x88 and script[-1] == 0xac: | |
pubkey_hash = script[3:-2] | |
return hash160_to_p2pkh_address(pubkey_hash.hex()) | |
# P2SH address | |
elif len(script) == 23 and script[0] == 0xa9 and script[-1] == 0x87: | |
script_hash = script[2:-1] | |
return hash160_to_p2sh_address(script_hash.hex()) | |
# Bech32 address | |
elif len(script) >= 22 and script[0] == 0x00 and (script[1] == 0x14 or script[1] == 0x20): | |
witness_hash = script[2:] | |
return hash_to_bech32(witness_hash, len(witness_hash) == 20) | |
return None | |
def hash160_to_p2pkh_address(hash160): | |
prefix = b'\x00' | |
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160)) | |
def hash160_to_p2sh_address(hash160): | |
prefix = b'\x05' | |
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160)) | |
def base58_encode_with_checksum(data): | |
checksum = hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4] | |
return base58_encode(data + checksum) | |
def base58_encode(data): | |
alphabet = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' | |
num = int.from_bytes(data, 'big') | |
encoded = '' | |
while num > 0: | |
num, rem = divmod(num, 58) | |
encoded = alphabet[rem] + encoded | |
for byte in data: | |
if byte == 0: | |
encoded = '1' + encoded | |
else: | |
break | |
return encoded | |
def hash_to_bech32(hash_data, is_p2wpkh): | |
version = 0 if is_p2wpkh else 0 | |
return bech32_encode("bc", convertbits([version] + list(hash_data), 8, 5)) | |
def bech32_polymod(values): | |
gen = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] | |
chk = 1 | |
for v in values: | |
b = (chk >> 25) | |
chk = (chk & 0x1ffffff) << 5 ^ v | |
for i in range(5): | |
if (b >> i) & 1: | |
chk ^= gen[i] | |
return chk | |
def bech32_hrp_expand(hrp): | |
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] | |
def bech32_verify_checksum(hrp, data): | |
return bech32_polymod(bech32_hrp_expand(hrp) + data) == 1 | |
def bech32_create_checksum(hrp, data): | |
values = bech32_hrp_expand(hrp) + data | |
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1 | |
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] | |
def bech32_encode(hrp, data): | |
combined = data + bech32_create_checksum(hrp, data) | |
return hrp + '1' + ''.join(['qpzry9x8gf2tvdw0s3jn54khce6mua7l'[(x)] for x in combined]) | |
def convertbits(data, frombits, tobits, pad=True): | |
acc = 0 | |
bits = 0 | |
ret = [] | |
maxv = (1 << tobits) - 1 | |
for value in data: | |
if value < 0 or value >> frombits: | |
return None | |
acc = (acc << frombits) | value | |
bits += frombits | |
while bits >= tobits: | |
bits -= tobits | |
ret.append((acc >> bits) & maxv) | |
if pad: | |
if bits: | |
ret.append((acc << (tobits - bits)) & maxv) | |
elif bits >= frombits or ((acc << (tobits - bits)) & maxv): | |
return None | |
return ret | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import hashlib | |
import json | |
def read_block(file_path): | |
block_info = [] | |
with open(file_path, 'rb') as f: | |
while True: | |
magic = f.read(4) | |
if len(magic) < 4: | |
break | |
if magic != b'\xf9\xbe\xb4\xd9': # Bitcoin's magic number | |
print("Magic number invalid.") | |
break | |
block_size = struct.unpack('<I', f.read(4))[0] | |
block_data = f.read(block_size) | |
if len(block_data) < block_size: | |
print("Incomplete block.") | |
break | |
block_header = block_data[:80] | |
version, prev_hash, merkle_root, timestamp, bits, nonce = struct.unpack('<L32s32sLLL', block_header) | |
block_hash = hashlib.sha256(hashlib.sha256(block_header).digest()).digest()[::-1].hex() | |
tx_count, varint_size = read_varint(block_data[80:]) | |
tx_offset = 80 + varint_size | |
block_info.append({ | |
'block_hash': block_hash, | |
'tx_count': tx_count, | |
'transactions': [] | |
}) | |
transactions = [] | |
for _ in range(tx_count): | |
tx_data, tx_size = parse_transaction(block_data[tx_offset:]) | |
transactions.append(tx_data) | |
tx_offset += tx_size | |
block_info[-1]['transactions'] = transactions | |
return block_info | |
def read_varint(data): | |
prefix = data[0] | |
if prefix < 0xfd: | |
return prefix, 1 | |
elif prefix == 0xfd: | |
return struct.unpack('<H', data[1:3])[0], 3 | |
elif prefix == 0xfe: | |
return struct.unpack('<I', data[1:5])[0], 5 | |
elif prefix == 0xff: | |
return struct.unpack('<Q', data[1:9])[0], 9 | |
def parse_transaction(data): | |
tx_data = {} | |
tx_start = 0 | |
tx_data['version'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0] | |
tx_start += 4 | |
tx_in_count, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
input_addresses = [] | |
input_values = [] | |
for _ in range(tx_in_count): | |
prev_txid = data[tx_start:tx_start + 32][::-1].hex() | |
vout = struct.unpack('<I', data[tx_start + 32:tx_start + 36])[0] | |
tx_start += 36 | |
script_length, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
script_sig = data[tx_start:tx_start + script_length] | |
tx_start += script_length | |
address = extract_address_from_script_sig(script_sig) | |
if address: | |
input_addresses.append(address) | |
else: | |
input_addresses.append(f"Could not parse address from scriptSig (txid: {prev_txid}, vout: {vout})") | |
# برای سادگی، فرض میکنیم مقدار بیتکوین برابر با vout (فرض نادرست) | |
input_values.append({ | |
'address': address, | |
'value': vout # مقدار واقعی نیاز به استخراج از تراکنش قبلی دارد | |
}) | |
tx_start += 4 | |
tx_out_count, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
output_addresses = [] | |
output_values = [] | |
total_output_value = 0 | |
for _ in range(tx_out_count): | |
value = struct.unpack('<Q', data[tx_start:tx_start + 8])[0] | |
tx_start += 8 | |
script_length, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
script_pubkey = data[tx_start:tx_start + script_length] | |
address = extract_address(script_pubkey) | |
if address: | |
output_addresses.append(address) | |
output_values.append({ | |
'address': address, | |
'value': value / 1e8 # تبدیل به بیتکوین | |
}) | |
total_output_value += value / 1e8 | |
tx_start += script_length | |
tx_data['locktime'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0] | |
tx_start += 4 | |
tx_data['txid'] = hashlib.sha256(hashlib.sha256(data[:tx_start]).digest()).digest()[::-1].hex() | |
tx_data['input_addresses'] = input_addresses | |
tx_data['output_addresses'] = output_addresses | |
tx_data['input_values'] = input_values | |
tx_data['output_values'] = output_values | |
tx_data['total_output_value'] = total_output_value | |
return tx_data, tx_start | |
def extract_address_from_script_sig(script_sig): | |
if len(script_sig) >= 33: | |
pubkey = script_sig[-33:] if script_sig[-33] in (0x02, 0x03) else script_sig[-65:] | |
if len(pubkey) in (33, 65): | |
pubkey_hash = hashlib.new('ripemd160', hashlib.sha256(pubkey).digest()).digest() | |
return hash160_to_p2pkh_address(pubkey_hash.hex()) | |
return None | |
def extract_address(script): | |
if len(script) == 25 and script[0] == 0x76 and script[1] == 0xa9 and script[-2] == 0x88 and script[-1] == 0xac: | |
pubkey_hash = script[3:-2] | |
return hash160_to_p2pkh_address(pubkey_hash.hex()) | |
elif len(script) == 23 and script[0] == 0xa9 and script[-1] == 0x87: | |
script_hash = script[2:-1] | |
return hash160_to_p2sh_address(script_hash.hex()) | |
elif len(script) >= 22 and script[0] == 0x00 and (script[1] == 0x14 or script[1] == 0x20): | |
witness_hash = script[2:] | |
return hash_to_bech32(witness_hash, len(witness_hash) == 20) | |
return None | |
def hash160_to_p2pkh_address(hash160): | |
prefix = b'\x00' | |
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160)) | |
def hash160_to_p2sh_address(hash160): | |
prefix = b'\x05' | |
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160)) | |
def base58_encode_with_checksum(data): | |
checksum = hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4] | |
return base58_encode(data + checksum) | |
def base58_encode(data): | |
alphabet = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' | |
num = int.from_bytes(data, 'big') | |
encoded = '' | |
while num > 0: | |
num, rem = divmod(num, 58) | |
encoded = alphabet[rem] + encoded | |
for byte in data: | |
if byte == 0: | |
encoded = '1' + encoded | |
else: | |
break | |
return encoded | |
def hash_to_bech32(hash_data, is_p2wpkh): | |
version = 0 if is_p2wpkh else 0 | |
return bech32_encode("bc", convertbits([version] + list(hash_data), 8, 5)) | |
def bech32_polymod(values): | |
gen = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] | |
chk = 1 | |
for v in values: | |
b = (chk >> 25) | |
chk = (chk & 0x1ffffff) << 5 ^ v | |
for i in range(5): | |
if (b >> i) & 1: | |
chk ^= gen[i] | |
return chk | |
def bech32_hrp_expand(hrp): | |
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] | |
def bech32_verify_checksum(hrp, data): | |
return bech32_polymod(bech32_hrp_expand(hrp) + data) == 1 | |
def bech32_create_checksum(hrp, data): | |
values = bech32_hrp_expand(hrp) + data | |
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1 | |
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] | |
def bech32_encode(hrp, data): | |
combined = data + bech32_create_checksum(hrp, data) | |
return hrp + '1' + ''.join(['qpzry9x8gf2tvdw0s3jn54khce6mua7l'[(x)] for x in combined]) | |
def convertbits(data, frombits, tobits, pad=True): | |
acc = 0 | |
bits = 0 | |
ret = [] | |
maxv = (1 << tobits) - 1 | |
for value in data: | |
if value < 0 or value >> frombits: | |
return None | |
acc = (acc << frombits) | value | |
bits += frombits | |
while bits >= tobits: | |
bits -= tobits | |
ret.append((acc >> bits) & maxv) | |
if pad: | |
if bits: | |
ret.append((acc << (tobits - bits)) & maxv) | |
elif bits >= frombits or ((acc << (tobits - bits)) & maxv): | |
return None | |
return ret | |
def write_json_block(block_info, filename): | |
with open(filename, 'w') as f: | |
json.dump(block_info, f, indent=4) | |
def block_info(filename: str): | |
nf = filename.split('.')[0] | |
jsonFile = f"{nf}.json" | |
block_db = read_block(filename) | |
write_json_block(block_db, jsonFile) | |
if __name__ == '__main__': | |
block_info('blk00001.dat') | |
print("Done") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
import hashlib | |
def read_block(file_path): | |
block_info = [] | |
with open(file_path, 'rb') as f: | |
while True: | |
magic = f.read(4) | |
if len(magic) < 4: | |
break | |
if magic != b'\xf9\xbe\xb4\xd9': # Bitcoin's magic number | |
print("Magic number invalid.") | |
break | |
block_size = struct.unpack('<I', f.read(4))[0] | |
block_data = f.read(block_size) | |
if len(block_data) < block_size: | |
print("Incomplete block.") | |
break | |
block_header = block_data[:80] | |
version, prev_hash, merkle_root, timestamp, bits, nonce = struct.unpack('<L32s32sLLL', block_header) | |
block_hash = hashlib.sha256(hashlib.sha256(block_header).digest()).digest()[::-1].hex() | |
tx_count, varint_size = read_varint(block_data[80:]) | |
tx_offset = 80 + varint_size | |
block_info.append({ | |
'block_hash': block_hash, | |
'tx_count': tx_count, | |
'transactions': [] | |
}) | |
transactions = [] | |
for _ in range(tx_count): | |
tx_data, tx_size = parse_transaction(block_data[tx_offset:]) | |
transactions.append(tx_data) | |
tx_offset += tx_size | |
block_info[-1]['transactions'] = transactions | |
return block_info | |
def read_varint(data): | |
prefix = data[0] | |
if prefix < 0xfd: | |
return prefix, 1 | |
elif prefix == 0xfd: | |
return struct.unpack('<H', data[1:3])[0], 3 | |
elif prefix == 0xfe: | |
return struct.unpack('<I', data[1:5])[0], 5 | |
elif prefix == 0xff: | |
return struct.unpack('<Q', data[1:9])[0], 9 | |
def parse_transaction(data): | |
tx_data = {} | |
tx_start = 0 | |
tx_data['version'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0] | |
tx_start += 4 | |
tx_in_count, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
input_addresses = [] | |
for _ in range(tx_in_count): | |
prev_txid = data[tx_start:tx_start + 32][::-1].hex() | |
vout = struct.unpack('<I', data[tx_start + 32:tx_start + 36])[0] | |
tx_start += 36 | |
script_length, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
script_sig = data[tx_start:tx_start + script_length] | |
tx_start += script_length | |
address = extract_address_from_script_sig(script_sig) | |
if address: | |
input_addresses.append(address) | |
else: | |
input_addresses.append(f"Could not parse address from scriptSig (txid: {prev_txid}, vout: {vout})") | |
tx_start += 4 | |
tx_out_count, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
output_addresses = [] | |
for _ in range(tx_out_count): | |
tx_start += 8 | |
script_length, varint_size = read_varint(data[tx_start:]) | |
tx_start += varint_size | |
script_pubkey = data[tx_start:tx_start + script_length] | |
address = extract_address(script_pubkey) | |
if address: | |
output_addresses.append(address) | |
tx_start += script_length | |
tx_data['locktime'] = struct.unpack('<I', data[tx_start:tx_start + 4])[0] | |
tx_start += 4 | |
tx_data['txid'] = hashlib.sha256(hashlib.sha256(data[:tx_start]).digest()).digest()[::-1].hex() | |
tx_data['input_addresses'] = input_addresses | |
tx_data['output_addresses'] = output_addresses | |
return tx_data, tx_start | |
def extract_address_from_script_sig(script_sig): | |
if len(script_sig) >= 33: | |
pubkey = script_sig[-33:] if script_sig[-33] in (0x02, 0x03) else script_sig[-65:] | |
if len(pubkey) in (33, 65): | |
pubkey_hash = hashlib.new('ripemd160', hashlib.sha256(pubkey).digest()).digest() | |
return hash160_to_p2pkh_address(pubkey_hash.hex()) | |
return None | |
def extract_address(script): | |
if len(script) == 25 and script[0] == 0x76 and script[1] == 0xa9 and script[-2] == 0x88 and script[-1] == 0xac: | |
pubkey_hash = script[3:-2] | |
return hash160_to_p2pkh_address(pubkey_hash.hex()) | |
elif len(script) == 23 and script[0] == 0xa9 and script[-1] == 0x87: | |
script_hash = script[2:-1] | |
return hash160_to_p2sh_address(script_hash.hex()) | |
elif len(script) >= 22 and script[0] == 0x00 and (script[1] == 0x14 or script[1] == 0x20): | |
witness_hash = script[2:] | |
return hash_to_bech32(witness_hash, len(witness_hash) == 20) | |
return None | |
def hash160_to_p2pkh_address(hash160): | |
prefix = b'\x00' | |
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160)) | |
def hash160_to_p2sh_address(hash160): | |
prefix = b'\x05' | |
return base58_encode_with_checksum(prefix + bytes.fromhex(hash160)) | |
def base58_encode_with_checksum(data): | |
checksum = hashlib.sha256(hashlib.sha256(data).digest()).digest()[:4] | |
return base58_encode(data + checksum) | |
def base58_encode(data): | |
alphabet = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' | |
num = int.from_bytes(data, 'big') | |
encoded = '' | |
while num > 0: | |
num, rem = divmod(num, 58) | |
encoded = alphabet[rem] + encoded | |
for byte in data: | |
if byte == 0: | |
encoded = '1' + encoded | |
else: | |
break | |
return encoded | |
def hash_to_bech32(hash_data, is_p2wpkh): | |
version = 0 if is_p2wpkh else 0 | |
return bech32_encode("bc", convertbits([version] + list(hash_data), 8, 5)) | |
def bech32_polymod(values): | |
gen = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] | |
chk = 1 | |
for v in values: | |
b = (chk >> 25) | |
chk = (chk & 0x1ffffff) << 5 ^ v | |
for i in range(5): | |
if (b >> i) & 1: | |
chk ^= gen[i] | |
return chk | |
def bech32_hrp_expand(hrp): | |
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] | |
def bech32_verify_checksum(hrp, data): | |
return bech32_polymod(bech32_hrp_expand(hrp) + data) == 1 | |
def bech32_create_checksum(hrp, data): | |
values = bech32_hrp_expand(hrp) + data | |
polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1 | |
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] | |
def bech32_encode(hrp, data): | |
combined = data + bech32_create_checksum(hrp, data) | |
return hrp + '1' + ''.join(['qpzry9x8gf2tvdw0s3jn54khce6mua7l'[(x)] for x in combined]) | |
def convertbits(data, frombits, tobits, pad=True): | |
acc = 0 | |
bits = 0 | |
ret = [] | |
maxv = (1 << tobits) - 1 | |
for value in data: | |
if value < 0 or value >> frombits: | |
return None | |
acc = (acc << frombits) | value | |
bits += frombits | |
while bits >= tobits: | |
bits -= tobits | |
ret.append((acc >> bits) & maxv) | |
if pad: | |
if bits: | |
ret.append((acc << (tobits - bits)) & maxv) | |
elif bits >= frombits or ((acc << (tobits - bits)) & maxv): | |
return None | |
return ret |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment