Last active
July 12, 2017 09:30
-
-
Save igreenfield/d5c3162c150cff924b4a1d10e4a3c371 to your computer and use it in GitHub Desktop.
Script to rebalance master node for rabbitmq cluster
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import httplib | |
import json | |
from string import Template | |
import time | |
request_template = '{"vhost": "/","name": "$name","pattern": "$pattern","apply-to": "all","definition": {"ha-mode": "nodes","ha-params": ["$node_name"],"ha-sync-mode": "automatic", "queue-master-locator": "min-masters"},"priority": 10}' | |
post_headers = {"Content-type": "application/json", | |
"Authorization": "Basic YWRtaW46YWRtaW4="} | |
get_headers = {"Authorization": "Basic YWRtaW46YWRtaW4="} | |
rabbitmq_ip = "10.10.41.56" | |
conn = httplib.HTTPConnection(rabbitmq_ip, 15672) | |
conn.request("GET", "/api/nodes", '', get_headers) | |
res = conn.getresponse() | |
nodes = None | |
queues = None | |
if res.status is 200: | |
data = res.read() | |
response_as_json = json.loads(data) | |
nodes = [item['name'] for item in response_as_json] | |
print 'found nodes: ' + str(nodes) | |
conn.close() | |
if nodes is not None: | |
conn = httplib.HTTPConnection(rabbitmq_ip, 15672) | |
conn.request("GET", "/api/queues", '', get_headers) | |
res = conn.getresponse() | |
if res.status is 200: | |
data = res.read() | |
response_as_json = json.loads(data) | |
queues = [item['name'] for item in response_as_json] | |
print 'found queues: ' + str(queues) | |
conn.close() | |
uris = [] | |
queueSpan = [] | |
if queues is not None: | |
index = 0 | |
for queue in queues: | |
conn = httplib.HTTPConnection(rabbitmq_ip, 15672) | |
node_name = nodes[index] | |
request_body = Template(request_template).substitute(name=queue, pattern=queue, node_name=node_name) | |
uri = "/api/policies/%2F/" + queue | |
queueSpan.append(queue + " " + node_name) | |
uris.append(uri) | |
conn.request("PUT", uri, request_body, post_headers) | |
res = conn.getresponse() | |
conn.close() | |
index += 1 | |
if index is len(nodes): | |
index = 0 | |
print "Waiting for new span to take affect..." | |
queueSpanSet = set(queueSpan) | |
count = 0 | |
while True and count < 60: | |
conn = httplib.HTTPConnection(rabbitmq_ip, 15672) | |
conn.request("GET", "/api/queues", '', get_headers) | |
res = conn.getresponse() | |
if res.status is 200: | |
data = res.read() | |
response_as_json = json.loads(data) | |
currentQueuesSpan = set([item['name'] + " " + item['node'] for item in response_as_json]) | |
if currentQueuesSpan == queueSpanSet: | |
break | |
else: | |
time.sleep(5) | |
count = count + 1 | |
print "Going to delete policies..." | |
for uri in uris: | |
conn = httplib.HTTPConnection(rabbitmq_ip, 15672) | |
print "DELETE " + uri | |
conn.request("DELETE", uri, '', get_headers) | |
res = conn.getresponse() | |
conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment