Last active
April 30, 2024 06:45
-
-
Save SodaDev/8da27a2599a3d2ba51f641003657e821 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package handler | |
import ( | |
"context" | |
"fmt" | |
"github.com/Ryanair/gofrlib/log" | |
"github.com/aws/aws-sdk-go-v2/aws" | |
"github.com/aws/aws-sdk-go-v2/service/lambda" | |
"github.com/aws/aws-sdk-go-v2/service/lambda/types" | |
"github.com/pkg/errors" | |
"go.opentelemetry.io/otel/attribute" | |
"go.opentelemetry.io/otel/trace" | |
"os" | |
"otel-tracing/CbEventSourceManager/cmd/schema/aws/cloudwatch/cloudwatchalarmstatechange" | |
"strconv" | |
) | |
type LambdaHandler struct { | |
loggerConfig log.Configuration | |
lambdaClient *lambda.Client | |
} | |
func New(loggerConfig log.Configuration, client *lambda.Client) *LambdaHandler { | |
return &LambdaHandler{ | |
loggerConfig: loggerConfig, | |
lambdaClient: client, | |
} | |
} | |
func (lh *LambdaHandler) Handle(ctx context.Context, event cloudwatchalarmstatechange.AWSEvent) error { | |
log.Init(lh.loggerConfig) | |
log.Debug("Got event: %s", log.ToString(event)) | |
mappings, err := lh.getEventSourceMappings(ctx) | |
if err != nil { | |
log.Error("Error getting event source mappings: %s", err) | |
return err | |
} | |
if event.Detail.State.Value == "ALARM" { | |
err := lh.openCircuit(ctx, event, &mappings.EventSourceMappings[0]) | |
if err != nil { | |
return errors.Wrapf(err, "Error opening circuit") | |
} | |
return nil | |
} else if event.Detail.State.Value == "OK" { | |
err := lh.closeCircuit(ctx, event, &mappings.EventSourceMappings[0]) | |
if err != nil { | |
return errors.Wrapf(err, "Error closing circuit") | |
} | |
return nil | |
} else if event.Detail.State.Value == "INSUFFICIENT_DATA" && event.Detail.PreviousState.Value == "ALARM" { | |
err := lh.halfOpen(ctx, event, &mappings.EventSourceMappings[0]) | |
if err != nil { | |
return errors.Wrapf(err, "Error half-opening circuit") | |
} | |
return nil | |
} | |
return nil | |
} | |
func (lh *LambdaHandler) getEventSourceMappings(ctx context.Context) (*lambda.ListEventSourceMappingsOutput, error) { | |
mappings, err := lh.lambdaClient.ListEventSourceMappings(ctx, &lambda.ListEventSourceMappingsInput{ | |
EventSourceArn: aws.String(os.Getenv("QUEUE_ARN")), | |
FunctionName: aws.String(os.Getenv("FUNCTION_ARN")), | |
}) | |
if err != nil { | |
return nil, errors.Wrapf(err, "Error listing event source mappings") | |
} | |
if len(mappings.EventSourceMappings) == 0 { | |
return nil, errors.New("No event source mappings found") | |
} | |
if len(mappings.EventSourceMappings) > 1 { | |
return nil, errors.New(fmt.Sprintf("Multiple event source mappings found: %s", log.ToString(mappings))) | |
} | |
return mappings, nil | |
} | |
func (lh *LambdaHandler) openCircuit(ctx context.Context, event cloudwatchalarmstatechange.AWSEvent, mapping *types.EventSourceMappingConfiguration) error { | |
mappingOutput, err := lh.lambdaClient.UpdateEventSourceMapping(ctx, &lambda.UpdateEventSourceMappingInput{ | |
UUID: mapping.UUID, | |
Enabled: aws.Bool(false), | |
}) | |
if err != nil { | |
return errors.Wrapf(err, "Error updating event source mapping") | |
} | |
instrumentCircuitStatusUpdate(ctx, "OPEN", event) | |
log.Debug("Updated mapping: %s", log.ToString(mappingOutput)) | |
return nil | |
} | |
func (lh *LambdaHandler) closeCircuit(ctx context.Context, event cloudwatchalarmstatechange.AWSEvent, mapping *types.EventSourceMappingConfiguration) error { | |
closedConcurrencyString := os.Getenv("CLOSED_CONCURRENCY") | |
closedConcurrency, err := strconv.Atoi(closedConcurrencyString) | |
if err != nil { | |
return errors.Wrapf(err, "Error converting closed concurrency to int") | |
} | |
mappingOutput, err := lh.lambdaClient.UpdateEventSourceMapping(ctx, &lambda.UpdateEventSourceMappingInput{ | |
UUID: mapping.UUID, | |
Enabled: aws.Bool(true), | |
ScalingConfig: &types.ScalingConfig{ | |
MaximumConcurrency: aws.Int32(int32(closedConcurrency)), | |
}, | |
}) | |
if err != nil { | |
return errors.Wrapf(err, "Error updating event source mapping") | |
} | |
instrumentCircuitStatusUpdate(ctx, "CLOSE", event) | |
log.Debug("Updated mapping: %s", log.ToString(mappingOutput)) | |
return nil | |
} | |
func (lh *LambdaHandler) halfOpen(ctx context.Context, event cloudwatchalarmstatechange.AWSEvent, mapping *types.EventSourceMappingConfiguration) error { | |
mappingOutput, err := lh.lambdaClient.UpdateEventSourceMapping(ctx, &lambda.UpdateEventSourceMappingInput{ | |
UUID: mapping.UUID, | |
Enabled: aws.Bool(true), | |
ScalingConfig: &types.ScalingConfig{ | |
MaximumConcurrency: aws.Int32(2), | |
}, | |
}) | |
if err != nil { | |
return errors.Wrapf(err, "Error updating event source mapping") | |
} | |
instrumentCircuitStatusUpdate(ctx, "HALF_OPEN", event) | |
log.Debug("Updated mapping: %s", log.ToString(mappingOutput)) | |
return nil | |
} | |
func instrumentCircuitStatusUpdate(ctx context.Context, circuitState string, event cloudwatchalarmstatechange.AWSEvent) { | |
trace.SpanFromContext(ctx).AddEvent("circuitStateUpdate", | |
trace.WithAttributes(attribute.String("state", circuitState)), | |
trace.WithAttributes(attribute.String("alarmEvent", log.ToString(event.Detail))), | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment