Skip to content

Airflow Monitoring and Alerting

Our Airflow project includes a monitoring system that combines Elasticsearch for centralized storage of error logs and Microsoft Teams notifications via Apprise for real time alerts.

This part was implemented to facilitate the identification of crawling system errors. Instead of manually checking Airflow task logs, the system automatically sends notifications to a dedicated Teams channel whenever an error occurs during a crawl.
At the same time, all error details are stored in Elasticsearch, where we have a dashboard that provides a consolidated view of issues along with contextual information to help diagnose and resolve problems quickly

Error logs are generated automatically whenever a failure is detected during the crawling process. For each error, a structured document is created based on the event type and is sent simultaneously to both Elasticsearch and the Teams channel.

We have two primary event types:

Event Type Description Index Example
login_failed Driver or API login/authentication failed airflow-domain-errors, airflow-social-media-errors
job_failed Job level failure during execution Same as above

Document Structure in Elasticsearch

Each log document is structured as a JSON object

Login Failure

Document structure for domain part

  • event_type: The category of the event (login_failed)
  • platform: The driver name
  • domain_identifier: Identifier of the domain
  • message: Error Message
  • timestamp: The exact time the event occurred

Document structure for social media part

  • event_type: The category of the event (login_failed)
  • platform: The driver name
  • email: The account email used for the login attempt
  • account_id: The ID of the account in dauthenticator
  • message: Error Message
  • timestamp: The exact time the event occurred

Example

alt text

Job Failure

  • event_type: The category of the event (job_failed)
  • driver_name: The name of the driver
  • job_id: Mongo ObjectId of the failed job
  • dag_id: The Airflow DAG in which the failure occurred
  • log_url: Direct link to the Airflow logs
  • message: Error Message
  • timestamp: The exact time the event occurred

Example

alt text

These documents are pushed to Elasticsearch using the log_to_elasticsearch function


Environment Configuration (.env)

To enable monitoring, configure your environment variables as follows:

# Elasticsearch
ELASTICSEARCH_URL=https://your-elasticsearch-server:port
ELASTICSEARCH_USERNAME=elastic
ELASTICSEARCH_PASSWORD=yourpassword

# Used in log_url to point to Airflow UI
AIRFLOW_URL=https://your-airflow-url

Make sure these are loaded in your environment before starting Airflow


Microsoft Teams Notifications via Apprise

Step 1: Create Teams Webhook

  • Go to your Teams channel → Connectors → Incoming Webhook
  • Name your webhook
  • Copy the generated Webhook URL

For more details, you can check this document

Step 2: Create Airflow Connection

In the Airflow UI, create a new connection:

Field Value
Conn ID teams_notification
Conn Type HTTP
Extra JSON { "url": "<WEBHOOK_URL>" }

alt text

This connection will be used by the Apprise provider to send alerts


Teams Notification Content

Notifications sent to Teams summarize the failure with Markdown formatting:

  • event_type: job_failed
  • driver_name: driver_name
  • job_id: job_id
  • dag_id: dag_id
  • message: error_message
  • View Logs: opens log url

Example

alt text

Function Used

To centralize the logic for both logging and alerting we used the function send_logs_and_notifications:

def send_logs_and_notifications(doc: dict, index: str, context: dict, apprise_conn_id="teams_notification"):
    log_to_elasticsearch(index, doc)
    send_teams_notification(doc, context, apprise_conn_id)

This function is called from:

  • DomainDriverManager (login failures for domain part)
  • DriverManage (login failures for social media part)
  • _process_jobs() (failed jobs)

Dashboard

The dashboard is available here

Kibana Dashboard