AWS Lambda i idempotentność

AWS Lambda i idempotentność

Czym jest idempotentność? Według Wikipedii to właściwość pewnych operacji, która pozwala na ich wielokrotne stosowanie bez zmiany wyniku. Jaki to ma związek z nami? W tym artykule pokażę jak mają się do siebie AWS Lambda i idempotentność.

W świecie aplikacji, szczególnie tych opartych o mikrosierwisy może wystąpić sytuacja, że np. jakaś wiadomość z kolejki zostanie przekazana do serwisu więcej niż jeden raz. To na nas spoczywa odpowiedzialność, żeby takie wielokrotne wywołanie np. funkcji Lambda poprzez takie samo zdarzenie przynosiło za każdym razem taki sam efekt. Nie chcemy przecież na przykład dwa razy obciążyć konta naszego klienta przy okazji jednego zakupu. Każde takie wywołanie musi być bezpieczne.

Idempotentne funkcje za każdym razem zwrócą taki sam rezultat. Bez znaczenia będzie to, ile razy zostanie on wywołana.

Ciekawy artykuł o tym jak AWS radzi sobie z tym problemem można przeczytać w The Amazon Builders’ Library. Także na stronach AWS można przeczytać artykuł o tym, jak podejść do tego problemu w przypadku funkcji Lambda. Sam pisałem już kilka razy o połączeniu SQS i funkcji Lambda. Ostatnio w przypadku Filtrowania zdarzeń wyzwalających Lambdę, a dawno temu o kolejkach FIFO.

AWS Lambda i idempotentność

Na szczęście nie zawsze będziemy musieli implementować wszystko samemu. Pokażę jak wykorzystać do rozwiązania tego problemu AWS Lambda Powertools.

Deployment

Do deploymentu wykorzystamy Serverless Framework. Ale nic nie stoi na przeszkodzie, aby podobne rozwiazanie wykorzystać np. w przypadku Serverless Application Model.

Na początku utworzymy funkcję Lambda wywoływaną przez wiadomości z kolejki SQS.

service: powertools-idempotency

frameworkVersion: '3'

provider:
  name: aws
  runtime: python3.8
  region: eu-central-1

functions:
  consumer:
    memorySize: 128
    timeout: 3
    handler: handler.handler
    events:
    - sqs:
        arn: !GetAtt EventQueue.Arn

resources:
  Transform: AWS::Serverless-2016-10-31
  Resources:
    EventQueue:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: !Sub '${AWS::Region}-${AWS::StackName}-EventQueue'
  
  Outputs:
    EventsQueue:
      Description: URL of Events Queue
      Value: !Ref EventQueue

Sama funkcja nie robi nic poza zrzuceniem paylodu w logów.

Robimy więc deployment sls deploy

sis-output

i na wyjściu powinniśmy dostać między innymi URL do naszej kolejki.

Spróbujmy więc coś do tej kolejki wysłać aws sqs send-message --queue-url <QUEUE-URL> --message-body "Testujemy kolejkę" --region eu-central-1

idempotency-mesage-sent

I jeżeli wszystko przebiegło pomyślnie, sprawdźmy logi funkcji sls logs -f consumer

idempotency-lambda-logs-1

Jeżeli wszystko zadziałało, powinniście zobaczyć log z wywołania funkcji. Na pewno się domyslacie, że każda nowa wiadomość w kolejce uruchomi funkcję, a ta zapisze dane do logów. My chcemy osiągnąć to, aby funkcja przetwarzała wiadomość z określonym body tylko jeden raz. Oczywiście w ramach konkretnego okna czasowego, które domyslnie ustawione jest na 3600 sekund, czyli 1 godzinę.
Taką funkcjinalność można oprogramować samemu, można też sięgnąć po gotowe rozwiązania. My wykorzystamy AWS Lambda Powertools.

Dodajemy AWS Lambda Powertools

Lambda Powertools dodamy do funkcji jako layer. Ale tą warstwę najpierw sobie zbudujemy. Dodajemy więc do naszego szablonu kolejny zasób:

AwsLambdaPowertoolsPythonLayer:
  Type: AWS::Serverless::Application
  Properties:
    Location:
      ApplicationId: arn:aws:serverlessrepo:eu-west-1:057560766410:applications/aws-lambda-powertools-python-layer
      SemanticVersion: 1.25.5

W chwili gdy to piszę, najnowsza wersja to 1.25.5. Można to zawsze sprawdzić tutaj:  github.com/awslabs/aws-lambda-powertools-python/releases i ewentualnie zmienić SemanticVersion. Do warstwy wrócimy jednak dopiero za chwilę. Swoją drogą, wstyd się przyznać, ale gdy zaczynałem pisać ten artykuł, to akutalą wersją było 1.24.1. 🙁

Konfiguracja

Domyślacie się, że musimy gdzieś przechowywać stan dla poszczególnych danych, które uruchomiły funkcję. Na chwilę obecną AWS Power Tools potrafi skorzystać tylko z tabeli DynamoDB. Dodamy więc taką tabelę do naszego template.

IdempotencyTable:
  Type: AWS::DynamoDB::Table
  Properties:
    AttributeDefinitions:
      -   AttributeName: id
          AttributeType: S
    KeySchema:
      -   AttributeName: id
          KeyType: HASH
    TimeToLiveSpecification:
      AttributeName: expiration
      Enabled: true
    BillingMode: PAY_PER_REQUEST

Powyżej wykorzystana jest domyślna konfiguracja tabeli. Nie stoi jednak nic na przeszkodzie, abyśmy zmienili na przykład nazwy kluczy. Możliwe jest także oczywiście „obsługiwanie” wielu funkcji Lambda za pomoca jednej tabeli.

W kolejnym kroku musimy poinformować funkcję Lambda o konieczności skorzystania z warstwy oraz dodać informację o tabeli, którą przekażemy w kodzie do Power Tools.

environment:
  IDEMPOTENCY_TABLE: !Ref IdempotencyTable
layers:
  - !GetAtt AwsLambdaPowertoolsPythonLayer.Outputs.LayerVersionArn

Utworzymy także konfigurację, która wskaże na jaki element wiadomości chcemy zwrócić uwagę. W naszym przypadku będzie to body wiadomości.

config =  IdempotencyConfig(
    event_key_jmespath="body"
)

W konfiguracji możemy zmienić między innymi okno czasowe, w którym takie same wiadomości będą traktowane jako pojedyńcze. Ustawiamy to za pomocą expires_after_seconds.

Funkcja Lambda i idempotentność

W samej funkcji mamy tylko dwie metody. handler jako handler 🙂 i wywoływany z niego synchronicznie message_consumer. Tą drugą metodę oznaczymy dekoratorem @idempotent_function, który zadba o idempotentność. W przypadku tego dekoratora musimy podać za pomocą data_keyword_argument, który parametr metody zawiera nasze dane. Przekażemy także konfigurację i magazyn danych.

Obie metody są bardzo proste:

@idempotent_function(data_keyword_argument="message",config=config, persistence_store=dynamodb)
def message_consumer(message):
    logger.info(message)

def handler(event, context):
    for m in event['Records']:
        message_consumer(message=m)

a kod całej funkcji wygląda tak:

import logging
import os
from aws_lambda_powertools.utilities.idempotency import (
    DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

TABLE_NAME = os.environ['IDEMPOTENCY_TABLE']

dynamodb = DynamoDBPersistenceLayer(table_name=TABLE_NAME)
config =  IdempotencyConfig(
    event_key_jmespath="body"
)

@idempotent_function(data_keyword_argument="message",config=config, persistence_store=dynamodb)
def message_consumer(message):
    logger.info(message)

def handler(event, context):
    for m in event['Records']:
        message_consumer(message=m)

Całość dostępna jest oczywiście w GitHub.

Działanie

Sprobujmy teraz przesłać do kolejki kilka ponumerowanych wiadomości:

aws sqs send-message --queue-url <QUEUE-URL> --region eu-central-1 --message-body "Test1"
aws sqs send-message --queue-url <QUEUE-URL> --region eu-central-1 --message-body "Test2"
aws sqs send-message --queue-url <QUEUE-URL> --region eu-central-1 --message-body "Test3"
aws sqs send-message --queue-url <QUEUE-URL> --region eu-central-1 --message-body "Test4"
aws sqs send-message --queue-url <QUEUE-URL> --region eu-central-1 --message-body "Test1"
aws sqs send-message --queue-url <QUEUE-URL> --region eu-central-1 --message-body "Test2"

Zwróćcie uwagę, że ostatnie dwa polecenia wysyłają do kolejki zduplikowane wiadomości 1 i 2.

Sprawdźmy jeszcze raz logi zapisane przez Lambdę. Przypominam, że logi zapisywane są w metodzie message_consumer, nie powinnismy więc zobaczyć duplikatów.

Tak też się dzieje. Nasza metoda zadziałała dla każdego payloadu tylko jeden raz.

Sprawdźmy jeszcze jak wyglądają dane zapisane w tabeli DynamoDB:

Jak widać, mamy zapisane poszczególne wywołania, nie ma jednak danych. Spróbujmy więc coś z naszej metody zwrócić.
Zmieńmy wielkość liter na duze i zwróćmy je z metody

@idempotent_function(data_keyword_argument="message",config=config, persistence_store=dynamodb)
def message_consumer(message):
    logger.info(message)
    m = message['body'].upper()
    return m

a w handlerze zapiszmy zwróconą wartość do CloudWatcha

def handler(event, context):
    for m in event['Records']:
        ret = message_consumer(message=m)
        logger.info("Consumer returned: {0}".format(ret))

Zobaczny co się teraz wydarzy. Na początek wyslijmy kilka nowych wiadomości do kolejki:

aws sqs send-message --queue-url <QUEUE-URL> --region eu-central-1 --message-body "Test danych 1" 
aws sqs send-message --queue-url <QUEUE-URL> --region eu-central-1 --message-body "Test danych 2" 
aws sqs send-message --queue-url <QUEUE-URL> --region eu-central-1 --message-body "Test danych 3"

Zawartości logów w CloudWatch się pewnie takie, jakich się spodziewacie

Funkcja zostala uruchomiona trzy razy i za każdym razem pracowały obie metody.

Jak teraz wyglądają dane w DynamoDB? Otóż tak:

Oprócz samych wywołań mamy zapisane także zwrócone dane. Wyślijmy więc jeszcze raz pierwsze dwie wiadomości:

aws sqs send-message --queue-url <QUEUE-URL> --region eu-central-1 --message-body "Test danych 1" 
aws sqs send-message --queue-url <QUEUE-URL> --region eu-central-1 --message-body "Test danych 2"

i sprawdźmy logi

Jak widać, ponowne uruchomienie funkcji nie spowodowało zadziałania metody message_consumer, ale zostały zwrócone poprawne dane. O to chodziło. AWS Lambda i idempotentność? Załatwione.

Wyjątki

Ważne jest zrozumienie jak zachowają się AWS Power Tools w przypadku gdy wystąpią jakieś wyjątki.

Jeżeli wyjątek będzie miał miejsce w metodzie, która nie jest oznaczona dekoratorem idempotencji i została ona wykonana później to dane pozostaną zapisane i powtórzona wiadomość nie zostanie przetworzona.

Natomiast gdy nie przechwycimy wyjątku w metodzie objętej idempotentnością to dane zostaną skasowane. Jeżeli zależy nam na zachowaniu danych, to należy wyjącek przechwycić i zwrócić z metody poprawne dane.

Podsumowanie

Według dokumentacji AWS „Standard queues provide at-least-once delivery, which means that each message is delivered at least once”. Czyli mamy gwarancję, że  konsument otrzyma wiadomość ze standardowej kolejki CO NAJMNIEJ jeden raz. Co najmniej, czyli SQS może dostarczyć konkretną wiadomość więcej niż jeden raz.

Czasami nie będzie to miało dużego znaczenia, ale może to też być krytyczne dla naszego systemu. Możemy sami próbować poradzić sobie z tym problemem, możemy także wspomóc się działającym, uzywanym przez wiele aplikacji AWS Lambda Powertools, które pomoże w załatwieniu problemu AWS Lambdy i idempotentności.

Powertoolsy potrafią zresztą o wiele więcej. Nawet odnośnie implementacji idempotentności. Poszczegóły odsyłam do dokumentacji, ale mam nadzieję, że zainteresowałem tych z Was, którzy jeszcze tego rozwiązania nie używali.

Przypominam, że całość kodu dostępna jest na moim GitHubie.

Comments are closed.