Кафка что это такое
Apache Kafka: обзор
Сегодня мы предлагаем вам сравнительно краткую, но при этом толковую и информативную статью об устройстве и вариантах применения Apache Kafka. Рассчитываем перевести и выпустить книгу Нии Нархид (Neha Narkhede) et. al до конца лета.
Приятного чтения!
Введение
Сегодня много говорят о Kafka. Многие ведущие айтишные компании уже активно и успешно пользуются этим инструментом. Но что же такое Kafka?
Kafka был разработан в компании LinkedIn в 2011 году и с тех пор значительно усовершенствовался. Сегодня Kafka – это целая платформа, обеспечивающая избыточность, достаточную для хранения абсурдно огромных объемов данных. Здесь предоставляется шина сообщений с колоссальной пропускной способностью, на которой можно в реальном времени обрабатывать абсолютно все проходящие через нее данные.
Все это классно, однако, если свести Kafka к сухому остатку, то получится распределенный горизонтально масштабируемый отказоустойчивый журнал коммитов.
Мудрено звучит. Давайте рассмотрим каждый из этих терминов и разберемся, что он означает. А затем подробно исследуем, как все это работает.
Распределенной называется такая система, которая в сегментированном виде работает сразу на множестве машин, образующих цельный кластер; поэтому для конечного пользователя они выглядят как единый узел. Распределенность Kafka заключается в том, что хранение, получение и рассылка сообщений у него организовано на разных узлах (так называемых «брокерах»).
Важнейшие плюсы такого подхода – высокодоступность и отказоустойчивость.
Давайте сначала определимся с тем, что такое вертикальная масштабируемость. Допустим, у нас есть традиционный сервер базы данных, и он постепенно перестает справляться с нарастающей нагрузкой. Чтобы справиться с этой проблемой, можно просто нарастить ресурсы (CPU, RAM, SSD) на сервере. Это и есть вертикальное масштабирование – на машину навешиваются дополнительные ресурсы. При таком «масштабировании вверх» возникает два серьезных недостатка:
После определенного порога горизонтальное масштабирование становится гораздо дешевле вертикального
Для нераспределенных систем характерно наличие так называемой единой точки отказа. Если единственный сервер вашей базы данных по какой-то причине откажет – вы попали.
Распределенные системы проектируются таким образом, чтобы их конфигурацию можно было корректировать, подстраиваясь под отказы. Кластер Kafka из пяти узлов остается работоспособным, даже если два узла лягут. Необходимо отметить, что для обеспечения отказоустойчивости обязательно приходится частично жертвовать производительностью, поскольку чем лучше ваша система переносит отказы, тем ниже ее производительность.
Журнал коммитов (также именуемый «журнал опережающей записи», «журнал транзакций») – это долговременная упорядоченная структура данных, причем, данные в такую структуру можно только добавлять. Записи из этого журнала нельзя ни изменять, ни удалять. Информация считывается слева направо; таким образом гарантируется правильный порядок элементов.
Схема журнала коммитов
— Вы имеете в виду, что структура данных в Kafka настолько проста?
Во многих отношениях — да. Эта структура образует самую сердцевину Kafka и абсолютно бесценна, поскольку обеспечивает упорядоченность, а упорядоченность – детерминированную обработку. Обе эти проблемы в распределенных системах решаются с трудом.
В сущности, Kafka хранит все свои сообщения на диске (подробнее об этом ниже), а при упорядочивании сообщений в виде вышеописанной структуры можно пользоваться последовательным считыванием с диска.
Как все это работает?
Приложения (генераторы) посылают сообщения (записи) на узел Kafka (брокер), и указанные сообщения обрабатываются другими приложениями, так называемыми потребителями. Указанные сообщения сохраняются в теме, a потребители подписываются на тему для получения новых сообщений.
Темы могут разрастаться, поэтому крупные темы подразделяются на более мелкие секции для улучшения производительности и масштабируемости. (пример: допустим, вы сохраняли пользовательские запросы на вход в систему; в таком случае можно распределить их по первому символу в имени пользователя)
Kafka гарантирует, что все сообщения в пределах секции будут упорядочены именно в той последовательности, в которой поступили. Конкретное сообщение можно найти по его смещению, которое можно считать обычным индексом в массиве, порядковым номером, который увеличивается на единицу для каждого нового сообщения в данной секции.
В Kafka соблюдается принцип «тупой брокер – умный потребитель». Таким образом, Kafka не отслеживает, какие записи считываются потребителем и после этого удаляются, а просто хранит их в течение заданного периода времени (например, суток), либо до тех пор, пока не будет достигнут некоторый порог. Потребители сами опрашивают Kafka, не появилось ли у него новых сообщений, и указывают, какие записи им нужно прочесть. Таким образом, они могут увеличивать или уменьшать смещение, переходя к нужной записи; при этом события могут переигрываться или повторно обрабатываться.
Следует отметить, что на самом деле речь идет не об одиночных потребителях, а о группах, в каждой из которых – один или более процессов-потребителей. Чтобы не допустить ситуации, когда два процесса могли бы дважды прочесть одно и то же сообщение, каждая секция привязывается лишь к одному процессу-потребителю в пределах группы.
Так устроен поток данных
Долговременное хранение на диске
Как упоминалось выше, Kafka на самом деле хранит свои записи на диске и ничего не держит в оперативной памяти. Да, возможен вопрос, есть ли в этом хоть капля смысла. Но в Kafka действует множество оптимизаций, благодаря которым такое становится осуществимым:
Распределение и репликация данных
Теперь давайте обсудим, как в Kafka достигается отказоустойчивость, и как он распределяет данные между узлами.
Данные с сегмента реплицируются на множестве брокеров, чтобы данные сохранились, если один из брокеров откажет.
В любом случае, один из брокеров всегда “владеет” секцией: этот брокер — именно тот, на котором приложения выполняют операции считывания и записи в секцию. Этот брокер называется «ведущим секции». Он реплицирует получаемые данные на N других брокеров, так называемых ведомыми. На ведомых также хранятся данные, и любой из них может быть выбран в качестве ведущего, если актуальный ведущий откажет.
Так можно сконфигурировать гарантии, обеспечивающие, что любое сообщение, которое будет успешно опубликовано, не потеряется. Когда есть возможность изменить коэффициент репликации, можно частично пожертвовать производительностью ради повышенной защиты и долговечности данных (в зависимости от того, насколько они критичны).
Таким образом, если один из ведущих когда-нибудь откажет, его место может занять ведомый.
Однако, логично спросить:
— Как генератор/потребитель узнает, какой брокер – ведущий данной секции?
Чтобы генератор/потребитель мог записывать/считывать информацию в данной секции, приложению нужно знать, какой из брокеров здесь ведущий, верно? Эту информацию нужно где-то взять.
Для хранения таких метаданных в Kafka используется сервис под названием Zookeeper.
Что такое Zookeeper?
Zookeeper – это распределенное хранилище ключей и значений. Оно сильно оптимизировано для считывания, но записи в нем происходят медленнее. Чаще всего Zookeeper применяется для хранения метаданных и обработки механизмов кластеризации (пульс, распределенные операции обновления/конфигурации, т.д.).
Таким образом, клиенты этого сервиса (брокеры Kafka) могут на него подписываться – и будут получать информацию о любых изменениях, которые могут произойти. Именно так брокеры узнают, когда ведущий в секции меняется. Zookeeper исключительно отказоустойчив (как и должно быть), поскольку Kafka сильно от него зависит.
Он используется для хранения всевозможных метаданных, в частности:
Ранее Генератор и Потребители непосредственно подключались к Zookeeper и узнавали у него эту (а также другую) информацию. Теперь Kafka уходит от такой связки и, начиная, соответственно, с версий 0.8 и 0.9 клиенты, во-первых, выбирают метаданные непосредственно у брокеров Kafka, а брокеры обращаются к Zookeeper.
Потоковый процессор в Kafka отвечает за всю следующую работу: принимает непрерывные потоки данных от входных тем, каким-то образом обрабатывает этот ввод и подает поток данных на выходные темы (либо на внешние сервисы, базы данных, в корзину, да куда угодно…)
Простую обработку можно выполнять непосредственно на API генераторов/потребителей, однако, более сложные преобразования – например, объединение потоков, в Kafka выполняется при помощи интегрированной библиотеки Streams API.
Этот API предназначен для использования в рамках вашей собственной базы кода, на брокере он не работает. Функционально он подобен API потребителя, облегчает горизонтальное масштабирование обработки потоков и распределение его между несколькими приложениями (подобными группам потребителей).
Обработка без сохранения состояния
Обработка без сохранения состояния — это поток детерминированной обработки, не зависящий ни от каких внешних факторов. В качестве примера рассмотрим вот такое простое преобразование данных: прикрепляем информацию к строке
Важно понимать, что потоки и таблицы – это, в сущности, одно и то же. Поток можно интерпретировать как таблицу, а таблицу – как поток.
Если обратить внимание, как выполняется синхронная репликация базы данных, то очевидно, что речь идет о потоковой репликации, где любые изменения в таблицах отправляются на сервер копий (реплику). Поток Kafka можно интерпретировать точно так же – как поток обновлений для данных, которые агрегируются и дают конечный результат, фигурирующий в таблице. Такие потоки сохраняются в локальной RocksDB (по умолчанию) и называются KTable.
Таблицу можно считать мгновенным снимком, отражающим последнее значение для каждого ключа в потоке. Аналогично, из потоковых записей можно составить таблицу, а из обновлений таблицы — сформировать поток с логом изменений.
При каждом обновлении можно сделать мгновенный снимок потока (запись)
Обработка с сохранением состояния
Проблема с поддержанием состояния в потоковых процессорах заключается в том, что эти процессоры иногда отказывают! Где же хранить это состояние, чтобы обеспечить отказоустойчивость?
Упрощенный подход – просто хранить все состояние в удаленной базе данных и подключаться к этому хранилищу по сети. Проблема в том, что тогда теряется локальность данных, а сами данные многократно перегоняются по сети – оба фактора существенно тормозят ваше приложение. Более тонкая, но важная проблема заключается в том, что активность вашего задания потоковой обработки будет жестко зависеть от удаленной базы данных – то есть, это задание будет несамодостаточным (вся ваша обработка может рухнуть, если другая команда внесет в базу данных какие-то изменения).
Итак, какой же подход лучше?
Вновь вспомним о дуализме таблиц и потоков. Именно благодаря этому свойству потоки можно преобразовывать в таблицы, расположенные именно там, где происходит обработка. Также при этом получаем механизм, обеспечивающий отказоустойчивость – мы храним потоки на брокере Kafka.
Потоковый процессор может сохранять свое состояние в локальной таблице (например, в RocksDB), которую будет обновлять входной поток (возможно, после каких-либо произвольных преобразований). Если этот процесс сорвется, то мы сможем восстановить соответствующие данные, повторно воспроизведя поток.
Можно добиться даже того, чтобы удаленная база данных генерировала поток и, фактически, широковещательно передавала лог изменений, на основании которого вы будете перестраивать таблицу на локальной машине.
Обработка с сохранением состояния, соединение KStream с KTable
Как правило, код для обработки потоков приходится писать на одном из языков для JVM, поскольку именно с ней работает единственный официальный клиент Kafka Streams API.
Образец установки KSQL
KSQL – это новая фича, позволяющая писать простые потоковые задания на знакомом языке, напоминающем SQL.
Настраиваем сервер KSQL и интерактивно запрашиваем его через CLI для управления обработкой. Он работает точно с теми же абстракциями (KStream и KTable), гарантирует те же преимущества, что и Streams API (масштабируемость, отказоустойчивость) и значительно упрощает работу с потоками.
Возможно, все это звучит не вдохновляюще, но на практике очень полезно для тестирования материала. Более того, такая модель позволяет приобщиться к потоковой обработке даже тем, кто не участвует в разработке как таковой (например, владельцам продукта). Рекомендую посмотреть небольшое вводное видео – сами убедитесь, насколько здесь все просто.
Альтернативны потоковой обработке
Потоки Kafka – идеальное сочетание силы и простоты. Пожалуй, Kafka – лучший инструмент для выполнения потоковых заданий, имеющихся на рынке, а интегрироваться с Kafka гораздо проще, чем с альтернативными инструментами для потоковой обработки (Storm, Samza, Spark, Wallaroo).
Проблема с большинством других инструментов потоковой обработки заключается в том, что их сложно развертывать (и с ними тяжело обращаться). Фреймворк для пакетной обработки, например, Spark, требует:
Kafka Streams позволяет вам сформулировать собственную стратегию развертывания, когда вам это потребуется, причем, работать с инструментом на ваш вкус: Kubernetes, Mesos, Nomad, Docker Swarm и пр.
Kafka Streams предназначен прежде всего для того, чтобы вы могли организовать в своем приложении потоковую обработку, однако, без эксплуатационных сложностей, связанных с поддержкой очередного кластера. Единственный потенциальный недостаток такого инструмента – его тесная связь с Kafka, однако, в нынешней реальности, когда потоковая обработка в основном выполняется именно при помощи Kafka, этот небольшой недостаток не так страшен.
Когда стоит использовать Kafka?
Как уже говорилось выше, Kafka позволяет пропускать через централизованную среду огромное количество сообщений, а затем хранить их, не беспокоясь о производительности и не опасаясь, что данные будут потеряны.
Таким образом, Kafka отлично устроится в самом центре вашей системы и будет работать как связующее звено, обеспечивающее взаимодействие всех ваших приложений. Kafka может быть центральным элементом событийно-ориентированной архитектуры, что позволит вам как следует отцепить приложения друг от друга.
Kafka позволяет с легкостью разграничить коммуникацию между различными (микро)сервисами. Работая Streams API, стало как никогда просто писать бизнес-логику, обогащающую данные из темы Kafka перед тем, как их станут потреблять сервисы. Здесь открываются широчайшие возможности – поэтому настоятельно рекомендую изучить, как Kafka применяется в разных компаниях.
Apache Kafka – это распределенная потоковая платформа, позволяющая обрабатывать триллионы событий в день. Kafka гарантирует минимальные задержки, высокую пропускную способность, предоставляет отказоустойчивые конвейеры, работающие по принципу «публикация/подписка» и позволяет обрабатывать потоки событий.
В этой статье мы познакомились с базовой семантикой Kafka (узнали, что такое генератор, брокер, потребитель, тема), узнали о некоторых вариантах оптимизации (страничный кэш), узнали, какую отказоустойчивость гарантирует Kafka при репликации данных и вкратце обсудили его мощные потоковые возможности.
Apache Kafka: основы технологии
У Kafka есть множество способов применения, и у каждого способа есть свои особенности. В этой статье разберём, чем Kafka отличается от популярных систем обмена сообщениями; рассмотрим, как Kafka хранит данные и обеспечивает гарантию сохранности; поймём, как записываются и читаются данные.
Статья подготовлена на основе открытого занятия из видеокурса по Apache Kafka. Авторы — Анатолий Солдатов, Lead Engineer в Авито, и Александр Миронов, Infrastructure Engineer в Stripe. Базовые темы курса доступны на Youtube.
Kafka и классические сервисы очередей
Для первого погружения в технологию сравним Kafka и классические сервисы очередей, такие как RabbitMQ и Amazon SQS.
Системы очередей обычно состоят из трёх базовых компонентов:
1) сервер,
2) продюсеры, которые отправляют сообщения в некую именованную очередь, заранее сконфигурированную администратором на сервере,
3) консьюмеры, которые считывают те же самые сообщения по мере их появления.
Базовые компоненты классической системы очередей
В веб-приложениях очереди часто используются для отложенной обработки событий или в качестве временного буфера между другими сервисами, тем самым защищая их от всплесков нагрузки.
Консьюмеры получают данные с сервера, используя две разные модели запросов: pull или push.
pull-модель — консьюмеры сами отправляют запрос раз в n секунд на сервер для получения новой порции сообщений. При таком подходе клиенты могут эффективно контролировать собственную нагрузку. Кроме того, pull-модель позволяет группировать сообщения в батчи, таким образом достигая лучшей пропускной способности. К минусам модели можно отнести потенциальную разбалансированность нагрузки между разными консьюмерами, а также более высокую задержку обработки данных.
push-модель — сервер делает запрос к клиенту, посылая ему новую порцию данных. По такой модели, например, работает RabbitMQ. Она снижает задержку обработки сообщений и позволяет эффективно балансировать распределение сообщений по консьюмерам. Но для предотвращения перегрузки консьюмеров в случае с RabbitMQ клиентам приходится использовать функционал QS, выставляя лимиты.
Как правило, приложение пишет и читает из очереди с помощью нескольких инстансов продюсеров и консьюмеров. Это позволяет эффективно распределить нагрузку.
Типичный жизненный цикл сообщений в системах очередей:
Типичный жизненный цикл сообщений в системах очередей
С базовыми принципами работы очередей разобрались, теперь перейдём к Kafka. Рассмотрим её фундаментальные отличия.
Как и сервисы обработки очередей, Kafka условно состоит из трёх компонентов:
1) сервер (по-другому ещё называется брокер),
2) продюсеры — они отправляют сообщения брокеру,
3) консьюмеры — считывают эти сообщения, используя модель pull.
Базовые компоненты Kafka
Пожалуй, фундаментальное отличие Kafka от очередей состоит в том, как сообщения хранятся на брокере и как потребляются консьюмерами.
В этом кроется главная мощь и главное отличие Kafka от традиционных систем обмена сообщениями.
Теперь давайте посмотрим, как Kafka и системы очередей решают одну и ту же задачу. Начнём с системы очередей.
Представим, что есть некий сайт, на котором происходит регистрация пользователя. Для каждой регистрации мы должны:
1) отправить письмо пользователю,
2) пересчитать дневную статистику регистраций.
В случае с RabbitMQ или Amazon SQS функционал может помочь нам доставить сообщения всем сервисам одновременно. Но при необходимости подключения нового сервиса придётся конфигурировать новую очередь.
Kafka упрощает задачу. Достаточно послать сообщения всего один раз, а консьюмеры сервиса отправки сообщений и консьюмеры статистики сами считают его по мере необходимости.
Kafka также позволяет тривиально подключать новые сервисы к стриму регистрации. Например, сервис архивирования всех регистраций в S3 для последующей обработки с помощью Spark или Redshift можно добавить без дополнительного конфигурирования сервера или создания дополнительных очередей.
Кроме того, раз Kafka не удаляет данные после обработки консьюмерами, эти данные могут обрабатываться заново, как бы отматывая время назад сколько угодно раз. Это оказывается невероятно полезно для восстановления после сбоев и, например, верификации кода новых консьюмеров. В случае с RabbitMQ пришлось бы записывать все данные заново, при этом, скорее всего, в отдельную очередь, чтобы не сломать уже имеющихся клиентов.
Структура данных
Наверняка возникает вопрос: «Раз сообщения не удаляются, то как тогда гарантировать, что консьюмер не будет читать одни и те же сообщения (например, при перезапуске)?».
Для ответа на этот вопрос разберёмся, какова внутренняя структура Kafka и как в ней хранятся сообщения.
Каждое сообщение (event или message) в Kafka состоит из ключа, значения, таймстампа и опционального набора метаданных (так называемых хедеров).
Сообщения в Kafka организованы и хранятся в именованных топиках (Topics), каждый топик состоит из одной и более партиций (Partition), распределённых между брокерами внутри одного кластера. Подобная распределённость важна для горизонтального масштабирования кластера, так как она позволяет клиентам писать и читать сообщения с нескольких брокеров одновременно.
Когда новое сообщение добавляется в топик, на самом деле оно записывается в одну из партиций этого топика. Сообщения с одинаковыми ключами всегда записываются в одну и ту же партицию, тем самым гарантируя очередность или порядок записи и чтения.
Для гарантии сохранности данных каждая партиция в Kafka может быть реплицирована n раз, где n — replication factor. Таким образом гарантируется наличие нескольких копий сообщения, хранящихся на разных брокерах.
У каждой партиции есть «лидер» (Leader) — брокер, который работает с клиентами. Именно лидер работает с продюсерами и в общем случае отдаёт сообщения консьюмерам. К лидеру осуществляют запросы фолловеры (Follower) — брокеры, которые хранят реплику всех данных партиций. Сообщения всегда отправляются лидеру и, в общем случае, читаются с лидера.
Чтобы понять, кто является лидером партиции, перед записью и чтением клиенты делают запрос метаданных от брокера. Причём они могут подключаться к любому брокеру в кластере.
Основная структура данных в Kafka — это распределённый, реплицируемый лог. Каждая партиция — это и есть тот самый реплицируемый лог, который хранится на диске. Каждое новое сообщение, отправленное продюсером в партицию, сохраняется в «голову» этого лога и получает свой уникальный, монотонно возрастающий offset (64-битное число, которое назначается самим брокером).
Как мы уже выяснили, сообщения не удаляются из лога после передачи консьюмерам и могут быть вычитаны сколько угодно раз.
Время гарантированного хранения данных на брокере можно контролировать с помощью специальных настроек. Длительность хранения сообщений при этом не влияет на общую производительность системы. Поэтому совершенно нормально хранить сообщения в Kafka днями, неделями, месяцами или даже годами.
Consumer Groups
Теперь давайте перейдём к консьюмерам и рассмотрим их принципы работы в Kafka. Каждый консьюмер Kafka обычно является частью какой-нибудь консьюмер-группы.
Каждая группа имеет уникальное название и регистрируется брокерами в кластере Kafka. Данные из одного и того же топика могут считываться множеством консьюмер-групп одновременно. Когда несколько консьюмеров читают данные из Kafka и являются членами одной и той же группы, то каждый из них получает сообщения из разных партиций топика, таким образом распределяя нагрузку.
Вернёмся к нашему примеру с топиком сервиса регистрации и представим, что у сервиса отправки писем есть своя собственная консьюмер-группа с одним консьюмером c1 внутри. Значит, этот консьюмер будет получать сообщения из всех партиций топика.
Если мы добавим ещё одного консьюмера в группу, то партиции автоматически распределятся между ними, и c1 теперь будет читать сообщения из первой и второй партиции, а c2 — из третьей. Добавив ещё одного консьюмера (c3), мы добьёмся идеального распределения нагрузки, и каждый из консьюмеров в этой группе будет читать данные из одной партиции.
А вот если мы добавим в группу ещё одного консьюмера (c4), то он не будет задействован в обработке сообщений вообще.
Важно понять: внутри одной консьюмер-группы партиции назначаются консьюмерам уникально, чтобы избежать повторной обработки.
Если консьюмеры не справляются с текущим объёмом данных, то следует добавить новую партицию в топик. Только после этого консьюмер c4 начнёт свою работу.
Механизм партиционирования является нашим основным инструментом масштабирования Kafka. Группы являются инструментом отказоустойчивости.
Кстати, как вы думаете, что будет, если один из консьюмеров в группе упадёт? Совершенно верно: партиции автоматически распределятся между оставшимися консьюмерами в этой группе.
Добавлять партиции в Kafka можно на лету, без перезапуска клиентов или брокеров. Клиенты автоматически обнаружат новую партицию благодаря встроенному механизму обновления метаданных. Однако, нужно помнить две важные вещи:
И ещё неочевидный момент: если вы добавляете новую партицию на проде, то есть в тот момент, когда в топик пишут сообщения продюсеры, то важно помнить про настройку auto.offset.reset=earliest в консьюмере, иначе у вас есть шанс потерять или просто не обработать кусок данных, записавшихся в новую партицию до того, как консьюмеры обновили метаданные по топику и начали читать данные из этой партиции.
Помимо этого, механизм групп позволяет иметь несколько несвязанных между собой приложений, обрабатывающих сообщения.
Как мы обсуждали ранее, можно добавить новую группу консьюмеров к тому же самому топику, например, для обработки и статистики регистраций. Эти две группы будут читать одни и те же сообщения из топика тех самых ивентов регистраций — в своём темпе, со своей внутренней логикой.
А теперь, зная внутреннее устройство консьюмеров в Kafka, давайте вернёмся к изначальному вопросу: «Каким образом мы можем обозначить сообщения в партиции, как обработанные?».
Для этого Kafka предоставляет механизм консьюмер-офсетов. Как мы помним, каждое сообщение партиции имеет свой собственный, уникальный, монотонно возрастающий офсет. Именно этот офсет и используется консьюмерами для сохранения партиций.
Консьюмер делает специальный запрос к брокеру, так называемый offset-commit с указанием своей группы, идентификатора топик-партиции и, собственно, офсета, который должен быть отмечен как обработанный. Брокер сохраняет эту информацию в своём собственном специальном топике. При рестарте консьюмер запрашивает у сервера последний закоммиченный офсет для нужной топик-партиции, и просто продолжает чтение сообщений с этой позиции.
В примере консьюмер в группе email-service-group, читающий партицию p1 в топике registrations, успешно обработал три сообщения с офсетами 0, 1 и 2. Для сохранения позиций консьюмер делает запрос к брокеру, коммитя офсет 3. В случае рестарта консьюмер запросит свою последнюю закоммиченную позицию у брокера и получит в ответе 3. После чего начнёт читать данные с этого офсета.
Консьюмеры вольны коммитить совершенно любой офсет (валидный, который действительно существует в этой топик-партиции) и могут начинать читать данные с любого офсета, двигаясь вперёд и назад во времени, пропуская участки лога или обрабатывая их заново.
Ключевой для понимания факт: в момент времени может быть только один закоммиченный офсет для топик-партиции в консьюмер-группе. Иными словами, мы не можем закоммитить несколько офсетов для одной и той же топик-партиции, эмулируя каким-то образом выборочный acknowledgment (как это делалось в системах очередей).
Представим, что обработка сообщения с офсетом 1 завершилась с ошибкой. Однако мы продолжили выполнение нашей программы в консьюмере и запроцессили сообщение с офсетом 2 успешно. В таком случае перед нами будет стоять выбор: какой офсет закоммитить — 1 или 3. В настоящей системе мы бы рекомендовали закоммитить офсет 3, добавив при этом функционал, отправляющий ошибочное сообщение в отдельный топик для повторной обработки (ручной или автоматической). Подобные алгоритмы называются Dead letter queue.
Разумеется, консьюмеры, находящиеся в разных группах, могут иметь совершенно разные закоммиченные офсеты для одной и той же топик-партиции.
Apache ZooKeeper
В заключение нужно упомянуть об ещё одном важном компоненте кластера Kafka — Apache ZooKeeper.
ZooKeeper выполняет роль консистентного хранилища метаданных и распределённого сервиса логов. Именно он способен сказать, живы ли ваши брокеры, какой из брокеров является контроллером (то есть брокером, отвечающим за выбор лидеров партиций), и в каком состоянии находятся лидеры партиций и их реплики.
В случае падения брокера именно в ZooKeeper контроллером будет записана информация о новых лидерах партиций. Причём с версии 1.1.0 это будет сделано асинхронно, и это важно с точки зрения скорости восстановления кластера. Самый простой способ превратить данные в тыкву — потеря информации в ZooKeeper. Тогда понять, что и откуда нужно читать, будет очень сложно.
В настоящее время ведутся активные работы по избавлению Kafka от зависимости в виде ZooKeeper, но пока он всё ещё с нами (если интересно, посмотрите на Kafka improvement proposal 500, там подробно расписан план избавления от ZooKeeper).
Важно помнить, что ZooKeeper по факту является ещё одной распределённой системой хранения данных, за которой необходимо следить, поддерживать и обновлять по мере необходимости.
Традиционно ZooKeeper раскатывается отдельно от брокеров Kafka, чтобы разделить границы возможных отказов. Помните, что падение ZooKeeper — это практически падение всего кластера Kafka. К счастью, нагрузка на ZooKeeper при нормальной работе кластера минимальна. Клиенты Kafka никогда не коннектятся к ZooKeeper напрямую.