Created
March 18, 2016 22:57
-
-
Save mdcollins05/8c639696a9608a76b755 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import requests | |
import sys | |
import json | |
import pprint | |
from time import sleep | |
#This is the user ID going on vacation | |
vacationing_user = 'PR2D0IM' | |
#This is the user ID taking over the shifts | |
replacement_user = 'PX2FDC3' | |
#Specify the schedule or schedules to replace the vacationing user | |
#leave blank to operate on all schedules | |
#specify a single schedule like: ["SCHEDULEID"] | |
#specify multiple schedules like: ["SCHEDULEID1", "SCHEDULEID2"] | |
schedules = [] | |
subdomain = 'PD_SUBDOMAIN' | |
auth_token = 'API_KEY_HERE' | |
since = '2016-04-15' | |
until = '2016-04-25' | |
schedule_id_name_cache = {} | |
def get_schedule_name(subdomain, auth_token, schedule): | |
if schedule in schedule_id_name_cache: | |
return schedule_id_name_cache[schedule] | |
else: | |
success, schedule_data = do_get_request('https://{0}.pagerduty.com/api/v1/schedules/{1}'.format(subdomain, schedule), make_headers(auth_token), None) | |
if not success: | |
print("{0}: {1}".format(schedule_data.status_code, schedule_data.text)) | |
return False | |
schedule_id_name_cache[schedule] = schedule_data['schedule']['name'] | |
return schedule_data['schedule']['name'] | |
def get_schedules(subdomain, auth_token, offset, limit): | |
params = { | |
'offset': offset, | |
'limit': limit, | |
} | |
success, schedules = do_get_request('https://{0}.pagerduty.com/api/v1/schedules'.format(subdomain), make_headers(auth_token), params) | |
if not success: | |
print("{0}: {1}".format(schedules.status_code, schedules.text)) | |
return False | |
return schedules['schedules'] | |
def get_schedules_details(subdomain, auth_token): | |
success, schedules = do_get_request('https://{0}.pagerduty.com/api/v1/schedules'.format(subdomain), make_headers(auth_token), None) | |
if not success: | |
print("{0}: {1}".format(schedules.status_code, schedules.text)) | |
return False | |
return {'total': schedules['total'], 'limit': schedules['limit']} | |
def get_schedule_entries(subdomain, auth_token, schedule, user, since, until): | |
params = { | |
'since': since, | |
'until': until, | |
'user_id': user | |
} | |
success, entries = do_get_request('https://{0}.pagerduty.com/api/v1/schedules/{1}/entries'.format(subdomain, schedule), make_headers(auth_token), params) | |
if not success: | |
print("{0}: {1}".format(entries.status_code, entries.text)) | |
return False | |
return entries['entries'] | |
def create_schedule_override(subdomain, auth_token, schedule, user, start, end): | |
params = { | |
'override': { | |
'user_id': user, | |
'start': start, | |
'end': end | |
} | |
} | |
success, result = do_post_request('https://{0}.pagerduty.com/api/v1/schedules/{1}/overrides'.format(subdomain, schedule), make_headers(auth_token), params) | |
if not success: | |
print("{0}: {1}".format(result.status_code, result.text)) | |
return False | |
else: | |
return True | |
def return_entries(subdomain, auth_token, schedule_name, schedule_id, vacationing_user, since, until): | |
if schedule_name is None or schedule_name == '': | |
schedule_name = get_schedule_name(subdomain, auth_token, schedule_id) | |
if schedule_name is False: | |
print("Couldn't retrieve schedule name. Validate the schedule ID is correct: {0}".format(schedule_id)) | |
return | |
print("Fetching entries for schedule {0}".format(schedule_name)) | |
return get_schedule_entries(subdomain, auth_token, schedule_id, vacationing_user, since, until) | |
def schedule_override_for_vacationing_user(subdomain, auth_token, schedule_name, schedule_id, vacationing_user, replacement_user, entry): | |
if schedule_name is None or schedule_name == '': | |
schedule_name = get_schedule_name(subdomain, auth_token, schedule_id) | |
if schedule_name is False: | |
print("Couldn't retrieve schedule name. Validate the schedule ID is correct: {0}".format(schedule_id)) | |
return | |
if entry['user']['id'] == vacationing_user: | |
print("Creating override for {0} on schedule {1}, shift: {2} to {3}".format(entry['user']['name'], schedule_name, entry['start'], entry['end'])) | |
create_schedule_override(subdomain, auth_token, schedule_id, replacement_user, entry['start'], entry['end']) | |
sleep(.1) | |
def make_headers(auth_token): | |
return { | |
'Authorization': 'Token token={0}'.format(auth_token), | |
'Content-type': 'application/json' | |
} | |
def do_get_request(url, headers, params): | |
sleep_timer = 1.5 | |
while True: | |
if not params: | |
get = requests.get( | |
url, | |
headers=headers | |
) | |
else: | |
get = requests.get( | |
url, | |
headers=headers, | |
data=json.dumps(params) | |
) | |
if (get.status_code == 403): | |
sleep_timer = sleep_timer * 2 | |
print "Recieved a 403; retrying in {0} seconds...".format(sleep_timer) | |
sleep(sleep_timer) | |
continue | |
elif (get.status_code == 200): | |
return True, get.json() | |
else: | |
return False, get | |
def do_post_request(url, headers, params): | |
sleep_timer = 1.5 | |
while True: | |
post = requests.post( | |
url, | |
headers=headers, | |
data=json.dumps(params) | |
) | |
if (post.status_code == 403): | |
sleep_timer = sleep_timer * 2 | |
print "Recieved a 403; retrying in {0} seconds...".format(sleep_timer) | |
sleep(sleep_timer) | |
continue | |
elif (post.status_code == 201): | |
return True, post | |
else: | |
return False, post | |
def main(schedules): | |
if len(schedules) == 0: | |
schedule_details = get_schedules_details(subdomain, auth_token) | |
print("Total of {0} schedules".format(schedule_details['total'])) | |
for offset in xrange(0, schedule_details['total']): | |
if offset % schedule_details['limit'] == 0: # For each x, request the next x | |
print("Fetching up to the next {0} schedules".format(schedule_details['limit'])) | |
schedules = get_schedules(subdomain, auth_token, offset, schedule_details['total']) | |
for schedule in schedules: # For each schedule | |
for entry in return_entries(subdomain, auth_token, schedule['name'], schedule['id'], vacationing_user, since, until): | |
schedule_override_for_vacationing_user(subdomain, auth_token, schedule['name'], schedule['id'], vacationing_user, replacement_user, entry) | |
else: | |
for schedule in schedules: # For each schedule | |
for entry in return_entries(subdomain, auth_token, None, schedule, vacationing_user, since, until): | |
schedule_override_for_vacationing_user(subdomain, auth_token, None, schedule, vacationing_user, replacement_user, entry) | |
if __name__=='__main__': | |
sys.exit(main(schedules)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment