Amazon Web Services – Kolejki FIFO
Pisałem ostatnio o kolejkach w Azure. Dziś trochę o AWS. Amazon udostępnił w kilku regionach (N. Virginia, Ohio, Oregon, Irlandia) kolejki FIFO. First In First Out. W przypadku zwykłej kolejki, wiadomości mogą być odczytywane przez wielu konsumentów i przetwarzane równolegle. Czy korzystając z kolejki FIFO możemy myśleć o skalowaniu aplikacji? Tak, ale…
Kolejka FIFO po udostępnieniu wiadomości „ukrywa” pozostałe do momentu, aż poprzednia nie zostanie obsłużona lub skończy się tak zwany Visibility Timeout. Korzystanie z większej ilości procesów obsługujących odczyt wiadomości z danej kolejki mija się więc z celem. Ma to sens, przecież kolejki FIFO mają zapewnić mam odpowiednią kolejność wiadomości oraz to, że każda wiadomość zostanie obsłużona dokładnie jeden raz.
Tylko co zrobić, jeżeli mamy do obsłużenia wiele wątków, spraw. Mamy np. internetowy sklep z narzędziami i kilku klientów chce w tym samym momencie kupić u nas młotki. Przecież nie będziemy czekać z obsługą drugiego klienta, aż pierwszy zakończy swoją płatność. Tworzymy drugą kolejkę? Nie. Ale po kolei.
Tworzymy kolejki FIFO
Cztery linijki Pythona i w naszej konsoli AWS powinna pojawić się kolejka FIFO. Zakładam, że mamy wcześniej skonfigurowane klucze, zaimportowane biblioteki itd.
import boto3 sqs = boto3.resource('sqs', region_name='eu-west-1') queue_name = 'FifoQueue.fifo' queue_create = sqs.create_queue(QueueName=queue_name, Attributes={'FifoQueue':'true', 'ContentBasedDeduplication':'true'})
Na co trzeba zwrócić uwagę?
- tworzymy kolejkę w jednym z regionów, które je udostępniają
- nazwa kolejki musi kończyć się suffixem .fifo
- tworzymy kolejkę FIFO 🙂 ’FifoQueue’:’true’
- deduplikację na podstawie zawartości wiadomości ustawiamy na true, wrócimy do tego jeszcze ’ContentBasedDeduplication’:’true’
Po zalogowaniu się w konsoli AWS powinniśmy zobaczyć co udało nam się stworzyć
Wysyłamy wiadomości
Mamy nową, pustą kolejkę. Trzeba coś do niej wysłać.
queue = sqs.get_queue_by_name(QueueName=queue_name) message_content = str({'title': 'Message 1'}) queue.send_message(MessageBody=message_content)
Tym razem tak łatwo nie będzie. Dostaniemy w twarz wyjątkiem An error occurred (MissingParameter) when calling the SendMessage operation: The request must contain the parameter MessageGroupId.
I tu dochodzimy do sedna sprawy. AWS umożliwia nam grupowanie wiadomości w kolejkach FIFO. Tworzymy coś na kształt kolejki w kolejce. Spróbujemy przesłać do kolejki coś takiego
Grup nie musimy w żaden sposób tworzyć. Wysyłając wiadomość do grupy ustawiamy po prostu wartośc dla atrybutu MessageGroupId. Obsługując naszych klientów chętnych zakupić młotki, możemy dla każdej sesji web utworzyć nowe id, w ten sposób stworzyć grupę i zapewnić poprawną kolejność zdarzeń dla każdego z naszych klientów. Nie wyślemy nikomu towaru, zanim za niego nie zapłaci 🙂
import boto3 sqs = boto3.resource('sqs', region_name='eu-west-1') queue_name = 'FifoQueue.fifo' queue_create = sqs.create_queue(QueueName=queue_name, Attributes={'FifoQueue':'true', 'ContentBasedDeduplication':'true'}) queue = sqs.get_queue_by_name(QueueName=queue_name) for i in range(1,5): message_content = 'G1_{0}'.format(i) queue.send_message(MessageBody=message_content, MessageGroupId='Group1') for i in range(1,3): message_content = 'G2_{0}'.format(i) queue.send_message(MessageBody=message_content, MessageGroupId='Group2')
Wszystko powinno się udać, a w naszej kolejce powinno czekać na nas 6 wiadomości. Spróbujemy coś poczytać.
Pobieramy wiadomości
Wywołamy dwukrotnie metodę receive_messages. Za każdym razem poprosimy o maksymalnie dwie wiadomości
print('First request') for message in queue.receive_messages(MaxNumberOfMessages=2): print('{0}'.format(message.body)) print('Second request') for message in queue.receive_messages(MaxNumberOfMessages=2): print('{0}'.format(message.body))
AWS nie wie, że pyta go jeden klient i za każdym razem zwraca wiadomości z innej grupy. Jeżeli dodalibyśmy trzeci request, wywoływany od razu, przed upływem Visibility Timeout, SQS nie swróci nam w nim żadnej wiadomości.
Spróbujemy teraz jedną z otrzymanych wiadomości „obsłużyć” i skasujemy ją z kolejki.
for message in queue.receive_messages(MaxNumberOfMessages=1): print('{0}'.format(message.body)) print('Deleting message') message.delete()
Po uruchomieniu, w terminalu powinniśmy zobaczyć coś podobnego do
W Waszym przypadku może to być inna wiadomość, pobrana z innej grupy.
Nasze procesy nadal starają się obsłużyć kolejkę. Wowołujemy ponownie odczyt
for message in queue.receive_messages(): print('{0}'.format(message.body)) for message in queue.receive_messages(): print('{0}'.format(message.body))
i dostajemy kolejne wiadomości. Z grupy, którą wcześniej już obsłużyliśmy (G2), jest to następna wiadomość (G2_2).
AWS i duplikaty wiadomości
Przy tworzeniu kolejki ustawiliśmy parametr ContentBasedDeduplication na true. Takie ustawienie powoduje, że AWS sam dba o to, abyśmy nie wstawili do kolejki wiadomości o tej samej treści. Spróbujcie wykonać następuący kod, który próbuje wstawić dwa razy takie same wiadomości
for i in range(1,5): message_content = '{0}'.format(i) queue.send_message(MessageBody=message_content, MessageGroupId='Group1') for i in range(1,5): message_content = '{0}'.format(i) queue.send_message(MessageBody=message_content, MessageGroupId='Group2')
Zauważcie, że próbujemy wstawiać te wiadomości do dwóch róznych grup, w każdym requeście jest inne MessageGroupId. I co? AWS przyjmie tylko pierwszą „porcję” wiadomości. Możemy to sprawdzić na przykład w konsoli AWS
Wygodne? Wygodne. Ale i niebiezpieczne. Nasza aplikacja nawet nie dowie się (albo ja nie wiem jak to zrobić), że próbowała wstawić duplikat wiadomości. Ponownie próbujemy wysłać dwie wiadomości o tej samej treści, sprawdzimy sobie jednak tym razem status odpowiedzi z serwera AWS.
message_content = 'Duplication test' response = queue.send_message(MessageBody=message_content, MessageGroupId='Group1') print('First request. Message Content: {0}, HTTP Status Code: {1}'.format(message_content, response['ResponseMetadata']['HTTPStatusCode'])) response = queue.send_message(MessageBody=message_content, MessageGroupId='Group2') print('Second request. Message Content: {0}, HTTP Status Code: {1}'.format(message_content, response['ResponseMetadata']['HTTPStatusCode']))
I wynik
Jak widać, oba żądania zakończyły się sukcesem…
Można także podczas tworzenia kolejki ustawić parametr ContentBasedDeduplication na false i samemu zająć się obsługą duplikatów. No, prawie samemu.
Sami dbamy o porządek
Tworzymy kolejki FIFO bez automatycznej obsługi duplikatów
queue_create = sqs.create_queue(QueueName=queue_name, Attributes={'FifoQueue':'true', 'ContentBasedDeduplication': 'false'})
Teraz, wysyłając wiadomości, musimy ustawić atrybut MessageDeduplicationId dla każdej wiadomości. Nie ustawiajmy tam jednak czegoś zupełnie przypadkowego. Pamiętajmy, że to na nas spoczywa teraz obowiązek zapewnienia, aby wiadomości się nie powtarzały. Identyfikatorem może być na przykład md5 z połączenia treści wiadomości i identyfikatora grupy. Spróbujmy tak zrobić.
message_content = 'Duplication test' message_group_id = 'Group1' message_deduplication_id = '{0}{1}'.format(message_group_id, message_content) response = queue.send_message(MessageBody=message_content, MessageGroupId='Group1', MessageDeduplicationId=hashlib.md5(message_deduplication_id.encode('utf-8')).hexdigest()) print('First request. Message Content: {0}, HTTP Status Code: {1}'.format(message_content, response['ResponseMetadata']['HTTPStatusCode'])) message_group_id = 'Group2' message_deduplication_id = '{0}{1}'.format(message_group_id, message_content) response = queue.send_message(MessageBody=message_content, MessageGroupId='Group2', MessageDeduplicationId=hashlib.md5(message_deduplication_id.encode('utf-8')).hexdigest()) print('Second request. Message Content: {0}, HTTP Status Code: {1}'.format(message_content, response['ResponseMetadata']['HTTPStatusCode']))
W ten sposób zapewnimy sobie możliwość zapisu „takich samych” wiadomości do różnych grup, ale jednocześnie możemy być pewni, że tak naprawdę te wiadomości dotyczą czegoś innego.
Kończymy
Dla niecierpliwych kod dostępny na GitHub. Nie zapomnijcie skasować wszystkich kolejek po skończeniu zabawy.