Data Skew Drama? Not Anymore With Broadcast Joins & AQE

Sdílet
Vložit
  • čas přidán 9. 07. 2024
  • Spark Performance Tuning
    Welcome back to another engaging apache spark tutorial! In this apache spark performance optimization hands on tutorial, we dive deep into the techniques to fix data skew, focusing on Adaptive Query Execution (AQE) and broadcast join. AQE, a feature introduced in Spark 3.0, uses runtime statistics to select the most efficient query plan, optimizing shuffle partitions, joins, and skewed joins. We will discuss how Spark coalesces partitions, converts sort merge joins into broadcast joins, and splits larger partitions into smaller ones to optimize skewed joins.
    We will walk through the Spark documentation to understand the properties that need to be set to true for Spark to dynamically handle skew in a sort mode join. Then, we will look at an example joining two datasets, transaction and customer, to analyze how the join will look with and without AQE. By the end of this video, you will have a solid understanding of AQE, how to optimize skewed joins, and how to set up a Spark session to handle data skews.
    Key Takeaways:
    Understanding Adaptive Query Execution (AQE) and its benefits.
    How to optimize shuffle partitions and joins using AQE.
    Setting up a Spark session and properties to handle data skew dynamically.
    Analyzing the distribution of data and identifying skewed partitions.
    Comparing the performance of sort merge join with and without AQE.
    📄 Complete Code on GitHub: github.com/afaqueahmad7117/sp...
    🎥 Full Spark Performance Tuning Playlist: • Apache Spark Performan...
    🔗 LinkedIn: / afaque-ahmad-5a5847129
    Chapters:
    00:00 Introduction
    00:35 What is AQE?
    04:25 Sort-Merge-Join of Customer & Transaction Dataset
    06:00 Spark UI showing Data Skew
    06:41Join of Customer & Transaction Dataset (AQE enabled)
    07:04 Code + Spark UI - Comparing Join Performance (with & without AQE)
    10:52 Broadcast Join
    11:18 Internal Working of Sort Merge Join
    13:12 Concept of Hash Partitioning
    14:47 Sort Merge Join example
    17:12 Broadcast Join example
    19:44 Code for Broadcast join fixing Data Skew
    #DataEngineering #AdaptiveQueryExecution #DataSkew #BroadcastJoin #spark #apachespark #dataengineering #sparkperformancetuning

Komentáře • 27

  • @Fullon2
    @Fullon2 Před 10 měsíci +2

    Incredible series, thank you Afaque Ahmad. Looking forward to the next videos.😃

    • @afaqueahmad7117
      @afaqueahmad7117  Před 10 měsíci +1

      Many thanks @Fullon2 for the kind words, really appreciate it! :)

  • @CoolGuy
    @CoolGuy Před 8 měsíci

    Learnt about AQE today. Thanks for the video.

  • @roksig3823
    @roksig3823 Před 7 měsíci

    Great explanation ! Can understand SMJ and BCJ in a better way. Thanks heaps !

  • @RohanKumar-mh3pt
    @RohanKumar-mh3pt Před 10 měsíci

    amazing you explained in very depth in each video

  • @OmairaParveen-uy7qt
    @OmairaParveen-uy7qt Před 10 měsíci

    amazing content!! explained so well !

  • @miguelruiz9772
    @miguelruiz9772 Před 9 měsíci +3

    Hi Afaque, great video, and content :). Maybe it may be worth noting in the video the limitations of broadcast joins: the broadcasted dataset needs to fit in the driver & executor memory, and if you have many executors, it may take longer than shuffle merge, it could in fact timeout.

    • @afaqueahmad7117
      @afaqueahmad7117  Před 9 měsíci

      Thanks @miguelruiz9772, for the kind words, and for the feedback, makes sense! :)

  • @ManaviVideos
    @ManaviVideos Před 10 měsíci

    Thank you! Afaque Ahmad👍

  • @vamsikrishnabhadragiri402
    @vamsikrishnabhadragiri402 Před 3 měsíci +1

    Hello Afaque, Thanks for the informative video, What does partition by join key means at 18:38?

    • @afaqueahmad7117
      @afaqueahmad7117  Před 3 měsíci

      Hey @vamsikrishnabhadragiri402, I'm referring to partitioning by `Customers.id`. Basically doing a `.partitionBy("id")`. If you were to partition by `Customers.id`, there could be data skews because some customers can have more transactions than others. So, some `Customers.id` partition files will have several rows, while others will have negligible.

  • @anandchandrashekhar2933
    @anandchandrashekhar2933 Před měsícem

    Really great content, all of your videos. Thannk you!! Just had a question out of curiousity - Does AQE only coalesce shuffle partitions or depending on the need, increase the shuffle partitions beyond 200?

    • @afaqueahmad7117
      @afaqueahmad7117  Před měsícem +1

      Hey @anandchandrashekhar2933, appreciate the kind words. Yes, AQE can do both - increase (split) and decrease (coalesce) the number of shuffle partitions. A clear example is this one is in the Spark DAGs video where 1 skewed partition was split into 12 because that 1 partition was skewed. Refer here: czcams.com/video/O_45zAz1OGk/video.html

    • @anandchandrashekhar2933
      @anandchandrashekhar2933 Před měsícem

      @@afaqueahmad7117 Ah thank you for that. That really made it very clear. For some reason, i couldnt replicate the same when i ran your notebook on Databricks, even though i disabled broadcast hash join, it still ended up using broadcast instead of the AQE coalesce followed by sort merge. Maybe seems like something specific about the spark version i am currently on. But thats all right. Thank you again :)

    • @anandchandrashekhar2933
      @anandchandrashekhar2933 Před měsícem

      @@afaqueahmad7117 Thank you! That makes sense. For some reason, I couldnt replicate it when running your notebook on Databricks, even if i disable broadcash hash join, it still ended up using it, instead of how you showed it, that is a AQE coalesce followed by a sort merge join. Maybe something specific with the spark version that i was on. But that's all right. Thank you again!!

  • @retenim28
    @retenim28 Před měsícem

    hi sir, great content as always. just a question on the last part of the video: if i correctly understood you said to repartition(3) the big table so that rows are evenly repartitioned across the 3 executors and then apply the broadcast join. But in the code part you only performed a broadcast join without repartition(3). Why that? I am a little bit confused about that part. thanks a lot

    • @afaqueahmad7117
      @afaqueahmad7117  Před měsícem

      Hey @retenim28, thank you, appreciate it.
      On the question - you're correct that I mentioned doing a `repartition(3)` when the table is big so that the rows get evenly partitioned. Reason why I don't do a `repartition(3)` in the code is because sample transactions table I'm using (supposedly the bigger table) isn't very big - hence a repartitioning to even out data is not needed. Hope that clarifies :)

    • @retenim28
      @retenim28 Před měsícem

      @@afaqueahmad7117 this clatifies a lot, thank you. Another question: `repartition(3)` function involves a shuffle, so theoretically it would be better avoiding that and only use broadcast join, as you did in the video. So, it seems to me there are two possible situation:
      1. make `repartition(3)` and then broadcast join: this involves a shuffle (bad) of big table, but finally skew data problem is solved so each core will process the same amount of data;
      2. avoid `repartition(3)` and then broadcst join: there is no shuffle (good) of big table, but a specific core is forces to work with a huge amount of data compared to the remaining two.
      Which is the best path?
      In your code I tried both options and it looks like it's better avoiding `repartition(3)`. Am I missing something on this point? Sorry about the long answer.

  • @adityeshchaturvedi6553
    @adityeshchaturvedi6553 Před 8 měsíci

    Hi Afaque, Loving your videos. Great content. Just one doubt, Isn't AQE automatically enabled from spark3.X , if yes, Why do we explicitly need to set two mentioned property to true ?

    • @afaqueahmad7117
      @afaqueahmad7117  Před 8 měsíci +2

      Hey @adityeshchaturvedi6553, thanks for the kind words. To answer your question, AQE (spark.sql.adaptive.enabled) defaults to false in Spark 3.0.0 (reference here: spark.apache.org/docs/3.0.0/sql-performance-tuning.html#adaptive-query-execution) while it was enabled (defaulting to true) starting 3.2.0 onwards.

    • @adityeshchaturvedi6553
      @adityeshchaturvedi6553 Před 8 měsíci

      @@afaqueahmad7117 thanks a lot.
      Really appreciate your efforts !

  • @suruchijha3914
    @suruchijha3914 Před 8 měsíci

    Hi @afaque could you please let me know what do you mean by 15 distinct keys in a join

    • @afaqueahmad7117
      @afaqueahmad7117  Před 8 měsíci

      Hey @suruchijha3914, by 15 distinct keys in a join, I'm referring to 15 unique values in the join column. For example: let's say you're joining sales data with product promotions data on the 'product_id' having only 15 unique products / product_id's => means only 15 distinct keys in the join.

  • @miguelruiz9772
    @miguelruiz9772 Před 9 měsíci

    Also, have you actually seen real-life performance improvements from AQE in a pipeline? - I always end up setting it to false to avoid unpredictable behavior

    • @afaqueahmad7117
      @afaqueahmad7117  Před 9 měsíci

      Well, some of it's functionalities work pretty well, like converting sort merge join to broadcast join, coalescing the number of partitions, but I do agree, others like optimizing skewed joins are hard to predict and understand.

  • @gohyuchen7465
    @gohyuchen7465 Před 10 měsíci

    just one feedback. at 11:10, your eyes keep darting from your script to your camera. actually since this is a recorded video, it is perfectly fine to keep looking at your script throughout. having your eyes keep changing focus is slightly distracting

    • @afaqueahmad7117
      @afaqueahmad7117  Před 10 měsíci +1

      I love your attention to detail, however, there's no script, its just beginner me, trying to adjust myself to the camera. Appreciate your feedback :)