Фильтры событий Lambda SQS могут удалить ваши сообщения, если вы не будете осторожны

В одном из недавних сообщений я писал о фильтрах событий Lambda и их преимуществах. Это было продолжение моего выступления на эту тему на tecRacer. Во время этого выступления мой коллега Себастьян Мён задал интересный вопрос о фильтрах, который я рассмотрю сегодня. Он хотел узнать, что происходит с сообщениями SQS, которые не соответствуют фильтрам.

Мы перейдем к этому немного позже, но сначала давайте сделаем шаг назад и кратко расскажем о фильтрах событий Lambda. Они были введены в конце 2021 года и позволяют фильтровать события из SQS, Kinesis или DynamoDB до того, как они вызовут вашу функцию Lambda. Ранее все события из этих источников запускали Lambda, и вам приходилось фильтровать их в собственном коде, что добавляло сложности и затрат, поскольку функция вызывалась даже для событий, которые ее не волновали. С фильтрами событий ситуация меняется. Теперь мы можем разгрузить недифференцированную тяжелую работу по фильтрации событий на AWS без дополнительных затрат.

Это интересный вопрос, как система реагирует на сообщения от SQS, которые не соответствуют фильтру, потому что SQS отличается от двух других источников событий, которые поддерживают фильтр. DynamoDB Streams и Kinesis работают с потоковыми данными. Частью этой парадигмы является то, что несколько сторон могут читать одну и ту же точку данных из потока. Это означает, что для других клиентов не имеет значения, если один из клиентов не заинтересован во всех сообщениях.

SQS, с другой стороны, реализует модель производитель-потребитель. Это означает, что один или несколько производителей создают сообщения, которые могут быть обработаны одним или несколькими потребителями. Сообщения обычно обрабатываются только одним потребителем в пуле потребителей, поэтому в большинстве случаев это ведет себя как обмен сообщениями один на один. Кроме того, у потребителей нет прямого способа выбора сообщений, с которыми они хотят работать, они должны обрабатывать все, что дает им SQS.

Это имеет последствия для фильтров событий между SQS-Queues и Lambda-функциями. Если фильтр отклоняет входящее сообщение от SQS, может произойти одно из двух событий:

  1. Сообщение считается обработанным и удаляется из очереди, не позволяя другим потребителям прочитать его.
  2. Сообщение считается необработанным и не удаляется из очереди. Оно снова становится доступным для (других) потребителей после истечения таймаута видимости.

В первом случае это означает, что сообщения удаляются из очереди, даже если другие потребители могут их обработать. Это означает, что один потребитель Lambda в очереди, которому не нужно сообщение, имеет побочные эффекты для всех остальных потребителей. В зависимости от вашей архитектуры это может быть проблемой.

Второй случай может привести к внутренним проблемам в AWS, если мы предположим, что Lambda под капотом использует тот же SQS-API, который также находится в открытом доступе. Чтобы понять почему, необходимо знать, как Lambda интегрируется с SQS. Когда вы устанавливаете SQS-триггер для функции Lambda, служба Lambda будет периодически опрашивать очередь на предмет работы и вызывать ваш код всякий раз, когда будут сообщения для обработки. Это происходит в фоновом режиме, и вам не нужно беспокоиться об этом.

Если предположить, что Lambda использует те же API, что и мы, то у службы нет возможности пропускать уже просмотренные и проигнорированные сообщения. Поэтому если в очереди только один потребитель Lambda, а отброшенные сообщения остаются в очереди, то со временем мы можем столкнуться с перегрузкой. Это было бы менее чем идеальным дизайном, поэтому я предполагаю, что в действительности служба Lambda действительно удаляет сообщения, которые не соответствуют фильтру.

Давайте посмотрим, что по этому поводу говорит документация:

Вы можете определить до пяти различных фильтров для одного источника событий. Если событие удовлетворяет любому из этих пяти фильтров, Lambda отправляет событие в вашу функцию. В противном случае Lambda отбрасывает событие.

— Фильтрация событий в Lambda, 15.01.2022

Я не носитель языка, но для меня слово «отбрасывает» немного двусмысленно. Это может означать либо то, что Lambda игнорирует сообщение и оно возвращается в очередь, либо то, что служба также удаляет событие из очереди. Я не большой поклонник двусмысленности в технической документации. Давайте посмотрим сами, что произойдет.

Мы будем использовать следующую тестовую установку, код доступен на Github. CDK-приложение развертывает SQS-Queue и Lambda-функцию, которая запускается сообщениями в Queue. Источник событий будет отфильтрован, чтобы принимать только те сообщения, которые имеют атрибут process_with_lambda со значением 1. Затем лямбда будет увеличивать счетчик в таблице DynamoDB, который подсчитывает, сколько сообщений было обработано.

У нас также будет скрипт, который будет посылать поочередно сообщения в очередь (job_generator.py). Первое сообщение будет содержать атрибут process_with_lambda, а второе — нет. Этот скрипт отправит в очередь по 20 сообщений каждого вида, после чего мы подождем несколько секунд. Если обработка сработала, то в таблице мы должны увидеть значение счетчика 20. Затем мы проверим, сколько сообщений все еще находится в очереди.

Если это число равно 0, мы знаем, что Lambda действует в соответствии с пунктом 1), а «discarded» означает, что сообщения удалены из очереди. Если в очереди ненулевое число, Lambda отправляет сообщения обратно в очередь для дальнейшей обработки. Давайте запустим код:

$ python job_generator.py
Removing summary item from table
Purging Queue
Purging queues can take up to 60 seconds, waiting...
Sending Message Group 1 with 2 messages
Sending Message Group 2 with 2 messages
Sending Message Group 3 with 2 messages
[...]
Sending Message Group 18 with 2 messages
Sending Message Group 19 with 2 messages
Sending Message Group 20 with 2 messages
Вход в полноэкранный режим Выход из полноэкранного режима

Сначала скрипт сбрасывает счетчик в таблице и удаляет уже существующие сообщения из очереди. Поскольку это может занять до 60 секунд, я добавил здесь sleep на одну минуту. После этого мы отправляем в очередь в общей сложности 40 сообщений. Затем нужно подождать несколько секунд, чтобы дать время Lambda обработать сообщения асинхронно. После этого мы можем запустить сценарий, чтобы получить результаты:

$ python get_result.py   
Lambda processed 20 records
The Queue contained 0 messages
Войти в полноэкранный режим Выйти из полноэкранного режима

Как и ожидалось, функция Lambda получила сообщения в соответствии с фильтром. Оказывается, вариант 1) — это то, что реализовано AWS, когда речь идет о работе с сообщениями, не соответствующими фильтру. Сообщения, которые не соответствуют ему, просто удаляются из очереди.

Заключение

Мы узнали, что Lambda Event Filter удаляет сообщения из очереди, если они не соответствуют критериям фильтра. Это не совсем ясно из документации, но это разумная реализация, учитывая ограничения, накладываемые SQS API.

Эта реализация имеет последствия для очередей, которые имеют несколько потребителей сообщений. Если вы добавите в набор потребителей функцию Lambda с фильтром событий, то может произойти потеря данных, если другие подписчики смогут обрабатывать сообщения, не предназначенные для Lambda. В этом случае вы потеряете сообщения без четкого указания причины.

В качестве обходного пути можно использовать тему SNS перед очередью, чтобы рассылать сообщения нескольким заинтересованным сторонам, создавая очередь для каждой стороны. Однако это повлечет за собой дополнительные расходы.

Было бы здорово, если бы AWS устранил здесь двусмысленность, а в идеале добавил предупреждение в консоль при добавлении фильтров. Если вы знаете об этом, вы можете планировать это — если вы не знаете, вы столкнетесь с неожиданным поведением. #awswishlist

Надеюсь, вам понравилась эта статья, и если у вас есть отзывы, вопросы или проблемы, не стесняйтесь обращаться к нам через каналы социальных сетей, указанные в моей биографии.

Оставьте комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *