Last active
April 17, 2024 20:22
-
-
Save Mahyar24/ff11ed7973bbe3a37b1caedef40ff850 to your computer and use it in GitHub Desktop.
Checking Abnormal V2Ray Users (Polars)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3.10 | |
""" | |
This module is used for checking excessive use by V2Ray users. | |
Install Polars beforehand via: `pip install polars` | |
Sample Command: | |
$ journalctl -u v2ray --since "72 h ago" | grep email | cut -d " " -f 6,7,8,13 | python abnormal.py - | |
GitHub: https://github.com/Mahyar24/V2Conf | |
[email protected], Mon 28 Nov 2022 | |
""" | |
import io | |
import ipaddress | |
import sys | |
import warnings | |
from functools import cache | |
import polars as pl | |
warnings.filterwarnings("ignore") | |
@cache | |
def ip_to_network(ip: str, subnet: int = 16) -> str: | |
if ipaddress.ip_address(ip).version == 6: | |
subnet = 48 | |
return str(ipaddress.ip_network(f"{ip}/{subnet}", strict=False).network_address) | |
def make_df(input_data, subnet: int = 16) -> pl.DataFrame: | |
return ( | |
pl.read_csv( | |
input_data, | |
separator=" ", | |
has_header=False, | |
new_columns=["date", "time", "ip", "username"], | |
) | |
.lazy() | |
.select( | |
pl.concat_str([pl.col("date"), pl.lit(" "), pl.col("time")]) | |
.alias("dt") | |
.str.strptime(pl.Datetime, "%Y/%m/%d %H:%M:%S"), | |
pl.col("username") | |
.str.split("@") | |
.list.first() | |
.cast(pl.Categorical) | |
.alias("user"), | |
pl.col("ip") | |
.str.extract(r"(.*):[^:]*$") | |
.str.strip_chars_start("tcp:") | |
.str.strip_chars_start("udp:") | |
.str.strip_chars("[]") | |
.map_elements( | |
lambda x: ip_to_network(x, subnet=subnet), | |
return_dtype=pl.String, | |
), | |
) | |
.set_sorted(pl.col("dt")) | |
) | |
def analyze_df(df: pl.DataFrame, conn: int = 2) -> pl.DataFrame: | |
conn *= 60 | |
return ( | |
( | |
df.group_by_dynamic( | |
"dt", every="30s", group_by=pl.col("user"), label="left" | |
) | |
.agg(pl.col("ip").n_unique()) | |
.group_by_dynamic("dt", every="30m", group_by=pl.col("user"), label="left") | |
.agg(pl.col("ip").sum()) | |
.filter(pl.col("ip") > conn) | |
.group_by(pl.col("user")) | |
.agg( | |
[ | |
pl.col("ip").count().alias("No"), | |
pl.col("ip").mean().sub(conn).truediv(conn).alias("Excessing"), | |
] | |
) | |
.sort(pl.col("No"), pl.col("Excessing"), descending=True) | |
) | |
.collect() | |
) | |
def main() -> None: | |
print("Notable Users:\n") | |
print(result := analyze_df(make_df(io.StringIO(sys.stdin.read()), subnet=16))) | |
print("---------------------------") | |
result = result.filter((pl.col("No") >= 5) & (pl.col("Excessing") >= 0.2)) | |
if result.is_empty(): | |
print("Abnormal Users: None!") | |
else: | |
print(f"Abnormal Users:\n\n{result}") | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment