Stream processing with Apache Kafka and Azure Databricks

Sdílet
Vložit
  • čas přidán 27. 07. 2024
  • This video describes how you can use Apache Kafka as a source when running Structured Streaming workloads on Azure Databricks. Video will explain how to setup Apache Kafka service and connect from Azure Databricks
    Timestamps
    00:00 Create Resource Group
    01:16 Deploy Virtual Network
    06:09 Deploy Azure Databricks
    12:30 Deploy Virtual Machine
    15:17 Install and Configure Kafka
    21:20 Start Zookeeper and Kafka Broker
    30:00 Read Kafka message from Databricks
    Start Zookeeper
    C:\kafka\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties ( Check -daemon option )
    Connect Zookeeper Shell
    bin\windows\zookeeper-shell.bat localhost:2181
    bin\windows\zookeeper-shell.bat -zk-tls-config-file
    ls /brokers/ids
    Start Kafka
    C:\kafka\bin\windows\kafka-server-start.bat .\config\server.properties
    Create Topic
    C:\kafka\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic TestTopic
    C:\kafka\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
    Console Producer
    bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic TestTopic
    Console Consumer
    bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic TestTopic --from-beginning
    Batch Read
    df = (spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "10.0.0.4:9092")
    .option("subscribe", "TestTopic")
    .option("startingOffsets", "earliest")
    .option("endingOffsets", "latest")
    .load()
    )
    display(df)
    Stream Read
    df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "10.0.0.4:9092")
    .option("subscribe", "TestTopic")
    .option("startingOffsets", "latest")
    .load()
    )
    display(df)
    from pyspark.sql.functions import * ;
    df = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "10.0.0.4:9092")
    .option("subscribe", "TestTopic")
    .option("startingOffsets", "latest")
    .load()
    .select(decode(col("value"),"UTF-8")).alias("decoded_value")
    )
    display(df)
    Databricks Structured Streaming
    Databricks Streaming
    Databricks with Kafka
    Kafka Streaming with Azure Databricks

Komentáře • 2

  • @yogeshpal4275
    @yogeshpal4275 Před 7 měsíci

    How to load data into unity catalog schema tables from azure storage gen2 using this spark in azure data bricks

    • @HadoopEngineering
      @HadoopEngineering  Před měsícem

      Create external location in Databricks ( which lets you connect ADLS ) , then create external table using that location