В прошлый раз мы говорили про интеграцию Kafka с системами Big Data. Сегодня рассмотрим про платформу KSQL, которая позволяет вести потоковую обработку данных в Apache Kafka с помощью SQL-запросов. Читайте далее, что позволяет Apache Kafka анализировать потоки больших данных в понятной и привычной форме SQL-запросов без программирования на Python или Java.
Структуры данных в KSQL
Kafka SQL — это платформа для потоковой обработки больших данных в Apache Kafka с помощью структурированных SQL-запросов. Структуры данных в Kafka SQL — это программные единицы, которые способны хранить и обрабатывать множество данных, связанных логикой KSQL. В KSQL структуры данных представлены в виде специальных коллекций, которые отвечают за хранение и создание последовательности событий (events). KafkaSQL поддерживает 2 вида коллекций данных:
- потоки;
- таблицы.
Каждый из этих видов мы подробнее рассмотрим далее [1].
Потоки в KSQL
Потоки KSQL (Kafka SQL Streams) — это неизменяемые коллекции данных, которые предназначены для добавления данных в топики Kafka. Потоки в KSQL также являются потоками фактов, где каждый факт представляет собой уникальное и неизменяемое событие (event). Данные потоков хранятся в виде структуры ключ-значение (key-value). Следующий код на диалекте KSQL отвечает за создание потока и сохранение его в формате JSON (JavaScript Object Notation):
CREATE STREAM pageviews ( page_id BIGINT, viewtime BIGINT, user_id VARCHAR) WITH ( KAFKA_TOPIC = 'keyless-pageviews-topic', VALUE_FORMAT = 'JSON');
Как видно из кода, структура создания потока очень напоминает создание обычной SQL-таблицы с указанием данных и их типов. Параметр KAFKA_TOPIC указывает на соответствующий топик в Apache Kafka. Параметр VALUE_FORMAT
указывает на формат хранения значений потока (например, CSV, JSON, AVRO). Значения потока могут иметь значение NULL
, а также один общий ключ (очень распространено в JSON-формате) [2].
Таблицы в KSQL
Таблицы — это изменяемые коллекции данных, которые служат для хранения данных, хранящихся в топиках или для создания новых топиков в Kafka. Следующий код на диалекте KSQL отвечает за создание таблицы по существующему топику:
CREATE TABLE users ( userid VARCHAR PRIMARY KEY, registertime BIGINT, gender VARCHAR, regionid VARCHAR) WITH ( KAFKA_TOPIC = 'users', VALUE_FORMAT='JSON');
Из кода видно, что таблицы, также, как и потоки, используют различные форматы для хранения значений (за это отвечает параметр VALUE_FORMAT
). В Kafka SQL также существует возможность одновременно создавать таблицу, которая регистрирует новый топик. Для того, чтобы сделать это, необходимо в скрипт добавить параметр PARTITIONS
(отвечает за создание разделов в новом топике) и REPLICAS
(создание реплик). Следующий фрагмент кода отвечает за создание таблицы для регистрации нового топика [3]:
CREATE TABLE users ( userid VARCHAR PRIMARY KEY, registertime BIGINT, gender VARCHAR, regionid VARCHAR) WITH ( KAFKA_TOPIC = 'users', VALUE_FORMAT='JSON', PARTITIONS=4, REPLICAS=3);
Таким образом, благодаря платформе KSQL, у Apache Kafka есть возможность обработки, создания и хранения потоков Big Data с помощью простых и понятных SQL-запросов. Это позволяет аналитикам и разработчикам активно использовать эту Big Data платформу потоковой обработки событий в задачах Data Science. В следующей статье мы поговорим про топологию потоковой передачи в Kafka.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Kafka Streams для разработчиков
- Интеграция Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
Источники
- https://www.oreilly.com/content/big-fast-easy-data-with-ksql/
- https://docs.ksqldb.io/en/latest/
- https://docs.ksqldb.io/en/latest/developer-guide/create-a-table/