В прошлый раз мы говорили про особенности создания и работы продюсера Kafka. Сегодня поговорим про механизм отправки сообщений в Kafka. Читайте далее про отправку Big Data сообщений в распределенном брокере Kafka, включая виды отправки и их особенности.
Какие виды отправки сообщений существуют в Kafka: особенности работы с записями Big Data
После того, как выполнено создание продюсера, можно приступать к отправке сообщений. В распределенном кластере Kafka существует 2 вида отправки сообщений:
- синхронная отправка сообщения — отправка, при которой идет организация ожидания и проверка того, что сообщение отправлено успешно. Обычно при возникновении ошибочной ситуации отправка повторяется автоматически, что существенно замедляет работу приложения, так как все его функции блокируются до тех пор, пока брокер Kafka не подтвердит успешную отправку;
- асинхронная отправка сообщения — выполняется независимо (параллельно) от выполнения остальных функций приложения и не требует обязательного подтверждения факта успешной отправки сообщения. Асинхронная отправка сообщения, в случае возникновения ошибочной ситуации (например, истечение времени ожидания или временный сбой Kafka-сервера) не станет повторять отправку текущего сообщения, а сразу перейдет к отправке следующего.
Синхронная и асинхронная отправки сообщений: несколько практических примеров
При синхронной отправке сообщений помимо основного метода send()
, также используется метод get()
для ожидания ответа (или подтверждения того, что сообщение было успешно отправлено) от сервера. Следующий код на языке Java отвечает за синхронную отправку сообщений [1]:
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { producer.send(record).get(); } catch (Exception e) { e.printStackTrace(); }
При успешном выполнении (отсутствии ошибок) метода get()
возвращается объект (экземпляр) класса RecordMetaData
, содержащий смещение, соответствующее отправленному сообщению. Из кода также видно, что метод get()
выполняется в блоке try-catch
для «отлова» ошибок и возможности настройки дальнейших действий разработчиком. При синхронной отправке сообщения может возникнуть ошибка, которую можно исправить, отправив сообщение повторно (retriable). Например, если при отправке возникла ошибка из-за потери соединения, то приложение будет пытаться заново отправить сообщение, когда соединение восстановится. Однако некоторые ошибки исправить невозможно (например, если размер сообщения превышает допустимый предел). В таком случае приложение не станет исправлять возникшую ошибку и вернет исключение.
При асинхронной отправке сообщения необходимо написать класс, реализующий интерфейс Callback
c функцией onCompletion()
, отвечающий за настройку дополнительных действий при ошибочной ситуации (например, может понадобится сохранять отчет по ошибкам для возможности написания нового сценария, исключающего ошибочные ситуации). Следующий код на языке Java отвечает за асинхронную отправку сообщений [1]:
private class DemoProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace();}}} ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); producer.send(record, new DemoProducerCallback());
Структура хранения сообщений в Kafka подразумевает хранение батчей (batch, куча), отдельные сообщения ни прочитать, ни записать брокер не может, так как нет технической возможности. Формированием батчей занимается механизм, который работает «под капотом» (реализация скрыта от глаз обычного пользователя), который запускается в виде бесконечного цикла (while
с условием true
) в бесконечном потоке при создании экземпляров потребителя или продюсера.
К вышесказанному также хотелось бы добавить следующие особенности отправки сообщений в брокере Kafka:
- брокер Kafka хранит сообщения в батчах ;
- продюсер передает брокеру батч (несколько сообщений сразу);
- потребитель (consumer) также получает от брокера батч сообщений.
Таким образом, благодаря возможности самостоятельной настройки механизма отправки сообщений в брокере Kafka, разработчик может без проблем написать приложение, позволяющее работать с Big Data и при этом не занимающее большое количество программных и аппаратных ресурсов. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про настройку получаталей в Kafka.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Kafka Streams для разработчиков
- Интеграция Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
Источники
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных