Created
May 29, 2017 12:43
-
-
Save igreenfield/3beb40316681d675a80fe38e22548522 to your computer and use it in GitHub Desktop.
create shoval configuration for 2 cluster
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import httplib | |
import json | |
from string import Template | |
import urllib | |
import time | |
put_headers = {"Content-type": "application/json", | |
"Authorization": "Basic YWRtaW46YWRtaW4="} | |
get_headers = {"Authorization": "Basic YWRtaW46YWRtaW4="} | |
rabbitmq_src_ip = "10.10.10.10" | |
rabbitmq_dst_ip = "10.10.10.10" | |
print "Start moving all exchanges..." | |
src_conn = httplib.HTTPConnection(rabbitmq_src_ip, 15672) | |
src_conn.request("GET", "/api/exchanges", '', get_headers) | |
all_exchanges_src = src_conn.getresponse() | |
all_exchanges_names = [] | |
all_queues_names = [] | |
if all_exchanges_src.status == 200: | |
data = all_exchanges_src.read() | |
response_as_json = json.loads(data) | |
for exchange in response_as_json: | |
dst_conn = httplib.HTTPConnection(rabbitmq_dst_ip, 15672) | |
if exchange['internal'] is False: | |
exchange_name = exchange['name'] | |
if exchange_name == "" or exchange_name.startswith("amq"): | |
continue | |
all_exchanges_names.append(exchange_name) | |
print "check exchange: " + exchange_name + " exists!" | |
dst_conn.request("GET", "/api/exchanges/%2F/" + exchange_name, '', get_headers) | |
if dst_conn.getresponse().status == 404: | |
print exchange_name + " does not " | |
dst_conn = httplib.HTTPConnection(rabbitmq_dst_ip, 15672) | |
if 'message_stats' in exchange: | |
del exchange['message_stats'] | |
replace = str(exchange).replace("'", '"').replace("True","true").replace("False", "false") | |
print 'create exchange: ' + replace | |
dst_conn.request("PUT", "/api/exchanges/%2F/" + exchange_name, replace, put_headers) | |
request = dst_conn.getresponse() | |
if request.status != 204: | |
print "failed to create exchange: " + exchange_name | |
src_conn = httplib.HTTPConnection(rabbitmq_src_ip, 15672) | |
print "Start moving all queues..." | |
src_conn.request("GET", "/api/queues", '', get_headers) | |
all_queues_src = src_conn.getresponse() | |
if all_queues_src.status == 200: | |
data = all_queues_src.read() | |
response_as_json = json.loads(data) | |
for queue in response_as_json: | |
queue_name = queue['name'] | |
if queue_name.startswith("amq") or queue_name is "": | |
continue | |
all_queues_names.append(queue_name) | |
dst_conn = httplib.HTTPConnection(rabbitmq_dst_ip, 15672) | |
print "checking queue: " + queue_name | |
# dst_conn.request("DELETE", "/api/queues/%2F/" + queue_name, '', get_headers) | |
dst_conn.request("GET", "/api/queues/%2F/" + queue_name, '', get_headers) | |
queue_details = dst_conn.getresponse() | |
dst_conn.close() | |
if queue_details.status == 404: | |
body = dict() | |
body['name'] = queue['name'] | |
body['auto_delete'] = queue['auto_delete'] | |
body['durable'] = queue['durable'] | |
body['arguments'] = queue['arguments'] | |
body['policy'] = queue['policy'] | |
body['vhost'] = queue['vhost'] | |
dst_conn = httplib.HTTPConnection(rabbitmq_dst_ip, 15672) | |
replace = str(body).replace("'", '"').replace("True","true").replace("False", "false") | |
dst_conn.request("PUT", "/api/queues/%2F/" + queue_name, replace, put_headers) | |
response = dst_conn.getresponse() | |
dst_conn.close() | |
if response.status != 204: | |
print "request body: " + replace | |
print "response body: " + response.read() | |
exchange_request_template = '''{ | |
"value":{ | |
"src-uri": "amqp://guest:guest@$source", | |
"src-exchange": "$exchange_src", | |
"dest-uri": "amqp://guest:guest@$destination", | |
"dest-exchange": "$exchange_dst" | |
} | |
}''' | |
queue_request_template = '''{ | |
"value":{ | |
"src-uri": "amqp://guest:guest@$source", | |
"src-queue": "$queue_src", | |
"dest-uri": "amqp://guest:guest@$destination", | |
"dest-queue": "$queue_dst" | |
} | |
}''' | |
for exchange_name in all_exchanges_names: | |
if exchange_name is "" or exchange_name.startswith('amq'): | |
continue | |
print "create shoval for exchange: " + exchange_name | |
request_body = Template(exchange_request_template).substitute(source=rabbitmq_src_ip, destination=rabbitmq_dst_ip, exchange_src=exchange_name, exchange_dst=exchange_name) | |
# print request_body | |
dst_conn = httplib.HTTPConnection(rabbitmq_dst_ip, 15672) | |
dst_conn.request("DELETE", "/api/parameters/shovel/%2f/" + exchange_name, '', get_headers) | |
time.sleep(1) | |
dst_conn = httplib.HTTPConnection(rabbitmq_dst_ip, 15672) | |
dst_conn.request("PUT", "/api/parameters/shovel/%2f/" + exchange_name, request_body, put_headers) | |
response = dst_conn.getresponse() | |
dst_conn.close() | |
if response.status != 204: | |
print "request body: " + request_body | |
print "response body: " + str(response.read()) | |
for queue_name in all_queues_names: | |
if queue_name is "" or queue_name.startswith('amq'): | |
continue | |
print "create shoval for queue: " + queue_name | |
request_body = Template(queue_request_template).substitute(source=rabbitmq_src_ip, destination=rabbitmq_dst_ip, queue_src=queue_name, queue_dst=queue_name) | |
# print request_body | |
dst_conn = httplib.HTTPConnection(rabbitmq_dst_ip, 15672) | |
dst_conn.request("DELETE", "/api/parameters/shovel/%2f/" + queue_name, '', get_headers) | |
time.sleep(1) | |
dst_conn = httplib.HTTPConnection(rabbitmq_dst_ip, 15672) | |
dst_conn.request("PUT", "/api/parameters/shovel/%2f/" + queue_name, request_body, put_headers) | |
response = dst_conn.getresponse() | |
dst_conn.close() | |
if response.status != 204: | |
print "request body: " + request_body | |
print "response body: " + str(response.read()) | |
time.sleep(15) | |
print "disconnect consumers..." | |
src_conn = httplib.HTTPConnection(rabbitmq_src_ip, 15672) | |
src_conn.request("GET", "/api/consumers/%2f", '', get_headers) | |
consumers_response = src_conn.getresponse() | |
src_conn.close() | |
if consumers_response.status == 200: | |
data = consumers_response.read() | |
response_as_json = json.loads(data) | |
for consumer in response_as_json: | |
# /api/connections/name | |
connection_name = consumer['channel_details']['connection_name'] | |
print connection_name | |
src_conn = httplib.HTTPConnection(rabbitmq_src_ip, 15672) | |
url = "/api/connections/" + urllib.quote_plus(connection_name) | |
src_conn.request("GET", url, '', get_headers) | |
# src_conn.request("DELETE", url, '', get_headers) | |
consumer_response = src_conn.getresponse() | |
src_conn.close() | |
con_data = consumer_response.read() | |
con_json = json.loads(con_data) | |
print con_json |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment