Ifremer

Migration des chaînes
du CERSAT vers Airflow

Modernisation de l'infrastructure de production — Ifremer / Sismer

Équipe CERSAT

Contexte

CERSAT est le centre de distribution et de traitement des données satellitaires d'Ifremer. Objectif : fournir des données et outils pour l'étude de l'état des océans et des interactions air-océan.

  • LOPS — expertise pour le développement des produits et traitements
  • Sismer — mise en production et distribution des données
  • En collaboration avec : Copernicus, ESA, CNES, Eumetsat, membre d'ODATIS
Ifremer ESA CNES Copernicus

L'équipe CERSAT du Sismer assure déploiement, suivi de production et distribution

1–2
opérateurs

2
ingénieurs

1
manager

Métier : qu'est-ce qu'un traitement ?

Un traitement est un enchaînement d'étapes qui transforme des données brutes en produits exploitables :

📡 Récupération
Données bas niveau

⚙️ Enrichissement
Fusion, correction, format

📤 Distribution
FTP, catalogue

Distribution directe (FTP, catalogue) ou indirecte (vers des organismes pour leurs propres produits)

Enjeux

Service de production contractuel (SLA)

  • Exemple SMOS Wind : production des données en moins de 4H (30 min de traitement chez Ifremer)
  • Des besoins de service pour l'infrastructure qui impactent chercheurs et utilisateurs

🔁 Besoins SLA

  • Haute disponibilité
  • Temps de traitement garanti
  • Redondance des services

🎯 Impact

  • Qualité de service pour les chercheurs
  • Respect des engagements contractuels
  • Visibilité sur la production

Données

Format NetCDF4 — format standard pour données scientifiques et satellitaires

XX TB
Archives
YY TB
Workspace
ZZ Go/j
Volume quotidien

(à compléter avec les chiffres réels de l'infrastructure CERSAT)

Architecture de production

Avant la migration

flowchart LR T["📥 Downloader"] --> CS["⚙️ ChainScheduler"] CS --> PBS["📋 PBS"] CS --> SP["📊 suivi_production"] SP --> ES[("🗄️ Elasticsearch")] ES --> K["📈 Kibana"]
flowchart LR cron["⏰ cron"] --> scripts["📜 scripts shell"]

Ordonnancement via ChainScheduler + PBS • Suivi historique via Elasticsearch/Kibana • Scripts automatisés via cron

Pourquoi la migration ?

❌ ChainScheduler

  • Non maintenu
  • Visualisation des états uniquement via logs
  • Tourne sur machines indépendantes (ISI) sans SLA
  • Hors conditions de production
  • Utilise PBS (décommissionné à la rentrée)

✅ Airflow

  • Orchestrateur moderne et maintenu
  • UI intégrée → visibilité temps réel
  • Supporte SLURM (HPC Ifremer)
  • Architecture scalable & résiliente
  • Event-driven & DAGs chaînables

Architecture de production

Après la migration

flowchart LR T["📥 Downloader"] --> AF["☁️ Airflow"] AF --> SLURM["📋 SLURM
(HPC Ifremer)"] AF --> MSP["📊 Meilleur suivi
de production"] MSP --> DASH["📈 Dashboard
+ Alertes"]
📉 Moins de silos 🔄 Automatisation complète 🔔 Alertes proactives 📋 SLA traçable

Passer les traitements
sur Airflow

De la chaîne legacy au DAG moderne

Anatomie d'une chaîne

flowchart LR A["🚀 Exécution"] --> B["📡 Récupération
de la donnée"] B --> C["⚙️ Traitement"] C --> D["📦 Déplacement
de la donnée"] D --> E["✅ Vérifications"]

📋 En terme Airflow

DAG = chaîne
Task = étape élémentaire

Les tasks peuvent être n'importe quoi : Python, Bash, SLURM, Kubernetes…

✨ Ce que permet Airflow

  • Arborescence de tasks libre
  • Event-driven (sensors)
  • Chaînage de DAGs
  • Retry automatique

Fonctionnalités clés d'Airflow

🧩 Arborescence

Construisez des graphes de tâches complexes avec des dépendances paramétrables.

🔔 Event-Driven

Les sensors déclenchent des tâches à l'arrivée de fichiers, à une heure donnée, ou sur condition.

🔗 Chaînage de DAGs

Un DAG peut déclencher un autre via TriggerDagRunOperator.

🔄 Retry automatique 📧 Alertes email/Slack 📊 Logs centralisés ⏱ Planning cron natif

Exemple de DAG


from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime

default_args = {
    'owner': 'cersat',
    'start_date': datetime(2025, 1, 1),
}

with DAG(
    'exemple_chaine_cersat',
    default_args=default_args,
    schedule=None,
    catchup=False,
) as dag:

    def initialiser(**context):
        params = context['params']
        return f"Traitement de {params['fichier']}"

    init = PythonOperator(
        task_id='initialisation',
        python_callable=initialiser,
        params={'fichier': 'S3A_OL_1_*.nc'},
    )

    traitement = BashOperator(
        task_id='traitement_slurm',
        bash_command='sbatch /opt/scripts/traiter.sh',
    )

    transfert = BashOperator(
        task_id='transfert_fichiers',
        bash_command='cp /data/workspace/{{ dag_run.id }} /data/archive/',
    )

    verification = BashOperator(
        task_id='verification',
        bash_command='python /opt/scripts/verifier_produit.py',
    )

    init >> traitement >> transfert >> verification
      

Nos outils Airflow

📋 Slurm Operator

Lance les jobs via SLURM sur hpc.ifremer.fr

Besoins

  • Jeton d'utilisation
  • Utilisateur airflow autorisé sur le cluster

Avantages

  • Calcul distribué haute performance
  • Ressources mutualisées Ifremer

Convertible Path

La problématique des accès disques

⚙️ Problème

Airflow est containerisé → l'accès aux disques doit être géré dans le DAG

  • KubernetesOperator : container créé pour la tâche, montages de volumes à spécifier
  • CeleryExecutor : container toujours en exécution, réduit le délai pour tâches rapides

🔀 Solution

Les chemins de montage diffèrent selon la machine :

Datarmor CeleryExecutor HPC (SLURM)

ConvertiblePath → classe utilitaire pour convertir les chemins facilement

WorkdirManager

Gestion des dossiers temporaires des chaînes de traitement

✅ Fonctionnalités

  • Création automatique de dossiers pour chaque task
  • Génération de dossiers pour les tasks dynamiques
  • Partage des chemins d'une tâche à l'autre
  • Nettoyage à la fin du DAG

🧹 Politique de nettoyage

  • Succès → suppression automatique
  • Erreur → conservation pour débogage

# Exemple de configuration
with WorkdirManager(root="/data/workspace") as wm:
    input_dir = wm.make_dirs("input")
    output_dir = wm.make_dirs("output")
    # Les dossiers seront nettoyés
    # automatiquement en cas de succès
      

Suivi de la production

Besoins

  1. Vision temps-réel + historique de ce qui a été produit/téléchargé
  2. Vision des fichiers non produits/téléchargés
  3. Vision des sources utilisées pour quels produits
  4. Vision des états (actif/inactif) des services, infra, DAGs
  5. Alertes pour les points 2 et 4

Aujourd'hui

  • ✅ Elasticsearch pour production & téléchargement (cron BDD) → 1,2 mais pas temps-réel
  • 🟡 UI Airflow → partiellement 4
  • ❌ Pour tout le reste : fouiller les logs

Objectif

  • 📊 Dashboard temps réel
  • 🔔 Alerting automatisé
  • 📈 Métriques de production
  • 🔄 Traçabilité de bout en bout

Merci

Questions / Discussion

Équipe CERSAT — Sismer / Ifremer