Классификация видео с использованием CNN-RNN

Классификация видео  является важной задачей, конечным итогом которой является понимание того, что происходит на видеоданных.  Здесь https://keras.io/examples/vision/video_classification/ описан простой метод классификации на основе CNN-RNN, при этом сам пример по ряду причин не запускается, одной из которых является отсутствие данных на сайте https://git.io/. Собственно это данные датасета UCF101, которые есть тут:

https://www.crcv.ucf.edu/data/UCF101.php

Но просто для проверки работы метода мы воспользуемся данными и примером, которые успешно уже загружены кем-то на Kaggle:

https://www.kaggle.com/code/ziadfellahidrissi/video-classification-with-a-cnn-rnn-architecture/notebook

Набор данных состоит из видео, классифицированных по различным действиям, таким как удары в крикет, удары руками, езда на велосипеде и т. д. Этот набор данных обычно используется для создания распознавателей действий, которые являются приложением для классификации видео.

Видео состоит из упорядоченной последовательности кадров. Каждый кадр содержит пространственную информацию, а последовательность этих кадров содержит временную информацию. Для моделирования обоих этих аспектов в примере используется гибридная архитектура, состоящую из сверток (для пространственной обработки), а также рекуррентных слоев (для временной обработки). В частности, используется сверточная нейронная сеть (CNN) и рекуррентная нейронная сеть (RNN), состоящую из слоев GRU. Этот тип гибридной архитектуры широко известен как CNN-RNN.

Установка и определение параметров

Для упрощение обучения будет использовано только 5 классов видео:

['CricketShot', 'PlayingCello', 'Punch', 'ShavingBeard', 'TennisSwing']

Библиотеки:

from tensorflow import keras

from imutils import paths

import matplotlib.pyplot as plt

import tensorflow as tf

import pandas as pd

import numpy as np

import imageio

import cv2

import os

import warnings

Инициализация:

IMG_SIZE = 224

BATCH_SIZE = 64

EPOCHS = 13

MAX_SEQ_LENGTH = 20

NUM_FEATURES = 2048

 Подготовка данных

train_df = pd.read_csv("/kaggle/input/ucf101-dataset/ucf101_top5/train.csv")

test_df = pd.read_csv("/kaggle/input/ucf101-dataset/ucf101_top5/test.csv")

print(f"Total videos for training: {len(train_df)}")

print(f"Total videos for testing: {len(test_df)}")

train_df.head(10)

Total videos for training: 594
Total videos for testing: 224

 

Out[4]:
  video_name tag
0 v_CricketShot_g08_c01.avi CricketShot
1 v_CricketShot_g08_c02.avi CricketShot
2 v_CricketShot_g08_c03.avi CricketShot
3 v_CricketShot_g08_c04.avi CricketShot
4 v_CricketShot_g08_c05.avi CricketShot
5 v_CricketShot_g08_c06.avi CricketShot
6 v_CricketShot_g08_c07.avi CricketShot
7 v_CricketShot_g09_c01.avi CricketShot
8 v_CricketShot_g09_c02.avi CricketShot
9 v_CricketShot_g09_c03.avi CricketShot

Поскольку видео представляет собой упорядоченную последовательность кадров, можно просто извлечь кадры и поместить их в трехмерный тензор. Но количество кадров может отличаться от видео к видео, что помешает складывать их в пакеты. В качестве альтернативы можно сохранять видеокадры с фиксированным интервалом, пока не будет достигнуто максимальное количество кадров. В этом примере мы сделается следующее:

  • Захват кадров видео.
  • Извлекайте кадры из видео, пока не будет достигнуто максимальное количество кадров.
  • В случае, когда количество кадров видео меньше максимального количества кадров, мы дополним видео нулями.

Обратите внимание, что этот рабочий процесс идентичен задачам, связанным с текстовыми последовательностями. Известно, что видео из набора данных UCF101 не содержат резких изменений объектов и действий в разных кадрах. Из-за этого может быть нормально рассмотреть только несколько кадров для задачи обучения. Но этот подход может не подходить для других задач классификации видео. 

def crop_center_square(frame):

    y, x = frame.shape[0:2]

    min_dim = min(y, x)

    start_x = (x // 2) - (min_dim // 2)

    start_y = (y // 2) - (min_dim // 2)

    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]

 

 

def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):

    cap = cv2.VideoCapture(path)

    frames = [

    try:

        while True:

            ret, frame = cap.read()

            if not ret:

                break

            frame = crop_center_square(frame)

            frame = cv2.resize(frame, resize)

            frame = frame[:, :, [2, 1, 0]]

            frames.append(frame)

 

            if len(frames) == max_frames:

                break

    finally:

        cap.release()

 

    return np.array(frames)

Можно использовать предварительно обученную сеть для извлечения значимых признаков из извлеченных кадров. Для этой цели мы будет использоваться модель InceptionV3.

def build_feature_extractor():

   

    feature_extractor = keras.applications.InceptionV3(

        weights="imagenet",

        include_top=False,

        pooling="avg",

        input_shape=(IMG_SIZE, IMG_SIZE, 3),

    )

   

    preprocess_input = keras.applications.inception_v3.preprocess_input

 

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))

    preprocessed = preprocess_input(inputs)

 

    outputs = feature_extractor(preprocessed)

 

    return keras.Model(inputs, outputs, name="feature_extractor")

import warnings

feature_extractor = build_feature_extractor()

Ярлыки видео представляют собой строки. Нейронные сети не понимают строковые значения, поэтому их необходимо преобразовать в какую-либо числовую форму, прежде чем они будут переданы в модель. Здесь мы будем использовать слой StringLookup, кодирующий метки классов как целые числа:

label_processor = keras.layers.StringLookup(

    num_oov_indices=0, vocabulary=np.unique(train_df["tag"])

)

 

print(label_processor.get_vocabulary())

Последняя функция работы с видео:

def prepare_all_videos(df, root_dir):

    num_samples = len(df)

    video_paths = df["video_name"].values.tolist()

    labels = df["tag"].values

    labels = label_processor(labels[..., None]).numpy()

 

    # `frame_masks` and `frame_features` are what we will feed to our sequence model.

    # `frame_masks` will contain a bunch of booleans denoting if a timestep is

    # masked with padding or not.

    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")

    frame_features = np.zeros(

        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"

    )

 

    # For each video.

    for idx, path in enumerate(video_paths):

        # Gather all its frames and add a batch dimension.

        frames = load_video(os.path.join(root_dir, path))

        frames = frames[None, ...]

 

        # Initialize placeholders to store the masks and features of the current video.

        temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")

        temp_frame_features = np.zeros(

            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"

        )

 

        # Extract features from the frames of the current video.

        for i, batch in enumerate(frames):

            video_length = batch.shape[0]

            length = min(MAX_SEQ_LENGTH, video_length)

            for j in range(length):

                temp_frame_features[i, j, :] = feature_extractor.predict(

                    batch[None, j, :]

                )

            temp_frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

 

        frame_features[idx,] = temp_frame_features.squeeze()

        frame_masks[idx,] = temp_frame_mask.squeeze()

 

    return (frame_features, frame_masks), labels

 

 

train_data, train_labels = prepare_all_videos(train_df, "/kaggle/input/ucf101-dataset/ucf101_top5/train/")

test_data, test_labels = prepare_all_videos(test_df, "/kaggle/input/ucf101-dataset/ucf101_top5/test/")

print(f"Frame features in train set: {train_data[0].shape}")

print(f"Frame masks in train set: {train_data[1].shape}")

 Модель последовательности

Теперь можно передать эти данные в модель последовательности, состоящую из повторяющихся слоев, таких как GRU.

# Utility for our sequence model.

def get_sequence_model():

    class_vocab = label_processor.get_vocabulary()

 

    frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))

    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

 

    x = keras.layers.GRU(16, return_sequences=True)(

        frame_features_input, mask=mask_input

    )

    x = keras.layers.GRU(8)(x)

    x = keras.layers.Dropout(0.4)(x)

    x = keras.layers.Dense(8, activation="relu")(x)

    output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)

 

    rnn_model = keras.Model([frame_features_input, mask_input], output)

 

    rnn_model.compile(

        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]

    )

    return rnn_model

 

 # Utility for running experiments.

def run_experiment():

    filepath = "train/cp.ckpt"

    checkpoint = keras.callbacks.ModelCheckpoint(

        filepath, save_weights_only=True, save_best_only=True, verbose=1

    )

    seq_model = get_sequence_model()

    history = seq_model.fit(

        [train_data[0], train_data[1]],

        train_labels,

        validation_split=0.3,

        epochs=EPOCHS,

        callbacks=[checkpoint],

    )

   seq_model.load_weights(filepath)

    _, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)

    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

 

    return history, seq_model

 _, sequence_model = run_experiment()

 

..................................

 

Epoch 00008: val_loss did not improve from 1.59328Epoch 9/1313/13 [==============================] - 1s 53ms/step - loss: 0.8062 - accuracy: 0.7133 - val_loss: 1.7616 - val_accuracy: 0.3240Epoch 00009: val_loss did not improve from 1.59328Epoch 10/1313/13 [==============================] - 1s 55ms/step - loss: 0.7406 - accuracy: 0.7590 - val_loss: 1.8516 - val_accuracy: 0.3408Epoch 00010: val_loss did not improve from 1.59328Epoch 11/1313/13 [==============================] - 1s 54ms/step - loss: 0.7292 - accuracy: 0.7590 - val_loss: 1.8783 - val_accuracy: 0.3296Epoch 00011: val_loss did not improve from 1.59328Epoch 12/1313/13 [==============================] - 1s 54ms/step - loss: 0.6738 - accuracy: 0.7904 - val_loss: 1.9583 - val_accuracy: 0.3352Epoch 00012: val_loss did not improve from 1.59328Epoch 13/1313/13 [==============================] - 1s 53ms/step - loss: 0.6324 - accuracy: 0.7783 - val_loss: 2.0679 - val_accuracy: 0.3408Epoch 00013: val_loss did not improve from 1.593287/7 [==============================] - 2s 18ms/step - loss: 1.2250 - accuracy: 0.5804Test accuracy: 58.04%

Точность на валидационных данных составила 58.04%. Для примера можно проверить одно видео:

def prepare_single_video(frames):

    frames = frames[None, ...]

    frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")

    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    for i, batch in enumerate(frames):

        video_length = batch.shape[0]

        length = min(MAX_SEQ_LENGTH, video_length)

        for j in range(length):

            frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])

        frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked

 

    return frame_features, frame_mask

  

def sequence_prediction(path):

    class_vocab = label_processor.get_vocabulary()

 

    frames = load_video(os.path.join("/kaggle/input/ucf101-dataset/ucf101_top5/test/", path))

    frame_features, frame_mask = prepare_single_video(frames)

    probabilities = sequence_model.predict([frame_features, frame_mask])[0]

 

    for i in np.argsort(probabilities)[::-1]:

        print(f"  {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")

    return frames

   

test_video ="v_Punch_g06_c02.avi"

print(f"Test video path: {test_video}")

 

test_frames = sequence_prediction(test_video)

 

Test video path: v_Punch_g06_c02.avi
Punch: 77.88%
ShavingBeard: 6.83%
CricketShot: 5.76%
PlayingCello: 4.80%
TennisSwing: 4.74%