Shuffle Partition Spark Optimization: 10x Faster!

Sdílet
Vložit
  • čas přidán 9. 07. 2024
  • Welcome to our comprehensive guide on understanding and optimising shuffle operations in Apache Spark! In this deep-dive video, we uncover the complexities of shuffle partitions and how shuffling works in Spark, providing you with the knowledge to enhance your big data processing tasks. Whether you're a beginner or an experienced Spark developer, this video is designed to elevate your skills and understanding of Spark's internal mechanisms.
    🔹 What you'll learn:
    1. Shuffling in Spark: Uncover the mechanics behind shuffling, why it's necessary, and how it impacts the performance of your data processing jobs.
    2. Shuffle Partitions: Discover what shuffle partitions are and their role in distributing data across nodes in a Spark cluster.
    3. When Does Shuffling Occur?: Learn about the specific scenarios and operations that trigger shuffling in Spark, particularly focusing on wide transformations.
    4. Shuffle Partition Size Considerations: Explore real-world scenarios where the shuffle partition size is significantly larger or smaller than the data per shuffle partition, and understand the implications on performance and resource utilisation.
    5. Tuning Shuffle Partitions: Dive into strategies and best practices for tuning the number of shuffle partitions based on the size and nature of your data, ensuring optimal performance and efficiency.
    📘 Chapters:
    00:00 Introduction
    00:14 What is shuffling & shuffle partitions?
    05:07 Why are shuffle partitions important?
    07:45 Scenario based question 1 (data per shuffle partition is large)
    12:29 Scenario based question 2 (data per shuffle partition is small)
    17:29 How to tune slow running jobs?
    18:53 Thank you
    📘 Resources:
    🎥 Full Spark Performance Tuning Playlist: • Apache Spark Performan...
    📄 Complete Code on GitHub: github.com/afaqueahmad7117/sp...
    🔗 LinkedIn: / afaque-ahmad-5a5847129
    #ApacheSpark, #DataEngineering, #ShufflePartitions, #BigData, #PerformanceTuning, #pyspark, #sql, #python

Komentáře • 54

  • @rgv5966
    @rgv5966 Před 5 dny

    I don't think this kind of videos are available on Spark anywhere else. Great work Afaque!

  • @showbhik9700
    @showbhik9700 Před 21 dnem

    Lovely!

  • @akshayshinde3703
    @akshayshinde3703 Před 6 měsíci +1

    Really good explanation Afaque. Thank you for making such in depth videos.😊

  • @vinothvk2711
    @vinothvk2711 Před 5 měsíci

    Great Explanation!

  • @tahiliani22
    @tahiliani22 Před 6 měsíci

    Commenting so that you continue making such informative videos. Great work!

  • @ashokreddyavutala8684
    @ashokreddyavutala8684 Před 6 měsíci

    thanks a bunch for the great content again....

  • @Wonderscope1
    @Wonderscope1 Před 6 měsíci

    Thanks for video, very informative

  • @ravikrish006
    @ravikrish006 Před 6 měsíci

    Thank you it is very useful

  • @2412_Sujoy_Das
    @2412_Sujoy_Das Před 6 měsíci

    Great share sir, the optimal shuffle size..... Please bring more scenario basef Questions as well as best production based practises!!!!

  • @dileepkumar-nd1fo
    @dileepkumar-nd1fo Před 2 měsíci

    Have watched all your videos. Seriously Gold content. Requesting not to stop making videos.

    • @afaqueahmad7117
      @afaqueahmad7117  Před 2 měsíci

      Thank you @dileepkumar-nd1fo for the kind words, it means a lot to me :)

  • @Momofrayyudoodle
    @Momofrayyudoodle Před 2 měsíci

    Thank you so much. Keep up the good work. Looking forward for more such videos to learn Spark

    • @afaqueahmad7117
      @afaqueahmad7117  Před 2 měsíci

      Thank you @Momofrayyu-pw9ki, really appreciate it :)

  • @iamkiri_
    @iamkiri_ Před 5 měsíci

    Nice explanation -)

  • @purnimasharma9734
    @purnimasharma9734 Před měsícem

    Very nice explanation! Thank you for making this video.

  • @anandchandrashekhar2933
    @anandchandrashekhar2933 Před měsícem

    Wow! Thank you Afaque, this is incredible content and very helpful!

  • @asokanramasamy2087
    @asokanramasamy2087 Před 3 měsíci

    Clearly explained!!

  • @Rafian1924
    @Rafian1924 Před 3 měsíci

    Learning from masters❤❤❤

  • @sureshpatchigolla3438
    @sureshpatchigolla3438 Před 3 měsíci

    Great work brother......... Thank you for explaining concepts in detail ❤❤

  • @puneetgupta003
    @puneetgupta003 Před měsícem

    Nice content Afaque !

  • @abdulwahiddalvi7119
    @abdulwahiddalvi7119 Před 17 dny

    @Afaque thank you for making these videos. Very helpful. I have questions how do we estimate the data size? We run our batches/jobs on spark and each batches could be processing varying size of data. Some batches could be dealing with 300Gb and some could be 300Mb. How do we calculate optimal number of shuffle partitions?

  • @user-er4qt3pc5s
    @user-er4qt3pc5s Před 5 měsíci

    @afaque shuffle partition will consist of both the shuffled data (keys that were not originally present in the executor and were shuffled to the partition) and the non-shuffled data (keys that were already present in the executor and were not shuffled). So, the size of the shuffle partition cannot be directly calculated from the shuffle write data alone,as it also depends on the distribution of the data across the partitions ?

  • @crepantherx
    @crepantherx Před 6 měsíci

    can you please cover bucketing handson in adb(handson with file view). In your last video it is working in your IDE but not in databricks. (delta bucketing not allowed)

  • @vikastangudu712
    @vikastangudu712 Před 2 měsíci

    Thanks for the explanantion, But Isn't the parameter(spark.sql.shuffle.partitions) is no way dependent on the cradinality of the group by/ join column ?

  • @tandaibhanukiran4828
    @tandaibhanukiran4828 Před 4 měsíci

    Thank you for explaining in detail.You are the best guy around. Can you also please explain me if there is a way to dynamically update the shuffle partition with the help of dynamic calculations of size and no. of cores in the cluster(if in case the cluster is altered in future).
    Thanks in advance.

    • @afaqueahmad7117
      @afaqueahmad7117  Před 4 měsíci +1

      @tandaibhanukiran4828 I believe it's challenging to be able to dynamically configure shuffle partitions only knowing the size of your data and cluster configuration. The most important input is the "Shuffle Write". Estimating shuffle write is not very clear-cut as it depends on several factors (skew, transformation code complexity i.e. joins, aggregations, dynamic execution plans etc..)
      If you have historical data, or similar jobs (using same/similar) datasets with similar operations i.e. joins aggregations, you could use those "Shuffle Partition" numbers and apply the logic (as demonstrated in the scenarios) to dynamically get the number of shuffle partitions.
      However, I would stress to use this approach with caution.

    • @tandaibhanukiran4828
      @tandaibhanukiran4828 Před 4 měsíci

      Thank you very much.@@afaqueahmad7117

  • @Kiedi7
    @Kiedi7 Před 4 měsíci

    Hey man, awesome series so far. I noticed in your videos that you share your mac screen but use an apple pencil on your ipad to annotate? Can you describe that setup on how you’re able to annotate on your Jupyter Notebook (presented on mac) but from an ipad instead? Thanks in advance appreciate it

    • @afaqueahmad7117
      @afaqueahmad7117  Před 4 měsíci

      Thank you! I use Ecamm Live so adding both iPad and Mac as a screen helps me navigate easily between both.
      For the ones, where I’ve annotated on Notebooks, I’ve used Zoom to share my screen and annotate on Zoom.

  • @tsaha100
    @tsaha100 Před 5 měsíci +1

    @Afaque : Very good video. So in real life for varying work load size ( shuffle write size 50mb - 300GB) you have to change the shuffle partition size programmatically ? How do you figure out the shuffle write in the code which find in the spark UI? Is there any solution?

    • @afaqueahmad7117
      @afaqueahmad7117  Před 5 měsíci

      Thanks for the kind works @tsaha100. I don't think there's a clear way to estimate the shuffle write statically which is shown on the Spark UI using code , because of dynamic nature of Spark's execution.
      If you would like to log the Shuffle write metrics when your task completes, you could try attaching the SparkListener to your SparkContext and override onTaskEnd method to capture shuffle write metrics, but I believe it's just easier to run and refer to the Spark UI.
      You can refer: books.japila.pl/apache-spark-internals/SparkListenerInterface/#ontaskgettingresult

  • @arunkindra832
    @arunkindra832 Před 5 měsíci +1

    Hi Afaque, in 1st case, when you have configured 1500 shuffle partitions, but initially you have said 1000 cores available in a cluster, and you have also mentioned about one partition per core. Then from where we got rest 500 partitions?
    Another doubt, do we need to configure no of cores consumed by a job according to the shuffle partitions we provide?
    Also, please explain a case where we don't have enough cores available in the cluster.

    • @afaqueahmad7117
      @afaqueahmad7117  Před 5 měsíci +1

      Hey @arunkindra832, in scenario 1, referring to the diagram, there are 20 cores in the cluster (5 executors * 4 core each). 1500 shuffle partitions are going to be processed by a 20 core cluster. Assuming each core is going to take the same amount of time to process a shuffle partition and the distribution of shuffle partitions is uniform, there's approximately going to be 1500/20 = 75 rounds. In 1 round, 20 shuffle partitions are going to be processed.

  • @kaushikghosh-po1ew
    @kaushikghosh-po1ew Před 2 měsíci

    i have a question in this. Let's say that the data volume that i am processing varies on daily basis i..,e someday it can be 50gb someday it can be 10gb. keeping in mind the 200mb per shuffle partition limit the num of partition for optimum partition should change on each run in that case. But it;s not practically possible to change the code every time to have a proper shuffle partition. How should this scenario be handled ? i read about a parameter sql.files.maxPartitionBytes which is defaulted to 128mb. Should i change this to 200 and let the number of shuffle partition be calculated automatically ? In that case will the value under sql.shuffle.partitions be ignored ?

  • @ramvel814
    @ramvel814 Před 4 měsíci

    Can you tell how to resolve Python worker exited unexpectedly (crashed)

  • @atifiu
    @atifiu Před 5 měsíci +1

    @afaque how can I calculate total shuffle data size without executing the code before hand. Also size in disk is not same as in memory as memory data is uncompressed.

    • @afaqueahmad7117
      @afaqueahmad7117  Před 5 měsíci

      Hey @atifiu, calculating total shuffle data size without executing the code can be challenging due to dynamic nature of Spark's execution. There are several things which would come into picture for example: data distribution, skew, nature of transformations (wide vs narrow) depending on which you may / may not get an accurate shuffle data size;

  • @akshaybaura
    @akshaybaura Před 6 měsíci +1

    in scenario 1, how exactly is reducing the size of partitions beneficial ?
    Correct me if I'm wrong, in case we let a core process 1.5 GB, most of the data will spill to disk for computation which will increase IO and hence increase time taken for completion. However, in case we reduce the partition size, we increase the number of partitions as a result which again would increase the time taken for job completion.

    • @kartikthakur6461
      @kartikthakur6461 Před 5 měsíci +1

      I am assuming Disk computation will take much longer to complete.

    • @afaqueahmad7117
      @afaqueahmad7117  Před 5 měsíci +2

      Hey @akshaybaura, @kartikthakur6461, you're on the right track. Reducing each partition from 1.5g to 200mb does increase the number of partitions, but it is beneficial for the following important reasons:
      1. Reduced memory pressure: when a core processes a large partition (1.5g), it's more likely to run out of memory and start spilling to disk. This spill-over is going to cause increased IO operations which in turn is significantly going to slow down processing. However, I would still emphasize on the fact that spill would depend on the memory per core. If memory per core is > 1.5g, spills won't happen but processing is going to be significantly slow.
      2. Better resource utilisation: increasing the number of partitions allows better distribution of workload across the cores.
      3. Balanced workload: Smaller partitions help in achieving a more balanced workload across cores. The importance is in ensuring that each core is given to process a "manageable" amount of data.
      However, the goal is to strike a balance - partition sizes should be small enough to avoid excessive memory usage and I/O spillover but large enough to ensure that the overhead of managing many partitions doesn’t outweigh the benefits.

  • @user-dx9qw3cl8w
    @user-dx9qw3cl8w Před 6 měsíci

    thanks again for the topic. i worried that you stopped ... because this deapth knowledge can not even get from colleauges

  • @Revnge7Fold
    @Revnge7Fold Před 17 dny +1

    I think its a bit dumb for spark to keep this value static... why not rather have a "target shuffle size(mb/gb)" config in spark. I wish the spark planner was a bit more sophisticated.

    • @afaqueahmad7117
      @afaqueahmad7117  Před 8 dny

      You could get a similar effect by turning on AQE and setting "spark.sql.adaptive.advisoryPartitionSizeInBytes" to your desired size. Documentation here: spark.apache.org/docs/latest/sql-performance-tuning.html

    • @Revnge7Fold
      @Revnge7Fold Před 8 dny

      @@afaqueahmad7117 Awesome! Thanks for the advice! Your videos have been really helpful!

  • @vinothvk2711
    @vinothvk2711 Před 5 měsíci

    In Scnario 2 - Finally, we are distributing 4.2 mb for each core. In this case whats the spark.sql.shuffle.partitions . Is it 12?

    • @afaqueahmad7117
      @afaqueahmad7117  Před 5 měsíci

      Yes @vinothvk2711, it's 12. As explained, this is going to ensure 100% utilisation of the cluster.