В прошлый раз мы говорили про работу с потребителем в Kafka. Сегодня поговорим про работу с Big Data записями, сериализованными с помощью системы Avro. Читайте далее про особенности механизма работы с Avro в распределенной среде Kafka, благодаря которому этот распределенный брокер может легко сериализовать записи для работы в среде потоковой передачи данных.
Что такое Avro и как он работает с брокером Kafka
Avro — это система сериализации (превращение объектов в массив байтов) данных, которая используется для работы с объектами в потоковой (распределенной) среде. Avro является кроссплатформенной (не зависит от операционной системы и вида аппаратных ресурсов) системой, которая не зависит от языка программирования. Авро использует систему, основанную на схемах входных данных. Схемы могут включать в себя тип данных, структуру данных, формат записи данных, а также параметры их передачи (например, URL, порт и т.д.). Схемы Avro описываются c помощью JSON-формата (JavaScript Object Notation), что обеспечивает Авро независимость от языковой реализации. Для работы с Kafka используется специальный реестр схем (Schema Registry), который хранит в себе все схемы записей, использующихся брокером в данный момент времени [1].
Особенности сериализации данных в Kafka с помощью Avro: несколько практических примеров
Для того, чтобы начать работу с сериализатором Avro, необходимо настроить базовую конфигурацию. Следующий код на языке Java отвечает за создание коллекции свойств (также как для потребителей или продюсеров) для Avro в брокере Kafka [2]:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", url);
В качестве объектов коллекции свойств Properties
для Avro используются следующие элементы [2]:
servers.bootstrap
— список брокеров для соединения с распределенной средой Kafka;serializer/value.serializer
— класс-сериализатор, применяемый для сериализации ключей/значений записей. В качестве сериализатора для работы с Avro в Kafka используется классKafkaAvroSerializer
;registry.url
— это параметр, указывающий на местоположение реестра (хранилища) использующихся схем.
Далее для того, чтобы сериализовать данные, необходимо указать схему сериализуемых данных:
String schemaString = "{\"namespace\": \"customerManagement.avro\", \"type\": \"record\", " + "\"name\": \"Customer\"," + "\"fields\": [" + "{\"name\": \"id\", \"type\": \"int\"}," + "{\"name\": \"name\", \"type\": \"string\"}," + "{\"name\": \"email\", \"type\": [\"null\", \"string\"], \"default\":\"null\" }" + "]}";
Для того, чтобы данные отправлялись продюсером (который в данном случае является сериализатором) потребителю, необходимо создать экземпляр KafkaProducer
со значениями типа GenericRecord
. Класс GenericRecord
представляет общий тип данных и применяется, когда необходимо сформировать запись на основе заданной схемы [2]:
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props); Schema.Parser parser = new Schema.Parser(); //парсинг по значениям каждого из ключей Schema schema = parser.parse(schemaString); for (int nCustomers = 0; nCustomers < customers; nCustomers++) { String name = "exampleCustomer" + nCustomers; String email = "example " + nCustomers + "@example.com"; GenericRecord customer = new GenericData.Record(schema); customer.put("id", nCustomers); customer.put("name", name); customer.put("email", email);
Вышерассмотренный фрагмент кода отвечает за сериализацию конкретных записей (с ключами id
, name
, email
). Для того, чтобы выделить конкретные ключи, необходимо распарсить (извлечь данные) каждый из ключей схемы. За парсинг схемы отвечает метод parse()
.
После формирования и сериализации необходимой записи, ее можно отправлять потербителю, который при получении сразу десериализует (восстановит объект из байтового массива) данные. Следующий код на языке Java отвечает за отправку сериализованной записи потребителю, который в данном случае будет являться десериализатором [2]:
ProducerRecord<String, GenericRecord> data = new ProducerRecord<String, GenericRecord>("customerContacts", name, customer); producer.send(data);
Таким образом, благодаря поддержке механизма работы с Avro-сериализатором, Kafka может гарантировать отправку Big Data записей без потерь структуры и времени. Это делает Apache Kafka универсальным и надежным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот брокер сообщений в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про пользовательские сериализаторы в Kafka.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Kafka Streams для разработчиков
- Интеграция Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
Источники
- https://docs.microsoft.com/ru-ru/azure/databricks/spark/latest/structured-streaming/avro-dataframe
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных