Skip to content

Instantly share code, notes, and snippets.

@ourway
Created December 28, 2024 04:46
Show Gist options
  • Save ourway/2946335e22f48404a5c3b9487685ed4c to your computer and use it in GitHub Desktop.
Save ourway/2946335e22f48404a5c3b9487685ed4c to your computer and use it in GitHub Desktop.
import random
class EndToEndEncryption:
"""
A simplified End-to-End Encryption example with Diffie-Hellman key exchange and a
basic keystream-based XOR encryption.
This class includes:
1. Prime number generation for Diffie-Hellman parameters.
2. Diffie-Hellman key exchange to establish a shared secret.
3. A basic keystream-based XOR encryption using a linear congruential generator (LCG).
WARNING:
This code is solely for DEMO/educational purposes and is NOT suitable for real-world usage.
Real cryptographic systems require secure and proven libraries.
Usage Outline:
1. Generate DH parameters (prime, generator).
2. Initialize two instances (or more), each using the same prime & generator.
3. Each instance calls generate_keypair() to create a private_key/public_key pair.
4. Exchange public keys, and each instance calls establish_shared_secret() with the other's public key.
5. Now both sides share an identical secret for encryption/decryption.
6. Use encrypt() to create ciphertext, and decrypt() to recover the original message.
"""
def __init__(self, prime=None, generator=None):
"""
Initialize the encryption system.
prime and generator are shared Diffie-Hellman parameters (must be the same for both parties).
"""
# This attribute will hold the final shared secret derived via DH.
self.shared_secret_key = None
# Public Diffie-Hellman parameters; typically these are agreed upon beforehand.
self.prime = prime
self.generator = generator
# Private key (random integer) for Diffie-Hellman.
self.private_key = None
# Public key = generator^private_key mod prime.
self.public_key = None
@staticmethod
def _is_prime(num, test_count=5):
"""
Test if a number is probably prime using a simplified Miller-Rabin primality test.
Args:
num: The number to test for primality.
test_count: The number of random tests (the higher, the more accurate).
Returns:
Boolean indicating whether `num` is probably prime.
"""
# Immediately rule out even numbers and numbers <= 1
if num <= 1 or num % 2 == 0:
return False
# 2 and 3 are prime
if num <= 3:
return True
# Factor out powers of 2 from num - 1
# We want to represent num-1 as 2^s * r, where r is odd.
s = 0
r = num - 1
while r % 2 == 0:
s += 1
r //= 2
# Perform the Miller-Rabin test multiple times (test_count).
for _ in range(test_count):
# Choose a random integer 'a' in [2, num - 2]
a = random.randrange(2, num - 1)
# Compute x = a^r mod num
x = pow(a, r, num)
# If x is 1 or num-1, this round of testing doesn't rule out primality
if x == 1 or x == num - 1:
continue
# Square x repeatedly up to s-1 times to see if we get num-1
for _ in range(s - 1):
x = pow(x, 2, num)
if x == num - 1:
break
else:
# If we never broke out, num is composite
return False
return True # Probably prime if we haven't found a reason it's composite
@classmethod
def generate_dh_parameters(cls, bit_length=32):
"""
Generate and return (prime, generator) suitable for Diffie-Hellman key exchange.
Args:
bit_length: The bit-length of the random prime to generate (e.g., 32, 64, 1024, etc.).
Returns:
A tuple (prime, generator) to be used by both parties for Diffie-Hellman.
"""
prime = cls._generate_prime(bit_length)
generator = cls._find_primitive_root(prime)
return prime, generator
@classmethod
def _generate_prime(cls, bit_length):
"""
Generates a random probable prime number of a specified bit length.
Args:
bit_length: The bit-length for the random number to be tested for primality.
Returns:
A prime number of the requested bit length.
"""
while True:
# Use the built-in random.getrandbits to generate a random number with given bit-length
num = random.getrandbits(bit_length)
# Check if the generated number is probably prime
if cls._is_prime(num):
return num
@classmethod
def _find_primitive_root(cls, prime):
"""
Finds a primitive root modulo 'prime'.
A primitive root g is one such that for every integer k (1 <= k < prime-1),
g^k mod prime != 1, except when k = prime-1.
This is a simplified approach for demonstration. For a large prime,
finding a primitive root can be more complex.
Args:
prime: The prime number for which we want a primitive root.
Returns:
The smallest integer >= 2 that is a primitive root modulo 'prime'.
"""
if prime == 2:
return 1 # 2 is trivial, but generator is 1
# phi(prime) = prime - 1, since 'prime' is prime
phi = prime - 1
# Factorize phi(prime). We need these prime factors to check if a candidate is a primitive root.
prime_factors = set()
d = 2
while d * d <= phi:
while phi % d == 0:
prime_factors.add(d)
phi //= d
d += 1
if phi > 1:
prime_factors.add(phi)
# The smallest primitive root is found by checking each candidate starting at 2
for candidate in range(2, prime):
ok = True
# For each prime factor of (prime-1), we check if candidate^((prime-1)/factor) mod prime == 1
for factor in prime_factors:
if pow(candidate, (prime - 1) // factor, prime) == 1:
ok = False
break
if ok:
return candidate
# In theory, we should always find a generator if 'prime' is truly prime
return None
def generate_keypair(self):
"""
Generates a private key and public key for Diffie-Hellman using the existing prime & generator.
Steps:
1. private_key is a random integer in [2, prime-2].
2. public_key = (generator^private_key) mod prime.
"""
if self.prime is None or self.generator is None:
raise ValueError("prime and generator must be set before generating keypair.")
# Random private key
self.private_key = random.randrange(2, self.prime - 1)
# Compute public key as generator^private_key mod prime
self.public_key = pow(self.generator, self.private_key, self.prime)
def establish_shared_secret(self, other_public_key):
"""
Establishes a shared secret using the other party's public key.
The shared secret is:
(other_public_key ^ private_key) mod prime
This should match the other party's shared secret:
(my_public_key ^ other_private_key) mod prime
"""
if self.prime is None:
raise Exception("No prime set. Make sure prime and generator are correctly set.")
# The shared secret is derived from the other party's public key
self.shared_secret_key = pow(other_public_key, self.private_key, self.prime)
@staticmethod
def _int_to_bytes(num):
"""
Converts a 32-bit integer to 4 bytes in big-endian byte order.
Args:
num: An integer (0 <= num < 2^32).
Returns:
A 4-byte representation of `num`.
"""
return num.to_bytes(4, byteorder='big')
@staticmethod
def _bytes_to_int(byte_data):
"""
Converts 4 bytes to a 32-bit integer in big-endian byte order.
Args:
byte_data: A 4-byte bytes object.
Returns:
The 32-bit integer decoded from the byte_data.
"""
return int.from_bytes(byte_data, byteorder='big')
@staticmethod
def _pad_message(message):
"""
Pads the message with null bytes (b'\\x00') so that its length is a multiple of 4 bytes.
Args:
message: The bytes of the original message.
Returns:
The padded message as bytes.
"""
padding_needed = 4 - (len(message) % 4)
return message + b'\x00' * padding_needed
@staticmethod
def _unpad_message(message):
"""
Removes trailing null bytes from the message.
Args:
message: The bytes (possibly with trailing nulls).
Returns:
The original message bytes without trailing null bytes.
"""
return message.rstrip(b'\x00')
def _generate_keystream(self, length):
"""
Generates a pseudo-random keystream using a simple Linear Congruential Generator (LCG)
seeded by the shared secret.
The LCG formula is:
X_{n+1} = (a * X_n + c) mod M
where we take a = 1103515245, c = 12345, M = 2^31.
Args:
length: The number of bytes for which we need a keystream.
We'll produce length/4 32-bit integers.
Returns:
A list of 32-bit integers (each up to 2^31-1).
"""
if self.shared_secret_key is None:
raise ValueError("Shared secret key not established. Call establish_shared_secret().")
keystream = []
# We'll chop the shared secret key to fit within 2^31 as the initial seed
seed = self.shared_secret_key % (2**31)
# For each 4-byte block, generate a new 32-bit integer from the LCG
for _ in range(length // 4):
seed = (1103515245 * seed + 12345) % (2**31)
keystream.append(seed)
return keystream
def encrypt(self, plaintext):
"""
Encrypts a plaintext string using XOR with a generated keystream.
Steps:
1. Convert plaintext to bytes (UTF-8).
2. Pad it so length is multiple of 4.
3. Generate the keystream (length // 4 32-bit numbers).
4. XOR each 4-byte block of plaintext with one 32-bit from the keystream.
5. Concatenate into ciphertext bytes.
Args:
plaintext: The original message as a string.
Returns:
The ciphertext bytes.
"""
# Ensure we have a shared key
if self.shared_secret_key is None:
raise Exception("Shared secret has not been established. Call establish_shared_secret() first.")
# Convert string to bytes
plaintext_bytes = plaintext.encode('utf-8')
# Pad the plaintext to a multiple of 4 bytes
padded_plaintext = self._pad_message(plaintext_bytes)
# Generate keystream of length equal to padded plaintext
keystream = self._generate_keystream(len(padded_plaintext))
ciphertext = b''
# Process in blocks of 4 bytes (32 bits)
for i in range(0, len(padded_plaintext), 4):
# Convert the 4-byte block into an integer
block_int = self._bytes_to_int(padded_plaintext[i:i+4])
# XOR the block with the corresponding keystream integer
cipher_int = block_int ^ keystream[i // 4]
# Convert back to 4 bytes and append to the ciphertext
ciphertext += self._int_to_bytes(cipher_int)
return ciphertext
def decrypt(self, ciphertext):
"""
Decrypts a ciphertext (bytes) using the shared secret-based keystream XOR.
Steps:
1. Generate the same keystream length based on the ciphertext size.
2. XOR each 4-byte block of ciphertext with the keystream block.
3. Unpad the result to remove trailing null bytes.
4. Decode from UTF-8 to get the original string.
Args:
ciphertext: The encrypted bytes.
Returns:
The decrypted plaintext string.
"""
if self.shared_secret_key is None:
raise Exception("Shared secret has not been established. Call establish_shared_secret() first.")
# Generate keystream for decryption (same length as ciphertext)
keystream = self._generate_keystream(len(ciphertext))
decrypted_bytes = b''
# Process in 4-byte blocks
for i in range(0, len(ciphertext), 4):
cipher_int = self._bytes_to_int(ciphertext[i:i+4])
# XOR with the same keystream value to recover plaintext
plain_int = cipher_int ^ keystream[i // 4]
decrypted_bytes += self._int_to_bytes(plain_int)
# Remove any trailing null bytes used for padding
return self._unpad_message(decrypted_bytes).decode('utf-8')
# ============================
# DEMO USAGE
# ============================
if __name__ == "__main__":
try:
# 1. Generate shared DH parameters (common prime & generator).
# For real usage, bit_length should be much larger (e.g., 2048 bits or more).
prime, generator = EndToEndEncryption.generate_dh_parameters(bit_length=32)
# 2. Create two parties (A and B) and set the same prime and generator.
person_a = EndToEndEncryption(prime=prime, generator=generator)
person_b = EndToEndEncryption(prime=prime, generator=generator)
# 3. Each party generates its own key pair.
person_a.generate_keypair()
person_b.generate_keypair()
# 4. They exchange public keys and compute the shared secret.
person_a.establish_shared_secret(person_b.public_key)
person_b.establish_shared_secret(person_a.public_key)
# Verify that both shared secrets match:
print("Shared Secret Check:")
print(f" Person A's Shared Secret: {person_a.shared_secret_key}")
print(f" Person B's Shared Secret: {person_b.shared_secret_key}")
print(" (They should match!)\n")
# 5. Person A encrypts a message to Person B.
message_from_a = "Hello Person B! This is a secret message."
ciphertext_a = person_a.encrypt(message_from_a)
print(f"Person A sends (encrypted, hex): {ciphertext_a.hex()}")
# 6. Person B decrypts the incoming message.
decrypted_from_a = person_b.decrypt(ciphertext_a)
print(f"Person B receives (decrypted): {decrypted_from_a}\n")
# 7. Person B encrypts a response to Person A.
message_from_b = "Hi Person A! I got your secret message."
ciphertext_b = person_b.encrypt(message_from_b)
print(f"Person B sends (encrypted, hex): {ciphertext_b.hex()}")
# 8. Person A decrypts the response.
decrypted_from_b = person_a.decrypt(ciphertext_b)
print(f"Person A receives (decrypted): {decrypted_from_b}")
except UnicodeDecodeError as e:
print(f"\nError: UnicodeDecodeError occurred - {e}")
print("This typically indicates the shared secret is not the same on both sides.")
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment