Created
April 26, 2016 10:31
-
-
Save SAPikachu/c515af0596bcaa6ba60c4dc81ea2848a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import time | |
import math | |
from queue import Queue, Empty | |
from threading import Thread | |
import logging | |
from influxdb import InfluxDBClient | |
from influxdb.exceptions import InfluxDBClientError | |
from rg_etcd import RgRedis, RgEtcd | |
NS = 1000000000 | |
class TimestampSnapper(object): | |
def __init__(self): | |
self.epoch = None | |
def snap(self, ts): | |
if not self.epoch: | |
self.epoch = ts | |
diff = 0 | |
else: | |
diff = int(round(ts - self.epoch)) | |
return (int(self.epoch) + diff) * NS | |
class VpnElectorInfluxDbWriter(object): | |
def __init__(self): | |
self._config = { | |
"host": "", | |
"port": 8086, | |
"username": "", | |
"password": "", | |
"database": "vpnelector", | |
"prefix": "", | |
"batchtime": 1, | |
"timeout": 5, | |
"retryinterval": 5, | |
} | |
self._queue = Queue() | |
self._snappers = {} | |
self._thread = None | |
self._close_signal = object() | |
self._running = False | |
self._client = None | |
self._write_token = None | |
self._create_loggers() | |
def _upload_thread(self): | |
while True: | |
points = [] | |
deadline = None | |
while True: | |
timeout = None | |
if points and not deadline: | |
deadline = time.clock() + self._config["batchtime"] | |
if deadline: | |
timeout = deadline - time.clock() | |
if timeout <= 0: | |
break | |
try: | |
next_object = self._queue.get(timeout=timeout) | |
except Empty: | |
break | |
if next_object is self._close_signal: | |
if points: | |
# Upload queued points before exiting | |
self._queue.put(next_object) | |
break | |
return | |
points.extend(self._build_points(next_object)) | |
assert points | |
while True: | |
retry = False | |
msg = None | |
try: | |
self._client.write_points(points) | |
except InfluxDBClientError as e: | |
retry = 500 <= int(e.code) <= 599 | |
msg = ( | |
"InfluxDBClientError: %s, %s" % | |
(e.code, e.content,) | |
) | |
except Exception as e: | |
retry = True | |
msg = ( | |
"Error while uploading points: %s " % | |
(e,) | |
) | |
if msg: | |
if retry and self._running: | |
msg += " (will retry after %s seconds)" % ( | |
self._config["retryinterval"], | |
) | |
self.error(msg) | |
if not self._running: | |
return | |
if retry: | |
time.sleep(self._config["retryinterval"]) | |
continue | |
break | |
def _create_loggers(self): | |
log = logging.getLogger("vpnmetrics") | |
for x in ("error", "warning", "info", "debug"): | |
setattr(self, x, getattr(log, x)) | |
def _handle_config(self, key, value): | |
sanitized_key = key.lower() | |
if sanitized_key not in self._config: | |
self.warning("Unrecognized config entry: %s", key) | |
return | |
value_class = self._config[sanitized_key].__class__ | |
try: | |
value = value_class(value) | |
except (TypeError, ValueError): | |
self.error("Invalid config item: %s = %s", key, value) | |
self._config[sanitized_key] = value | |
self.debug("Config: %s = %s", sanitized_key, value) | |
def _validate_config(self): | |
if not self._config["host"]: | |
self.error("Config error: Host is not set") | |
return False | |
if not self._config["database"]: | |
self.error("Config error: Database is empty") | |
return False | |
if self._config["port"] < 1 or self._config["port"] > 65535: | |
self.error("Config error: Port is out of range") | |
return False | |
return True | |
def _snap_timestamp(self, ts): | |
key = "default" | |
if key not in self._snappers: | |
self._snappers[key] = TimestampSnapper() | |
return self._snappers[key].snap(ts) | |
def _build_points(self, group): | |
points = [] | |
for name, stats in group["vpns"].items(): | |
name_parts = name.split("-", 1) | |
if len(name_parts) == 1: | |
name_parts.append("unknown") | |
route, transport = name_parts | |
tags = { | |
"name": name, | |
"route": route, | |
"transport": transport, | |
} | |
def _add_point(name, value): | |
if ( | |
value is None or | |
math.isnan(value) or | |
math.isinf(value) | |
): | |
return | |
points.append(self._build_one_point( | |
name, group["time_unix"], value, tags, | |
)) | |
_add_point("tx_speed", stats.get("tx_speed")) | |
_add_point("rx_speed", stats.get("rx_speed")) | |
_add_point("score", stats.get("score")) | |
_add_point("ping", stats.get("ping_latest")) | |
_add_point("packet_loss.10s", stats["packet_loss"]["10s"]) | |
_add_point("packet_loss.1min", stats["packet_loss"]["1min"]) | |
return points | |
def _build_one_point(self, measurement, time, value, tags): | |
point = { | |
"measurement": measurement, | |
"time": self._snap_timestamp(time), | |
"fields": { | |
"value": value, | |
"original_timestamp": time, | |
}, | |
"tags": tags, | |
} | |
return point | |
def config_from_env(self): | |
CONFIG_PREFIX = "VM_INFLUXDB_" | |
for k, v in os.environ.items(): | |
if not k.startswith(CONFIG_PREFIX): | |
continue | |
self._handle_config(k[len(CONFIG_PREFIX):], v.strip()) | |
return self._validate_config() | |
def run(self): | |
if not self.config_from_env(): | |
return | |
self._client = InfluxDBClient(**{ | |
k: v for k, v in self._config.items() | |
if k in ( | |
"host", "port", "username", "password", "database", | |
"timeout", | |
) | |
}) | |
assert not self._thread | |
self._running = True | |
self._queue = Queue() | |
self._snappers = {} | |
self._thread = Thread(target=self._upload_thread) | |
self._thread.start() | |
def shutdown(self): | |
if self._thread: | |
self._running = False | |
self._queue.put(self._close_signal) | |
self._thread.join() | |
self._client = None | |
def write(self, group): | |
assert self._running | |
self._queue.put(group) | |
def setup_signals(): | |
""" Handle SIGTERM like SIGINT """ | |
import signal | |
def sighandler(*args, **kwargs): | |
raise KeyboardInterrupt | |
signal.signal(signal.SIGTERM, sighandler) | |
def main(): | |
logging.basicConfig(level=os.getenv("VM_LOG_LEVEL", "INFO")) | |
setup_signals() | |
redis = RgRedis(RgEtcd()) | |
info_node = redis.root_node.metrics.vpnelector | |
writer = VpnElectorInfluxDbWriter() | |
writer.run() | |
try: | |
for group in info_node.subscribe(): | |
writer.write(group) | |
except KeyboardInterrupt: | |
logging.info("Exiting normally") | |
finally: | |
writer.shutdown() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment