Last active
February 10, 2020 19:38
-
-
Save tamasgal/222e88a006c8a889c07c37bc7a3c7909 to your computer and use it in GitHub Desktop.
ThePipe example for gcn
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from queue import Queue, Empty | |
import gcn | |
from grbfunk import handler | |
import coloredlogs, logging | |
import telegram | |
import os | |
import yaml | |
import thepipe | |
class BlobPrinter(thepipe.Module): | |
"""Prints the blob""" | |
def process(self, blob): | |
print(blob) | |
return blob | |
class Bot(thepipe.Module): | |
def configure(self): | |
path = self.get("config_path", | |
default=os.path.join(os.path.expanduser("~"), | |
".grbfunk", | |
"access.yaml")) | |
with open(path) as f: | |
access = yaml.load(f, Loader=yaml.SafeLoader) | |
token = access["token"] | |
self.chat_id = access["chat_id"] | |
self._bot = telegram.Bot(token=token) | |
self.expose(self.send_message, "send_message") | |
self.log.info("Bot ready!") | |
def send_message(self, text): | |
self._bot.send_message(chat_id=self.chat_id, text=text) | |
class GCNListener(thepipe.Module): | |
def configure(self): | |
self.timeout = self.get("timeout", default=60*60*24) # [s] | |
self.log.info("SPIN UP") | |
self.queue = Queue() | |
self.thread = threading.Thread(target=self._run, args=()) | |
self.thread.daemon = True | |
def prepare(self): | |
"""Called once after configure and right before the first process-call""" | |
self.services['send_message']('GRB Funk has started blasting events to your earholes') | |
self.thread.start() | |
def process(self, blob): | |
"""Called in each iteration""" | |
try: | |
log.debug("Waiting for queue items.") | |
blob = self.queue.get(timeout=self.timeout) | |
self.cprint("Got a new GCN event!") | |
except Empty: | |
raise StopIteration("Timeout reached.") | |
else: | |
return blob | |
def _run(self): | |
gcn.listen(handler=self._handler) | |
def _handler(self, payload, root): | |
"""Called from gcn.listen and produces a blob which holds the data""" | |
# process payload, extract data | |
# put it into the `blob` | |
blob = thepipe.Blob() | |
blob['data'] = 123 | |
self.queue.put(blob) | |
if __name__ == "__main__": | |
pipe = thepipe.Pipeline() | |
pipe.attach(Bot) | |
pipe.attach(GCNListener) | |
pipe.attach(BlobPrinter) | |
pipe.drain() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment