Что такое перебалансировка и почему она так важна для Kafka

курсы администраторов, обучение kafka, курсы kafka, курсы администраторов, kafka для начинающих, курсы администрирования kafka

В прошлый раз мы говорили про фиксации смещений в брокере Apache Kafka. Сегодня поговорим про перебалансировку в Kafka. Читайте далее про особенности перебалансировки, благодаря которой Kafka может легко удалять и добавлять новые топики, а также распределять подписчиков (consumer) по конкретным разделам в этих топиках.

Как работает перебалансировка Big Data в Apache Kafka: особенности распределения подписчиков по топикам

Перебалансировка (rebalance) — это процесс передачи раздела топика от одного (неактивного) подписчика другому подписчику, который является свободным (не принадлежит ни к одному из топиков) в данный момент времени [1]. Подписчики считаются активными (alive) до тех пор, пока они отправляют на сервер Kafka периодические контрольные сигналы (heartbeats), которые представляют собой потоки сообщений, идущих в заданный разработчиком момент времени (timeout). Если контрольный сигнал от подписчика не был получен сервером Kafka в заданный момент времени, Kafka автоматически генерирует исключение (TimeOutException) и инициирует перебалансировку. Во время процесса перебалансировки подписчики не могут получать сообщения из топиков, что делает их недоступными. Кроме того, при перебалансировке подписчики утрачивают свое текущее состояние [1]. Например, если некоторые данные кэшировались (сохранялись в промежуточном буфере для быстрого доступа к ним) подписчиком, то необходимо будет обновить все кэши, что значительно замедлит приложение на время восстановления исходного состояния подписчика.

 

Подготовка данных к перебалансировке: несколько практических примеров

При перебалансировке существует риск потери несохраненных данных. Поэтому необходимо сохранять необходимую информацию до того, как начнется процесс перебалансировки данных. Для этого необходимо определить класс, который будет использовать интерфейс ConsumerRebalanceListener, отвечающий за настраиваемые действия при перебалансировке. Следующий код на языке Java отвечает за создание класса, который инициирует действия, происходящие при перебалансировке данных [1]:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance.Committing current offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}

Как видно из кода, в классе HandleRebalance определены 2 метода [1]:

  • onPartititonsAssigned — метод, отвечающий за действия после перебалансировки (например, восстановление соединения с базами данных или возобновление получения сообщений подписчиками). В описанном выше фрагменте кода этот метод отвечает за возобновление процесса получения подписчиками сообщений из топика;
  • onPartitionsRevoked — это метод, который отвечает за действия, совершаемые до перебалансировки данных (например, фиксация записей или сохранение информации о подключенных базах данных или хранилищах). В вышерассмотренном фрагменте кода в этом методе происходит синхронная фиксация смещений записей для обязательного их сохранения (фиксация продолжается до тех пор, пока все записи не зафиксируются).

Для того, чтобы в случае перебалансировки управлять действиями по сохранению данных, необходимо передать экземпляр класса HandleRebalance в метод subscribe(), который вызывается потребителем для получения данных из разделов топика [1]:

try {
//Передается экземпляр HandleRebalance для управления действиями
//в случае перебалансировки
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()+1, "no metadata"));}
consumer.commitAsync(currentOffsets, null); }
} catch (WakeupException e) {
}

Таким образом, благодаря поддержке механизма перебалансировки данных, брокер Kafka обеспечивает высокую масштабируемость, позволяя легко удалять и добавлять целые топики или конкретные разделы Big Data без потерь данных, а также весьма быстро распределять всех активных подписчиков по топикам. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про цикл опроса в Kafka.

Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:

Записаться на курс

Смотреть раcписание

Источники

  1. Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных

Добавить комментарий

Поиск по сайту