How do pyspark handles consumer group and consumer of kafka , generally by converting consumer group we can start consuming data from starting of topic , how to start consuming from beginning, startingoffset as earliest , will always read from starting .
Spark Streaming utilizes checkpoint directory to handle offsets, so that it doesn't consume the offset which is already consumed. You can find more details to set the starting offset with Kafka at this link - spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Hello, and thanks for the sharing of these useful videos. How to handle the writing in delta tables: Because the best practice is that the size of each parquet file should be between 128 MB to 1 GB. How to handle this situation while each batch has very less than the size that is mentioned? or how to handle to collect the number of batches and to reach the mentioned size and finally to write in deltalake.
Usually microbatch execution in Spark can write multiple small files. This requires a later stage to read all those files and write a compacted file (say for each day) of bigger size to avoid small file issue. You can use this compacted file to read data in your downstream systems.
Hi! I have problem when i do a ReadStream. i use load() and then this appears: AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide". please help me thank you ^^
Hello, Set up postgres from docker hub using this command (docker compose up) after you download the following compose file github.com/subhamkharwal/docker-images/tree/master/postgres Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example www.tutorialworks.com/container-networking/
What will happen if postgres is down, data will still be written to parquet as that is on hdfs and checkpoint will still have the ack done for those records.will be having data loss in that case in the postgres. If yes how can we recover. Some fault tolerance video on this will be highly appreciated. Edited: I just the next video glad to see that, still I have one more question is there a way to make the error recovery auto handled.
Hello Shubham, Hope the next video explained the first part of your doubt. Coming to the next part, yes you can have a weekly of daily batch job to look for error records and reprocess those. If you like the content, please make sure to share with your network over LinkedIn.
@@easewithdata I solved this by setting up the Postgres and the Jupyter all with the same compose file. Before I was using a docker network to connect the two, didn't work no matter what. Everything breaks after I removed the network group so I tried setting it up again.
Thank you so much for the Streaming Lecture! While Studying, I got some question when using readStream from kafka. Do I always have to pass the schema for the streaming data? Even if the number of tables coming from the source increases to more than 100, do I need to define all schemas for each table and include them in the code? Is there any way to automatically define the schema of incoming tables?
Thanks, please share with your network 🛜 Answer to second part of your question - You can set following conf in order to read schema in run time spark.sql.streaming.schemaInference to true
Hi, Thanks very much for the video. While doing the implementation I am stuck at this error: java.lang.ClassNotFoundException: org.postgresql.Driver I have tried to add manually the postgres driver still it is not working. I have added the exact jar file as well in the specified location /home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.2.20.jar still I am getting ClassNotFoundException while executing the writestream part. Please help.
I keep having ERROR : ERROR MicroBatchExecution: Query [id = 570602f6-fc8e-41b5-b4b1-cc7a7c894a98, runId = e084fabe-f1b7-44e1-a274-b3c4be0959b9] terminated with error py4j.Py4JException: Error while obtaining a new communication channel. Any chance you knew why ?
Would you be able to start a play list on Databricks, Unity Catalog, Delta Live Tables. I am sure for the understanding purpose you are using docker. But in real time it will be really helpful if you can create a play list just with Databricks with either Azure/AWS integration is fine. Is that something in your wish list or to do ?
Hello, You can lift and shift the code the I am teaching on Jupyter to get started with Databricks. Currently its on Docker so that anyone can setup in local and get started. And yes, I am planning to start a course on complete integration with Azure. Its in my wishlist.
Hello, Set up postgres from docker hub using this command (docker compose up) after you download the following compose file github.com/subhamkharwal/docker-images/tree/master/postgres Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example www.tutorialworks.com/container-networking/
Hello, Set up postgres from docker hub using this command (docker compose up) after you download the following compose file github.com/subhamkharwal/docker-images/tree/master/postgres Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example www.tutorialworks.com/container-networking/
How do pyspark handles consumer group and consumer of kafka , generally by converting consumer group we can start consuming data from starting of topic , how to start consuming from beginning, startingoffset as earliest , will always read from starting .
Spark Streaming utilizes checkpoint directory to handle offsets, so that it doesn't consume the offset which is already consumed. You can find more details to set the starting offset with Kafka at this link - spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Thank you !
Hello, and thanks for the sharing of these useful videos.
How to handle the writing in delta tables:
Because the best practice is that the size of each parquet file should be between 128 MB to 1 GB.
How to handle this situation while each batch has very less than the size that is mentioned?
or how to handle to collect the number of batches and to reach the mentioned size and finally to write in deltalake.
Usually microbatch execution in Spark can write multiple small files. This requires a later stage to read all those files and write a compacted file (say for each day) of bigger size to avoid small file issue. You can use this compacted file to read data in your downstream systems.
Hi! I have problem when i do a ReadStream. i use load() and then this appears: AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide". please help me thank you ^^
MKe sure the Kafka library is loaded in the Spark Session before you use it in code.
A separate video for connecting postgres from cluster pyspark how?
Hello,
Set up postgres from docker hub using this command (docker compose up) after you download the following compose file
github.com/subhamkharwal/docker-images/tree/master/postgres
Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example
www.tutorialworks.com/container-networking/
What will happen if postgres is down, data will still be written to parquet as that is on hdfs and checkpoint will still have the ack done for those records.will be having data loss in that case in the postgres. If yes how can we recover. Some fault tolerance video on this will be highly appreciated.
Edited: I just the next video glad to see that, still I have one more question is there a way to make the error recovery auto handled.
Hello Shubham,
Hope the next video explained the first part of your doubt.
Coming to the next part, yes you can have a weekly of daily batch job to look for error records and reprocess those.
If you like the content, please make sure to share with your network over LinkedIn.
Hi, I have a doubt: How can we check if a stream has multiple sink from spark UI?
Allow me sometime to search the exact screenshot for you.
Got the "scram authentication is not supported by this driver" error while trying to connect to postgres. This is driving me nuts.
Please make sure to use the correct driver version for the postgres you are using
@@easewithdata I solved this by setting up the Postgres and the Jupyter all with the same compose file. Before I was using a docker network to connect the two, didn't work no matter what. Everything breaks after I removed the network group so I tried setting it up again.
Thank you so much for the Streaming Lecture! While Studying, I got some question when using readStream from kafka. Do I always have to pass the schema for the streaming data? Even if the number of tables coming from the source increases to more than 100, do I need to define all schemas for each table and include them in the code? Is there any way to automatically define the schema of incoming tables?
Thanks, please share with your network 🛜
Answer to second part of your question - You can set following conf in order to read schema in run time
spark.sql.streaming.schemaInference to true
Hi, Thanks very much for the video. While doing the implementation I am stuck at this error: java.lang.ClassNotFoundException: org.postgresql.Driver
I have tried to add manually the postgres driver still it is not working. I have added the exact jar file as well in the specified location /home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.2.20.jar still I am getting ClassNotFoundException while executing the writestream part.
Please help.
Please download the driver and keep it at the mentioned loaction
were you able to resolve this issue? I am stuck at same place, can you please share the solution
I keep having ERROR : ERROR MicroBatchExecution: Query [id = 570602f6-fc8e-41b5-b4b1-cc7a7c894a98, runId = e084fabe-f1b7-44e1-a274-b3c4be0959b9] terminated with error
py4j.Py4JException: Error while obtaining a new communication channel. Any chance you knew why ?
Hello,
Please kill all queries running and restart the application with new checkpoint
@@easewithdata hi thx for the reply! i got it resolved after switching from PySpark to Scala
please demo to create table there how can be done not being done?
its just create table command with the columns shown in the table
Please step by step give demo to establish connection with postgresq it is not being done. please
You can install postgres in your local machine and use it for the same examples
Would you be able to start a play list on Databricks, Unity Catalog, Delta Live Tables. I am sure for the understanding purpose you are using docker. But in real time it will be really helpful if you can create a play list just with Databricks with either Azure/AWS integration is fine. Is that something in your wish list or to do ?
Hello,
You can lift and shift the code the I am teaching on Jupyter to get started with Databricks. Currently its on Docker so that anyone can setup in local and get started.
And yes, I am planning to start a course on complete integration with Azure. Its in my wishlist.
can u give me the code to write into cassandra?
I dont have it.
Any one from community give me answar.
Hello,
Set up postgres from docker hub using this command (docker compose up) after you download the following compose file
github.com/subhamkharwal/docker-images/tree/master/postgres
Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example
www.tutorialworks.com/container-networking/
py4j.protocol.Py4JJavaError: An error occurred while calling o185.save.
: java.lang.ClassNotFoundException: org.postgresql.Driver
Hello,
Set up postgres from docker hub using this command (docker compose up) after you download the following compose file
github.com/subhamkharwal/docker-images/tree/master/postgres
Once postgres is up, You can use docker network bridge to communicate between cluster and postgres container. checkout this example
www.tutorialworks.com/container-networking/