Skip to content
malak.cloud
  • Kontakt
  • O mnie
  • Search Icon

malak.cloud

Cloud Native na co dzień

Amazon Web Services – Kolejki FIFO

Amazon Web Services – Kolejki FIFO

30 lipca 2017

Pisałem ostatnio o kolejkach w Azure. Dziś trochę o AWS.  Amazon udostępnił w kilku regionach (N. Virginia, Ohio, Oregon, Irlandia) kolejki FIFO. First In First Out. W przypadku zwykłej kolejki, wiadomości mogą być odczytywane przez wielu konsumentów i przetwarzane równolegle. Czy korzystając z kolejki FIFO możemy myśleć o skalowaniu aplikacji? Tak, ale…

Kolejka FIFO po udostępnieniu wiadomości „ukrywa” pozostałe do momentu, aż poprzednia nie zostanie obsłużona lub skończy się tak zwany Visibility Timeout. Korzystanie z większej ilości procesów obsługujących odczyt wiadomości z danej kolejki mija się więc z celem. Ma to sens, przecież kolejki FIFO mają zapewnić mam odpowiednią kolejność wiadomości oraz to, że każda wiadomość zostanie obsłużona dokładnie jeden raz.

Tylko co zrobić, jeżeli mamy do obsłużenia wiele wątków, spraw. Mamy np. internetowy sklep z narzędziami i kilku klientów chce w tym samym momencie kupić u nas młotki. Przecież nie będziemy czekać z obsługą drugiego klienta, aż pierwszy zakończy swoją płatność. Tworzymy drugą kolejkę? Nie. Ale po kolei.

Tworzymy kolejki FIFO

Cztery linijki Pythona i w naszej konsoli AWS powinna pojawić się kolejka FIFO. Zakładam, że mamy wcześniej skonfigurowane klucze, zaimportowane biblioteki itd.

import boto3
sqs = boto3.resource('sqs', region_name='eu-west-1')
queue_name = 'FifoQueue.fifo'
queue_create = sqs.create_queue(QueueName=queue_name, Attributes={'FifoQueue':'true', 'ContentBasedDeduplication':'true'})

Na co trzeba zwrócić uwagę?

  • tworzymy kolejkę w jednym z regionów, które je udostępniają
  • nazwa kolejki musi kończyć się suffixem .fifo
  • tworzymy kolejkę FIFO 🙂 ’FifoQueue’:’true’
  • deduplikację na podstawie zawartości wiadomości ustawiamy na true, wrócimy do tego jeszcze ’ContentBasedDeduplication’:’true’

Po zalogowaniu się w konsoli AWS powinniśmy zobaczyć co udało nam się stworzyć

Wysyłamy wiadomości

Mamy nową, pustą kolejkę. Trzeba coś do niej wysłać.

queue = sqs.get_queue_by_name(QueueName=queue_name)
message_content = str({'title': 'Message 1'})
queue.send_message(MessageBody=message_content)

Tym razem tak łatwo nie będzie. Dostaniemy w twarz wyjątkiem An error occurred (MissingParameter) when calling the SendMessage operation: The request must contain the parameter MessageGroupId.

I tu dochodzimy do sedna sprawy. AWS umożliwia nam grupowanie wiadomości w kolejkach FIFO. Tworzymy coś na kształt kolejki w kolejce. Spróbujemy przesłać do kolejki coś takiego

Grup nie musimy w żaden sposób tworzyć. Wysyłając wiadomość do grupy ustawiamy po prostu wartośc dla atrybutu MessageGroupId. Obsługując naszych klientów chętnych zakupić młotki, możemy dla każdej sesji web utworzyć nowe id, w ten sposób stworzyć grupę i zapewnić poprawną kolejność zdarzeń dla każdego z naszych klientów. Nie wyślemy nikomu towaru, zanim za niego nie zapłaci 🙂

import boto3
sqs = boto3.resource('sqs', region_name='eu-west-1')
queue_name = 'FifoQueue.fifo'
queue_create = sqs.create_queue(QueueName=queue_name, Attributes={'FifoQueue':'true', 'ContentBasedDeduplication':'true'})

queue = sqs.get_queue_by_name(QueueName=queue_name)

for i in range(1,5):
    message_content = 'G1_{0}'.format(i)
    queue.send_message(MessageBody=message_content, MessageGroupId='Group1')

for i in range(1,3):
    message_content = 'G2_{0}'.format(i)
    queue.send_message(MessageBody=message_content, MessageGroupId='Group2')

Wszystko powinno się udać, a w naszej kolejce powinno czekać na nas 6 wiadomości. Spróbujemy coś poczytać.

Pobieramy wiadomości

Wywołamy dwukrotnie metodę receive_messages. Za każdym razem poprosimy o maksymalnie dwie wiadomości

print('First request')
for message in queue.receive_messages(MaxNumberOfMessages=2):
    print('{0}'.format(message.body))

print('Second request')
for message in queue.receive_messages(MaxNumberOfMessages=2):
    print('{0}'.format(message.body))

AWS nie wie, że pyta go jeden klient i za każdym razem zwraca wiadomości z innej grupy. Jeżeli dodalibyśmy trzeci request, wywoływany od razu, przed upływem Visibility Timeout, SQS nie swróci nam w nim żadnej wiadomości.

Spróbujemy teraz jedną z otrzymanych wiadomości „obsłużyć” i skasujemy ją z kolejki.

for message in queue.receive_messages(MaxNumberOfMessages=1):
    print('{0}'.format(message.body))
    print('Deleting message')
    message.delete()

Po uruchomieniu, w terminalu powinniśmy zobaczyć coś podobnego do

W Waszym przypadku może to być inna wiadomość, pobrana z innej grupy.

Nasze procesy nadal starają się obsłużyć kolejkę. Wowołujemy ponownie odczyt

for message in queue.receive_messages():
    print('{0}'.format(message.body))

for message in queue.receive_messages():
    print('{0}'.format(message.body))

i dostajemy kolejne wiadomości. Z grupy, którą wcześniej już obsłużyliśmy (G2), jest to następna wiadomość (G2_2).

AWS i duplikaty wiadomości

Przy tworzeniu kolejki ustawiliśmy parametr ContentBasedDeduplication na true. Takie ustawienie powoduje, że AWS sam dba o to, abyśmy nie wstawili do kolejki wiadomości o tej samej treści. Spróbujcie wykonać następuący kod, który próbuje wstawić dwa razy takie same wiadomości

for i in range(1,5):
    message_content = '{0}'.format(i)
    queue.send_message(MessageBody=message_content, MessageGroupId='Group1')

for i in range(1,5):
    message_content = '{0}'.format(i)
    queue.send_message(MessageBody=message_content, MessageGroupId='Group2')

Zauważcie, że próbujemy wstawiać te wiadomości do dwóch róznych grup, w każdym requeście jest inne MessageGroupId. I co? AWS przyjmie tylko pierwszą „porcję” wiadomości. Możemy to sprawdzić na przykład w konsoli AWS

Wygodne? Wygodne. Ale i niebiezpieczne. Nasza aplikacja nawet nie dowie się (albo ja nie wiem jak to zrobić), że próbowała wstawić duplikat wiadomości. Ponownie próbujemy wysłać dwie wiadomości o tej samej treści, sprawdzimy sobie jednak tym razem status odpowiedzi z serwera AWS.

message_content = 'Duplication test'
response = queue.send_message(MessageBody=message_content, MessageGroupId='Group1')
print('First request. Message Content: {0}, HTTP Status Code: {1}'.format(message_content, response['ResponseMetadata']['HTTPStatusCode']))
response = queue.send_message(MessageBody=message_content, MessageGroupId='Group2')
print('Second request. Message Content: {0}, HTTP Status Code: {1}'.format(message_content, response['ResponseMetadata']['HTTPStatusCode']))

I wynik

Jak widać, oba żądania zakończyły się sukcesem…

Można także podczas tworzenia kolejki ustawić parametr ContentBasedDeduplication na false i samemu zająć się obsługą duplikatów. No, prawie samemu.

Sami dbamy o porządek

Tworzymy kolejki FIFO bez automatycznej obsługi duplikatów

queue_create = sqs.create_queue(QueueName=queue_name,
 Attributes={'FifoQueue':'true', 'ContentBasedDeduplication': 'false'})

Teraz, wysyłając wiadomości, musimy ustawić atrybut MessageDeduplicationId dla każdej wiadomości. Nie ustawiajmy tam jednak czegoś zupełnie przypadkowego. Pamiętajmy, że to na nas spoczywa teraz obowiązek zapewnienia, aby wiadomości się nie powtarzały. Identyfikatorem może być na przykład md5 z połączenia treści wiadomości i identyfikatora grupy. Spróbujmy tak zrobić.

message_content = 'Duplication test'
message_group_id = 'Group1'
message_deduplication_id = '{0}{1}'.format(message_group_id, message_content)
response = queue.send_message(MessageBody=message_content, MessageGroupId='Group1', MessageDeduplicationId=hashlib.md5(message_deduplication_id.encode('utf-8')).hexdigest())
print('First request. Message Content: {0}, HTTP Status Code: {1}'.format(message_content, response['ResponseMetadata']['HTTPStatusCode']))
message_group_id = 'Group2'
message_deduplication_id = '{0}{1}'.format(message_group_id, message_content)
response = queue.send_message(MessageBody=message_content, MessageGroupId='Group2', MessageDeduplicationId=hashlib.md5(message_deduplication_id.encode('utf-8')).hexdigest())
print('Second request. Message Content: {0}, HTTP Status Code: {1}'.format(message_content, response['ResponseMetadata']['HTTPStatusCode']))

W ten sposób zapewnimy sobie możliwość zapisu „takich samych” wiadomości do różnych grup, ale jednocześnie możemy być pewni, że tak naprawdę te wiadomości dotyczą czegoś innego.

Kończymy

Dla niecierpliwych kod dostępny na GitHub. Nie zapomnijcie skasować wszystkich kolejek po skończeniu zabawy.


AWS, CloudNative, DEV
AWS, Dev, serverless

Post navigation

PREVIOUS
Pierwsza kolejka wiadomości w Azure
NEXT
Czyścimy Dockera
Comments are closed.
Cześć. Nazywam się Przemek Malak. Dzięki za wizytę. Mam nadzieję, że to o czym piszę Cię zainteresowało. Jeżeli chcesz ze mną pogadać, najłatwiej będzie przez LinkedIn.

Losowe wpisy

  • Jak skasować pliki w S3 przy usuwaniu stacka Cloudformation

    18 stycznia 2022
  • Czym jest dla mnie Cloud Native

    20 listopada 2020
  • AWS Lambda i idempotentność

    21 marca 2022
  • AWS Lambda – nowy edytor

    1 grudnia 2017
  • Jak przesłać plik do S3 za pomocą API Gateway

    16 listopada 2022
  • Apps
  • AWS
  • CloudNative
  • Cookbook
  • Data
  • DEV
  • GCP
  • IoT
  • Istio
  • k8s
  • Security
  • Social
  • GitHub
  • LinkedIn
© 2023   All Rights Reserved.