Last active
July 31, 2019 15:08
-
-
Save cannium/fa0e2d772e388cc48d9d17191075f752 to your computer and use it in GitHub Desktop.
files to do performance test for seaweedfs-cannyls
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"context" | |
"encoding/json" | |
"errors" | |
"flag" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"math/rand" | |
"mime/multipart" | |
"net/http" | |
"os" | |
"time" | |
"github.com/chrislusf/seaweedfs/weed/operation" | |
"github.com/chrislusf/seaweedfs/weed/wdclient" | |
"google.golang.org/grpc" | |
) | |
const ObjectSizeLimit = 30 << 20 // 30M, limit introduced by cannyls | |
// Static alphaNumeric table used for generating unique request ids | |
var alphaNumericTable = []byte("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ") | |
func GenerateRandomId() []byte { | |
alpha := make([]byte, 16, 16) | |
for i := 0; i < 16; i++ { | |
n := rand.Intn(len(alphaNumericTable)) | |
alpha[i] = alphaNumericTable[n] | |
} | |
return alpha | |
} | |
// read from ReadCloser and unmarshal to out; | |
// `out` should be of POINTER type | |
func ReadJsonBody(body io.ReadCloser, out interface{}) (err error) { | |
defer func() { | |
_ = body.Close() | |
}() | |
jsonBytes, err := ioutil.ReadAll(body) | |
if err != nil { | |
return err | |
} | |
//fmt.Println(string(jsonBytes)) | |
err = json.Unmarshal(jsonBytes, out) | |
if err != nil { | |
return err | |
} | |
return nil | |
} | |
type uploadResult struct { | |
Name string `json:"name,omitempty"` | |
Size uint32 `json:"size,omitempty"` | |
Error string `json:"error,omitempty"` | |
ETag string `json:"eTag,omitempty"` | |
} | |
// Storage implements yig.storage.backend | |
type Storage struct { | |
masters []string | |
seaweedClient *wdclient.MasterClient | |
httpClient *http.Client | |
} | |
func NewSeaweedStorage() Storage { | |
clientId := fmt.Sprintf("YIG-%s", string(GenerateRandomId())) | |
seaweedClient := wdclient.NewMasterClient(context.Background(), | |
grpc.WithInsecure(), clientId, []string{"10.254.128.51:2233"}) | |
go seaweedClient.KeepConnectedToMaster() | |
seaweedClient.WaitUntilConnected() // FIXME some kind of timeout? | |
fmt.Println("Seaweedfs client initialized") | |
return Storage{ | |
seaweedClient: seaweedClient, | |
httpClient: &http.Client{ | |
Timeout: 5 * time.Minute, | |
Transport: &http.Transport{ | |
MaxIdleConnsPerHost: 65535, | |
}, | |
}, | |
} | |
} | |
func (s Storage) assignObject(poolName string) (result operation.AssignResult, err error) { | |
masterAddress := s.seaweedClient.GetMaster() | |
assignRequest := &operation.VolumeAssignRequest{ | |
// TODO read from config | |
Count: 1, | |
Replication: "000", | |
Collection: poolName, | |
Ttl: "", | |
DataCenter: "", | |
} | |
assignResult, err := operation.Assign(masterAddress, nil, | |
assignRequest) | |
if err != nil { | |
return operation.AssignResult{}, err | |
} | |
if assignResult.Error != "" { | |
return operation.AssignResult{}, errors.New(assignResult.Error) | |
} | |
return *assignResult, nil | |
} | |
func (s Storage) Put(poolName string, object io.Reader) (objectUrl string, | |
bytesWritten uint64, err error) { | |
assigned, err := s.assignObject(poolName) | |
if err != nil { | |
fmt.Println("assignObject error:", err) | |
return "", 0, err | |
} | |
url := fmt.Sprintf("http://%s/%s", assigned.Url, assigned.Fid) | |
// limit object size because of cannlys | |
object = io.LimitReader(object, ObjectSizeLimit) | |
body := &bytes.Buffer{} | |
writer := multipart.NewWriter(body) | |
part, err := writer.CreateFormFile("file", assigned.Fid) | |
if err != nil { | |
fmt.Println("CreateFormFile error:", err) | |
return "", 0, err | |
} | |
n, err := io.Copy(part, object) | |
if err != nil { | |
fmt.Println("io.Copy error:", err) | |
return "", 0, err | |
} | |
err = writer.Close() | |
if err != nil { | |
fmt.Println("writer.Close error:", err) | |
return "", 0, err | |
} | |
req, err := http.NewRequest("POST", url, body) | |
if err != nil { | |
fmt.Println("http.NewRequest error:", err) | |
return "", 0, err | |
} | |
req.Header.Set("Content-Type", writer.FormDataContentType()) | |
resp, err := s.httpClient.Do(req) | |
if err != nil { | |
fmt.Println("s.httpClient.Do error:", err) | |
return "", 0, err | |
} | |
var result uploadResult | |
err = ReadJsonBody(resp.Body, &result) | |
if err != nil { | |
fmt.Println("ReadJsonBody error:", err) | |
return "", 0, err | |
} | |
if result.Error != "" { | |
return "", 0, errors.New(result.Error) | |
} | |
return url, uint64(n), nil | |
} | |
func (s Storage) GetReader(poolName, objectName string, | |
offset int64, length uint64) (reader io.ReadCloser, err error) { | |
// TODO offset and length | |
url, err := s.seaweedClient.LookupFileId(objectName) | |
if err != nil { | |
fmt.Println("seaweedClient.LookupFileId error:", err) | |
return nil, err | |
} | |
resp, err := s.httpClient.Get(url) | |
if err != nil { | |
fmt.Println("httpClient.Get error:", err) | |
return nil, err | |
} | |
return resp.Body, nil | |
} | |
func (s Storage) Remove(url string) (err error) { | |
req, err := http.NewRequest("DELETE", url, nil) | |
if err != nil { | |
fmt.Println("http.NewRequest error:", err) | |
return err | |
} | |
resp, err := s.httpClient.Do(req) | |
if err != nil { | |
fmt.Println("httpClient.Get error:", err) | |
return err | |
} | |
var result map[string]interface{} | |
err = ReadJsonBody(resp.Body, &result) | |
if err != nil { | |
fmt.Println("ReadJsonBody error:", err) | |
return err | |
} | |
if resp.StatusCode == http.StatusAccepted { | |
return nil | |
} | |
return errors.New(fmt.Sprintln(result["error"])) | |
} | |
// randomly remove delete_ratio * len(urls) | |
func batchRemove(client Storage, urls []string, ratio float64) { | |
rand.Shuffle(len(urls), func(i, j int) { | |
urls[i], urls[j] = urls[j], urls[i] | |
}) | |
urls = urls[:int(float64(len(urls))*ratio)] | |
for _, url := range urls { | |
client.Remove(url) | |
} | |
} | |
func main() { | |
file := flag.String("file", "", "file to upload") | |
concurrency := flag.Int("concurrency", 10, "go routine number") | |
deleteRatio := flag.Float64("delete_ratio", 0, | |
"delete objects randomly, ratio between 0 and 1") | |
flag.Parse() | |
f, err := os.Open(*file) | |
if err != nil { | |
fmt.Println("cannot open", *file) | |
return | |
} | |
content, err := ioutil.ReadAll(f) | |
if err != nil { | |
fmt.Println("ReadAll error:", err) | |
return | |
} | |
var size, count, lastSize, lastCount int64 | |
countChannel := make(chan int64, *concurrency) | |
sizeChannel := make(chan int64, *concurrency) | |
urls := make([]string, 0, 10000) | |
var deleteChannel chan string | |
if *deleteRatio > 0 { | |
deleteChannel = make(chan string, *concurrency) | |
} | |
client := NewSeaweedStorage() | |
for i := 0; i < *concurrency; i++ { | |
go upload(content, client, | |
countChannel, sizeChannel, deleteChannel) | |
} | |
ticker := time.NewTicker(10 * time.Second) | |
lastTime := time.Now() | |
for { | |
select { | |
case s := <-sizeChannel: | |
size += s | |
case n := <-countChannel: | |
count += n | |
case t := <-ticker.C: | |
dt := t.Sub(lastTime) | |
ds := size - lastSize | |
dn := count - lastCount | |
fmt.Println("Objects per second:", float64(dn)/dt.Seconds(), | |
"Bytes per second:", float64(ds)/dt.Seconds()) | |
lastSize, lastCount, lastTime = size, count, t | |
case url := <-deleteChannel: | |
urls = append(urls, url) | |
if len(urls) >= 10000 { | |
go batchRemove(client, urls, *deleteRatio) | |
urls = make([]string, 0, 10000) | |
} | |
} | |
} | |
} | |
func upload(content []byte, client Storage, | |
countChannel, sizeChannel chan int64, | |
deleteChannel chan string) { | |
reader := bytes.NewReader(content) | |
for { | |
reader.Seek(0, 0) | |
url, n, err := client.Put("", reader) | |
if err != nil { | |
fmt.Println("PUT", url, n, err) | |
} else { | |
countChannel <- 1 | |
sizeChannel <- int64(n) | |
if deleteChannel != nil { | |
deleteChannel <- url | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -x | |
bash stop.sh | |
ssh -t [email protected] "cd /root/test; bash stop_all.sh" | |
ssh -t [email protected] "cd /root/test; bash clean.sh" | |
for sizeFile in "1K" "100K" "30M" | |
#for sizeFile in "1K" "4K" "10K" "100K" "1M" "10M" "30M" | |
do | |
concurrency=100 | |
if [ $sizeFile = "1K" ]; then | |
concurrency=1000 | |
fi | |
if [ $sizeFile = "100K" ]; then | |
concurrency=10 | |
fi | |
if [ $sizeFile = "30M" ]; then | |
concurrency=2 | |
fi | |
for deleteRatio in "0" "0.05" "0.5" | |
do | |
#for deleteRatio in "0" "0.05" "0.1" "0.15" "0.2" "0.3" "0.5" | |
for version in "vanilla" "cannyls" | |
do | |
ssh -t [email protected] "cd /root/test; cp weed.$version weed" | |
ssh 10.254.128.51 -l root "cd /root/test; bash start_master.bash" | |
sleep 5 | |
ssh 10.254.128.51 -l root "cd /root/test; bash start_volume.bash" | |
sleep 5 | |
curl 'http://10.254.128.51:2233/vol/grow?count=3500&replication=000' | |
sleep 60 | |
for i in {1..10} | |
do | |
./pressure -file $sizeFile -concurrency $concurrency -delete_ratio $deleteRatio > $version/$sizeFile-$concurrency-$deleteRatio-$i.log & | |
done | |
sleep 1800 # 30min | |
bash stop.sh | |
ssh -t [email protected] "cd /root/test; bash stop_all.sh" | |
sleep 60 | |
ssh -t [email protected] "cd /root/test; bash clean.sh" | |
done | |
done | |
done |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
nohup ./weed master -mdir=. -ip=10.254.128.51 -port=2233 -defaultReplication=000 &>> master.log & |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# every dir is a mounted disk | |
./weed volume -dir=b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r -max=100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100 -mserver=127.0.0.1:2233 -ip=10.254.128.51 -port=2999 &>> volume1.log & | |
./weed volume -dir=s,t,u,v,w,x,y,z,aa,ab,ac,ad,ae,af,ag,ah,ai,aj -max=100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100 -mserver=10.254.128.51:2233 -ip=10.254.128.51 -port=3000 &>> volume2.log & |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
# summary test results | |
import sys | |
import os | |
from parse import parse | |
# dir path -> [log file name] | |
def walk(dirName): | |
ans = [] | |
for _, _, files in os.walk(dirName): | |
for f in files: | |
if f.endswith('.log'): | |
ans.append(f) | |
return ans | |
# dir name, file name -> [object size, concurrency, delete ratio, i, | |
# objects per sec, bytes per sec, errorCount] | |
def singleFileAverage(dirName, fileName): | |
result = parse('{}-{}-{}-{}.log', fileName) | |
objectSize, concurrency, deleteRatio, i = result | |
objects = 0 | |
bytes = 0 | |
n = 0 | |
errors = {} | |
errorCount = 0 | |
with open(dirName + '/' + fileName) as f: | |
for line in f: | |
result = parse('Objects per second: {} Bytes per second: {}', line) | |
if result is None: | |
if 'Seaweedfs client initialized' in line: | |
continue | |
errors[line] = None | |
errorCount += 1 | |
continue | |
n += 1 | |
objects += int(float(result[0])) | |
bytes += int(float(result[1])) | |
#for err in errors: | |
# print err, | |
return [ | |
objectSize, | |
int(concurrency), | |
float(deleteRatio), | |
int(i), | |
1.0 * objects / n, | |
1.0 * bytes / n, | |
errorCount, | |
] | |
if __name__ == "__main__": | |
dirName = sys.argv[1] | |
files = walk(dirName) | |
# (object size, concurrency, delete ratio) -> | |
# [objects per sec, bytes per sec, error count] | |
memo = {} | |
for f in files: | |
entry = singleFileAverage(dirName, f) | |
key = (entry[0], entry[1], entry[2]) | |
if key in memo: | |
memo[key] = [ | |
memo[key][0] + entry[4], | |
memo[key][1] + entry[5], | |
memo[key][2] + entry[6], | |
] | |
else: | |
memo[key] = [entry[4], entry[5], entry[6]] | |
keys = memo.keys() | |
for k in sorted(keys): | |
print k, memo[k] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment