В прошлый раз мы говорили про основные элементы для потоковой обработки в брокере Kafka. Сегодня поговорим про утилиту для зеркального копирования Mirror Maker, а также рассмотрим ее основные особенности. Читайте далее про Mirror Maker в Kafka, благодаря которой в Kafka есть возможность надежного и оптимального копирования Big Data.
Для чего нужна утилита Mirror Maker в Kafka
Mirror Maker — это утилита Apache Kafka, которая позволяет осуществлять зеркальное копирование данных в данном брокере. Зеркальное копирование в Kafka — это обращение к записям из разделов основного кластера с целью создания локальной копии на дополнительном (целевом) кластере. Mirror Maker представляет собой набор потребителей (consumer), которые состоят в одной группе и читают данные из выбранного для копирования набора тем. При зеркальном копировании Mirror Maker запускает для каждого потребителя поток выполнения, который считывает данные из нужных топиков и разделов исходного кластера. Затем создается производитель (producer) для отправки считанных данных на целевой кластер Каждые 60 секунд потребители уведомляют производителя о необходимости отправки информации об имеющихся данных. Затем потребители обращаются к исходному кластеру Kafka для фиксации смещения этих данных [1].
Особенности работы с утилитой Mirror Maker: несколько практических примеров
Для того, чтобы начать копирование, необходимо получить список топиков с данными, которые нужно скопировать из исходного кластера. Далее рассмотрим пример кода, который формирует список топиков с данными на исходном кластере для их копирования на целевой кластер под другими именами:
import java.util.Collections; import java.util.HashMap; import java.util.List; import kafka.consumer.BaseConsumerRecord; import kafka.tools.MirrorMaker; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; public class RenameTopicHandler implements MirrorMaker.MirrorMakerMessageHandler { private final HashMap<String, String> topicMap = new HashMap<String, String>(); public RenameTopicHandler(String topicList) { String[] topicAssignments = topicList.split(";"); for (String topicAssignment : topicAssignments) { String[] topicsArray = topicAssignment.split(","); if (topicsArray.length == 2) { topicMap.put(topicsArray[0], topicsArray[1]); } } } public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) { String targetTopic = null; if (topicMap.containsKey(record.topic())) { targetTopic = topicMap.get(record.topic()); } else { targetTopic = record.topic(); } Long timestamp = record.timestamp() == ConsumerRecord.NO_TIMESTAMP ? null : record.timestamp(); return Collections.singletonList(new ProducerRecord<byte[], byte[]>(targetTopic, null, timestamp, record.key(), record.value(), record.headers())); } }
Для зеркального копирования также немаловажно сохранять полную структуру исходных данных для их корректного «отражения». Следующий код «зеркально отражает» структуру исходных данных для копирования их на целевой кластер [1]:
import java.util.Collections; import java.util.List; import kafka.consumer.BaseConsumerRecord; import kafka.tools.MirrorMaker; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.record.RecordBatch; public class ExactMessageHandler implements MirrorMaker.MirrorMakerMessageHandler { public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) { Long timestamp = record.timestamp() == RecordBatch.NO_TIMESTAMP ? null : record.timestamp(); return Collections.singletonList(new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), timestamp, record.key(), record.value(), record.headers())); } }
Таким образом, благодаря механизму зеркального копирования Apache Kafka является весьма надежным и отказоустойчивым брокером для хранения и обмена больших потоков данных, что делает ее полезным средством для каждого специалиста в области анализа и обработки больших данных, от Data Scientist’а до разработчика распределенных приложений. В следующей статье мы поговорим про соединение потоков в Kafka.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
20 января, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве: