Last active
November 22, 2017 19:32
-
-
Save aneeshkp/a852bfa5a0465804acab4dc16d5378fb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Create SQLite database and table before exuting the script | |
sqlite3 gnocchi_test.db | |
create table results(id INTEGER primary key AUTOINCREMENT, batch_id int ,host varchar(250), start_time TEXT,end_time TEXT, collect_time_avg real, collect_time_max real,batch_time_avg real, batch_time_max real, total_gauges bigint,total_batch bigint,total_metrics bigint,interval int,flush_delay int); | |
for i in {1..100};do python testclient.py "$((i*2))" "gnocchi.test.redhat.com.$i" & done | |
#Killall instances | |
ps -aux | grep testclient.py | awk '{print $2}' | xargs kill SIGQUIT | |
#Compare | |
gnocchi resource list | |
count=0;for i in $(gnocchi measures show --resource-id a4b637a5-b48d-5096-acbe-3ccf35d1662d metric_94 | cut -d "|" -f 3 | grep 1.0); do echo "$((count++))";done | |
#databse | |
create user gnocchi_user with password 'redhat'; | |
create database gnocchi OWNER gnocchi_user | |
GRANT ALL ON DATABASE gnocchi TO gnocchi_user; | |
psql -d gnocchi -U gnocchi_user | |
#TEst connection | |
import psycopg2 | |
def postgres_test(): | |
try: | |
conn = psycopg2.connect("dbname='gnocchi' user='gnocchi_user' host='localhost' password='redhat' connect_timeout=1 ") | |
conn.close() | |
return True | |
except: | |
return False | |
print postgres_test() | |
#running api | |
#uwsgi /etc/gnocchi/uwsgi.ini | |
metricd-api --config-file ~/gnocchi.config | |
#clean up redis | |
redis-cli FLUSHALL | |
redis-cli FLUSHDB | |
#print status | |
for i in {1..10};do redis-cli info | grep keys=;date;sleep 1;done | |
for i in {1..100};do gnocchi status;date;sleep 60;done | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
# implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import itertools | |
import uuid | |
try: | |
import asyncio | |
except ImportError: | |
import trollius as asyncio | |
import daiquiri | |
from oslo_config import cfg | |
import six | |
from collections import defaultdict | |
from gnocchi import incoming | |
from gnocchi import indexer | |
from gnocchi import service | |
from gnocchi import utils | |
import random | |
import signal | |
import datetime | |
import time | |
import sqlite3 | |
""" | |
daiquiri.setup( | |
level=logging.DEBUG, | |
outputs=( | |
daiquiri.output.File('gnocchi_performance_test.log', level=logging.ERROR), | |
daiquiri.output.TimedRotatingFile( | |
'everything.log', | |
level=logging.DEBUG, | |
interval=datetime.timedelta(hours=8)) | |
) | |
) | |
""" | |
LOG = daiquiri.getLogger(__name__) | |
class Stats(object): | |
def __init__(self, conf,host): | |
#don't know constants in python | |
self.METRICS_COUNT = 100 | |
self.COLLECT_DELAY = 1 | |
self.FLUSH_DELAY = 0 | |
self.total_metrics_count=0 | |
self.conf = conf | |
self.host = host | |
self.metric_generated_counts={} | |
self.resource_type_name = "Resource_Performance_Test_high" | |
self.incoming = incoming.get_driver(self.conf) | |
self.indexer = indexer.get_driver(self.conf) | |
self.metric_name_list=[] | |
self.metric_name_count ={} | |
for index in range(self.METRICS_COUNT): | |
self.metric_name_list.append("metric_"+str(index)) | |
for index in range(self.METRICS_COUNT): | |
self.metric_name_count["metric_"+str(index)] = 0 | |
self.batch_count=0 | |
self.metrics_per_batch=0 | |
self.batch_metrics=defaultdict(list) | |
#ensure resource_type exists | |
try: | |
mgr = self.indexer.get_resource_type_schema() | |
rtype = mgr.resource_type_from_dict( | |
self.resource_type_name, { | |
"host": {"type": "string", "required": True, | |
"min_length": 0, "max_length": 255}, | |
}, "creating") | |
self.indexer.create_resource_type(rtype) | |
except indexer.ResourceTypeAlreadyExists: | |
LOG.error("Resource type %s already exists", | |
self.resource_type_name) | |
resource=None | |
host_id = self.resource_type_name + \ | |
":" + host.replace("/", "_") | |
try: | |
self.indexer.create_resource( | |
self.resource_type_name, | |
utils.ResourceUUID( | |
host_id, | |
"user_test"), | |
"user_test", | |
original_resource_id=host_id, | |
host=host) | |
except indexer.ResourceAlreadyExists: | |
pass | |
resources = self.indexer.list_resources( | |
self.resource_type_name, | |
attribute_filter={"=": {"host": host}}) | |
self.resource_id=resources[0].id | |
self.metrics = { | |
metric.name: metric | |
for metric | |
in self.indexer.get_resource( | |
self.resource_type_name, | |
self.resource_id, with_metrics=True).metrics | |
} | |
def reset(self): | |
self.batch_metrics.clear() | |
self.batch_metrics= defaultdict(list) | |
#Collect and batch it for n secs | |
def collect(self): | |
for metric_name in self.metric_name_list: | |
try: | |
metric = self.metrics.get(metric_name) | |
if not metric: | |
ap_name = "high" #policy name | |
metric = self.indexer.create_metric( | |
uuid.uuid4(), | |
"user_test", | |
archive_policy_name=ap_name, | |
name=metric_name, | |
resource_id=self.resource_id) | |
self.metrics[metric_name] = metric | |
measure=incoming.Measure( | |
utils.dt_in_unix_ns(utils.utcnow()), random.randint(1,101)) | |
self.batch_metrics[metric].append(measure) | |
self.metric_name_count[metric_name]+=1 | |
self.total_metrics_count+=1 | |
except Exception as e: | |
LOG.error("Unable to collect measure %s: %s", | |
metric_name, e) | |
#flush immediately | |
if stats.FLUSH_DELAY < 1: | |
self.flush() | |
# flush every n secs | |
def flush(self): | |
if not self.batch_metrics: | |
return | |
self.batch_count+=1 | |
metrics_to_write=[] | |
#need to optimize this for loop ... scratching my head | |
metrics_to_write =dict((key,value) for key,value in self.batch_metrics.iteritems()) | |
try: | |
self.incoming.add_measures_batch(metrics_to_write) | |
except Exception as e: | |
LOG.error("Unable to add measure %s: %s", | |
"metric_name", e) | |
self.reset() | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv)<3 : | |
print 'Please pass host name as a argument . usage: script.py batch_id node1.redhat.com' | |
exit(1) | |
host= sys.argv[2] | |
batch_id= sys.argv[1] | |
start = time.time() | |
start_time=str(datetime.datetime.now()) | |
del sys.argv[1:] | |
LOG.info("****************** %s ***************************",host) | |
LOG.info("Starting test client for host %s : batch id :%s",host,batch_id) | |
conf = service.prepare_service() | |
stats = Stats(conf,host) | |
loop = asyncio.get_event_loop() | |
def signal_term_handler(signal, frame): | |
print "write to sqlite3" | |
database = "/root/gnocchi_test.db" | |
try: | |
conn = sqlite3.connect(database) | |
with conn: | |
cur=conn.cursor() | |
#create table results(id int primary key NOT NULL, batch_id int ,host varchar(250), | |
#start_time TEXT,end_time TEXT, run_time real,total_gauges int,total_batch int, | |
#total_metrics int,interval int,flush_delay int); | |
results = [batch_id,host,start_time,str(datetime.datetime.now()),time.time()-start, len(stats.metric_name_list), | |
stats.batch_count,stats.total_metrics_count,stats.COLLECT_DELAY,stats.FLUSH_DELAY] | |
cur.execute("insert into results(batch_id,host, start_time,end_time,\ | |
run_time,total_gauges ,total_batch,total_metrics, interval ,flush_delay)\ | |
values (?,?,?,?,?,?,?,?,?,?)",results) | |
sys.exit(0) | |
except Exception as e: | |
LOG.error(e) | |
sys.exit(0) | |
signal.signal(signal.SIGTERM, signal_term_handler) | |
#collect every 1 sec | |
def _collect(): | |
loop.call_later(stats.COLLECT_DELAY, _collect) | |
stats.collect() | |
#flush every 5 secs | |
def _flush(): | |
loop.call_later(stats.FLUSH_DELAY, _flush) | |
stats.flush() | |
loop.call_later(stats.COLLECT_DELAY, _collect) | |
if stats.FLUSH_DELAY > 1 : | |
loop.call_later(stats.FLUSH_DELAY , _flush) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
print "########################################" | |
print ("Total Gauges:",len(stats.metric_name_list)) | |
print "-----------------------------------------" | |
print ("Total Batch:",stats.batch_count) | |
print "-----------------------------------------" | |
for key, value in stats.metric_name_count.iteritems() : | |
print (key,value) | |
print "-----------------------------------------" | |
pass | |
loop.close() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
# implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import itertools | |
import uuid | |
try: | |
import asyncio | |
except ImportError: | |
import trollius as asyncio | |
import daiquiri | |
from oslo_config import cfg | |
import six | |
from collections import defaultdict | |
from gnocchi import incoming | |
from gnocchi import indexer | |
from gnocchi import service | |
from gnocchi import utils | |
import random | |
import signal | |
import datetime | |
import time | |
import sqlite3 | |
""" | |
daiquiri.setup( | |
level=logging.DEBUG, | |
outputs=( | |
daiquiri.output.File('gnocchi_performance_test.log', level=logging.ERROR), | |
daiquiri.output.TimedRotatingFile( | |
'everything.log', | |
level=logging.DEBUG, | |
interval=datetime.timedelta(hours=8)) | |
) | |
) | |
""" | |
LOG = daiquiri.getLogger(__name__) | |
class Const(object): | |
METRICS_COUNT = 100 | |
COLLECT_DELAY = 1 | |
FLUSH_DELAY = 0 | |
class Stats(object): | |
def __init__(self, conf,host,incoming1,indexer1): | |
#don't know constants in python | |
const=Const() | |
self.total_metrics_count=0 | |
self.conf = conf | |
self.host = host | |
self.metric_generated_counts={} | |
self.resource_type_name = "Resource_Performance_Test_high_delay" | |
self.incoming = incoming1 | |
self.indexer = indexer1 | |
self.metric_name_list=[] | |
self.metric_name_count ={} | |
self.batch_count = 0 | |
self.collect_count = 0 | |
self.batch_time_avg = 0 | |
self.batch_time_max = 0 | |
self.collect_time_avg = 0 | |
self.collect_time_max = 0 | |
self.start_time=str(datetime.datetime.now()) | |
self.end_time = None | |
self.const=Const() | |
for index in range(self.const.METRICS_COUNT): | |
self.metric_name_list.append("metric_"+str(index)) | |
for index in range(self.const.METRICS_COUNT): | |
self.metric_name_count["metric_"+str(index)] = 0 | |
self.metrics_per_batch=0 | |
self.batch_metrics=defaultdict(list) | |
#ensure resource_type exists | |
try: | |
mgr = self.indexer.get_resource_type_schema() | |
rtype = mgr.resource_type_from_dict( | |
self.resource_type_name, { | |
"host": {"type": "string", "required": True, | |
"min_length": 0, "max_length": 255}, | |
}, "creating") | |
self.indexer.create_resource_type(rtype) | |
except indexer.ResourceTypeAlreadyExists: | |
LOG.info("Resource type %s already exists", | |
self.resource_type_name) | |
resource=None | |
host_id = self.resource_type_name + \ | |
":" + host.replace("/", "_") | |
try: | |
self.indexer.create_resource( | |
self.resource_type_name, | |
utils.ResourceUUID( | |
host_id, | |
"user_test"), | |
"user_test", | |
original_resource_id=host_id, | |
host=host) | |
except indexer.ResourceAlreadyExists: | |
pass | |
resources = self.indexer.list_resources( | |
self.resource_type_name, | |
attribute_filter={"=": {"host": host}}) | |
self.resource_id=resources[0].id | |
self.metrics = { | |
metric.name: metric | |
for metric | |
in self.indexer.get_resource( | |
self.resource_type_name, | |
self.resource_id, with_metrics=True).metrics | |
} | |
def reset(self): | |
self.batch_metrics.clear() | |
self.batch_metrics= defaultdict(list) | |
#Collect and batch it for n secs | |
def collect(self): | |
start_collect = time.time() | |
self.collect_count+=1 | |
for metric_name in self.metric_name_list: | |
try: | |
metric = self.metrics.get(metric_name) | |
if not metric: | |
ap_name = "high" #policy name | |
metric = self.indexer.create_metric( | |
uuid.uuid4(), | |
"user_test", | |
archive_policy_name=ap_name, | |
name=metric_name, | |
resource_id=self.resource_id) | |
self.metrics[metric_name] = metric | |
measure=incoming.Measure( | |
utils.dt_in_unix_ns(utils.utcnow()), random.randint(1,101)) | |
self.batch_metrics[metric].append(measure) | |
self.metric_name_count[metric_name]+=1 | |
self.total_metrics_count+=1 | |
except Exception as e: | |
LOG.error("Unable to collect measure %s: %s", | |
metric_name, e) | |
#flush immediately | |
end_collect = time.time() - start_collect | |
if end_collect > self.collect_time_max : | |
self.collect_time_max = end_collect | |
self.collect_time_avg += end_collect | |
if self.const.FLUSH_DELAY < 1: | |
self.flush() | |
# flush every n secs | |
def flush(self): | |
self.batch_count+=1 | |
start_batch = time.time() | |
metrics_to_write=[] | |
#need to optimize this for loop ... scratching my head | |
metrics_to_write =dict((key,value) for key,value in self.batch_metrics.iteritems()) | |
try: | |
self.incoming.add_measures_batch(metrics_to_write) | |
self.end_time = str(datetime.datetime.now()) | |
except Exception as e: | |
LOG.error("Unable to add measure %s: %s", | |
"metric_name", e) | |
end_batch = time.time() - start_batch | |
if end_batch > self.batch_time_max: | |
self.batch_time_max = end_batch | |
self.batch_time_avg += end_batch | |
self.reset() | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv)<4 : | |
print 'Please pass host name as a argument . usage: script.py batch_id node1.redhat.com no_of_hosts' | |
exit(1) | |
#read command line arguments | |
base_host= sys.argv[2] | |
batch_id= sys.argv[1] | |
no_of_host=int(sys.argv[3]) | |
if no_of_host <1: | |
no_of_host = 1 | |
#create const | |
const = Const() | |
hosts=[] | |
start = time.time() | |
start_time=str(datetime.datetime.now()) | |
#clear commad line args or else it complaints | |
del sys.argv[1:] | |
conf = service.prepare_service() | |
def write_to_sqlite(): | |
print "write to sqlite3" | |
database = "/root/gnocchi_test.db" | |
try: | |
conn = sqlite3.connect(database) | |
with conn: | |
cur=conn.cursor() | |
#create table results(id int primary key NOT NULL, batch_id int ,host varchar(250), | |
#start_time TEXT,end_time TEXT, run_time real,total_gauges int,total_batch int, | |
#total_metrics int,interval int,flush_delay int); | |
for stats in hosts: | |
results = [batch_id,stats.host,stats.start_time,stats.end_time, | |
(stats.collect_time_avg/stats.collect_count),stats.collect_time_max,(stats.batch_time_avg/stats.batch_count), | |
stats.batch_time_max, len(stats.metric_name_list), | |
stats.batch_count,stats.total_metrics_count,stats.const.COLLECT_DELAY,stats.const.FLUSH_DELAY] | |
print results | |
cur.execute("insert into results(batch_id,host, start_time,end_time,\ | |
collect_time_avg,collect_time_max,batch_time_avg,batch_time_max,total_gauges,total_batch,total_metrics,interval ,flush_delay)\ | |
values (?,?,?,?,?,?,?,?,?,?,?,?,?)",results) | |
sys.exit(0) | |
except Exception as e: | |
print e | |
LOG.error(e) | |
sys.exit(0) | |
def signal_term_handler(signal, frame): | |
write_to_sqlite(); | |
signal.signal(signal.SIGTERM, signal_term_handler) | |
#collect every 1 sec | |
def _collect(stats): | |
loop.call_later(const.COLLECT_DELAY, _collect,stats) | |
stats.collect() | |
#flush every 5 secs | |
def _flush(stats): | |
loop.call_later(const.FLUSH_DELAY, _flush,stats) | |
stats.flush() | |
def prepare(loop,forever): | |
incoming1 = incoming.get_driver(conf) | |
indexer1 = indexer.get_driver(conf) | |
for index in range(no_of_host): | |
new_host=base_host + "_" + str(index) | |
hosts.append(Stats(conf,new_host,incoming1,indexer1)) | |
if forever == 1: | |
loop.call_soon(hosts[-1].collect()) | |
else: | |
loop.call_later(const.COLLECT_DELAY, _collect,hosts[-1],forever) | |
if const.FLUSH_DELAY > 1 : | |
loop.call_later(const.FLUSH_DELAY , _flush,hosts[-1],forever) | |
#### MULTIPLE HOSTS gets created here | |
loop = asyncio.get_event_loop() | |
try: | |
print "PREPARING FIRST CALL" | |
## run once | |
prepare(loop,1) | |
while loop.is_running(): | |
time.sleep(1) | |
print "loop ended" | |
## run once ends | |
#Run forever | |
print "All initlizalied" | |
#prepare(loop,-1) | |
#loop.run_forever() | |
except KeyboardInterrupt: | |
print "########################################" | |
write_to_sqlite() | |
exit(-1) | |
loop.close() | |
write_to_sqlite() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
# implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import itertools | |
import uuid | |
import copy_reg | |
import types | |
try: | |
import asyncio | |
except ImportError: | |
import trollius as asyncio | |
import daiquiri | |
from oslo_config import cfg | |
import six | |
from collections import defaultdict | |
from gnocchi import incoming | |
from gnocchi import indexer | |
from gnocchi import service | |
from gnocchi import utils | |
import random | |
import signal | |
import datetime | |
import time | |
import sqlite3 | |
import sys | |
import math | |
from multiprocessing import Pool | |
""" | |
daiquiri.setup( | |
level=logging.DEBUG, | |
outputs=( | |
daiquiri.output.File('gnocchi_performance_test.log', level=logging.ERROR), | |
daiquiri.output.TimedRotatingFile( | |
'everything.log', | |
level=logging.DEBUG, | |
interval=datetime.timedelta(hours=8)) | |
) | |
) | |
""" | |
LOG = daiquiri.getLogger(__name__) | |
class Const(object): | |
METRICS_COUNT = 100 | |
COLLECT_DELAY = 1 | |
FLUSH_DELAY = 0 | |
class Stats(object): | |
def __init__(self, conf,host,incoming1,indexer1): | |
#don't know constants in python | |
const=Const() | |
self.total_metrics_count=0 | |
self.conf = conf | |
self.host = host | |
self.metric_generated_counts={} | |
self.resource_type_name = "Resource_Performance_Test_high_delay" | |
self.incoming = incoming1 | |
self.indexer = indexer1 | |
self.metric_name_list=[] | |
self.metric_name_count ={} | |
self.batch_count = 0 | |
self.collect_count = 0 | |
self.batch_time_avg = 0 | |
self.batch_time_max = 0 | |
self.collect_time_avg = 0 | |
self.collect_time_max = 0 | |
self.start_time=str(datetime.datetime.now()) | |
self.end_time = None | |
self.const=Const() | |
for index in range(self.const.METRICS_COUNT): | |
self.metric_name_list.append("metric_"+str(index)) | |
for index in range(self.const.METRICS_COUNT): | |
self.metric_name_count["metric_"+str(index)] = 0 | |
self.metrics_per_batch=0 | |
self.batch_metrics=defaultdict(list) | |
#ensure resource_type exists | |
try: | |
mgr = self.indexer.get_resource_type_schema() | |
rtype = mgr.resource_type_from_dict( | |
self.resource_type_name, { | |
"host": {"type": "string", "required": True, | |
"min_length": 0, "max_length": 255}, | |
}, "creating") | |
self.indexer.create_resource_type(rtype) | |
except indexer.ResourceTypeAlreadyExists: | |
LOG.info("Resource type %s already exists", | |
self.resource_type_name) | |
resource=None | |
host_id = self.resource_type_name + \ | |
":" + host.replace("/", "_") | |
try: | |
self.indexer.create_resource( | |
self.resource_type_name, | |
utils.ResourceUUID( | |
host_id, | |
"user_test"), | |
"user_test", | |
original_resource_id=host_id, | |
host=host) | |
except indexer.ResourceAlreadyExists: | |
pass | |
resources = self.indexer.list_resources( | |
self.resource_type_name, | |
attribute_filter={"=": {"host": host}}) | |
self.resource_id=resources[0].id | |
self.metrics = { | |
metric.name: metric | |
for metric | |
in self.indexer.get_resource( | |
self.resource_type_name, | |
self.resource_id, with_metrics=True).metrics | |
} | |
def reset(self): | |
self.batch_metrics.clear() | |
self.batch_metrics= defaultdict(list) | |
#Collect and batch it for n secs | |
def collect(self): | |
start_collect = time.time() | |
self.collect_count+=1 | |
for metric_name in self.metric_name_list: | |
try: | |
metric = self.metrics.get(metric_name) | |
if not metric: | |
ap_name = "high" #policy name | |
metric = self.indexer.create_metric( | |
uuid.uuid4(), | |
"user_test", | |
archive_policy_name=ap_name, | |
name=metric_name, | |
resource_id=self.resource_id) | |
self.metrics[metric_name] = metric | |
measure=incoming.Measure( | |
utils.dt_in_unix_ns(utils.utcnow()), random.randint(1,101)) | |
self.batch_metrics[metric].append(measure) | |
self.metric_name_count[metric_name]+=1 | |
self.total_metrics_count+=1 | |
except Exception as e: | |
LOG.error("Unable to collect measure %s: %s", | |
metric_name, e) | |
#flush immediately | |
end_collect = time.time() - start_collect | |
if end_collect > self.collect_time_max : | |
self.collect_time_max = end_collect | |
self.collect_time_avg += end_collect | |
if self.const.FLUSH_DELAY < 1: | |
self.flush() | |
# flush every n secs | |
def flush(self): | |
self.batch_count+=1 | |
start_batch = time.time() | |
metrics_to_write=[] | |
#need to optimize this for loop ... scratching my head | |
metrics_to_write =dict((key,value) for key,value in self.batch_metrics.iteritems()) | |
try: | |
self.incoming.add_measures_batch(metrics_to_write) | |
self.end_time = str(datetime.datetime.now()) | |
except Exception as e: | |
LOG.error("Unable to add measure %s: %s", | |
"metric_name", e) | |
end_batch = time.time() - start_batch | |
if end_batch > self.batch_time_max: | |
self.batch_time_max = end_batch | |
self.batch_time_avg += end_batch | |
self.reset() | |
def init_worker(): | |
signal.signal(signal.SIGINT, signal.SIG_IGN) | |
def process_metrics(host): | |
print "We are here" | |
print host | |
try: | |
LOG.info(host) | |
conf = service.prepare_service() | |
incoming1 = incoming.get_driver(conf) | |
indexer1 = indexer.get_driver(conf) | |
hosts_to_process =[] | |
stats=Stats(conf,host,incoming1,indexer1) | |
stats.collect() | |
write_to_sqlite(stats) | |
except Exception as e: | |
print e | |
return "following hosts done" + hosts | |
def write_to_sqlite(stats): | |
print "write to sqlite3" | |
database = "/root/gnocchi_test.db" | |
try: | |
conn = sqlite3.connect(database) | |
with conn: | |
cur=conn.cursor() | |
#create table results(id int primary key NOT NULL, batch_id int ,host varchar(250), | |
#start_time TEXT,end_time TEXT, run_time real,total_gauges int,total_batch int, | |
#total_metrics int,interval int,flush_delay int); | |
#for stats in hosts: | |
results = [batch_id,stats.host,stats.start_time,stats.end_time, | |
(stats.collect_time_avg/stats.collect_count),stats.collect_time_max,(stats.batch_time_avg/stats.batch_count), | |
stats.batch_time_max, len(stats.metric_name_list), | |
stats.batch_count,stats.total_metrics_count,stats.const.COLLECT_DELAY,stats.const.FLUSH_DELAY] | |
print results | |
cur.execute("insert into results(batch_id,host, start_time,end_time,\ | |
collect_time_avg,collect_time_max,batch_time_avg,batch_time_max,total_gauges,total_batch,total_metrics,interval ,flush_delay)\ | |
values (?,?,?,?,?,?,?,?,?,?,?,?,?)",results) | |
except Exception as e: | |
print e | |
LOG.error(e) | |
if __name__ == "__main__": | |
if len(sys.argv)<4 : | |
print 'Please pass host name as a argument . usage: script.py batch_id node1.redhat.com no_of_hosts' | |
exit(1) | |
#read command line arguments | |
base_host= sys.argv[2] | |
batch_id= sys.argv[1] | |
no_of_host=int(sys.argv[3]) | |
del sys.argv[1:] | |
conf = service.prepare_service() | |
#calculate pool_size and chunck_size | |
chunk_size = 1 | |
pool_size = 1 | |
if no_of_host <= 1: | |
no_of_host = 1 | |
pool_size = 1 | |
else: | |
pool_size = int(math.floor(no_of_host/100)) | |
if pool_size <=1: | |
pool_size = 1 | |
chunk_size =1 | |
else: | |
chunk_size = no_of_host/pool_size | |
if pool_size>100: | |
pool_size=100 | |
print "No of hosts :", no_of_host, "pool size :",pool_size,"chunk_size :" , chunk_size | |
print("Continue ? Please respond with 'y' or 'n'") | |
choice = raw_input().lower() | |
if choice == "n": | |
raise ValueError("Your choice") | |
incoming1 = incoming.get_driver(conf) | |
indexer1 = indexer.get_driver(conf) | |
hosts = [] | |
for index in range(no_of_host): | |
new_host=base_host + "_" + str(index) | |
hosts.append(new_host) | |
print("Initializng n workers", len(hosts)) | |
pool=Pool(pool_size,init_worker) | |
try: | |
res = pool.map_async(process_metrics, hosts,1) | |
print "Result" | |
res.wait() | |
print "Done" | |
except KeyboardInterrupt: | |
print "Keyboard interuppted" | |
pool.terminate() | |
else: | |
pool.close() | |
pool.join() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
pattern="Resource_Performance_Test_high:gnocchi.test.redhat.com.12" | |
policy="high" | |
#resource | |
arr_resource=() | |
idx=0 | |
for i in $(gnocchi resource list | grep $pattern | cut -d "|" -f 2) | |
do | |
arr_resource[idx]=$i | |
idx=$((idx+1)) | |
done | |
#metric | |
arr_metric=() | |
idm=0 | |
for i in $(gnocchi metric list | grep $policy | grep metric_ | cut -d "|" -f 4 | sort -n | uniq) | |
do | |
arr_metric[idm]=$i | |
idm=$((idm+1)) | |
done | |
#resource | |
arr_resource=() | |
idx=0 | |
for i in $(gnocchi resource list | grep $pattern | cut -d "|" -f 2) | |
do | |
arr_resource[idx]=$i | |
idx=$((idx+1)) | |
done | |
#echo ${arr_resource[*]} | |
for resource_id in "${arr_resource[@]}" | |
do | |
for metric in "${arr_metric[@]}" | |
do | |
count=0 | |
for i in $(gnocchi measures show --resource-id $resource_id $metric | cut -d "|" -f 3 | grep 1.0) | |
do | |
((++count)) | |
done | |
echo $resource_id. " - " . $metric. " - ". $count | |
done | |
echo "****************NEXT RESOURCE ***************" | |
done |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment