Apache Kafka — это платформа для обработки и передачи данных в реальном времени. Одной из ключевых концепций в Kafka является возможность объединения потоков данных. Объединение потоков данных в Kafka — это процесс объединения и обработки данных из различных топиков (потоков) с целью создания нового потока данных или выполнения какой-либо аналитики. Это позволяет эффективно управлять потоками данных и обеспечивать гибкость в архитектуре обработки данных, а также агрегировать, обрабатывать и анализировать данные из различных источников и направлять их в нужные места.
Объединение потоков: несколько практических примеров
Для работы с потоками в Kafka используется класс KStream
библиотеки Kafka Streams. В следующем примере мы объединим данные из двух топиков — topicA
и topicB
— и направим их в новый топик mergedTopic
:
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class MergeTopicsExample { public static void main(String[] args) { Properties consumerProps = new Properties(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "merge-group"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Arrays.asList("topicA", "topicB")); Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { ProducerRecord<String, String> newRecord = new ProducerRecord<>("mergedTopic", record.key(), record.value()); producer.send(newRecord); } } } }
В данном фрагменте кода мы создаем Kafka Consumer, подписываем его на topicA
и topicB
, затем создаем Kafka Producer и отправляем данные в mergedTopic
.
В следующем примере мы объединим данные из топика userActions и выполним агрегацию суммы действий по каждому пользователю, направляя результат в userActionsAggregated
:
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.Topology; import java.util.Properties; public class AggregateStreamsExample { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregate-streams"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> userActionsStream = builder.stream("userActions"); KStream<String, String> aggregatedStream = userActionsStream .groupByKey() .aggregate( () -> "0", // Initial value for aggregation (key, value, aggregate) -> String.valueOf(Integer.parseInt(aggregate) + 1), Materialized.as("userActionsAggregated") ).toStream(); aggregatedStream.to("userActionsAggregated", Produced.with(Serdes.String(), Serdes.String())); Topology topology = builder.build(); } }
В данном примере мы используем Kafka Streams API для создания агрегированного потока данных. Мы группируем данные по ключу (пользователю) и применяем агрегатор, который увеличивает счетчик действий пользователя. Результат агрегации направляется в userActionsAggregated
топик.
Таким образом, объединение потоков данных в Apache Kafka является мощным инструментом для агрегации, анализа и перенаправления данных в реальном времени. Это позволяет системе Kafka обеспечивать высокую доступность, надежность и масштабируемость при обработке данных в реальном времени.
Это делает Apache Kafka надежным и универсальным средством для хранения и обмена большими потоками данных, что активно применяется в задачах Data Science и разработке распределенных приложений.
Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:
- Администрирование кластера Kafka
- Apache Kafka для разработчиков
- Администрирование кластера Arenadata Streaming Kafka
Источники
- https://kafka.apache.org/documentation/
- Н.Нархид, Г.Шапира, Т.Палино. Apache Kafka. Потоковая обработка и анализ данных