Data Validation with Pyspark || Schema Comparison || Dynamically || Real Time Scenario

Sdílet
Vložit
  • čas přidán 22. 12. 2023
  • In this Video we covered how we can perform quick data validation like Schema comparison between source and Target: In the next video we will look into Date/TimeStamp format check and duplicate count check .
    Column Comparison link :
    • Data Validation with P...
    #dataanalytics #dataengineeringessentials #azuredatabricks
    #dataanalysis
    #pyspark
    #pythonprogramming
    #sql
    #databricks #PySpark #Spark #DatabricksNotebook #PySparkLogic

Komentáře • 10

  • @vamshimerugu6184
    @vamshimerugu6184 Před 3 měsíci +1

    I think schema comparison is the important topic in pyspark . Great explanation sir ❤

  • @saibhargavreddy5992
    @saibhargavreddy5992 Před 3 měsíci

    I found this very useful as I had a similar issue with data validations. It helped a lot while completing my project.

  • @skateforlife3679
    @skateforlife3679 Před 7 měsíci +2

    Thank you for your work !!! It would be amazing if you could enhance the video with "chapters" to put more context in what you explain in the differents sections of the video :) !

  • @avinash7003
    @avinash7003 Před 6 měsíci

    code in github?

    • @DataSpark45
      @DataSpark45  Před 6 měsíci +1

      Here is the link bro : drive.google.com/drive/folders/1I6rqtiKh1ChM_dkLJwxfxyxcHWPgyiKZ?usp=sharing

  • @amandoshi5803
    @amandoshi5803 Před měsícem

    source code ?

    • @DataSpark45
      @DataSpark45  Před měsícem +1

      def SchemaComparision(controldf, spsession, refdf):
      try:
      #iterate controldf and get the filename and filepath
      for x in controldf.collect():
      filename = x['filename']
      #print(filename)
      filepath = x['filepath']
      #print(filepath)
      #define the dataframes from the filepaths
      print("Data frame is creating for {} or {}".format(filepath, filename))
      dfs = spsession.read.format('csv').option('header', True).option('inferSchema', True).load(filepath)
      print("DF Created for {} or {}".format(filepath, filename))
      ref_filter = refdf.filter(col('SrcFileName') == filename)
      for x in ref_filter.collect():
      columnNames = x['SrcColumns']
      refTypes = x['SrcColumnType']
      #print(columnNames)
      columnNamesList = [x.strip().lower() for x in columnNames.split(",")]
      refTypesList = [x.strip().lower() for x in refTypes.split(",")]
      #print(refTypesList)
      dfsTypes = dfs.schema[columnNames].dataType.simpleString() #StringType() : string , IntergerType() : int
      dfsTypesList = [x.strip().lower() for x in dfsTypes.split(",")]
      # columnName : Row id, DataFrameType : int, reftype: int
      missmatchedcolumns = [(col_name, df_types, ref_types) for (col_name, df_types, ref_types) in zip(columnNamesList, dfsTypesList, refTypesList) if dfsTypesList != refTypesList]
      if missmatchedcolumns :
      print("schema comparision has been failed or missmatched for this {}".format(filename))
      for col_name, df_types, ref_types in missmatchedcolumns:
      print(f"columnName : {col_name}, DataFrameType : {df_types}, referenceType : {ref_types}")

      else:
      print("Schema comaprision is done and success for {}".format(filename))
      except Exception as e:
      print("An error occured : ", str(e))
      return False