How to build and automate your Python ETL pipeline with Airflow | Data pipeline | Python

Sdílet
Vložit
  • čas přidán 26. 07. 2024
  • In this video, we will cover how to automate your Python ETL (Extract, Transform, Load) with Apache Airflow. In this session, we will use the TaskFlow API introduced in Airflow 2.0. TaskFlow API makes it much easier to author clean ETL code without extra boilerplate, by using the @task decorator. Airflow organizes your workflows as Directed Acyclic Graphs (DAGs) composed of tasks.
    In this tutorial, we will see how to design a ETL Pipeline with Python. We will use SQL Server’s AdventureWorks database as a source and load data in PostgreSQL with Python. We will focus on Product's hierarchy and enhance our initial data pipeline to give you a complete overview of the extract, load and transform process.
    Link to medium article on this topic: / how-to-automate-etl-pi...
    Link to previous video: • How to build an ETL pi...
    Link to Pandas video: • Python Pandas Data Sci...
    Link to GitHub repo: github.com/hnawaz007/pythonda...
    Link to Cron Expressions: docs.oracle.com/cd/E12058_01/...
    Subscribe to our channel:
    / haqnawaz
    ---------------------------------------------
    Follow me on social media!
    GitHub: github.com/hnawaz007
    Instagram: / bi_insights_inc
    LinkedIn: / haq-nawaz
    ---------------------------------------------
    #ETL #Python #Airflow
    Topics covered in this video:
    0:00 - Introduction to Airflow
    2:49 - The Setup
    3:40 - Script ETL pipeline: Extract
    5:52 - Transform
    7:39 - Load
    8:00 - Define Direct Acyclic Graph (DAG)
    9:36 - Airflow UI: Dag enable & run
    10:09 - DAG Overview
    10:29 - Test ETL Pipeline
  • Věda a technologie

Komentáře • 98

  • @BiInsightsInc
    @BiInsightsInc  Před 2 lety +8

    Videos in this series:
    Build ETL pipeline: czcams.com/video/dfouoh9QdUw/video.html&t
    ETL Load Reference Data: czcams.com/video/W-8tEFAWD5A/video.html
    ETL Incremental Data Load (Source Change Detection): czcams.com/video/32ErvH_m_no/video.html&t
    ETL Incremental Data Load (Destination Change Comparison): czcams.com/video/a_T8xRaCO60/video.html
    How to install Apache Airflow: czcams.com/video/t4h4vsULwFE/video.html

    • @beastmaroc7585
      @beastmaroc7585 Před 10 měsíci

      How to load big tables example of 30M rows from SQL to PG ? Trying to solve load speed and memory usage

    • @BiInsightsInc
      @BiInsightsInc  Před 10 měsíci

      ​@@beastmaroc7585 You can horizontally scale your set up to cluster if your data exceeds the resources of a single machine. This will allow you to process large datasets as cluster nodes will offer more compute and memory resources. You can also use a distributed engine like Spark and/or kafka to process large datasets.
      I have discussed the Airflow execution and cluster based approach here. Feel free to check it out.
      czcams.com/video/In7zwp0FDX4/video.html&ab_channel=BIInsightsInc

  • @tevintemu105
    @tevintemu105 Před rokem +2

    I want to thank you for posting this content. It is helpful in many ways.

  • @sharanchandrasekaran8399

    Excellent video sir, thank you.

  • @franciscmoldovan2153
    @franciscmoldovan2153 Před rokem +1

    Very nice videos and blog! Keep up the good work!

  • @thalesdefaria
    @thalesdefaria Před 2 lety +2

    Great content, so pragmatic!

  • @flyingdutchman9804
    @flyingdutchman9804 Před rokem +1

    Fantastic presentation!

  • @demohub
    @demohub Před rokem +1

    This video was a great resource. Thanks for the tutelage and your take on it.

  • @letsplaionline
    @letsplaionline Před 2 lety

    Great explanation! Thanks!

  • @lasopainstantaneasuprema6861
    @lasopainstantaneasuprema6861 Před 4 měsíci +1

    simple and godlike understandable 10/10

  • @bandedecodeurs
    @bandedecodeurs Před rokem

    Very good video. Thank you !

  • @Levy957
    @Levy957 Před 2 lety

    Amazing

  • @rajansonvane488
    @rajansonvane488 Před 11 měsíci

    Thanks !!❤

  • @rjrmatias
    @rjrmatias Před 10 měsíci

    excellent tutorial, thank you ! it would be great if you could split the tasks in several files, need to learn how to do this

  • @agussyarif8429
    @agussyarif8429 Před 2 lety

    this is powerful knwoledge

  • @fashiuddin7911
    @fashiuddin7911 Před 11 měsíci

    Great great great

  • @crisybelkis
    @crisybelkis Před rokem

    Thank you a lot. I'm trying to understand hot to create pipeline. I want to be expert on this and be a good Data Engineering. Professional.

  • @GiovanniDeCillis
    @GiovanniDeCillis Před 2 lety +4

    Hey, this is really helpful. It would be even more insightful if you provided or suggested ways to run this process (along with those described in this recent series of tutorials) in the cloud or in a server less environment. Thanks!

    • @BiInsightsInc
      @BiInsightsInc  Před 2 lety +3

      Hey Giovanni thanks for stopping by. We've covered AWS data lake in the last few videos and will cover ETL topics going forward. I will be sure to include Airflow and how to process data from S3 to Redshift with it. Stay tuned.

    • @GiovanniDeCillis
      @GiovanniDeCillis Před 2 lety +1

      @@BiInsightsInc thank you indeed. I completely missed the data lake video. Thanks that is really helpful!

  • @tahirrana786
    @tahirrana786 Před 2 lety

    👍

  • @Obtemus
    @Obtemus Před 2 lety +3

    Great playlist. Your method of building the videos is very practical and lovable.
    One Question: How can you perform the "paste" line by line in the recording? is it ctrl+y after so many ctrl+z ops?

    • @BiInsightsInc
      @BiInsightsInc  Před 2 lety +4

      Glad you find it helpful. Yes, you're right. I write the code and then it's ctrl+y and ctrl+z as I am going through the code. It takes practice and sometimes few retakes to get it right.

    • @Obtemus
      @Obtemus Před 2 lety +2

      @@BiInsightsInc that's so nice. I asked because I found a very useful way to build the code during the video

  • @ulhasbhagwat4942
    @ulhasbhagwat4942 Před 2 lety

    in load_src_data, can we not use postgres connection object(conn) to load the data instead of using create_engine? because we need to know all the same connection details again which we used to create connection id in airflow

    • @BiInsightsInc
      @BiInsightsInc  Před 2 lety

      Hi Ulhas, when we use df.to_sql, we need to pass in a SQLAlchemy engine and not a standard connection object (and not a cursor either). If you try it on your end it will throw an error. Hope this helps.

  • @parikshitchavan2211
    @parikshitchavan2211 Před rokem

    Hello Sir Thanks for such a great tutorial everting you made smooth like butter thanks for that ,just one question whenever we made new DAG ( we will have to add docker-compose-CeleryExecutor, docker-compose-LocalExecutor, and Config for that particular DAG )

    • @BiInsightsInc
      @BiInsightsInc  Před rokem

      Hi Parikshit, if you have mounted the host folder “dag” to the docker container, just as shown in the tutorial, then dags created in this folder will automatically appear in your docker UI and will be copied over to appropriate containers.

    • @parikshitchavan2211
      @parikshitchavan2211 Před rokem

      Thanks sir for clearing confusion please keep uploading videos 😀🙏

  • @abnormalirfan
    @abnormalirfan Před 2 lety

    Hi, how dags airflow mssql to gcs please, i will to build data warehouse in bigquery.

  • @saadlechhb3702
    @saadlechhb3702 Před měsícem +1

    Thank you, i have a question If the task is scheduled to run daily and new data has been inserted into the source since the last transfer, will just new data get transferred next task or all data again

    • @BiInsightsInc
      @BiInsightsInc  Před měsícem

      This would bring in all data. This is a truncate and load approach. If you need to bring only newly inserted data then you want to look into the incremental data load approache(s). I have covered those in the following videos:
      czcams.com/video/a_T8xRaCO60/video.html
      czcams.com/video/32ErvH_m_no/video.html

    • @saadlechhb3702
      @saadlechhb3702 Před měsícem

      @@BiInsightsInc i can make scripte that merge between the incremental load and aiflow ?

  • @shrutibangad8112
    @shrutibangad8112 Před rokem

    Question: Do you have a video to build pipeline to move data from Postgres Server to SQL Server?

    • @BiInsightsInc
      @BiInsightsInc  Před rokem

      Hi Shruti, I think we can reverse the order of the source and destination and it will give you the code to move from Postgres to SQL Server. Hope this helps!

  • @babakbarzegari9228
    @babakbarzegari9228 Před rokem

    it was awesome, I had a blast, Can a video be created for ETL SQL Server Tables to Azure Blobs with Airflow, you never worked around Azure Blob

    • @BiInsightsInc
      @BiInsightsInc  Před rokem +1

      Thanks Babak! I will try and explore Azure.

    • @babakbarzegari9228
      @babakbarzegari9228 Před rokem

      @@BiInsightsInc Thank you so much! Looking forward to it with great anticipation. Please ping me here when it done.

  • @isbakhullail6693
    @isbakhullail6693 Před rokem

    Do you use dokcer for airflow or not? is there an installation video?

    • @BiInsightsInc
      @BiInsightsInc  Před rokem

      I have Airflow running on Docker. Here is the link to Airflow installation video:
      czcams.com/video/t4h4vsULwFE/video.html

  • @hellowillow_YT
    @hellowillow_YT Před 6 měsíci

    Hi, Sir! Thanks for the insightful video! However I'd like to ask if we need to place the ETL python file in a particular folder for it to be recognized as a DAG by Airflow?

    • @BiInsightsInc
      @BiInsightsInc  Před 6 měsíci +1

      Yes, you need to put the DAG files in the dags folder. This folder gets mounted in the docker image.

    • @hellowillow_YT
      @hellowillow_YT Před 6 měsíci

      Understood. Thank you, Sir! @@BiInsightsInc

  • @alanamantinodasilva4362

    Can you ask me a question? I need to transfer data from Azure SQL server to MSSQL Server (cloud to server). I can do it directly from source and load to destiny, or need to land in postgre?

    • @BiInsightsInc
      @BiInsightsInc  Před rokem +1

      Hi Alan, you can directly load the data from Azure SQL to SQL Server. There’s no need to load data to PostgreSQL.

    • @alanamantinodasilva4362
      @alanamantinodasilva4362 Před rokem

      ​@@BiInsightsInc can you indicate to me a content that explain this, i loved your vídeo, but is so much complex to me, i just need to transfer data, from source to destiny. (sorry about my english).

    • @alanamantinodasilva4362
      @alanamantinodasilva4362 Před rokem

      @@BiInsightsInc i tried with generic transfer, but this limit the transfer to 1000 rows por json file and take a lot of time to transfer.

  • @dzakwandaffahidayatullah3125

    do you have any suggestions for me to run etl python in command line ubuntu server? to read a csv file or a connection with a pre-existing mysql database?

    • @BiInsightsInc
      @BiInsightsInc  Před 2 lety

      Hi Dzakwan, I have done a video on querying existing mysql database. You can follow along to establish database connection and query the database. Once you write and test the script then save as Python file i.e. etl.py. You can type: Python etl.py in the terminal to execute the script. Hope this helps.
      czcams.com/video/mKnMY2GqaGI/video.html

  • @mrg807
    @mrg807 Před rokem

    Thanks for the video. One question is why you didn't use Airflow's built-in PostgresHook or PostgreSQLOperator instead of SQLAlchemy and Pandas. I think this would simplify the code and make it more consistent with the way the SQL server connection is established using MsSqlHook.

    • @BiInsightsInc
      @BiInsightsInc  Před rokem

      This is a continuation of the Python ETL series. This session orchestrate what we developed previously and the (E*T*L) transformations are carried out with Pandas. I am not sure if PostgresHook offers same capabilities as Pandas therefore, I went with Pandas. I could've used the PostgresHook to get a dataframe from Postgres but not sure if I can persist dataframe as easily as SQLAlchemy. In addition, If I were to use the PostgreSQLOperator then I would've to switch to SQL as oppose to Python.

    • @alonsom.donayre1992
      @alonsom.donayre1992 Před rokem

      @@BiInsightsInc I guess this is just an example of how to use the hooks, cause airflow is not a processing framework but an orchestrator. Transformation should be handle by an external source like Spark, Database engine etc

    • @BiInsightsInc
      @BiInsightsInc  Před rokem

      @@alonsom.donayre1992 Yes, it is an orchestrator however with TaskFlow API 2.0, which is used here, they're claiming you can carry out the execution (transformation) within Airflow. I am not sold on it because it is designed for small to medium jobs, unless you are working with a cluster. I am waiting for Dagster to mature as it can handle orchestration and processing. More to come on Dagster. So yeah I would advise to process large datasets with an external database engine or distributed engines like Spark & Glue.
      airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

  • @varshachauhan5185
    @varshachauhan5185 Před 2 lety

    Thank you so much for the video, I have one question ,suppose i have 1000 different source files how to compare this file with one table present in database

    • @BiInsightsInc
      @BiInsightsInc  Před 2 lety

      Hi Varsha thanks for stopping by. If the files structure is the same then you can combine them into a DataFrame. Then query the database. From there is just comparing the two DataFrames. Hope this helps.
      Also, if you are trying to do change detection for incremental load then I will be doing a video on how to perform incremental load. Stay tuned.

    • @varshachauhan5185
      @varshachauhan5185 Před 2 lety

      @@BiInsightsInc Thank you for your reply . I will wait for your video and request u to make more videos on operations which can be performed using pandas dataframe

    • @BiInsightsInc
      @BiInsightsInc  Před 2 lety

      @@varshachauhan5185 I have a video on the Pandas library with various tips and trips. Feel free to check it out: czcams.com/video/-jerzfh2bS0/video.html

  • @ryanschraeder8681
    @ryanschraeder8681 Před 28 dny

    What happens if you kill the airflow web server, or localhost? Will the DAG still run on the schedule you specified?

    • @BiInsightsInc
      @BiInsightsInc  Před 28 dny

      If the services are down then DAG won’t run. You want to make sure your server remains on for the DAG to execute on schedule.

  • @eunheechoi3745
    @eunheechoi3745 Před 9 měsíci

    Was able to connect to import airflow.providers.oracle and oracle hook. However, when I use OracelHook, it keeps throwing an error saying ‘conn_id’ not found even thought the connection has been configured fine via the airflow UI. Do you have any idea? What could go wrong ?

    • @BiInsightsInc
      @BiInsightsInc  Před 9 měsíci

      It's not able to find the connection configured in the Airflow. You need to define the connection, test and try agian.

  • @ChaimaHermi-zi8pq
    @ChaimaHermi-zi8pq Před rokem

    Could you please provide more details on what you mean by "table fact of the datawarehouse"?

    • @BiInsightsInc
      @BiInsightsInc  Před rokem

      Can you please provide the timeline in the video for the context.

  • @eunheechoi3745
    @eunheechoi3745 Před 9 měsíci

    is it better to extract the entire tables to the stagings and then final table with consolidated data ?? why?
    wouldn't recommend to have queried data first (extract) and then transforming and loading it to the final table?
    I am trying to understand why the first approach is better over the latter....

    • @BiInsightsInc
      @BiInsightsInc  Před 9 měsíci

      If you are referring to the technique showcased in the video then it is following the dimensional modeling approach. After staging we are building a proper dimension table. With the basic ETL you would extract, transform and load the tables. I'd advise to pick up a good book on ETL, dimensional modeling if you're curious. Hope this helps.

    • @eunheechoi3745
      @eunheechoi3745 Před 9 měsíci

      @@BiInsightsInc Thank you so much for the explanation and advice. Can you please recommend the books/tutorials/courses which have practical hands on projects and where I can delve into data engineering space. I have worked as a quant/data analytics for about 5 + years and expanding my skills/knowledge in data engineering

  • @CriticalThinker0890
    @CriticalThinker0890 Před rokem

    why didn't you transform the data after you extract the data from mssql itself and then load the final data to postgresql ?

    • @BiInsightsInc
      @BiInsightsInc  Před rokem

      It's better to break the each operation in a different task so it is easier to manage and troubleshoot. This in turns results in a neater dag structure extract >> transform >> load. You can write the entire operation in a single function but it would be hard to maintain and debug.

  • @beastmaroc7585
    @beastmaroc7585 Před 10 měsíci

    Can you share a tip for big tables 30M rows ... I have an issue with Df and memory usage

    • @BiInsightsInc
      @BiInsightsInc  Před 10 měsíci

      I have done a video on this topic sinc a lot of folks were raising this. Also, look into big data processing frameworks design for large data processing i.e., Dask, Kafka, Spark and Flink. I have covered Spark and Kafka on this channel.
      czcams.com/video/8Awk8CpfeGc/video.html&ab_channel=BIInsightsInc

  • @nabarajghimire610
    @nabarajghimire610 Před 11 měsíci

    Plz make video about installation of airflow .I faced many issues in windows 10/11 like localhost:8080 not open properly lacking many GUI feature

    • @BiInsightsInc
      @BiInsightsInc  Před 11 měsíci

      I have covered the Airflow's installation as a docker container. You can follow the steps outline in the following video. Docker takes the OS system out of the equation and you will get the same Airflow functionaly not matter what system you are running.
      czcams.com/video/t4h4vsULwFE/video.html

  • @eunheechoi3745
    @eunheechoi3745 Před 9 měsíci

    Hi, can you please make a video about python ETL pipeline with Airflow for extracting data from oracle sql and loading to postgres?

    • @BiInsightsInc
      @BiInsightsInc  Před 9 měsíci

      I think you all you need to is connect to orcale database and rest of the process should be similar. Here is a link to oracledb library that showcases how to establish connection to oracle database.
      python-oracledb.readthedocs.io/en/latest/user_guide/connection_handling.html

    • @eunheechoi3745
      @eunheechoi3745 Před 9 měsíci

      @@BiInsightsIncthanks for the reply. I’ve already know how to connect to oracle db in Python itself but was trying to connect oracle directly to airflow by using providers. No luck so far. Can you please make a video how to install airflow provider for oracle and how to extract data from oracle and insert data to Postgres ?

    • @BiInsightsInc
      @BiInsightsInc  Před 9 měsíci +1

      @@eunheechoi3745 I will cover the oracle provider with airflow soon.

    • @eunheechoi3745
      @eunheechoi3745 Před 9 měsíci

      @@BiInsightsInc thanks you so much! actually, I've figured it out. It was just including airflow.providers.oracle in the docker-compose.yaml file... but I suppose this is not recommended for production. Can you show better approach to install the oracle provider like how to install cx_Oracle with instant client etc...? or alternatives...
      Also, after connection to oracle in airflow is established, I am having an error of UnicodeDecode error UTF-8 when I try to get the entire table from oracle... do you know how to fix it?

    • @BiInsightsInc
      @BiInsightsInc  Před 9 měsíci +1

      @@eunheechoi3745 You're on the right track to install the oracle providers. However, don't include it in the docker compose file rather build a custom image with additional libraries. I have covered how to build a custom image with additional libraries/providers here: czcams.com/video/In7zwp0FDX4/video.html&t

  • @mycousins896
    @mycousins896 Před 11 měsíci

    why didnt you show how you connect to mysql? please make a video on that!

    • @BiInsightsInc
      @BiInsightsInc  Před 11 měsíci +1

      As you can see that I am using SQL Server as the source and Postgres as the target therefore, MySQL is out of the scope. This is why it is not covered. Here is screenshot on how to connect to MySQL via Airflow:
      github.com/hnawaz007/pythondataanalysis/blob/main/AirflowSession2/airflow_mysql.png

    • @mycousins896
      @mycousins896 Před 11 měsíci

      @@BiInsightsInc Thanks, how about sql server? Can you make that as well? And how did you host your db to that ip address in the picture?
      Im asking these because there really isnt any good tutorials out there for stuff like these. Many people underestimate how difficult sometimes it is to make connections work between these tools, while the actual development on the other hand is most of times easy. That is a really gap that you could fill by making some videos on how to make it work between docker, microsoft sql server/myssql etc., airflow, and write back to connected database

    • @BiInsightsInc
      @BiInsightsInc  Před 11 měsíci

      @@mycousins896 Check out the Airflow installation with SQL Server provider. I go over on how to install SQL Server provider and how to create a SQL Server connection.
      How to install Apache Airflow on Docker? | Build Custom Airflow Docker Image | Airflow | Docker
      czcams.com/video/t4h4vsULwFE/video.html

  • @irfanalfathramadhan4797
    @irfanalfathramadhan4797 Před 2 lety +1

    sql server to gcs with airflow please

    • @BiInsightsInc
      @BiInsightsInc  Před 2 lety +2

      Will make a video on this topic soon. Thanks

    • @irfanalfathramadhan4797
      @irfanalfathramadhan4797 Před 2 lety

      ​@@BiInsightsInc
      I'm really waiting for that

    • @BiInsightsInc
      @BiInsightsInc  Před rokem

      @@irfanalfathramadhan4797 SQL Server to GCP video is up.
      czcams.com/video/Nq8Td8h_szk/video.html&t

  • @esmailpaltasingh9691
    @esmailpaltasingh9691 Před rokem

    How to connect to Sybase db

    • @BiInsightsInc
      @BiInsightsInc  Před rokem

      You can use sybase library in Python to connect to a Sybase database. If you want to use SQLAlchemey then use the following method to connect to a Sybase database.
      # Sybase library
      import Sybase
      db = Sybase.connect('server','name','pass','database')
      c = db.cursor()
      c.execute("sql statement")
      list1 = c.fetchall()
      #SQLAlchemey
      params = (
      "DRIVER = "+driver+";"\
      "SERVER = "+server+";"\
      "DATABASE = "+database+";"\
      "PORT = "+port+";"\
      "UID = "+user+";"\
      "PWD= "+password+";"
      params = urllib.parse.quote_plus(params)
      connexion_string = 'sybase+pyodbc:///?odbc_connect = %s'%params)

  • @ardavanmoinzadeh801
    @ardavanmoinzadeh801 Před 2 lety

    Hi, Can you share the source code ?

    • @BiInsightsInc
      @BiInsightsInc  Před 2 lety

      Please check the description of the video. All the resources including link to the repo is there. Thanks

    • @ardavanmoinzadeh801
      @ardavanmoinzadeh801 Před 2 lety

      @@BiInsightsInc yes.. I commented too fast :) found them and forgot to fix my comment!! thanks!

  • @CSE-AshikAhamedP
    @CSE-AshikAhamedP Před rokem

    but why you importing pandas

    • @BiInsightsInc
      @BiInsightsInc  Před rokem

      Pandas is used to query the database and carry out the transformation.

  • @bigdataon-premorcloud7866

    how to add the "Conn Type" of mssql? czcams.com/video/eZfD6x9FJ4E/video.html

    • @BiInsightsInc
      @BiInsightsInc  Před rokem

      Check out the Airflow installation with SQL Server provider. I go over on how to create a SQL Server connection.
      How to install Apache Airflow on Docker? | Build Custom Airflow Docker Image | Airflow | Docker
      czcams.com/video/t4h4vsULwFE/video.html