Last active
July 19, 2017 01:46
-
-
Save michael-erasmus/618ab13a3493d4471b00c63337a7e2f6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import grpc | |
import json | |
from rsdf import redshift | |
from buda.entities.funnel_pb2 import Funnel | |
from buda.entities.funnel_event_pb2 import FunnelEvent | |
from buda.entities.link_pb2 import Link | |
from buda.entities.uuid_pb2 import Uuid | |
import buda.services.events_collector_pb2_grpc as collector_grpc | |
def cursor_iter(cursor, chunk_size=500): | |
while True: | |
results = cursor.fetchmany(chunk_size) | |
if not results: | |
break | |
for result in results: | |
yield result | |
def make_funnel(record): | |
event = Funnel( | |
id=Uuid(id=record['id']), | |
user_id=Uuid(id=record['user_id']), | |
name=record['name'] or '' | |
) | |
event.created_at.FromDatetime(record['created_at']) | |
if record['tags']: | |
tags = json.loads(record['tags']) | |
for k,v in tags.items(): | |
event.tags[k] = v or '' | |
return event | |
def make_funnel_event(record): | |
event = FunnelEvent( | |
id=Uuid(id=record['id']), | |
funnel_id=Uuid(id=record['funnel_id']), | |
funnel_step_id=Uuid(id=record['funnel_step_id']), | |
user_id=Uuid(id=record['user_id']), | |
funnel_end=(record['funnel_end']) | |
) | |
event.created_at.FromDatetime(record['created_at']) | |
if record['tags']: | |
tags = json.loads(record['tags']) | |
for k,v in tags.items(): | |
event.tags[k] = v or '' | |
return event | |
ip = os.environ.get('EVENTS_COLLECTOR_HOSTNAME', 'events-collector') | |
channel = grpc.insecure_channel(ip + ':50051') | |
stub = collector_grpc.EventsCollectorStub(channel) | |
#funnels | |
query = """ | |
select | |
json_extract_path_text(value, 'id') as id | |
, date as created_at | |
, user_visitor_id as user_id | |
, json_extract_path_text(value, 'name') as name | |
, json_extract_path_text(value, 'tags') as tags | |
from event_data | |
where type = 'funnel_created' | |
order by date desc | |
limit 2000 | |
""" | |
engine = redshift.get_engine() | |
connection = engine.connect() | |
cursor = connection.execute(query) | |
for record in cursor_iter(cursor): | |
event = make_funnel(record) | |
stub.CollectFunnel(event) | |
#funnel events | |
query = """ | |
select | |
date as created_at | |
, user_visitor_id as user_id | |
, md5(json_extract_path_text(value, 'funnel_id') || date_text) id | |
, json_extract_path_text(value, 'funnel_id') as funnel_id | |
, json_extract_path_text(value, 'funnel_step_id') as funnel_step_id | |
, ( case nullif(json_extract_path_text(value, 'funnel_end'), '') | |
when 'true' then 1 | |
else 0 | |
end | |
)::bool funnel_end | |
, nullif(json_extract_path_text(value, 'stripe_event_id'),'') as subscription_event_id | |
, json_extract_path_text(value, 'tags') as tags | |
from event_data | |
where type like 'funnel_event' | |
-- check for malformed data | |
and nullif(json_extract_path_text(value, 'state'), '') in ('0', '1', '2', '3', '4', '5') | |
and len(json_extract_path_text(value, 'funnel_id')) < 41 | |
and len(json_extract_path_text(value, 'funnel_step_id')) < 41 | |
order by date desc | |
limit 2000 | |
""" | |
engine = redshift.get_engine() | |
connection = engine.connect() | |
cursor = connection.execute(query) | |
for record in cursor_iter(cursor): | |
event = make_funnel_event(record) | |
stub.CollectFunnelEvent(event) | |
#funnels | |
query = """ | |
select | |
json_extract_path_text(value, 'id') as id | |
, date as created_at | |
, user_visitor_id as user_id | |
, json_extract_path_text(value, 'name') as name | |
, json_extract_path_text(value, 'tags') as tags | |
from event_data | |
where type = 'funnel_created' | |
order by date desc | |
limit 2000 | |
""" | |
engine = redshift.get_engine() | |
connection = engine.connect() | |
cursor = connection.execute(query) | |
for record in cursor_iter(cursor): | |
event = make_funnel(record) | |
stub.CollectFunnel(event) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM python:3.6 | |
ENV GRPC_PYTHON_VERSION 1.4.0 | |
RUN python -m pip install --upgrade pip | |
RUN pip install grpcio==${GRPC_PYTHON_VERSION} grpcio-tools==${GRPC_PYTHON_VERSION} | |
COPY requirements.txt /tmp/requirements.txt | |
RUN pip install --no-cache-dir -r /tmp/requirements.txt | |
WORKDIR /usr/src/app | |
COPY client.py /usr/src/app/client.py | |
CMD ["python", "client.py"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
IMAGE_NAME:=bufferapp/events-collector:backfill-funnels | |
.PHONY: all | |
all: run | |
.PHONY: run | |
run: | |
docker run -e EVENTS_COLLECTOR_HOSTNAME=localhost --env-file .env --net=host $(IMAGE_NAME) | |
.PHONY: build | |
build: | |
docker build . -t $(IMAGE_NAME) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
https://github.com/bufferapp/buda-protobufs/releases/download/0.1.1/buda-python-0.1.1.tar.gz | |
git+https://github.com/bufferapp/rsdf |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment