Что такое топология потоковой передачи и почему она так важна для Kafka

обучение kafka, курсы kafka, курсы администраторов, kafka для начинающих, курсы администрирования kafka

В прошлый раз мы говорили про основы работы с KSQL. Сегодня поговорим про топологию потоковой передачи в брокере Apache Kafka. Читайте далее про особенности топологии потоковой передачи, благодаря которой брокер Kafka способен обрабатывать несколько входящих потоков Big Data одновременно.

 

Как работает топология потоковой передачи в Apache Kafka: особенности обработки потоков Big Data

Топология потоковой передачи (ТПП) представляет собой направленный ациклический граф (Directed acyclic graph, DAG), который не содержит циклов, то есть состоит только из вершин и ребер. Причем каждое ребро направлено от одной вершины к другой так, что следование этим направлениям никогда не образует замкнутого цикла. В качестве вершин этого графа в ТПП выступают рабочие узлы, которые соединены между собой потоками, представляющими собой ребра графа.

обучение kafka, курсы kafka, курсы администраторов, kafka для начинающих, курсы администрирования kafka
Структура топологии потоковой передачи

Работа с топологией потоковой передачи в Kafka: несколько практических примеров

Для того, чтобы начать работу с ТПП в Kafka, необходимо настроить базовую конфигурацию, подгрузив необходимые библиотеки. В качестве сборщика проекта можно использовать Apache Maven, так как он имеет возможность самостоятельно подгружать необходимые библиотеки из нужных репозиториев (источников), что освобождает разработчика от необходимости «ручных» установок и тем самым значительно экономит его время. В первую очередь необходимо установить maven-зависимости для Kafka (чтобы подгрузить актуальную версию Kafka-клиента). Следующий код на языке разметки XML отвечает за настройку Kafka-зависимостей в Maven-файле pom.xml (файл зависимостей, который создается автоматически при сборке проекта с помощью Maven) [1]:

<dependency>
<groupId> org.apache.kafka </groupId>
<artifactId> kafka-streams </artifactId>
<version> 2.6.0 </version>
</dependency>
<dependency>
<groupId> org.apache.kafka </groupId>
<artifactId> kafka-clients </artifactId>
<version> 2.6.0 </version>
</dependency>

Далее необходимо настроить конфигурацию для использования клиента Kafka. Следующий код на языке Java отвечает за создание класса для базовой конфигурации Kafka-клиента [2]:

public class KafkaClient{
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, LOCATION_OF_KAFKA_BROKERS);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "MyStreamingApp");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, JsonObject> inputStream = builder.stream(inputTopic);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}

Для создания топологии потоковой передачи необходимо создать ветви (ребра), задав необходимые условия с помощью класса Predicate. Следующий код на языке Java отвечает за создание потоковой топологии с несколькими ветвями [3]:

public class KafkaTopology{
public static final double SOME_CONSTANT = 100;
// задание необходимых условий для каждой из ветвей топологии
Predicate<String, JsonObject> greaterThan = (String key, JsonObject value) -> {
double dValue = value.get("my_double_value").getAsDouble();
return dValue > SOME_CONSTANT;
};
Predicate<String, JsonObject> lessThan = (String key, JsonObject value) -> {
double dValue = value.get("my_double_value").getAsDouble();
return dValue < SOME_CONSTANT;
};
Predicate<String, JsonObject> equalTo = (String key, JsonObject value) -> {
double dValue = value.get("my_double_value").getAsDouble();
return Math.abs(dValue - SOME_CONSTANT) < epsilon;
};
Predicate<String, JsonObject>[] conditions = new Predicate<>[] { greaterThan, lessThan, equalTo };
KStream<String, JsonElement>[] branches = inputStream.branch(conditions);
branches[0].to("greater-than-topic");
branches[1].to("less-than-topic");
branches[2].to("equal-to-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}

Как видно из кода, созданная топология разбивает входящий топик на 3 топика в зависимости от значений поля «my_double_value» в каждом из сообщений следующим образом:

  • топик со значением поля больше заданного значения;
  • топик со значением поля меньше заданного значения;
  • топик со значением поля равным заданному значению.

Разбиение топика происходит с помощью утилиты Kafka Streams, которая создает три параллельно выполняющихся потока для одновременного создания трех топиков на Kafka-сервере.

Таким образом, благодаря топологии потоковой передачи, брокер Kafka может эффективно и достаточно быстро обрабатывать большие массивы данных, разбивая их на несколько параллельных потоков выполнения. Это делает Apache Kafka универсальным средством для хранения и обмена большими потоками данных, что позволяет активно использовать этот фреймворк в задачах Data Science и разработке распределенных приложений. В следующей статье мы поговорим про обогащение потока событий в Kafka.

Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:

Записаться на курс

Смотреть раcписание

Источники

  1. https://itnext.io/creating-a-streaming-data-pipeline-with-kafka-streams-898fb352a7b7
  2. https://gist.github.com/jsnouffer/29fd0748ec1629d3f94074cba4a406e7#file-kstreams-simple-java
  3. https://gist.github.com/jsnouffer/7a8496dfb7dd7663742a6be554955e50#file-kstreams-simple-branches-java

 

Добавить комментарий

Поиск по сайту