Apache Kafka — это распределенная платформа обмена сообщениями, предназначенная для стриминга данных в реальном времени. Одной из ключевых концепций в Kafka является возможность передачи данных от одного компонента к другому. Однако данные, которые передаются через Kafka, могут быть разного типа и формата. В этой статье мы рассмотрим важные аспекты таких механизмов как сериализация и десериализация данных в Kafka с примерами кода.
Сериализация и десериализация в Kafka: несколько практических примеров
Сериализация — это процесс преобразования структурированных данных в байтовую последовательность, которая может быть передана или сохранена. В контексте Apache Kafka, данные, которые отправляются через топики (topics), должны быть сериализованы перед отправкой, чтобы их можно было представить в виде байтов.
Десериализация — это обратный процесс, при котором байтовая последовательность преобразуется обратно в исходные структурированные данные. В Kafka данные передаются в виде ключ-значение пар, где ключ может быть опциональным, а значение — это те данные, которые нужно сериализовать и передать. Kafka поддерживает различные форматы данных для сериализации и десериализации. Наиболее распространенными форматами являются JSON и Avro. JSON — это удобный человекочитаемый формат данных, в то время как Avro — это более компактный и эффективный бинарный формат. В данной статье мы рассмотрим сериализацию и десериализацию на примере этих форматов.
Рассмотрим пример использования JSON сериализации и десериализации в Kafka, используя библиотеку org.apache.kafka.connect.json.JsonSerializer для сериализации и org.apache.kafka.connect.json.JsonDeserializer для десериализации. Для начала необходимо настроить Maven-зависимости:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
Пример кода для JSON-сериализации:
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonSerializer;
import java.util.Map;
public class MyJsonSerializer implements Serializer<MyDataClass> {
private JsonSerializer serializer = new JsonSerializer();
@Override
public byte[] serialize(String topic, MyDataClass data) {
return serializer.serialize(topic, data);
}
@Override
public void close() {
serializer.close();
}
}
Для сериализации используется интерфейс Deserializer:
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import java.util.Map;
public class MyJsonDeserializer implements Deserializer<MyDataClass> {
private JsonDeserializer deserializer = new JsonDeserializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
deserializer.configure(configs, isKey);
}
@Override
public MyDataClass deserialize(String topic, byte[] data) {
return (MyDataClass) deserializer.deserialize(topic, data);
}
@Override
public void close() {
deserializer.close();
}
}
Здесь MyDataClass — это пользовательский класс, который будет сериализоваться и десериализоваться.
Для использования Avro в Kafka, вы можете воспользоваться библиотекой Confluent Schema Registry для управления схемами данных:
<dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>common-config</artifactId> <version>6.0.0</version> </dependency>
Следующий код на языке Java отвечает за Avro-сериализацию в Kafka:
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class MyAvroSerializer implements Serializer<MyDataClass> {
private KafkaAvroSerializer serializer = new KafkaAvroSerializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
serializer.configure(configs, isKey);
}
@Override
public byte[] serialize(String topic, MyDataClass data) {
return serializer.serialize(topic, data);
}
@Override
public void close() {
serializer.close();
}
}
Пример кода для Avro-десериализации:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class MyAvroDeserializer implements Deserializer<MyDataClass> {
private KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
deserializer.configure(configs, isKey);
}
@Override
public MyDataClass deserialize(String topic, byte[] data) {
return (MyDataClass) deserializer.deserialize(topic, data);
}
@Override
public void close() {
deserializer.close();
}
}
Таким образом, сериализация и десериализация данных являются важными аспектами при работе с Apache Kafka. Выбор между различными форматами, такими как JSON и Avro, зависит от требований по производительности, эффективности и удобству чтения.
Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
2 февраля, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800 руб.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:



