Consistency Regularization-based Models

Les modèles basés sur la régularisation par consistance (Consistency Regularization) sont des approches semi-supervisées qui exploitent la consistance des prédictions sur des transformations ou perturbations des données d’entrée. L’idée principale est qu’un modèle devrait produire des prédictions similaires, même après avoir appliqué des transformations ou du bruit sur les données d’entrée, à condition que ces transformations ne modifient pas fondamentalement les caractéristiques de l’exemple.

Dans un cadre semi-supervisé, les données non étiquetées sont utilisées pour renforcer l’entraînement du modèle. En ajoutant des régularisations sur les prédictions faites par le modèle pour des versions modifiées (ou bruitées) des données non étiquetées, le modèle apprend à être plus robuste et à généraliser mieux.

Les méthodes populaires qui utilisent la régularisation par consistance incluent Mean Teacher, FixMatch, et Pseudo-Labeling.

Fonctions :

  • Exemple avec TensorFlow

    L'approche Consistency Regularization dans TensorFlow est utilisée pour améliorer l'entraînement des modèles en utilisant des données non étiquetées en leur imposant une régularisation sur la consistance des prédictions. Voici un exemple d'implémentation d'un modèle basé sur la régularisation par consistance en utilisant TensorFlow et Keras.

    Importation :

    import tensorflow as tf
    from tensorflow.keras import layers, models
    import numpy as np
    import matplotlib.pyplot as plt
    from tensorflow.keras.datasets import cifar10

    Exemple de code :

    import tensorflow as tf
    from tensorflow.keras import layers, models
    import numpy as np
    import matplotlib.pyplot as plt
    from tensorflow.keras.datasets import cifar10
    
    (x_train, y_train), (x_test, y_test) = cifar10.load_data()
    
    x_train, x_test = x_train / 255.0, x_test / 255.0
    
    # Forcer float32 pour éviter l’erreur dtype
    x_train = x_train.astype(np.float32)
    x_test = x_test.astype(np.float32)
    
    y_train = y_train.flatten()
    y_test = y_test.flatten()
    
    # Séparation des données étiquetées et non étiquetées
    labeled_data = x_train[:1000]  # 1000 images étiquetées
    labeled_labels = y_train[:1000]
    unlabeled_data = x_train[1000:]  # reste non étiqueté
    
    # Modèle CNN simple
    def create_model():
        model = models.Sequential([
            layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)),
            layers.MaxPooling2D((2, 2)),
            layers.Conv2D(64, (3, 3), activation='relu'),
            layers.MaxPooling2D((2, 2)),
            layers.Conv2D(64, (3, 3), activation='relu'),
            layers.Flatten(),
            layers.Dense(64, activation='relu'),
            layers.Dense(10, activation='softmax')  # sortie en probas
        ])
        return model
    
    # Fonction d'augmentation des données
    def data_augmentation(images):
        images = tf.convert_to_tensor(images, dtype=tf.float32)
        aug_images = tf.image.random_flip_left_right(images)
        aug_images = tf.image.random_flip_up_down(aug_images)
        aug_images = tf.image.random_contrast(aug_images, 0.8, 1.2)  # valeurs plus réalistes
        return aug_images
    
    # Perte de consistance (MSE entre prédictions)
    def consistency_loss(logits1, logits2):
        return tf.reduce_mean(tf.square(logits1 - logits2))
    
    @tf.function
    def train_step(model, optimizer, loss_fn, x_labeled, y_labeled, x_unlabeled, lambda_consistency):
        with tf.GradientTape() as tape:
            logits_labeled = model(x_labeled, training=True)
            supervised_loss = loss_fn(y_labeled, logits_labeled)
    
            logits_unlabeled1 = model(x_unlabeled, training=True)
            augmented_unlabeled = data_augmentation(x_unlabeled)
            logits_unlabeled2 = model(augmented_unlabeled, training=True)
    
            cons_loss = consistency_loss(logits_unlabeled1, logits_unlabeled2)
            total_loss = supervised_loss + lambda_consistency * cons_loss
    
        grads = tape.gradient(total_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        return total_loss
    
    # Entraînement avec régularisation par consistance
    def train_model(model, labeled_data, labeled_labels, unlabeled_data, epochs=10, batch_size=32, lambda_consistency=1.0):
        optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
        loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
    
        n_batches = max(len(labeled_data), len(unlabeled_data)) // batch_size + 1
    
        for epoch in range(epochs):
            print(f'Epoch {epoch + 1}/{epochs}')
            total_loss = 0.0
            for i in range(n_batches):
                # Batches étiquetés
                start_l = i * batch_size
                end_l = start_l + batch_size
                x_batch_l = labeled_data[start_l:end_l]
                y_batch_l = labeled_labels[start_l:end_l]
    
                # Batches non étiquetés
                start_u = i * batch_size
                end_u = start_u + batch_size
                x_batch_u = unlabeled_data[start_u:end_u]
    
                if len(x_batch_l) == 0 or len(x_batch_u) == 0:
                    continue  # sauter s’il manque des données
    
                loss = train_step(model, optimizer, loss_fn, x_batch_l, y_batch_l, x_batch_u, lambda_consistency)
                total_loss += loss.numpy() * len(x_batch_l)
    
            avg_loss = total_loss / len(labeled_data)
            print(f"Loss moyenne Epoch {epoch + 1}: {avg_loss:.4f}")
    
    # Création et entraînement du modèle
    model = create_model()
    train_model(model, labeled_data, labeled_labels, unlabeled_data, epochs=10, batch_size=64, lambda_consistency=1.0)
    
    # Évaluation sur les données de test
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    
    logits = model(x_test, training=False)
    test_loss = loss_fn(y_test, logits).numpy()
    predictions = tf.argmax(logits, axis=1).numpy()
    accuracy = np.mean(predictions == y_test)
    
    print(f"Test loss: {test_loss:.4f}, Test accuracy: {accuracy:.4f}")

    Explication du code :

    import tensorflow as tf importe la bibliothèque TensorFlow, un framework puissant pour construire et entraîner des modèles d’apprentissage profond.
    
    from tensorflow.keras import layers, models importe les modules layers et models de Keras, une API haute-niveau intégrée à TensorFlow pour créer facilement des réseaux de neurones.
    
    import numpy as np importe la bibliothèque NumPy, essentielle pour la manipulation efficace des tableaux et les opérations numériques.
    
    import matplotlib.pyplot as plt importe le module pyplot de Matplotlib, utilisé pour afficher graphiquement des données (non utilisé explicitement ici, mais souvent utile).
    
    from tensorflow.keras.datasets import cifar10 importe le jeu de données CIFAR-10, un classique en vision par ordinateur contenant 60 000 images colorées réparties en 10 classes.
    
    
    1. Chargement et préparation des données
    (x_train, y_train), (x_test, y_test) = cifar10.load_data() charge les images d’entraînement et de test ainsi que leurs étiquettes respectives. x_train, x_test = x_train / 255.0, x_test / 255.0 normalise les pixels des images dans l’intervalle [0,1] en divisant par 255 pour faciliter l’entraînement. x_train = x_train.astype(np.float32) force le type des données d’images à float32, compatible avec TensorFlow et évitant les erreurs de type. y_train = y_train.flatten() aplatie les labels qui étaient en forme (N,1) en un vecteur 1D simple, plus pratique pour l’entraînement.
    2. Séparation entre données étiquetées et non étiquetées
    labeled_data = x_train[:1000] sélectionne les 1000 premières images comme données étiquetées, un petit sous-ensemble pour l’apprentissage supervisé. labeled_labels = y_train[:1000] récupère les étiquettes correspondantes à ces 1000 images. unlabeled_data = x_train[1000:] considère le reste des images d’entraînement comme données non étiquetées, simulant un scénario semi-supervisé.
    3. Construction du modèle CNN simple
    def create_model(): ... définit une fonction qui construit un modèle séquentiel CNN avec : Trois couches convolutives avec activations ReLU et max-pooling pour extraire les caractéristiques spatiales, Une couche d’aplatissement, Une couche dense de 64 neurones ReLU, Une couche de sortie dense avec 10 neurones et activation softmax, donnant des probabilités de classes.
    4. Fonction d’augmentation des données
    def data_augmentation(images): ... applique des transformations aléatoires sur un lot d’images non étiquetées, incluant : Flip horizontal et vertical aléatoire, Modification aléatoire du contraste. Cela améliore la robustesse du modèle aux variations d’images.
    5. Définition de la perte de consistance
    def consistency_loss(logits1, logits2): calcule la moyenne de la différence quadratique entre deux ensembles de prédictions (logits), ce qui encourage la stabilité du modèle face aux images augmentées.
    6. Une étape d’entraînement (train_step)
    @tf.function compile la fonction pour optimiser ses performances. def train_step(...): effectue : La prédiction sur les images étiquetées et calcul de la perte supervisée classique (cross-entropy), La prédiction sur les images non étiquetées originales et leurs versions augmentées, Le calcul de la perte de consistance entre ces deux prédictions, La somme des deux pertes pondérées par un facteur lambda_consistency, La rétropropagation des gradients et la mise à jour des poids du modèle.
    7. Boucle d’entraînement complète (train_model)
    def train_model(...): entraîne le modèle sur un nombre d’époques donné en : Divisant les données en lots, Pour chaque lot, récupérant un batch étiqueté et un batch non étiqueté, Effectuant la mise à jour des poids via train_step, Affichant la perte moyenne par époque.
    8. Évaluation finale du modèle
    logits = model(x_test, training=False) calcule les prédictions sur le jeu de test complet en mode évaluation. loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) définit la fonction de perte adaptée au mode évaluation avec logits. test_loss = loss_fn(y_test, logits).numpy() calcule la perte moyenne sur le jeu de test. predictions = tf.argmax(logits, axis=1).numpy() récupère la classe prédite pour chaque image. accuracy = np.mean(predictions == y_test) calcule la précision finale. print(f"Test loss: {test_loss:.4f}, Test accuracy: {accuracy:.4f}") affiche ces métriques. Ce code met en œuvre une stratégie d’apprentissage semi-supervisé avec régularisation par consistance : un petit sous-ensemble de données étiquetées entraîne le modèle de manière classique, tandis que les données non étiquetées sont exploitées grâce à une perte qui force la stabilité des prédictions avant et après augmentation. Cela permet d’améliorer la généralisation en utilisant plus efficacement toutes les données disponibles.