12 Spark Streaming Writing data to Multiple Sinks | foreachBatch | Writing data to JDBC(Postgres)

Sdílet
Vložit
  • čas přidán 7. 09. 2024

Komentáře • 39

  • @priyachaturvedi1164
    @priyachaturvedi1164 Před 2 měsíci +1

    How do pyspark handles consumer group and consumer of kafka , generally by converting consumer group we can start consuming data from starting of topic , how to start consuming from beginning, startingoffset as earliest , will always read from starting .

    • @easewithdata
      @easewithdata  Před 2 měsíci

      Spark Streaming utilizes checkpoint directory to handle offsets, so that it doesn't consume the offset which is already consumed. You can find more details to set the starting offset with Kafka at this link - spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

  • @ryan7ait
    @ryan7ait Před 6 měsíci

    Thank you !

  • @hamedtamadon6520
    @hamedtamadon6520 Před 2 měsíci

    Hello, and thanks for the sharing of these useful videos.
    How to handle the writing in delta tables:
    Because the best practice is that the size of each parquet file should be between 128 MB to 1 GB.
    How to handle this situation while each batch has very less than the size that is mentioned?
    or how to handle to collect the number of batches and to reach the mentioned size and finally to write in deltalake.

    • @easewithdata
      @easewithdata  Před 2 měsíci +1

      Usually microbatch execution in Spark can write multiple small files. This requires a later stage to read all those files and write a compacted file (say for each day) of bigger size to avoid small file issue. You can use this compacted file to read data in your downstream systems.

  • @luonghuy7154
    @luonghuy7154 Před 27 dny

    Hi! I have problem when i do a ReadStream. i use load() and then this appears: AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide". please help me thank you ^^

    • @easewithdata
      @easewithdata  Před 27 dny

      MKe sure the Kafka library is loaded in the Spark Session before you use it in code.

  • @shivakant4698
    @shivakant4698 Před 3 měsíci

    A separate video for connecting postgres from cluster pyspark how?

    • @easewithdata
      @easewithdata  Před 3 měsíci

      Hello,
      Set up postgres from docker hub using this command (docker compose up) after you download the following compose file
      github.com/subhamkharwal/docker-images/tree/master/postgres
      Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example
      www.tutorialworks.com/container-networking/

  • @shubhamgupta9375
    @shubhamgupta9375 Před 3 měsíci

    What will happen if postgres is down, data will still be written to parquet as that is on hdfs and checkpoint will still have the ack done for those records.will be having data loss in that case in the postgres. If yes how can we recover. Some fault tolerance video on this will be highly appreciated.
    Edited: I just the next video glad to see that, still I have one more question is there a way to make the error recovery auto handled.

    • @easewithdata
      @easewithdata  Před 3 měsíci

      Hello Shubham,
      Hope the next video explained the first part of your doubt.
      Coming to the next part, yes you can have a weekly of daily batch job to look for error records and reprocess those.
      If you like the content, please make sure to share with your network over LinkedIn.

  • @deepanshuaggarwal7042
    @deepanshuaggarwal7042 Před 2 měsíci

    Hi, I have a doubt: How can we check if a stream has multiple sink from spark UI?

    • @easewithdata
      @easewithdata  Před 2 měsíci +1

      Allow me sometime to search the exact screenshot for you.

  • @atonxment2868
    @atonxment2868 Před měsícem

    Got the "scram authentication is not supported by this driver" error while trying to connect to postgres. This is driving me nuts.

    • @easewithdata
      @easewithdata  Před měsícem

      Please make sure to use the correct driver version for the postgres you are using

    • @atonxment2868
      @atonxment2868 Před měsícem +1

      ​@@easewithdata I solved this by setting up the Postgres and the Jupyter all with the same compose file. Before I was using a docker network to connect the two, didn't work no matter what. Everything breaks after I removed the network group so I tried setting it up again.

  • @ye__in
    @ye__in Před 3 měsíci

    Thank you so much for the Streaming Lecture! While Studying, I got some question when using readStream from kafka. Do I always have to pass the schema for the streaming data? Even if the number of tables coming from the source increases to more than 100, do I need to define all schemas for each table and include them in the code? Is there any way to automatically define the schema of incoming tables?

    • @easewithdata
      @easewithdata  Před 3 měsíci

      Thanks, please share with your network 🛜
      Answer to second part of your question - You can set following conf in order to read schema in run time
      spark.sql.streaming.schemaInference to true

  • @RahulGhosh-yl7hl
    @RahulGhosh-yl7hl Před 3 měsíci

    Hi, Thanks very much for the video. While doing the implementation I am stuck at this error: java.lang.ClassNotFoundException: org.postgresql.Driver
    I have tried to add manually the postgres driver still it is not working. I have added the exact jar file as well in the specified location /home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.2.20.jar still I am getting ClassNotFoundException while executing the writestream part.
    Please help.

    • @easewithdata
      @easewithdata  Před 3 měsíci

      Please download the driver and keep it at the mentioned loaction

    • @MuzicForSoul
      @MuzicForSoul Před 11 dny

      were you able to resolve this issue? I am stuck at same place, can you please share the solution

  • @zheyan4704
    @zheyan4704 Před 4 měsíci

    I keep having ERROR : ERROR MicroBatchExecution: Query [id = 570602f6-fc8e-41b5-b4b1-cc7a7c894a98, runId = e084fabe-f1b7-44e1-a274-b3c4be0959b9] terminated with error
    py4j.Py4JException: Error while obtaining a new communication channel. Any chance you knew why ?

    • @easewithdata
      @easewithdata  Před 4 měsíci

      Hello,
      Please kill all queries running and restart the application with new checkpoint

    • @zheyan4704
      @zheyan4704 Před 4 měsíci +1

      @@easewithdata hi thx for the reply! i got it resolved after switching from PySpark to Scala

  • @shivakant4698
    @shivakant4698 Před 3 měsíci

    please demo to create table there how can be done not being done?

    • @easewithdata
      @easewithdata  Před 3 měsíci

      its just create table command with the columns shown in the table

  • @shivakant4698
    @shivakant4698 Před 3 měsíci

    Please step by step give demo to establish connection with postgresq it is not being done. please

    • @easewithdata
      @easewithdata  Před 3 měsíci

      You can install postgres in your local machine and use it for the same examples

  • @babuganesh2000
    @babuganesh2000 Před 6 měsíci

    Would you be able to start a play list on Databricks, Unity Catalog, Delta Live Tables. I am sure for the understanding purpose you are using docker. But in real time it will be really helpful if you can create a play list just with Databricks with either Azure/AWS integration is fine. Is that something in your wish list or to do ?

    • @easewithdata
      @easewithdata  Před 6 měsíci

      Hello,
      You can lift and shift the code the I am teaching on Jupyter to get started with Databricks. Currently its on Docker so that anyone can setup in local and get started.
      And yes, I am planning to start a course on complete integration with Azure. Its in my wishlist.

  • @ryan7ait
    @ryan7ait Před 6 měsíci

    can u give me the code to write into cassandra?

  • @shivakant4698
    @shivakant4698 Před 3 měsíci

    Any one from community give me answar.

    • @easewithdata
      @easewithdata  Před 3 měsíci

      Hello,
      Set up postgres from docker hub using this command (docker compose up) after you download the following compose file
      github.com/subhamkharwal/docker-images/tree/master/postgres
      Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example
      www.tutorialworks.com/container-networking/

  • @shivakant4698
    @shivakant4698 Před 3 měsíci

    py4j.protocol.Py4JJavaError: An error occurred while calling o185.save.
    : java.lang.ClassNotFoundException: org.postgresql.Driver

    • @easewithdata
      @easewithdata  Před 3 měsíci

      Hello,
      Set up postgres from docker hub using this command (docker compose up) after you download the following compose file
      github.com/subhamkharwal/docker-images/tree/master/postgres
      Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example
      www.tutorialworks.com/container-networking/