Created
April 2, 2020 11:15
-
-
Save Ivlyth/70802e768589cfecee165054cd17ba83 to your computer and use it in GitHub Desktop.
self-defined simple BPF syntax parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding:utf8 -*- | |
""" | |
Author : Myth | |
Date : 2020/3/16 | |
Email : email4myth at gmail.com | |
""" | |
from __future__ import unicode_literals | |
import sys | |
import os | |
import subprocess | |
import base64 | |
import ipaddress | |
import re | |
TEST_PCAP_DATA = b'1MOyoQIABAAAAAAAAAAAAAAABABxAAAA' | |
TEST_PCAP_FILE = os.path.join(os.path.dirname(__file__), 'test-bpf.pcap') | |
if not os.path.exists(TEST_PCAP_FILE): | |
open(TEST_PCAP_FILE, 'wb').write(base64.b64decode(TEST_PCAP_DATA)) | |
class CommandRet(object): | |
def __init__(self, retcode, stdout, stderr): | |
self.retcode = retcode | |
self._stdout = stdout.strip() | |
self._stderr = stderr.strip() | |
@property | |
def success(self): | |
return self.retcode == 0 | |
def __bool__(self): | |
return self.success | |
def __nonzero__(self): | |
return self.success | |
@property | |
def stdout(self): | |
if isinstance(self._stdout, bytes): | |
return self._stdout.decode('utf-8', errors='ignore') | |
return self._stdout | |
@property | |
def stderr(self): | |
if isinstance(self._stderr, bytes): | |
return self._stderr.decode('utf-8', errors='ignore') | |
return self._stderr | |
def run_command(cmd, keep_stdout=True): | |
if not keep_stdout: | |
cmd += ' &> /dev/null' | |
ret = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
retcode = ret.wait() | |
stdout, stderr = ret.communicate() | |
return CommandRet(retcode, stdout, stderr) | |
def test_bpf(bpf): | |
ret = run_command("tcpdump -r %s -w /tmp/test-bpf.pcap -W 1 -G 0.1 '%s'" % (TEST_PCAP_FILE, bpf)) | |
if not ret.success: | |
return ret.stderr.splitlines()[-1] | |
def is_valid_ipv4(host): | |
try: | |
ipaddress.IPv4Address(host) | |
except: | |
return False | |
return True | |
def is_valid_ipv4_net(net): | |
try: | |
ipaddress.IPv4Network(net) | |
except: | |
return False | |
return True | |
def is_valid_ipv6(host): | |
print 'host is %s' % host | |
try: | |
ipaddress.IPv6Address(host) | |
except: | |
return False | |
return True | |
def is_valid_ipv6_net(net): | |
try: | |
ipaddress.IPv6Network(net) | |
except Exception: | |
return False | |
return True | |
class Protocol(object): | |
def __init__(self, name, bpf): | |
self.name = name | |
self.bpf = bpf | |
class ProtocolExpr(object): | |
def __init__(self, token, protocol, negative=False): | |
self.token = token | |
self.protocol = protocol | |
self.negative = negative | |
def to_bpf(self): | |
bpf = self.protocol.bpf | |
if self.negative: | |
bpf = 'not %s' % bpf | |
return bpf | |
class AddressInfo(object): | |
def __init__(self, host_or_net, net_mask='', is_ipv6=False, ports=()): | |
self.host_or_net = host_or_net | |
self.net_mask = net_mask | |
self.is_ipv6 = is_ipv6 | |
self.ports = ports | |
def to_bpf(self): | |
# FIXME | |
if self.net_mask: | |
host_or_net_bpf = 'net %s/%s' % (self.host_or_net, self.net_mask) | |
elif self.host_or_net: | |
host_or_net_bpf = 'host %s' % self.host_or_net | |
else: | |
host_or_net_bpf = '' | |
ports_bpf_list = [] | |
for port in self.ports: | |
if isinstance(port, int): | |
ports_bpf_list.append('port %s' % port) | |
else: | |
ports_bpf_list.append('portrange %s' % port) | |
ports_bpf = ' or '.join(ports_bpf_list) | |
if len(ports_bpf_list) > 1: | |
ports_bpf = '(%s)' % ports_bpf | |
if host_or_net_bpf and ports_bpf: | |
return '(%s and %s)' % (host_or_net_bpf, ports_bpf) | |
elif host_or_net_bpf: | |
return host_or_net_bpf | |
else: | |
return ports_bpf | |
class AddressExpr(object): | |
''' | |
可以只有主机或网络, 也可以带端口, 也可以只有端口 | |
''' | |
def __init__(self, token, negative=False): | |
self.token = token | |
self.negative = negative | |
self.addr_info = self.parse() | |
def parse(self): | |
# TODO | |
expr = self.token.value | |
last_middle_bracket = expr.rfind(']') | |
last_colon = expr.rfind(':') | |
if last_colon == len(expr) - 1: | |
raise BPFTokenError('冒号后缺少端口信息', self.token.start + last_colon, self.token.end) | |
ports = [] | |
if last_colon > last_middle_bracket: # maybe ports FIXME 需要精细化标记错误位置 | |
port_expr = expr[last_colon + 1:] | |
ports_list = port_expr.split(',') | |
for port in ports_list: | |
# TODO 严格的范围包含测试 | |
if '-' in port: # range | |
start, _, end = port.partition('-') | |
if not start: | |
raise BPFTokenError('错误的端口范围: 缺少开始端口', self.token.start, self.token.end) | |
if not start.isdigit(): | |
raise BPFTokenError('错误的端口范围: 端口必须为数字', self.token.start, self.token.end) | |
if not end: | |
raise BPFTokenError('错误的端口范围: 缺少结束端口', self.token.start, self.token.end) | |
if not end.isdigit(): | |
raise BPFTokenError('错误的端口范围: 端口必须为数字', self.token.start, self.token.end) | |
start = int(start) | |
end = int(end) | |
if start < 0 or start > 65535: | |
raise BPFTokenError('错误的端口范围: 开始端口超出合法范围', self.token.start, self.token.end) | |
if end < 0 or end > 65535: | |
raise BPFTokenError('错误的端口范围: 结束端口超出合法范围', self.token.start, self.token.end) | |
if end < start: | |
raise BPFTokenError('错误的端口范围: 结束端口不得小于开始端口', self.token.start, self.token.end) | |
if end == start: | |
ports.append(start) # int | |
else: | |
ports.append(port) # str | |
else: | |
if not port.isdigit(): | |
raise BPFTokenError('端口必须为数字', self.token.start, self.token.end) | |
port = int(port) | |
if port < 0 or port > 65535: | |
raise BPFTokenError('端口超出合法范围', self.token.start, self.token.end) | |
ports.append(port) | |
host_or_net = expr | |
if last_colon > last_middle_bracket: | |
host_or_net = expr[:last_colon] | |
net_mask = '' | |
is_ipv6 = False | |
if host_or_net: | |
host_or_net, sep, net_mask = host_or_net.partition('/') | |
if sep: # must have mask | |
if not host_or_net: | |
raise BPFTokenError('错误的 IP 地址', self.token.start, self.token.end) | |
if not net_mask: | |
raise BPFTokenError('缺少网络掩码', self.token.start, self.token.end) | |
if not net_mask.isdigit(): | |
raise BPFTokenError('网络掩码应为数字', self.token.start, self.token.end) | |
if host_or_net[0] == '[': # treat as ipv6 | |
if not is_valid_ipv6(host_or_net[1: -1]): | |
raise BPFTokenError('错误的 IPv6 地址', self.token.start, self.token.end) | |
if net_mask and not is_valid_ipv6_net('%s/%s' % (host_or_net[1: -1], net_mask)): | |
raise BPFTokenError('错误的网络段: %s/%s, 主机位不得被设置' % (host_or_net, net_mask), self.token.start, self.token.end) | |
host_or_net = host_or_net[1: -1] | |
is_ipv6 = True | |
else: # treat as ipv4 | |
if not is_valid_ipv4(host_or_net): | |
# is ipv6 without bracket ? | |
if is_valid_ipv6(host_or_net): | |
raise BPFTokenError('IPv6 应该被包裹在中括号内') # TODO FIXME 需要标记位置信息 | |
else: | |
raise BPFTokenError('错误的 IPv4 地址', self.token.start, self.token.end) | |
if net_mask and not is_valid_ipv4_net('%s/%s' % (host_or_net, net_mask)): | |
raise BPFTokenError('错误的网络段: %s/%s, 主机位不得被设置' % (host_or_net, net_mask), self.token.start, self.token.end) | |
else: | |
pass # only ports is valid | |
return AddressInfo(host_or_net, net_mask, is_ipv6, ports) | |
def to_bpf(self): | |
return self.addr_info.to_bpf() | |
class OPExpr(object): | |
''' | |
currently only `AND` and `OR` | |
''' | |
def __init__(self, token): | |
self.token = token | |
def to_bpf(self): | |
return self.token.ivalue | |
class LogicExpr(object): | |
''' | |
currently only `NOT` | |
''' | |
def __init__(self, token): | |
self.token = token | |
def to_bpf(self): | |
return self.token.ivalue | |
class GroupExpr(object): | |
def __init__(self, token): | |
self.token = token | |
self.exprs = [] | |
def add(self, expr): | |
self.exprs.append(expr) | |
def to_bpf(self): | |
bpf = ' '.join(e.to_bpf() for e in self.exprs) | |
if len(self.exprs) > 1: | |
return '(%s)' % bpf | |
else: | |
return bpf | |
def is_open_bracket(self): | |
return '(' == self.token.value | |
PROTOCOLS = [ | |
# protocol_name, bpf | |
('ip', '(ip and ip6)'), | |
('ip4', 'ip'), | |
('ip6', 'ip6'), | |
('tcp', 'tcp'), | |
('tcp4', '(ip and tcp)'), | |
('tcp6', '(ip6 and tcp)'), | |
('udp', 'udp'), | |
('udp4', '(ip and udp)'), | |
('udp6', '(ip6 and udp)'), | |
('icmp', 'icmp'), | |
('icmp4', '(ip and icmp)'), | |
('icmp6', '(ip6 and icmp)'), | |
] | |
PROTOCOL_MAP = dict([(name, Protocol(name, bpf)) for name, bpf in PROTOCOLS]) | |
VALID_EXPR = re.compile('[()a-z\-,\s\d!]+', re.IGNORECASE) | |
BLANK = re.compile('\s+') | |
class BPFTokenError(Exception): | |
def __init__(self, message, start=0, end=-1): | |
super(BPFTokenError, self).__init__(message) | |
self.start = start | |
self.end = end | |
def __str__(self): | |
return '%s (from %s to %s)' % (self.message, self.start, self.end) | |
class Token(object): | |
def __init__(self, start, end, value): | |
self.start = start | |
self.end = end | |
self.value = value | |
self.ivalue = value.lower() | |
class Tokenizer(object): | |
def __init__(self, expr): | |
self.expr = expr | |
def __iter__(self): | |
s = '' # current token | |
start = current = 0 | |
for i, c in enumerate(iter(self.expr)): | |
current = i | |
if BLANK.match(c): | |
if s: | |
yield Token(start, current + 1, s) | |
s = '' | |
elif c in '()': | |
if s: | |
yield Token(start, current + 1, s) | |
s = '' | |
yield Token(current, current + 1, c) | |
elif c in '!': | |
if s: # 这里也可以兼容, 但还是严格一些比较好 | |
raise BPFTokenError('期待空格, 但是遇到: "!"', current, current + 1) | |
yield Token(current, current + 1, 'not') # 将用户的 ! 转换为 not 关键字输出, 为后续处理统一标准 | |
else: | |
if not s: # just the beginning | |
start = i | |
s += c | |
if s: | |
yield Token(start, current + 1, s) | |
class Stack(object): | |
def __init__(self): | |
self._stack = [] | |
def push(self, item): | |
self._stack.append(item) | |
def pop(self): | |
if not self.is_empty(): | |
return self._stack.pop() | |
def is_empty(self): | |
return len(self._stack) == 0 | |
def at_the_top(self): | |
if not self.is_empty(): | |
return self._stack[-1] | |
def parse(expr): | |
expr = expr.strip() | |
if not expr: | |
return '' # empty expr | |
tokenizer = Tokenizer(expr) | |
exprs = [] | |
pre_expr = None | |
stack = Stack() | |
for token in tokenizer: | |
# print token.value, '-->', tokenizer.expr[token.start: token.end] | |
expr = create_expr_from_token(token) | |
if pre_expr: | |
if isinstance(pre_expr, LogicExpr): | |
if not isinstance(expr, | |
(OPExpr, ProtocolExpr, AddressExpr, GroupExpr)): # 这里 GroupExpr 不需区分开始结束, 下边会检测组相关问题 | |
raise BPFTokenError('期待得到 "not", "协议名称" 或者 "地址信息" 或者 "组", 但是得到了: "%s"' % expr.token.value, | |
expr.token.start, | |
expr.token.end) | |
elif isinstance(pre_expr, OPExpr): | |
if not isinstance(expr, (ProtocolExpr, AddressExpr)): | |
raise BPFTokenError('期待得到 "协议名称" 或者 "地址信息", 但是得到了: "%s"' % expr.token.value, expr.token.start, | |
expr.token.end) | |
elif isinstance(pre_expr, (ProtocolExpr, AddressExpr)): | |
if not isinstance(expr, (LogicExpr, GroupExpr)): | |
raise BPFTokenError('期待得到 "and" 或者 "or" 或者 "组", 但是得到了: "%s"' % expr.token.value, expr.token.start, | |
expr.token.end) | |
elif isinstance(pre_expr, GroupExpr): | |
if pre_expr.is_open_bracket(): | |
if not isinstance(expr, ( | |
OPExpr, ProtocolExpr, AddressExpr, GroupExpr)): # 这里 GroupExpr 不需区分开始结束, 下边会检测组相关问题 | |
raise BPFTokenError('期待得到 "not" 或者 "协议名称" 或者 "地址信息" 或者 "组", 但是得到了: "%s"' % expr.token.value, | |
expr.token.start, expr.token.end) | |
else: | |
if not isinstance(expr, (LogicExpr, GroupExpr)): # 这里 GroupExpr 不需区分开始结束, 下边会检测组相关问题 | |
raise BPFTokenError('期待得到 "and" 或者 "or" 或者 "结束组", 但是得到了: "%s"' % expr.token.value, | |
expr.token.start, expr.token.end) | |
else: | |
if not isinstance(expr, | |
(OPExpr, ProtocolExpr, AddressExpr, GroupExpr)): # 这里 GroupExpr 不需区分开始结束, 下边会检测组相关问题 | |
raise BPFTokenError('期待得到 "not", "协议名称" 或者 "地址信息", 但是得到了: "%s"' % expr.token.value, expr.token.start, | |
expr.token.end) | |
pre_expr = expr | |
# TODO 利用栈来构建组 Group | |
if isinstance(expr, GroupExpr): | |
if expr.token.value == '(': # start a new group | |
exprs.append(expr) | |
stack.push(expr) | |
elif expr.token.value == ')': # end a group | |
if stack.is_empty(): | |
raise BPFTokenError("单独的关闭组", token.start, token.end) | |
else: | |
group_expr = stack.pop() | |
if group_expr.token.value != '(': | |
raise BPFTokenError("未成对的组", group_expr.token.start, expr.token.end) | |
if not group_expr.exprs: | |
raise BPFTokenError("无内容的组", group_expr.token.start, expr.token.end) | |
else: | |
if not stack.is_empty(): | |
group = stack.at_the_top() | |
group.add(expr) | |
else: | |
exprs.append(expr) | |
if not stack.is_empty(): | |
group_expr = stack.pop() | |
raise BPFTokenError("未成对的组", group_expr.token.start, expr.token.end) | |
last_expr = exprs[-1] | |
if not isinstance(last_expr, (GroupExpr, ProtocolExpr, AddressExpr)): | |
raise BPFTokenError('期待得到 "组", "协议名称" 或者 "地址信息", 但是得到了: "%s"' % last_expr.token.value, last_expr.token.start, | |
last_expr.token.end) | |
bpf = ' '.join([expr.to_bpf() for expr in exprs]) | |
err = test_bpf(bpf) | |
if err: | |
if err.endswith('expression rejects all packets'): | |
raise BPFTokenError('错误的表达式, 过滤掉了所有数据') | |
else: | |
raise BPFTokenError('错误的表达式: %s' % err) | |
return bpf | |
def create_expr_from_token(token): | |
''' | |
:param token: | |
:return: | |
''' | |
# group start or end | |
if token.ivalue in '()': | |
return GroupExpr(token) | |
# and / or | |
if token.ivalue in ('and', 'or'): | |
return LogicExpr(token) | |
# not | |
elif token.ivalue == 'not': | |
return OPExpr(token) | |
# protocols | |
elif token.ivalue in PROTOCOL_MAP: | |
return ProtocolExpr(token, PROTOCOL_MAP[token.ivalue]) | |
# should be addresses | |
return AddressExpr(token) | |
def main(): | |
expr = ' '.join(sys.argv[1:]) | |
if not expr: | |
expr = 'tcp4 or ( tcp or udp6 and icmp4)and udp6 and !10.0.81.0/24:80-10000,20000-30000,48888,58888 and 10.0.81.48:11111' | |
print 'expr is "%s"' % expr | |
try: | |
bpf = parse(expr) | |
print 'bpf is "%s"' % bpf | |
except BPFTokenError as e: | |
print '============== ERROR ==============' | |
print e | |
print expr[e.start: e.end] | |
print '============== ERROR ==============' | |
raise | |
test_rules = [ | |
'TcP or udp or (10.0.81.0/24:80-10000,20000-30000,48888,58888)', | |
'ip4 or tcp6', | |
'tcp and udp', | |
'tcp4', | |
'!tcp6', | |
'!tcp4 and !udp', | |
'10.0.81.1/24:80', | |
'10.0.81.0/24:80-99999', | |
'10.0.81.0/24:80-78', | |
'10.0.81.0/24:80 and 192.168.0.0/16:80', | |
'10.0.81.0/24:80 and 192.168.0.0/16:80,90', | |
'10.0.81.0/24:80 and 192.168.0.0/16:80,90,100', | |
'10.0.81.9 and 10.0.81.10 or 10.0.81.11:80-80', | |
'10.0.81.9 and 10.0.81.10 and 10.0.81.11', | |
':80,443,10000-11000', | |
':80,8080,3306', | |
'!:80,8080,3306', | |
'!10.0.81.0/24:443', | |
'udp and :8080', | |
'udp or [240e:e1:f300:1:3::]/120:80-99,101', | |
'tcp and (10.0.81.0/24 or :8080 or 192.168.1.234)' | |
] | |
def test(): | |
for rule in test_rules: | |
print '================================================================' | |
print "rule: %s" % rule | |
try: | |
bpf = parse(rule) | |
print "parsed bpf: %s" % bpf | |
except BPFTokenError as e: | |
print "Error: %s (%s - %s: '%s')" % (e.message, e.start, e.end, rule[e.start: e.end]) | |
print '' | |
if __name__ == '__main__': | |
# main() | |
test() | |
''' | |
使用表达式字符串调用 parse 方法, 如果解析成功, 则返回解析后的 bpf 字符串, 如果解析失败, 则抛出 BPFTokenError, | |
取其 message 作为错误原因, start 与 end 属性标记了错误出现的位置 | |
错误位置处理: | |
如果 end=-1, 则仅需要展示错误信息, 不需要高亮某一部分, 否则可以高亮展示给定的区域 | |
''' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment