Классификация видео с использованием CNN-RNN
Классификация видео является важной задачей, конечным итогом которой является понимание того, что происходит на видеоданных. Здесь https://keras.io/examples/vision/video_classification/ описан простой метод классификации на основе CNN-RNN, при этом сам пример по ряду причин не запускается, одной из которых является отсутствие данных на сайте https://git.io/. Собственно это данные датасета UCF101, которые есть тут:
https://www.crcv.ucf.edu/data/UCF101.php
Но просто для проверки работы метода мы воспользуемся данными и примером, которые успешно уже загружены кем-то на Kaggle:
Набор данных состоит из видео, классифицированных по различным действиям, таким как удары в крикет, удары руками, езда на велосипеде и т. д. Этот набор данных обычно используется для создания распознавателей действий, которые являются приложением для классификации видео.
Видео состоит из упорядоченной последовательности кадров. Каждый кадр содержит пространственную информацию, а последовательность этих кадров содержит временную информацию. Для моделирования обоих этих аспектов в примере используется гибридная архитектура, состоящую из сверток (для пространственной обработки), а также рекуррентных слоев (для временной обработки). В частности, используется сверточная нейронная сеть (CNN) и рекуррентная нейронная сеть (RNN), состоящую из слоев GRU. Этот тип гибридной архитектуры широко известен как CNN-RNN.
Установка и определение параметров
Для упрощение обучения будет использовано только 5 классов видео:
['CricketShot', 'PlayingCello', 'Punch', 'ShavingBeard', 'TennisSwing']
Библиотеки:
from tensorflow import keras
from imutils import paths
import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import os
import warnings
Инициализация:
IMG_SIZE = 224
BATCH_SIZE = 64
EPOCHS = 13
MAX_SEQ_LENGTH = 20
NUM_FEATURES = 2048
Подготовка данных
train_df = pd.read_csv("/kaggle/input/ucf101-dataset/ucf101_top5/train.csv")
test_df = pd.read_csv("/kaggle/input/ucf101-dataset/ucf101_top5/test.csv")
print(f"Total videos for training: {len(train_df)}")
print(f"Total videos for testing: {len(test_df)}")
train_df.head(10)
Поскольку видео представляет собой упорядоченную последовательность кадров, можно просто извлечь кадры и поместить их в трехмерный тензор. Но количество кадров может отличаться от видео к видео, что помешает складывать их в пакеты. В качестве альтернативы можно сохранять видеокадры с фиксированным интервалом, пока не будет достигнуто максимальное количество кадров. В этом примере мы сделается следующее:
- Захват кадров видео.
- Извлекайте кадры из видео, пока не будет достигнуто максимальное количество кадров.
- В случае, когда количество кадров видео меньше максимального количества кадров, мы дополним видео нулями.
Обратите внимание, что этот рабочий процесс идентичен задачам, связанным с текстовыми последовательностями. Известно, что видео из набора данных UCF101 не содержат резких изменений объектов и действий в разных кадрах. Из-за этого может быть нормально рассмотреть только несколько кадров для задачи обучения. Но этот подход может не подходить для других задач классификации видео.
def crop_center_square(frame):
y, x = frame.shape[0:2]
min_dim = min(y, x)
start_x = (x // 2) - (min_dim // 2)
start_y = (y // 2) - (min_dim // 2)
return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]
def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
cap = cv2.VideoCapture(path)
frames = [
try:
while True:
ret, frame = cap.read()
if not ret:
break
frame = crop_center_square(frame)
frame = cv2.resize(frame, resize)
frame = frame[:, :, [2, 1, 0]]
frames.append(frame)
if len(frames) == max_frames:
break
finally:
cap.release()
return np.array(frames)
Можно использовать предварительно обученную сеть для извлечения значимых признаков из извлеченных кадров. Для этой цели мы будет использоваться модель InceptionV3.
def build_feature_extractor():
feature_extractor = keras.applications.InceptionV3(
weights="imagenet",
include_top=False,
pooling="avg",
input_shape=(IMG_SIZE, IMG_SIZE, 3),
)
preprocess_input = keras.applications.inception_v3.preprocess_input
inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
preprocessed = preprocess_input(inputs)
outputs = feature_extractor(preprocessed)
return keras.Model(inputs, outputs, name="feature_extractor")
import warnings
feature_extractor = build_feature_extractor()
Ярлыки видео представляют собой строки. Нейронные сети не понимают строковые значения, поэтому их необходимо преобразовать в какую-либо числовую форму, прежде чем они будут переданы в модель. Здесь мы будем использовать слой StringLookup, кодирующий метки классов как целые числа:
label_processor = keras.layers.StringLookup(
num_oov_indices=0, vocabulary=np.unique(train_df["tag"])
)
print(label_processor.get_vocabulary())
Последняя функция работы с видео:
def prepare_all_videos(df, root_dir):
num_samples = len(df)
video_paths = df["video_name"].values.tolist()
labels = df["tag"].values
labels = label_processor(labels[..., None]).numpy()
# `frame_masks` and `frame_features` are what we will feed to our sequence model.
# `frame_masks` will contain a bunch of booleans denoting if a timestep is
# masked with padding or not.
frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
frame_features = np.zeros(
shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
)
# For each video.
for idx, path in enumerate(video_paths):
# Gather all its frames and add a batch dimension.
frames = load_video(os.path.join(root_dir, path))
frames = frames[None, ...]
# Initialize placeholders to store the masks and features of the current video.
temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
temp_frame_features = np.zeros(
shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
)
# Extract features from the frames of the current video.
for i, batch in enumerate(frames):
video_length = batch.shape[0]
length = min(MAX_SEQ_LENGTH, video_length)
for j in range(length):
temp_frame_features[i, j, :] = feature_extractor.predict(
batch[None, j, :]
)
temp_frame_mask[i, :length] = 1 # 1 = not masked, 0 = masked
frame_features[idx,] = temp_frame_features.squeeze()
frame_masks[idx,] = temp_frame_mask.squeeze()
return (frame_features, frame_masks), labels
train_data, train_labels = prepare_all_videos(train_df, "/kaggle/input/ucf101-dataset/ucf101_top5/train/")
test_data, test_labels = prepare_all_videos(test_df, "/kaggle/input/ucf101-dataset/ucf101_top5/test/")
print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")
Модель последовательности
Теперь можно передать эти данные в модель последовательности, состоящую из повторяющихся слоев, таких как GRU.
# Utility for our sequence model.
def get_sequence_model():
class_vocab = label_processor.get_vocabulary()
frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")
x = keras.layers.GRU(16, return_sequences=True)(
frame_features_input, mask=mask_input
)
x = keras.layers.GRU(8)(x)
x = keras.layers.Dropout(0.4)(x)
x = keras.layers.Dense(8, activation="relu")(x)
output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)
rnn_model = keras.Model([frame_features_input, mask_input], output)
rnn_model.compile(
loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
)
return rnn_model
# Utility for running experiments.
def run_experiment():
filepath = "train/cp.ckpt"
checkpoint = keras.callbacks.ModelCheckpoint(
filepath, save_weights_only=True, save_best_only=True, verbose=1
)
seq_model = get_sequence_model()
history = seq_model.fit(
[train_data[0], train_data[1]],
train_labels,
validation_split=0.3,
epochs=EPOCHS,
callbacks=[checkpoint],
)
seq_model.load_weights(filepath)
_, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")
return history, seq_model
_, sequence_model = run_experiment()
..................................
Epoch 00008: val_loss did not improve from 1.59328Epoch 9/1313/13 [==============================] - 1s 53ms/step - loss: 0.8062 - accuracy: 0.7133 - val_loss: 1.7616 - val_accuracy: 0.3240Epoch 00009: val_loss did not improve from 1.59328Epoch 10/1313/13 [==============================] - 1s 55ms/step - loss: 0.7406 - accuracy: 0.7590 - val_loss: 1.8516 - val_accuracy: 0.3408Epoch 00010: val_loss did not improve from 1.59328Epoch 11/1313/13 [==============================] - 1s 54ms/step - loss: 0.7292 - accuracy: 0.7590 - val_loss: 1.8783 - val_accuracy: 0.3296Epoch 00011: val_loss did not improve from 1.59328Epoch 12/1313/13 [==============================] - 1s 54ms/step - loss: 0.6738 - accuracy: 0.7904 - val_loss: 1.9583 - val_accuracy: 0.3352Epoch 00012: val_loss did not improve from 1.59328Epoch 13/1313/13 [==============================] - 1s 53ms/step - loss: 0.6324 - accuracy: 0.7783 - val_loss: 2.0679 - val_accuracy: 0.3408Epoch 00013: val_loss did not improve from 1.593287/7 [==============================] - 2s 18ms/step - loss: 1.2250 - accuracy: 0.5804Test accuracy: 58.04%
Точность на валидационных данных составила 58.04%. Для примера можно проверить одно видео:
def prepare_single_video(frames):
frames = frames[None, ...]
frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")
for i, batch in enumerate(frames):
video_length = batch.shape[0]
length = min(MAX_SEQ_LENGTH, video_length)
for j in range(length):
frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
frame_mask[i, :length] = 1 # 1 = not masked, 0 = masked
return frame_features, frame_mask
def sequence_prediction(path):
class_vocab = label_processor.get_vocabulary()
frames = load_video(os.path.join("/kaggle/input/ucf101-dataset/ucf101_top5/test/", path))
frame_features, frame_mask = prepare_single_video(frames)
probabilities = sequence_model.predict([frame_features, frame_mask])[0]
for i in np.argsort(probabilities)[::-1]:
print(f" {class_vocab[i]}: {probabilities[i] * 100:5.2f}%")
return frames
test_video ="v_Punch_g06_c02.avi"
print(f"Test video path: {test_video}")
test_frames = sequence_prediction(test_video)
Test video path: v_Punch_g06_c02.avi
Punch: 77.88%
ShavingBeard: 6.83%
CricketShot: 5.76%
PlayingCello: 4.80%
TennisSwing: 4.74%