Apache Kafka — это распределенная платформа обмена сообщениями, предназначенная для стриминга данных в реальном времени. Одной из ключевых концепций в Kafka является возможность передачи данных от одного компонента к другому. Однако данные, которые передаются через Kafka, могут быть разного типа и формата. В этой статье мы рассмотрим важные аспекты таких механизмов как сериализация и десериализация данных в Kafka с примерами кода.
Сериализация и десериализация в Kafka: несколько практических примеров
Сериализация — это процесс преобразования структурированных данных в байтовую последовательность, которая может быть передана или сохранена. В контексте Apache Kafka, данные, которые отправляются через топики (topics), должны быть сериализованы перед отправкой, чтобы их можно было представить в виде байтов.
Десериализация — это обратный процесс, при котором байтовая последовательность преобразуется обратно в исходные структурированные данные. В Kafka данные передаются в виде ключ-значение пар, где ключ может быть опциональным, а значение — это те данные, которые нужно сериализовать и передать. Kafka поддерживает различные форматы данных для сериализации и десериализации. Наиболее распространенными форматами являются JSON и Avro. JSON — это удобный человекочитаемый формат данных, в то время как Avro — это более компактный и эффективный бинарный формат. В данной статье мы рассмотрим сериализацию и десериализацию на примере этих форматов.
Рассмотрим пример использования JSON сериализации и десериализации в Kafka, используя библиотеку org.apache.kafka.connect.json.JsonSerializer
для сериализации и org.apache.kafka.connect.json.JsonDeserializer
для десериализации. Для начала необходимо настроить Maven-зависимости:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
Пример кода для JSON-сериализации:
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.connect.json.JsonSerializer; import java.util.Map; public class MyJsonSerializer implements Serializer<MyDataClass> { private JsonSerializer serializer = new JsonSerializer(); @Override public byte[] serialize(String topic, MyDataClass data) { return serializer.serialize(topic, data); } @Override public void close() { serializer.close(); } }
Для сериализации используется интерфейс Deserializer
:
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.connect.json.JsonDeserializer; import java.util.Map; public class MyJsonDeserializer implements Deserializer<MyDataClass> { private JsonDeserializer deserializer = new JsonDeserializer(); @Override public void configure(Map<String, ?> configs, boolean isKey) { deserializer.configure(configs, isKey); } @Override public MyDataClass deserialize(String topic, byte[] data) { return (MyDataClass) deserializer.deserialize(topic, data); } @Override public void close() { deserializer.close(); } }
Здесь MyDataClass
— это пользовательский класс, который будет сериализоваться и десериализоваться.
Для использования Avro в Kafka, вы можете воспользоваться библиотекой Confluent Schema Registry
для управления схемами данных:
<dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>common-config</artifactId> <version>6.0.0</version> </dependency>
Следующий код на языке Java отвечает за Avro-сериализацию в Kafka:
import io.confluent.kafka.serializers.KafkaAvroSerializer; import org.apache.kafka.common.serialization.Serializer; import java.util.Map; public class MyAvroSerializer implements Serializer<MyDataClass> { private KafkaAvroSerializer serializer = new KafkaAvroSerializer(); @Override public void configure(Map<String, ?> configs, boolean isKey) { serializer.configure(configs, isKey); } @Override public byte[] serialize(String topic, MyDataClass data) { return serializer.serialize(topic, data); } @Override public void close() { serializer.close(); } }
Пример кода для Avro-десериализации:
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; public class MyAvroDeserializer implements Deserializer<MyDataClass> { private KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(); @Override public void configure(Map<String, ?> configs, boolean isKey) { deserializer.configure(configs, isKey); } @Override public MyDataClass deserialize(String topic, byte[] data) { return (MyDataClass) deserializer.deserialize(topic, data); } @Override public void close() { deserializer.close(); } }
Таким образом, сериализация и десериализация данных являются важными аспектами при работе с Apache Kafka. Выбор между различными форматами, такими как JSON и Avro, зависит от требований по производительности, эффективности и удобству чтения.
Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
8 ноября, 2023
Длительность обучения
24 ак.часов
Стоимость обучения
66 000 руб.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве: