Last active
May 29, 2019 18:12
-
-
Save dyusupov/725d083d6f043d53abf8d61507c7bba3 to your computer and use it in GitHub Desktop.
AWS S3 and EdgeFS synchronization via Lambda function
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var assert = require('assert'); | |
var qs = require('querystring'); | |
var AWS = require('aws-sdk'); | |
var url = require('url'); | |
var dns = require('dns'); | |
// ======= config section ======= | |
var AWS_ACCESS = "aws_key"; | |
var AWS_SECRET = "aws_secret"; | |
var EDGEFS_PROVIDER = "aws-s3"; // Has to match EdgeFS supported Cloud Provider name | |
var EDGEFS_ACCESS = "efs_key"; // EdgeFS tenant user S3 access key | |
var EDGEFS_SECRET = "efs_secret"; // EdgeFS tenant user S3 secret key | |
var EDGEFS_REGION = "cltest"; // EdgeFS cluster namespace or cofigured S3 origin | |
var EDGEFS_ENDPOINT = "http://IPADDR:9982"; // EdgeFS S3 service endpoint | |
var EDGEFS_BUCKET = "bk1"; // EdgeFS configured tenant's bucket | |
var EDGEFS_MDONLY = "true"; // If enabled then fetch data on demand | |
// ============================== | |
function uploadFromStream(s3e, awsRegion, awsBucket, key, headData, context) { | |
var stream = require('stream'); | |
var pass = new stream.PassThrough(); | |
var meta = { | |
"cloud-provider": EDGEFS_PROVIDER, | |
"cloud-region": awsRegion, | |
"cloud-origin": "https://" + awsBucket + ".s3.amazonaws.com", | |
"cloud-key": AWS_ACCESS, | |
"cloud-mdonly": EDGEFS_MDONLY | |
}; | |
// merge in custom metadata if any | |
if ("Metadata" in headData) { | |
Object.keys(headData["Metadata"]).forEach(function (key) { | |
meta[key] = headData["Metadata"][key]; | |
}); | |
} | |
s3e.upload({Bucket: EDGEFS_BUCKET, Key: key, Body: pass, | |
ContentType: headData["ContentType"], Metadata: meta}, {partSize: 5 * 1024 * 1024 * 1024}, function(err, data) { | |
if (err) | |
return context.fail(err); | |
context.succeed('OK'); | |
}); | |
return pass; | |
} | |
function syncCreate(region, bucket, key, context) { | |
var s3 = new AWS.S3({apiVersion: '2006-03-01'}); | |
var s3e = new AWS.S3({apiVersion: '2006-03-01'}); | |
// Read from AWS S3 bucket | |
s3.config.update({ | |
accessKeyId: AWS_ACCESS, | |
secretAccessKey: AWS_SECRET, | |
region: region, | |
computeChecksums: false | |
}); | |
// Write to EdgeFS S3 endpoint | |
s3e.config.update({ | |
httpOptions: { agent: new https.Agent({ rejectUnauthorized: false })}, | |
endpoint: new AWS.Endpoint(EDGEFS_ENDPOINT), | |
s3BucketEndpoint: false, | |
s3ForcePathStyle: true, | |
accessKeyId: EDGEFS_ACCESS, | |
secretAccessKey: EDGEFS_SECRET, | |
region: EDGEFS_REGION | |
}); | |
var params = {Bucket: bucket, Key: key}; | |
s3.headObject(params, function(err, headData) { | |
if (err) | |
return context.fail(err); | |
var readStream = s3.getObject(params).createReadStream(); | |
readStream.pipe(uploadFromStream(s3e, region, bucket, key, headData, context)); | |
}); | |
} | |
function syncRemove(key, context) { | |
var s3e = new AWS.S3({apiVersion: '2006-03-01'}); | |
// Remove via EdgeFS S3 endpoint | |
s3e.config.update({ | |
httpOptions: { agent: new https.Agent({ rejectUnauthorized: false })}, | |
s3BucketEndpoint: false, | |
s3ForcePathStyle: true, | |
accessKeyId: EDGEFS_ACCESS, | |
secretAccessKey: EDGEFS_SECRET, | |
region: EDGEFS_REGION | |
}); | |
s3e.endpoint = new AWS.Endpoint(EDGEFS_ENDPOINT); | |
var req = s3e.deleteObject({Bucket: EDGEFS_BUCKET, Key: key}); | |
req.on('build', function () { | |
req.httpRequest.headers["x-cloud-delete"] = EDGEFS_PROVIDER; | |
}); | |
req.send(function(err, data) { | |
if (err) | |
return context.fail(err); | |
context.succeed('OK'); | |
}); | |
} | |
exports.handler = function(event, context) { | |
console.log("event %j", event); | |
// Get the object from the event and show its content type | |
var bucket = event.Records[0].s3.bucket.name; | |
// see https://forums.aws.amazon.com/thread.jspa?threadID=215813 | |
var key = Object.keys(qs.decode(event.Records[0].s3.object.key))[0]; | |
// sanity checks | |
assert(bucket !== undefined, "bucket not present in s3 event"); | |
assert(key !== undefined, "key not present in s3 event"); | |
var sourceIp, endpointHostname; | |
try { | |
sourceIp = event.Records[0].requestParameters.sourceIPAddress; | |
endpointHostname = (new url.URL(EDGEFS_ENDPOINT)).hostname; | |
} catch (e) { | |
return context.fail(e); | |
} | |
dns.lookup(endpointHostname, function(err, result) { | |
if (err) | |
return context.fail(err); | |
if (sourceIp == result.address) { | |
console.log("Skip synchronization due to same source update detected"); | |
return context.succeed('OK'); | |
} | |
console.log("syncing event", event.Records[0].eventName, bucket + "/" + key, "to", EDGEFS_ENDPOINT, EDGEFS_BUCKET); | |
if (event.Records[0].eventName.match(/ObjectCreated/)) { | |
var region = event.Records[0].awsRegion; | |
syncCreate(region, bucket, key, context); | |
} else if (event.Records[0].eventName.match(/ObjectRemoved/)) { | |
syncRemove(key, context); | |
} else { | |
console.log("Not handling event", event.Records[0].eventName); | |
return context.succeed('OK'); | |
} | |
}) | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment