Last active
February 4, 2024 17:32
-
-
Save HQJaTu/7ddf24b0e6151f8831aa1d41f1d89c8e to your computer and use it in GitHub Desktop.
OpenSuse wicked 6RD tunnel setup
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python | |
import os | |
import sys | |
import argparse | |
import ipaddress | |
import netifaces | |
from lxml import etree as ET | |
DEFAULT_DHCPV4_FILE = r'/var/lib/wicked/lease-{0}-dhcp-ipv4.xml' | |
DEFAULT_TUNNEL_INTERFACE = "tun6" | |
def parse_wicked_dhcpv4_lease_file(iface: str) -> str: | |
lease_filename = '/var/lib/wicked/lease-{}-dhcp-ipv4.xml'.format(iface) | |
if not os.path.exists(lease_filename): | |
raise ValueError("Lease file for interface '{}' doesn't exist! Cannot continue.".format(iface)) | |
with open(lease_filename, 'r') as file: | |
lease_xml = file.read() | |
# We really want to use lxml for its safety. | |
# However: With lxml namespaces need to be there in the XML before parsing. | |
improved_xml = lease_xml.replace('<lease>', | |
'<?xml version="1.0" ?>\n<lease' | |
' xmlns="http://www.example.com/lease"' | |
' xmlns:ipv4="http://www.example.com/ipv4">' | |
) | |
root = ET.XML(improved_xml) | |
# Also: namespaces MUST be used while searching | |
e = root.xpath('/l:lease/ip:dhcp/l:options/l:unknown-212/l:data', | |
namespaces={'l': 'http://www.example.com/lease', | |
'ip': 'http://www.example.com/ipv4'} | |
) | |
if not e: | |
raise ValueError("Wicked DHCPv4 lease for interface '{}' doesn't contain 6RD information in it! Telco doesn't support 6RD!") | |
return e[0].text | |
def get_current_ip_from_interface(iface: str) -> ipaddress.IPv4Address: | |
""" | |
Query the IPv4-address for given interface | |
:param iface: | |
:return: | |
""" | |
if iface not in netifaces.interfaces(): | |
sys.stderr.write("Error: Interface {} not known. Cannot continue!".format(iface)) | |
exit(1) | |
ips = netifaces.ifaddresses(iface) | |
if not ips[netifaces.AF_INET]: | |
sys.stderr.write("Error: Interface %s has no IPv4-addresses. Cannot continue!" % iface) | |
exit(1) | |
if len(ips[netifaces.AF_INET]) > 1: | |
sys.stderr.write("Error: Interface %s has multiple IPv4-addresses. Cannot continue!" % iface) | |
exit(1) | |
# Example from shell: | |
# ip route get 8.8.8.8 | awk '/ src / {print $7;}' | |
my_ipv4 = ipaddress.IPv4Address(ips[netifaces.AF_INET][0]['addr']) | |
# Return the only IPv4-address there is. | |
return my_ipv4 | |
def parse_option_212(option: str) -> tuple[ipaddress.IPv6Network, int, ipaddress.IPv4Address]: | |
""" | |
Spec: https://www.rfc-editor.org/rfc/rfc5969 | |
:param option: Option 212 value as string | |
:return: parsed values in a tuple: IPv6 prefix, IPv4 mask length, border relay IPv4 address | |
""" | |
result = bytes.fromhex(option.replace(':', '')) | |
ipv4_mask_len = result[0] | |
prefix_len = result[1] | |
prefix_bytes = result[2:18] | |
border_relay_bytes = result[18:22] | |
prefix_addr = ipaddress.IPv6Address(prefix_bytes) | |
prefix = ipaddress.IPv6Network((prefix_addr, prefix_len), strict=True) | |
border_relay_ipv4 = ipaddress.IPv4Address(border_relay_bytes) | |
return prefix, ipv4_mask_len, border_relay_ipv4 | |
def combine_prefix_and_ipv4(prefix: ipaddress.IPv6Network, | |
my_ipv4: ipaddress.IPv4Address, | |
ipv4_mask_len: int) -> tuple[ipaddress.IPv6Address, int, ipaddress.IPv6Address]: | |
""" | |
Do some IP-address bit tweaking. | |
Combine given 6RD prefix with meaningful bits of IPv4 address | |
:param prefix: IPv6 prefix from DHCPv4 option-212 | |
:param my_ipv4: IPv4 address to be converted into a 6RD IPv6 address | |
:param ipv4_mask_len: IPv4 mask length from DHCPv4 option-212 | |
:return: tuple: calculated 6RD IPv6 prefix, calculated IPv6 mask length, calculated 6RD IPv6 address | |
""" | |
# Sanity: IPv4 mask needs to be sensible in context of IPv4 address | |
if ipv4_mask_len < 0 or ipv4_mask_len > 32: | |
raise ValueError("IPv4 mask length must be 0-32 bits!") | |
ipv4_prefix_len = 32 - ipv4_mask_len | |
# Calculated IPv6 will have bits from given IPv6 prefix and meaningful (unmasked) bits from IPv4 address | |
prefix_len = prefix.prefixlen + ipv4_prefix_len | |
# 1: Most significant bits from IPv6 prefix | |
prefix_bits = "{0:b}".format(prefix.network_address)[0:prefix.prefixlen] | |
# 2: Unmasked bits from IPv4 address | |
ipv4_effective_bits = "{0:b}".format(my_ipv4)[ipv4_mask_len:32] | |
# 3: Combine 1) and 2) into a null-padded IPv6-address | |
ipv6_address_bits = prefix_bits + ipv4_effective_bits + "0" * (128 - prefix_len) | |
# Convert resulting bits into an actual IPv6 address | |
my_ipv6_prefix = ipaddress.IPv6Address(int(ipv6_address_bits, 2)) | |
# Take prefix IPv6-address and change the last bit into 1 to form a host IPv6 address | |
my_ipv6_bytes = my_ipv6_prefix.packed[0:15] + bytes([1]) | |
my_ipv6 = ipaddress.IPv6Address(my_ipv6_bytes) | |
return my_ipv6_prefix, prefix_len, my_ipv6 | |
def get_prefix_32(prefix: ipaddress.IPv6Network) -> ipaddress.IPv6Network: | |
""" | |
Make sure 6RD prefix is /32 or less. | |
Linux will emit "ioctl 89f9 failed: Invalid argument" if using bigger prefixes than /32. | |
:param prefix: IPv6 network prefix as received via DHCPv4 option-212 | |
:return: IPv6 network prefix with /32 or less | |
""" | |
if prefix.prefixlen <= 32: | |
return prefix | |
prefix32 = ipaddress.IPv6Network((prefix.network_address, 32), strict=False) | |
return prefix32 | |
def main(): | |
parser = argparse.ArgumentParser(description='Wicked 6RD tunnel setup - RFC 5969') | |
parser.add_argument('interface', | |
help='Network interface to read DHCPv4 data for') | |
parser.add_argument('--tunnel-interface', default=DEFAULT_TUNNEL_INTERFACE, | |
help="Interface to create as tunnel. Default: {}".format(DEFAULT_TUNNEL_INTERFACE)) | |
parser.add_argument('--print-terse', action='store_true', | |
help="Print only commands, nothing else") | |
parser.add_argument('--print-dhcp', action='store_true', | |
help="Print DHVPv4 information") | |
parser.add_argument('--print-update-existing', action='store_true', | |
help="Add tunnel up/down. Skip outputting 'ip tunnel add'. Used when updating pre-existing tunnels.") | |
args = parser.parse_args() | |
# Sanity: | |
my_ipv4 = get_current_ip_from_interface(args.interface) | |
# Parse DHCPv4 lease | |
option_212_data = parse_wicked_dhcpv4_lease_file(args.interface) | |
# 6RD-math: | |
prefix, ipv4_mask_len, border_relay = parse_option_212(option_212_data) | |
my_ipv6_prefix, my_ipv6_prefix_len, my_ipv6 = combine_prefix_and_ipv4(prefix, my_ipv4, ipv4_mask_len) | |
prefix_32 = get_prefix_32(prefix) | |
if not args.print_terse: | |
print("6RD setup") | |
print() | |
if args.print_dhcp: | |
print("My IPv4 is: {}".format(my_ipv4)) | |
print() | |
print("DHCPv4 option-212 information:") | |
print("6RD IPv4 mask length: {}".format(ipv4_mask_len)) | |
print("6RD prefix length: {}".format(prefix.prefixlen)) | |
print("6RD prefix: {}".format(prefix)) | |
print("6RD border relay: {}".format(border_relay)) | |
print() | |
if not args.print_terse: | |
print("Commands to create a 6RD tunnel:") | |
if args.print_update_existing: | |
print("ifdown {}".format(args.tunnel_interface)) | |
print("ip tunnel add name {} mode sit local {} ttl 64".format(args.tunnel_interface, my_ipv4)) | |
if not args.print_update_existing: | |
#print("ip addr add {}/64 dev {}".format(my_ipv6, args.interface)) | |
print("ip tunnel add name {} mode sit local {} ttl 64".format(args.tunnel_interface, my_ipv4)) | |
print("ip tunnel 6rd dev {} 6rd-prefix {}".format(args.tunnel_interface, prefix_32)) | |
print("ip addr add {}/{} dev {}".format(my_ipv6, my_ipv6_prefix_len, args.tunnel_interface)) | |
print("ip link set {} up".format(args.tunnel_interface)) | |
print("ip addr flush dev {}".format(args.tunnel_interface)) | |
print("ip route add ::/0 via ::{} dev {} # IPv6 default route".format(border_relay, args.tunnel_interface)) | |
if args.print_update_existing: | |
print("ifup {}".format(args.tunnel_interface)) | |
print("# done") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment