Last active
December 1, 2020 18:33
-
-
Save CrashLaker/0dc1b96ec684f830f888af4058e7d6fa to your computer and use it in GitHub Desktop.
Grafana alert
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import falcon | |
import json | |
class CORSComponent(object): | |
def process_response(self, req, resp, resource, req_succeeded): | |
resp.set_header('Access-Control-Allow-Origin', '*') | |
class AlertHistory(object): | |
alerts = [] | |
def aq(self): | |
pass | |
def rel(self): | |
pass | |
def push(self, data): | |
self.alerts.append(data) | |
def list(self): | |
ret = self.alerts[:] | |
self.alerts = [] | |
return ret | |
class ReceiveAlert(object): | |
def __init__(self, hist): | |
self.hist = hist | |
print("receive init") | |
def on_get(self, req, resp): | |
resp.status = falcon.HTTP_200 | |
resp.body = "receive" | |
def on_post(self, req, resp): | |
resp.status = falcon.HTTP_200 | |
resp.body = "receive" | |
data = req.media | |
self.hist.push(req.media) | |
class SendAlert(object): | |
def __init__(self, hist): | |
self.hist = hist | |
print("sender init") | |
def on_get(self, req, resp): | |
resp.status = falcon.HTTP_200 | |
resp.body = json.dumps(self.hist.list()) | |
api = falcon.API(middleware=[ | |
CORSComponent() | |
]) | |
# Enable a simple CORS policy for all responses | |
alert_history = AlertHistory() | |
receiver = ReceiveAlert(alert_history) | |
api.add_route('/recv', receiver) | |
sender = SendAlert(alert_history) | |
api.add_route('/send', sender) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment