Currency Pipeline

- 3 mins read

The application leverages Airflow and Docker to automate the retrieval of currency exchange rates for the past four weeks, using the Malaysian Ringgit (MYR) as the base currency. The pipeline generates a report with visualizations and automatically sends a weekly email notification to users with the attachments.

Configuration

The pipeline runs on multiple Docker containers and consists of three main components: Webserver, PostgreSQL, and MySQL.

  • Webserver: Hosts Apache Airflow’s web UI for managing DAGs. Instead of using a pre-built image, it was built from a custom Dockerfile. The webserver mounts multiple volumes for DAGs and stored files. Notably, the store_files directory is accessible by both the webserver and MySQL, enabling file sharing and data cleaning within the MySQL database. Additionally, the webserver has an SMTP (Simple Mail Transfer Protocol) setup, allowing Airflow to send automated emails via external mail servers (e.g., Gmail).
  • MySQL: Stores the generated reports and performs data preprocessing within DAGs.
  • PostgreSQL: Serves as Airflow’s metadata database, storing DAGs, task statuses, and logs.

PostgreSQL and MySQL are initialized by pulling pre-built images from Docker Hub, whereas the webserver runs on port 8080 and depends on both databases, ensuring it does not start until they are fully operational. The configuration details can be found in docker-compose-LocalExecutor.yml and the Dockerfile.

The data pipeline is illustrated below:

AIMM ICC Boat

Pipeline Process

  1. Retrieving Exchange Rates
    A Python script (currency_rates.py) retrieves the latest exchange rates via an API. The script generates a CSV file and a line plot to visualize recent trends. These files are saved in the store_files directory, allowing seamless file sharing between Airflow and MySQL. File sensors in the DAG ensure that the files are created before proceeding to data insertion.

  2. Data Processing in MySQL
    The script (mysql_queries.py) establishes a connection to the MySQL database using predefined credentials. Upon a successful connection, SQL queries are executed to create, insert, and sort the retrieved data. The cleaned table is stored in MySQL, and the CSV file is renamed with a timestamp (e.g., store_files/currency_rates_20250203_0933.csv) for easy historical lookup.

  3. Automated Email Notification
    The final step involves using the Airflow EmailOperator to send an automated email with the CSV file and line plot attached. The email is scheduled to be sent weekly, summarizing exchange rate trends over the past four weeks.

Implementaion in Airflow DAGs

The workflow is orchestrated using Airflow DAGs, with the PythonOperator executing Python scripts for data retrieval, preprocessing, and file handling. Each task is defined as a separate operator, ensuring modular execution and easier debugging.

This project showcases the creation of a data pipeline, enhancing my understanding of Docker and Airflow. Potential improvements include deploying the pipeline to the cloud for full automation and integrating Metabase for more flexible and interactive visualizations. The project can be found on Github.