24 Fix Skewness and Spillage with Salting in Spark

Sdílet
Vložit
  • čas přidán 9. 07. 2024
  • Video explains - What is Data Skewness? How Data Skewness leads to Data Spillage ? How to Identify Skewness in Spark? What is Salting Technique? How to implement Salting Technique?
    Chapters
    00:00 - Introduction
    03:57 - What is Data Skewness?
    04:25 - Types of Spillage in Spark
    05:28 - How to identify Skewness?
    08:32 - How to fix Skewness ?
    09:03 - What is Salting Technique?
    09:53 - How is Salting done ?
    Medium Link for Salting - / pyspark-the-famous-sal...
    Local PySpark Jupyter Lab setup - • 03 Data Lakehouse | Da...
    Python Basics - www.learnpython.org/
    GitHub URL for code - github.com/subhamkharwal/pysp...
    The series provides a step-by-step guide to learning PySpark, a popular open-source distributed computing framework that is used for big data processing.
    New video in every 3 days ❤️
    #spark #pyspark #python #dataengineering

Komentáře • 25

  • @dataworksstudio
    @dataworksstudio Před 5 měsíci

    Great video. Thank you bhaiya.

  • @akshaykadam1260
    @akshaykadam1260 Před 6 dny

    great work

    • @easewithdata
      @easewithdata  Před 4 dny

      Thank you for your feedback 💓 Please make sure to share it with your network over LinkedIn 👍

  • @sumeet6510
    @sumeet6510 Před 2 měsíci

    Great Video👍

  • @at-cv9ky
    @at-cv9ky Před 4 měsíci

    V nice explanation.

  • @user-qc9vi2jy6w
    @user-qc9vi2jy6w Před 3 měsíci

    Mass solutions bruhh🎉

  • @Amarjeet-fb3lk
    @Amarjeet-fb3lk Před 10 dny +1

    Why you made 32 shuffle partition if you have 8core,
    If one partition is going to process on single core, from where it will get other remaining 24 cores?

    • @easewithdata
      @easewithdata  Před 6 dny

      The 8 cores will process all the 32 partitions in 4 iterations each. (8X4 = 32)

  • @ankitpehlajani9244
    @ankitpehlajani9244 Před 5 měsíci

    This is what exactly I was looking for , such a great way to explain.Can you please explain why did you choose 16 as a number of partitions ?

    • @easewithdata
      @easewithdata  Před 5 měsíci

      The number partitions need to factor of cores. subhamkharwal.medium.com/pyspark-the-factor-of-cores-e884b2d5af6c

  • @vishaljoshi1752
    @vishaljoshi1752 Před 4 měsíci +1

    thank you for explanation one question is there at 4:41 that 39.6 MB is data first loaded in memory then it deserialized then it become 79mb or it is more than that means( deserialized data - 79 MB ) is able to process at one time then after processing it that 79 MB will be processed

    • @easewithdata
      @easewithdata  Před 2 měsíci +1

      Spillage shows the amount of data it is not able to fit in memory for processing, both in deserialized and serialized form. This data will get processed as soon Spark is able to fit the same after processing some data.

    • @vishaljoshi1752
      @vishaljoshi1752 Před 2 měsíci

      @@easewithdata thanks...like in sort merge join suppose we are joining two tables let suppose after shuffling the shuffled partition of one table have very big size so that it can not fully fit in memory.. I have seen in such a scenario spark is able to join the data with the help of sort merge join how it is possible as we know both table partitions have to be fit in memory for join

  • @user-dv1ry5cs7e
    @user-dv1ry5cs7e Před 2 měsíci

    if we can set the shuffle partition to 32 and don't use the salting technique, just do the joining on original department_id columns then what will happen

    • @easewithdata
      @easewithdata  Před 2 měsíci

      Shuffle setting doesn't guarantee even data distribution among executors. In order to make sure the data is distributed properly, we are using slating technique.

  • @montypoddar10
    @montypoddar10 Před 3 měsíci

    I couldn't find the emp_skewed_data in the repo, can you please share it here in the link or on the git repo.

    • @easewithdata
      @easewithdata  Před 3 měsíci

      The dataset is too huge to be uploaded in Github. I am trying an external source to upload the same.

  • @Fullon2
    @Fullon2 Před 6 měsíci

    Amazing video.
    Why not use repartition using id_departament? Isn't it simpler?

    • @easewithdata
      @easewithdata  Před 6 měsíci

      If we repartition shuffle partitions after joining, then its again an extra step of exchange. And if we try to repartition before shuffle then the data will again get skewed in shuffle partitions.
      So, its a choice of optimisation completely based on scenario.

    • @easewithdata
      @easewithdata  Před 6 měsíci

      Watch out for AQE video for the most simpler option to fix Skewness.

    • @Fullon2
      @Fullon2 Před 6 měsíci

      @@easewithdata thanks bro, I'm going to watch.

  • @user-nv6ho7uk8b
    @user-nv6ho7uk8b Před 3 měsíci

    Hi Shubham,
    The skewed employee dataset is not available, could you please push it to the git.

    • @easewithdata
      @easewithdata  Před 3 měsíci +1

      Hello,
      Unfortunately GitHub is not allowing me to push GB files anymore. I am trying to locate another file sharing service to upload the bigger files.

    • @pawansharma-pz1hz
      @pawansharma-pz1hz Před 3 měsíci +1

      ​@@easewithdata Hi Please create file at your end using below code
      # Read Employee data
      _schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"
      emp = spark.read.format("csv").schema(_schema).option("header", True).load("data/employee_records.csv")
      from pyspark.sql.functions import lit,count
      emp.groupBy("department_id").agg(count("first_name").alias("count")).show()
      dept_3 = spark.range(40).select(lit(3).alias("department_id_temp"))
      dept_7 = spark.range(40).select(lit(7).alias("department_id_temp"))
      emp_inc_3 = emp.join(dept_3, emp["department_id"] == dept_3["department_id_temp"], "left_outer")
      emp_inc_3 = emp_inc_3.drop("department_id_temp")
      emp_inc_3.groupBy("department_id").agg(count("first_name").alias("count")).show()
      emp_inc_7 = emp_inc_3.join(dept_7, emp_inc_3["department_id"] == dept_7["department_id_temp"], "left_outer")
      emp_inc_7 = emp_inc_7.drop("department_id_temp")
      emp_inc_7.groupBy("department_id").agg(count("first_name").alias("count")).show()
      emp_inc_7.repartition(1).write.format("csv").mode("overwrite").option("header", True).save("data/output/emp_record_skewed.csv")

    • @anjibabumakkena
      @anjibabumakkena Před 3 měsíci

      Hi, how can we getvthe data to practice as same​@@easewithdata