Skip to content

Instantly share code, notes, and snippets.

@tamasgal
Last active February 10, 2020 19:38
Show Gist options
  • Save tamasgal/222e88a006c8a889c07c37bc7a3c7909 to your computer and use it in GitHub Desktop.
Save tamasgal/222e88a006c8a889c07c37bc7a3c7909 to your computer and use it in GitHub Desktop.
ThePipe example for gcn
#!/usr/bin/env python
from queue import Queue, Empty
import gcn
from grbfunk import handler
import coloredlogs, logging
import telegram
import os
import yaml
import thepipe
class BlobPrinter(thepipe.Module):
"""Prints the blob"""
def process(self, blob):
print(blob)
return blob
class Bot(thepipe.Module):
def configure(self):
path = self.get("config_path",
default=os.path.join(os.path.expanduser("~"),
".grbfunk",
"access.yaml"))
with open(path) as f:
access = yaml.load(f, Loader=yaml.SafeLoader)
token = access["token"]
self.chat_id = access["chat_id"]
self._bot = telegram.Bot(token=token)
self.expose(self.send_message, "send_message")
self.log.info("Bot ready!")
def send_message(self, text):
self._bot.send_message(chat_id=self.chat_id, text=text)
class GCNListener(thepipe.Module):
def configure(self):
self.timeout = self.get("timeout", default=60*60*24) # [s]
self.log.info("SPIN UP")
self.queue = Queue()
self.thread = threading.Thread(target=self._run, args=())
self.thread.daemon = True
def prepare(self):
"""Called once after configure and right before the first process-call"""
self.services['send_message']('GRB Funk has started blasting events to your earholes')
self.thread.start()
def process(self, blob):
"""Called in each iteration"""
try:
log.debug("Waiting for queue items.")
blob = self.queue.get(timeout=self.timeout)
self.cprint("Got a new GCN event!")
except Empty:
raise StopIteration("Timeout reached.")
else:
return blob
def _run(self):
gcn.listen(handler=self._handler)
def _handler(self, payload, root):
"""Called from gcn.listen and produces a blob which holds the data"""
# process payload, extract data
# put it into the `blob`
blob = thepipe.Blob()
blob['data'] = 123
self.queue.put(blob)
if __name__ == "__main__":
pipe = thepipe.Pipeline()
pipe.attach(Bot)
pipe.attach(GCNListener)
pipe.attach(BlobPrinter)
pipe.drain()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment