Migration des chaînes
du CERSAT vers Airflow
Modernisation de l'infrastructure de production — Ifremer / Sismer
• Équipe CERSAT •
Contexte
CERSAT est le centre de distribution et de traitement des données satellitaires d'Ifremer. Objectif : fournir des données et outils pour l'étude de l'état des océans et des interactions air-océan.
- LOPS — expertise pour le développement des produits et traitements
- Sismer — mise en production et distribution des données
- En collaboration avec : Copernicus, ESA, CNES, Eumetsat, membre d'ODATIS
L'équipe CERSAT du Sismer assure déploiement, suivi de production et distribution
Métier : qu'est-ce qu'un traitement ?
Un traitement est un enchaînement d'étapes qui transforme des données brutes en produits exploitables :
📡 Récupération
Données bas niveau
→
⚙️ Enrichissement
Fusion, correction, format
→
📤 Distribution
FTP, catalogue
Distribution directe (FTP, catalogue) ou indirecte (vers des organismes pour leurs propres produits)
Enjeux
Service de production contractuel (SLA)
- Exemple SMOS Wind : production des données en moins de 4H (30 min de traitement chez Ifremer)
- Des besoins de service pour l'infrastructure qui impactent chercheurs et utilisateurs
🔁 Besoins SLA
- Haute disponibilité
- Temps de traitement garanti
- Redondance des services
🎯 Impact
- Qualité de service pour les chercheurs
- Respect des engagements contractuels
- Visibilité sur la production
Données
Format NetCDF4 — format standard pour données scientifiques et satellitaires
(à compléter avec les chiffres réels de l'infrastructure CERSAT)
Architecture de production
Avant la migration
flowchart LR
T["📥 Downloader"] --> CS["⚙️ ChainScheduler"]
CS --> PBS["📋 PBS"]
CS --> SP["📊 suivi_production"]
SP --> ES[("🗄️ Elasticsearch")]
ES --> K["📈 Kibana"]
flowchart LR
cron["⏰ cron"] --> scripts["📜 scripts shell"]
Ordonnancement via ChainScheduler + PBS • Suivi historique via Elasticsearch/Kibana • Scripts automatisés via cron
Pourquoi la migration ?
❌ ChainScheduler
- Non maintenu
- Visualisation des états uniquement via logs
- Tourne sur machines indépendantes (ISI) sans SLA
- Hors conditions de production
- Utilise PBS (décommissionné à la rentrée)
✅ Airflow
- Orchestrateur moderne et maintenu
- UI intégrée → visibilité temps réel
- Supporte SLURM (HPC Ifremer)
- Architecture scalable & résiliente
- Event-driven & DAGs chaînables
Architecture de production
Après la migration
flowchart LR
T["📥 Downloader"] --> AF["☁️ Airflow"]
AF --> SLURM["📋 SLURM
(HPC Ifremer)"]
AF --> MSP["📊 Meilleur suivi
de production"]
MSP --> DASH["📈 Dashboard
+ Alertes"]
📉 Moins de silos
🔄 Automatisation complète
🔔 Alertes proactives
📋 SLA traçable
Passer les traitements
sur Airflow
De la chaîne legacy au DAG moderne
Anatomie d'une chaîne
flowchart LR
A["🚀 Exécution"] --> B["📡 Récupération
de la donnée"]
B --> C["⚙️ Traitement"]
C --> D["📦 Déplacement
de la donnée"]
D --> E["✅ Vérifications"]
📋 En terme Airflow
DAG = chaîne
Task = étape élémentaire
Les tasks peuvent être n'importe quoi : Python, Bash, SLURM, Kubernetes…
✨ Ce que permet Airflow
- Arborescence de tasks libre
- Event-driven (sensors)
- Chaînage de DAGs
- Retry automatique
Fonctionnalités clés d'Airflow
🧩 Arborescence
Construisez des graphes de tâches complexes avec des dépendances paramétrables.
🔔 Event-Driven
Les sensors déclenchent des tâches à l'arrivée de fichiers, à une heure donnée, ou sur condition.
🔗 Chaînage de DAGs
Un DAG peut déclencher un autre via TriggerDagRunOperator.
🔄 Retry automatique
📧 Alertes email/Slack
📊 Logs centralisés
⏱ Planning cron natif
Exemple de DAG
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
default_args = {
'owner': 'cersat',
'start_date': datetime(2025, 1, 1),
}
with DAG(
'exemple_chaine_cersat',
default_args=default_args,
schedule=None,
catchup=False,
) as dag:
def initialiser(**context):
params = context['params']
return f"Traitement de {params['fichier']}"
init = PythonOperator(
task_id='initialisation',
python_callable=initialiser,
params={'fichier': 'S3A_OL_1_*.nc'},
)
traitement = BashOperator(
task_id='traitement_slurm',
bash_command='sbatch /opt/scripts/traiter.sh',
)
transfert = BashOperator(
task_id='transfert_fichiers',
bash_command='cp /data/workspace/{{ dag_run.id }} /data/archive/',
)
verification = BashOperator(
task_id='verification',
bash_command='python /opt/scripts/verifier_produit.py',
)
init >> traitement >> transfert >> verification
Nos outils Airflow
📋 Slurm Operator
Lance les jobs via SLURM sur hpc.ifremer.fr
Besoins
- Jeton d'utilisation
- Utilisateur airflow autorisé sur le cluster
Avantages
- Calcul distribué haute performance
- Ressources mutualisées Ifremer
Convertible Path
La problématique des accès disques
⚙️ Problème
Airflow est containerisé → l'accès aux disques doit être géré dans le DAG
- KubernetesOperator : container créé pour la tâche, montages de volumes à spécifier
- CeleryExecutor : container toujours en exécution, réduit le délai pour tâches rapides
🔀 Solution
Les chemins de montage diffèrent selon la machine :
Datarmor
CeleryExecutor
HPC (SLURM)
ConvertiblePath → classe utilitaire pour convertir les chemins facilement
WorkdirManager
Gestion des dossiers temporaires des chaînes de traitement
✅ Fonctionnalités
- Création automatique de dossiers pour chaque task
- Génération de dossiers pour les tasks dynamiques
- Partage des chemins d'une tâche à l'autre
- Nettoyage à la fin du DAG
🧹 Politique de nettoyage
- Succès → suppression automatique
- Erreur → conservation pour débogage
# Exemple de configuration
with WorkdirManager(root="/data/workspace") as wm:
input_dir = wm.make_dirs("input")
output_dir = wm.make_dirs("output")
# Les dossiers seront nettoyés
# automatiquement en cas de succès
Suivi de la production
Besoins
- Vision temps-réel + historique de ce qui a été produit/téléchargé
- Vision des fichiers non produits/téléchargés
- Vision des sources utilisées pour quels produits
- Vision des états (actif/inactif) des services, infra, DAGs
- Alertes pour les points 2 et 4
Aujourd'hui
- ✅ Elasticsearch pour production & téléchargement (cron BDD) → 1,2 mais pas temps-réel
- 🟡 UI Airflow → partiellement 4
- ❌ Pour tout le reste : fouiller les logs
Objectif
- 📊 Dashboard temps réel
- 🔔 Alerting automatisé
- 📈 Métriques de production
- 🔄 Traçabilité de bout en bout
Merci
Questions / Discussion
Équipe CERSAT — Sismer / Ifremer