Как использовать Avro-сериализацию в распределенном брокере Kafka

apache kafka, курсы администраторов spark, apache kafka для начинающих, Big Data, Data Science, kafka streaming, Kafka, брокер kafka, avro

В прошлый раз мы говорили про работу с потребителем в Kafka. Сегодня поговорим про работу с Big Data записями, сериализованными с помощью системы Avro. Читайте далее про особенности механизма работы с Avro в распределенной среде Kafka, благодаря которому этот распределенный брокер может легко сериализовать записи для работы в среде потоковой передачи данных.

Что такое Avro и как он работает с брокером Kafka

Avro — это система сериализации (превращение объектов в массив байтов) данных, которая используется для работы с объектами в потоковой (распределенной) среде. Avro является кроссплатформенной (не зависит от операционной системы и вида аппаратных ресурсов) системой, которая не зависит от языка программирования. Авро использует систему, основанную на схемах входных данных. Схемы могут включать в себя тип данных, структуру данных, формат записи данных, а также параметры их передачи (например, URL, порт и т.д.). Схемы Avro описываются c помощью JSON-формата (JavaScript Object Notation), что обеспечивает Авро независимость от языковой реализации. Для работы с Kafka используется специальный реестр схем (Schema Registry), который хранит в себе все схемы записей, использующихся брокером в данный момент времени [1].

apache kafka, курсы администраторов spark, apache kafka для начинающих, Big Data, Data Science, kafka streaming, Kafka, брокер kafka, avro
Структура Avro

Особенности сериализации данных в Kafka с помощью Avro: несколько практических примеров

Для того, чтобы начать работу с сериализатором Avro, необходимо настроить базовую конфигурацию. Следующий код на языке Java отвечает за создание коллекции свойств (также как для потребителей или продюсеров) для Avro в брокере Kafka [2]:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url);

В качестве объектов коллекции свойств Properties для Avro используются следующие элементы [2]:

  • servers.bootstrap — список брокеров для соединения с распределенной средой Kafka;
  • serializer/value.serializer — класс-сериализатор, применяемый для сериализации ключей/значений записей. В качестве сериализатора для работы с Avro в Kafka используется класс KafkaAvroSerializer;
  • registry.url — это параметр, указывающий на местоположение реестра (хранилища) использующихся схем.

Далее для того, чтобы сериализовать данные, необходимо указать схему сериализуемых данных:

String schemaString = "{\"namespace\": \"customerManagement.avro\",
\"type\": \"record\", " +
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\",
\"type\": [\"null\", \"string\"],
\"default\":\"null\" }" +
"]}";

Для того, чтобы данные отправлялись продюсером (который в данном случае является сериализатором) потребителю, необходимо создать экземпляр KafkaProducer со значениями типа GenericRecord. Класс GenericRecord представляет общий тип данных и применяется, когда необходимо сформировать запись на основе заданной схемы [2]:

Producer<String, GenericRecord> producer =
new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
//парсинг по значениям каждого из ключей
Schema schema = parser.parse(schemaString);
for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
String name = "exampleCustomer" + nCustomers;
String email = "example " + nCustomers + "@example.com";
GenericRecord customer = new GenericData.Record(schema);
customer.put("id", nCustomers);
customer.put("name", name);
customer.put("email", email);

Вышерассмотренный фрагмент кода отвечает за сериализацию конкретных записей (с ключами id, name, email). Для того, чтобы выделить конкретные ключи, необходимо распарсить (извлечь данные) каждый из ключей схемы. За парсинг схемы отвечает метод parse().

После формирования и сериализации необходимой записи, ее можно отправлять потербителю, который при получении сразу десериализует (восстановит объект из байтового массива) данные. Следующий код на языке Java отвечает за отправку сериализованной записи потребителю, который в данном случае будет являться десериализатором [2]:

ProducerRecord<String, GenericRecord> data = new ProducerRecord<String, GenericRecord>("customerContacts", name, customer); producer.send(data);

Таким образом, благодаря поддержке механизма работы с Avro-сериализатором, Kafka может гарантировать отправку Big Data записей без потерь структуры и времени. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про пользовательские сериализаторы в Kafka.

Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:

Записаться на курс

Смотреть раcписание

Источники

  1. https://docs.microsoft.com/ru-ru/azure/databricks/spark/latest/structured-streaming/avro-dataframe
  2. Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных

Добавить комментарий

Поиск по сайту