Created
June 14, 2021 17:45
-
-
Save cbess/c03fad3469ddae04ec430731605d1aeb to your computer and use it in GitHub Desktop.
python-digitalocean server provisioning Py3 module
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# the server provisioning module | |
# uses: python-digitalocean==1.16.0 | |
# ref: https://developers.digitalocean.com/documentation/v2/#droplets | |
import digitalocean | |
import os | |
class ProvisionServerException(Exception): | |
pass | |
class ProvisionServer: | |
'''Represents a server provisioning tool. | |
''' | |
cloud_init_path = os.getenv('CLOUD_INIT_PATH', 'cloud-init.yml') | |
firewall_id = os.getenv('DIGITALOCEAN_FIREWALL_ID') | |
def __init__(self): | |
DO_TOKEN: str = os.getenv('DIGITALOCEAN_TOKEN') | |
self.manager = digitalocean.Manager(token=DO_TOKEN) | |
def get_all_servers(self): | |
severs = self.manager.get_all_droplets() | |
return severs | |
def create_server(self, **kwargs) -> digitalocean.Droplet: | |
'''Creates a server using the specified options. | |
@param kwargs: name, region, size_slug | |
''' | |
# get contents of cloud-init config | |
user_data = '' | |
with open(self.cloud_init_path) as f: | |
user_data = f.read() | |
if not user_data: | |
raise ProvisionServerException('no user data') | |
ssh_keys = self.manager.get_all_sshkeys() | |
droplet = digitalocean.Droplet( | |
token=self.manager.token, | |
name=kwargs.get('name', 'general'), # hostname | |
region=kwargs.get('region') or 'nyc3', | |
image='ubuntu-18-04-x64', # Ubuntu 18.04 x64 (LTS) - supported until April 2023 | |
size_slug=kwargs.get('size_slug') or 's-1vcpu-2gb', # s-1vcpu-1gb, s-1vcpu-2gb*, s-2vcpu-2gb | |
ssh_keys=ssh_keys, | |
user_data=user_data, | |
backups=False, | |
monitoring=False, # False, to allow the server to provision faster | |
) | |
droplet.create() | |
droplets = [droplet.id] | |
# add custom tag to the droplet | |
if kwargs.get('tag_name'): | |
self.add_tag(droplet, kwargs['tag_name']) | |
# add category tag | |
self.add_tag(droplet, 'temp') | |
# add firewall to droplet, if firewall id is given | |
if self.firewall_id: | |
fw = digitalocean.Firewall(id=self.firewall_id, token=self.manager.token) | |
fw.add_droplets(droplets) | |
return droplet | |
def add_tag(self, droplets, tag_name: str): | |
# tags may contain letters, numbers, colons, dashes, and underscores | |
name = tag_name.replace('.', '-').replace(' ', '-') | |
tag = digitalocean.Tag(token=self.manager.token, name=name) | |
tag.create() | |
tag.add_droplets(droplets) | |
def get_server(self, sid: str) -> digitalocean.Droplet: | |
if not sid: | |
return None | |
try: | |
return self.manager.get_droplet(sid) | |
except digitalocean.baseapi.NotFoundError: | |
pass | |
return None | |
def print_droplet_status(self, droplet_id: str): | |
droplet = self.get_server(droplet_id) | |
print('ip:', droplet.ip_address) | |
actions = droplet.get_actions() | |
for action in actions: | |
action.load() | |
# 'complete' = droplet is up and running | |
print('status:', action.status) | |
def destroy_server(self, sid: str): | |
droplet = self.get_server(sid) | |
if not droplet: | |
return | |
droplet.destroy() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment