Pierwsza kolejka wiadomości w Azure
Chyba każdy programista miał do czynienia z kolejkami. A większość z tych, którzy studiowali informatykę, jakąś pewnie nawet zaimplementowała. Po co kolejki? Pozwalają nam na asynchroniczną komunikację pomiędzy róznymi apliakcjami, serwisami. Możemy odseparować poszczególne elementy naszego rozwiązania i uruchamiać je niezależnie. Każdy taki element może pracować w innym miejscu. Może to być chmura, nasz komputer, czy też serwer umieszczony w naszej firmie. Pozwala to na budowanie różnego rodzaju workflows, ułatwia skalowanie rozwiązań oraz uniezależnia je od siebie w pewnym stopniu. Ostatnio pisałem trochę o Amazon Web Services. A jak zrobić kolejkę w Azure? Zaraz się dowiecie.
Kolejka jest zasilana wiadomościami przez tak zwanego producenta, a pobiera i je obsługuje konsument.
Maksymalna wielkość wiadomości w Azure to 64kB. Mało? Mało. Cieszmy się z tego, kiedyś było to 8kB. Jeszcze gorzej to wygląda, jeżeli będziemy chcieli zapisać dane binarne zakodowane w base64. W takim przypadku możemy spodziewać się, że nasze dane zajmą nawet 33% więcej przestrzeni. Czyli zostaje nam jakieś 48kB. Amazon Web Services pozwala na przesłanie 256kB. Lepiej, ale ani w jednym, ani w drugim przypadku nie możemy pozwolić sobie na przesłanie dużej ilości danych w wiadomości. Musimy ograniczyć się do opisania zadania i ewentualnie, podania linka do danych. Danych zapisanych w innym miejscu.
Jak zrobić kolejkę w Azure.
Aby uruchomić kolejki w Azure będziemy potrzebowali konto storage.
Po utworzeniu konta, zanotujmy jego nazwę i klucz api. Są one dostępne w zakładce Access keys
.
Pobawimy się kolejkami korzystając z Pythona. Dostępne jest oczywiście SDK, nie bedziemy musieli więc korzystać z zabawy w wywoływanie połączeń REST.
Tworzymy na dysku katalog, w którym napiszemy nasz kod i dodajemy biblioteki Azure. Można je instalować indywidualnie, my jednak, dla ułatwienia, zainstalujemy sobie wszystko. Wchodzimy do naszego katalogu i uruchamiamy: pip install azure
Po chwili powinniśmy mieć zainstalowane wszystkie biblioteki.
Teraz tworzymy plik, w którym napiszemy nasz kod i zaczynamy zabawę. W każdym przypadku, czy to tworząc kolejkę, czy wysyłając lub pobierając wiadomości musimy utworzyć QueueService.
from azure.storage.queue import QueueService queue_service = QueueService(account_name='nazwa naszego konta', account_key='nasz klucz api')
W kodzie wpisujemy oczywiście nasze wartości :-).
Tworzymy kolejkę
Aby utworzyć kolejkę, skorzystamy z metody create_queue. Przyjmuje ona jako parametr nazwę tworzonej kolejki. Dodajemy więc kolejną linię do naszego programu:
queue_service.create_queue('testqueue')
Trzeba pamięać, że w nazwie kolejki mogą być tylko małe litery i cyfry.
I to wszystko czego potrzebujemy, żeby utworzyć naszą pierwszą kolejkę w Azure. Uruchamiamy program, wracamy na nasze konto Azure i po chwili powinniśmy zobaczyć naszą nową kolejkę w portalu.
Wysyłamy
Mamy kolejkę wiadomości, ale bez wiadomości. Czas coś do niej wysłać. I znowu wystarczy jedna linijka kodu. Skorzystamy z metody put_message. Metoda ta przyjmuje 4 parametry: nazwę kolejki, treść wiadomości, visibility timeout, ttl oraz timeout. wymagane są pierwsze dwa i nie wymagają one wyjasnień. Pozostałe to:
- visibility timeout – czas, wyrażony w sekundach, po którym wiadomość ma być widoczna.
- message ttl – time to live, wyrażony w sekundach, czas prez który wiadomość ma być dostępna do obsłużenia przez konsumenta. Po tym czasie znika, nawej jeżeli nie zostanie obsłużona; domyślna wartość to 7 dni
- timeout – timeut samego zapytania; domyślnie 30 sekund
Najlepiej będzie to wyjaśnić na przykładzie. Załóżmy, że wywołujemy metodę
queue_service.put_message('testqueue', 'Moja pierwsza wiadomość', 0, 30, 30)
23 lipca 2017 roku, o godzinie 15:52:28 (Nie bawimy się w strefy czasowe).
Wiadomość jest widoczna w systemie od razu, ale po 30 sekundach zniknie. Nawet jeżeli nie zostanie skasowana przez konsumenta.
Poprzednia wiadomość nam zniknęła, utworzymy sobie więc pięć nowych
from azure.storage.queue import QueueService queue_service = QueueService(account_name='nazwa konta', account_key='klucz api') queue_service.create_queue('testqueue') for i in range(1, 6): queue_service.put_message('testqueue', 'Message No {0}'.format(i))
Po wykonaniu w portalu powinniśmy mieć 5 nowych wiadomości.
Jak widać, będą one dostępne dla konsumentów przez domyślne 7 dni. My jednak nie będziemy tak długo czekali i zabierzemy się za ich odczytanie natychmiast.
Odczytujemy
Samo wysłanie wiadomości to już sukces. Ale nie pełen. Załóżmy, że zapisaliśmy zamówienie od klienta, ale nie zostało one wysłane do magazynu. Klient nie dostanie towaru i… No właśnie.
Do sprawdzenia co czeka dla nas w kolejce możemy wykorzystać dwie metody: peek_messages lub get_messages. Słowo 'lub’ nie jest może zbyt fortunne. Sprawdzamy metodą peek_messages, ale w celu obsłużenia wiadomości powinniśmy ją pobrać za pomocą get_messages. Metoda peek_messages nie „chowa” naszej wiadomości przed innymi konsumentami i poza nami mogłaby ona zostać obsłużona przez inny proces. W naszym hipotetycznym przypaku, może to spowodować wysłanie dwóch takich samych zamówień do naszego klienta. Przykładowy podgląd zawartości naszej kolejki mógłby jednak wyglądać tak:
messages = queue_service.peek_messages('testqueue', 20) for m in messages: print(m.content)
Drugim parametrem ustawiamy ilość wiadomości. Domyślnie metoda zwróci nam jedną.
Zabieramy się na poważnie za obsługę naszych wiadomości. Użyjemy metody get_messages, pobierzemy jedną wiadomość i ukryjemy ją przed innymi konsumentami.
messages = queue_service.get_messages('testqueue', 1)
Jeżeli teraz szybko (w ciągu 30 sekund) odświeżymy listę naszych wiadomości w portalu to zobaczymy, że dostepne są tylko 4 z nich. Nie widać pobranej przez nas wiadomości
Jeżeli poczekamy jeszcze chwilę i odświeżymy widok naszego portalu, to nasza wiadomość pojawi się w nim, i będzie ponownie dostępna dla wszystkich. Jeżeli więc obsłużyliśmy wiadomość i nie chcemy, żeby ktoś jeszcze raz mógł sie nią zająć, musimy ją usunąć z kolejki. Ale o tym za moment. W tej chwili skupimy się jeszcze na możliwościach samej metody get_messages. Poza podaniem nazwy kolejki, z której chcemy pobrać wiadomości, możemy także ustawić ilość pobieranych wiadomości, oraz czas, przez który wiadomość nie będzie widoczna dla innych konsumentów. Ustawienie tej drugiej wartości jest szczególnie ważne, jeżeli przetworzenie wiadomości może zająć dłużej, niż domyślne 30 sekund. Wywołanie metody np. w taki sposób
messages = queue_service.get_messages('testqueue', 2, 60)
zwróci nam dwie wiadomości, oraz sprawi, że nie będą one widoczne dla innych przez minutę (60 sekund).
Kasujemy
Wykonaliśmy pełną listę zadań związaną z wiadomością, nie było żadnych błędów. Trzeba więc wiadomośc skasować. Do tego celu służy metoda delete_message. Poniższy kod pobiera dwie wiadomości, „obsługuje je” i kasuje z kolejki
messages = queue_service.get_messages('testqueue', 2) for m in messages: print(m.content) queue_service.delete_message('testqueue', m.id, m.pop_receipt)
W tym momencie wiadomości znikaja na zawsze i nikt nie może ich już ponownie pobrać.
Co dalej
Jeżeli macie jakieś problemy z kodem, tutaj możecie go pobrać. Odkomentujcie to, co w danym momencie potrzebujecie i do dzieła.
Zastosowań kolejek jest msóstwo. Naprawdę. Teraz można ułatwić sobie życie i zamiast instalować np. RabbitMQ możemy skorzystać z rozwiązań, o które dbają inni. Zapominamy w tym momencie o aktualizacjach, patchowaniu itd. Koszty? Naprawdę znikome. Możecie skorzystać z kalkulatora i sprawdzić jak by to wyglądało w Waszym przypadku. Podobne rowiązania oferują także Amazon i Google.