Что такое объединение потоков в Kafka

курс kafka spark, курс kafka spark, курсы администрирования kafka, курс kafka spark, apache kafka для начинающих, kafka это, ksql, kafka streams, обучение kafka, курсы потоковой обработки kafka, курс kafka spark, Big Data, курсы kafka rest, apache kafka для начинающих, kafka это, big data курсы, kafka streams, курс kafka spark, курсы по kafka, курсы big data москва, курс kafka spark, apache kafka для начинающих, apache kafka, курсы администраторов spark, apache kafka для начинающих, Big Data, Data Science, kafka streaming, Kafka, брокер kafka, avro

Apache Kafka — это платформа для обработки и передачи данных в реальном времени. Одной из ключевых концепций в Kafka является возможность объединения потоков данных. Объединение потоков данных в Kafka — это процесс объединения и обработки данных из различных топиков (потоков) с целью создания нового потока данных или выполнения какой-либо аналитики. Это позволяет эффективно управлять потоками данных и обеспечивать гибкость в архитектуре обработки данных, а также агрегировать, обрабатывать и анализировать данные из различных источников и направлять их в нужные места.

Объединение потоков: несколько практических примеров

Для работы с потоками в Kafka используется класс KStream библиотеки Kafka Streams. В следующем примере мы объединим данные из двух топиков — topicA и topicB — и направим их в новый топик mergedTopic:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


public class MergeTopicsExample {
    public static void main(String[] args) {
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "merge-group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Arrays.asList("topicA", "topicB"));


        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");


        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);


        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                ProducerRecord<String, String> newRecord = new ProducerRecord<>("mergedTopic", record.key(), record.value());
                producer.send(newRecord);
            }
        }
    }
}

В данном фрагменте кода мы создаем Kafka Consumer, подписываем его на topicA и topicB, затем создаем Kafka Producer и отправляем данные в mergedTopic.
В следующем примере мы объединим данные из топика userActions и выполним агрегацию суммы действий по каждому пользователю, направляя результат в userActionsAggregated:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Topology;


import java.util.Properties;


public class AggregateStreamsExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregate-streams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());


        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> userActionsStream = builder.stream("userActions");

       
        KStream<String, String> aggregatedStream = userActionsStream
            .groupByKey()
            .aggregate(
                () -> "0",  // Initial value for aggregation
                (key, value, aggregate) -> String.valueOf(Integer.parseInt(aggregate) + 1),
                Materialized.as("userActionsAggregated")
            ).toStream();


        aggregatedStream.to("userActionsAggregated", Produced.with(Serdes.String(), Serdes.String()));



        Topology topology = builder.build();

    }
}

В данном примере мы используем Kafka Streams API для создания агрегированного потока данных. Мы группируем данные по ключу (пользователю) и применяем агрегатор, который увеличивает счетчик действий пользователя. Результат агрегации направляется в userActionsAggregated топик.

Таким образом, объединение потоков данных в Apache Kafka является мощным инструментом для агрегации, анализа и перенаправления данных в реальном времени. Это позволяет системе Kafka обеспечивать высокую доступность, надежность и масштабируемость при обработке данных в реальном времени.

Это делает Apache Kafka надежным и универсальным средством для хранения и обмена большими потоками данных, что активно применяется в задачах Data Science и разработке распределенных приложений.

Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:

Записаться на курс

Смотреть раcписание

Источники

  1. https://kafka.apache.org/documentation/
  2. Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных

Добавить комментарий

Поиск по сайту