Интеграция triton inference server в бесплатную систему видеонаблюдения Frigate

Frigate — это сетевой видеорегистратор с открытым исходным кодом, построенный на основе обнаружения объектов в реальном времени с помощью искусственного интеллекта. Вся обработка выполняется локально на собственном оборудовании.

Стандартная проблема: встроенный детектор Frigate (например, CPU/OpenVINO или даже CUDA) хорош для базовых сценариев, но при масштабировании на 10+ камер или использовании тяжёлых моделей (YOLOv8, EfficientDet) производительность падает, а управление батчами и динамическим batching'ем отсутствует.

Также не нужно забывать, что часто есть специфические решения, которые отлично работают через http и gRPC, и которые напрямую тяжело встроить в Frigate, так как "ломают" всё окружение, но в отдельном докере работают замечательно. Поэтому в данной статье описано, как связать между собой triton inference server и Frigate.

 Поднимаем детектор объектов на Triton

Для начала поднимем triton, запустим там модель и проверим через Python скрипт. Сначала загружаем нужный докер:

docker pull nvcr.io/nvidia/tritonserver:25.08-py3docker pull nvcr.io/nvidia/tritonserver:25.08-py3

Запускаем, пробрасывая порты и устанавливая внешнюю директорию:

docker run --gpus all --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 -v F:/dockerdisk2/models:/models nvcr.io/nvidia/tritonserver:25.08-py3   tritonserver --model-repository=/models --strict-model-config=false

Директория models будет содержать информацию о моделях. В нашем случае таким образом:

SIZ_detector\
    1\
        model.onnx
    config.pbtxt

где model.onnx - модель YoloX детектирования объектов, а config.pbtxt - следующий конфигурационный файл, описывающий эту модель:

name: "SIZ_detector"
platform: "onnxruntime_onnx"
max_batch_size: 1  
input [
  {
    name: "images"
    data_type: TYPE_FP32
    format: FORMAT_NCHW
    dims: [3, 640, 640]
  }
]
output [
  {
    name: "output"  
    data_type: TYPE_FP32
    dims: [-1, -1]  
  }
]

При запуске докера должна выдаться информация о готовности модели:

2026-04-08_17-52-40

 Тестирование работы модели детектирования СИЗ в Python скрипте

Пример вызова модели на на СИЗ (средствах индивидуальной защиты) представлено в скрипте ниже.

import tritonclient.http as httpclient

import numpy as np

import cv2

import yoloxpostprocess as yxp # модуль с постпроцессингом YOLOX

import SCRFD  # модуль с постпроцессингом SCRFD

 

# --- Настройки ---

TRITON_URL = "localhost:8000"

IMAGE_PATH = "094ec1b75a4a.jpg"

OUTPUT_PATH = "result_0.jpg"

 

# Имена классов СИЗ

names = [

    'Fall-Detected', 'Gloves', 'Goggles', 'Hardhat', 'Ladder', 'Mask',

    'NO-Gloves', 'NO-Goggles', 'NO-Hardhat', 'NO-Mask', 'NO-Safety Vest',

    'Person', 'Safety Cone', 'Safety Vest'

]

convert_class = [0, 1, 10, 11, 12, 13, 2, 3, 4, 5, 6, 7, 8, 9]

 

# --- Загрузка изображения ---

image = cv2.imread(IMAGE_PATH)

if image is None:

    raise FileNotFoundError(f"Не найдено изображение: {IMAGE_PATH}")

 

original_image = image.copy()

h_orig, w_orig = original_image.shape[:2]

 

# Предобработка: resize до 640x640, NCHW, batch=1

resized = cv2.resize(image, (640, 640))

input_data = np.transpose(resized, (2, 0, 1)).astype(np.float32)

input_data = np.expand_dims(input_data, axis=0)  # (1, 3, 640, 640)

 

client = httpclient.InferenceServerClient(url=TRITON_URL)

 

# Инференс СИЗ

inputs_siz = [httpclient.InferInput("images", input_data.shape, "FP32")]

inputs_siz[0].set_data_from_numpy(input_data)

outputs_siz = [httpclient.InferRequestedOutput("output")]

response_siz = client.infer(model_name="SIZ_detector", inputs=inputs_siz, outputs=outputs_siz)

raw_output_siz = response_siz.as_numpy("output"# (1, N, 19)

 

# Постпроцессинг СИЗ

dets_siz = None

if raw_output_siz.size > 0:

    output_no_batch = raw_output_siz[0]  # (N, 19)

    predictions = yxp.postprocess(np.expand_dims(output_no_batch, axis=0), (640, 640))[0]

    if predictions is not None and len(predictions) > 0:

        boxes = predictions[:, :4]

        scores = predictions[:, 4:5] * predictions[:, 5:]

        boxes_xyxy = np.ones_like(boxes)

        boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2.

        boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2.

        boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2.

        boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2.

        dets_siz = yxp.multiclass_nms(boxes_xyxy, scores, nms_thr=0.45, score_thr=0.1)

 

# Визуализация

result_img = original_image.copy()

 

# --- Рисуем СИЗ (прямоугольники) ---

if dets_siz is not None and len(dets_siz) > 0:

    scale_x = w_orig / 640.0

    scale_y = h_orig / 640.0   

 

    converted_classes = np.array([convert_class[int(cls_id)] for cls_id in dets_siz[:, 5]])

    dets_siz[:, 5] = converted_classes

 

    for det in dets_siz:

        x1, y1, x2, y2, score, class_id = det

        x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])

        if score >= 0.1:

            cv2.rectangle(result_img, (x1, y1), (x2, y2), (0, 255, 0), 2)

            label = f"{names[int(class_id)]}: {score:.2f}"

            (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)

            cv2.rectangle(result_img, (x1, y1 - h - 5), (x1 + w, y1), (0, 255, 0), -1)

            cv2.putText(result_img, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2)

 

 

# --- Сохранение ---

cv2.imwrite(OUTPUT_PATH, result_img)

print(f"Результат с СИЗ в: {OUTPUT_PATH}")

Результат работы для разных изображений показывает детектированные объекты:

  • result_0
  • result_1
  • result_2
  • result_3

 Как интегрировать Triton в Frigate

Сначала скачиваем докер Frigate. Была использован версия  ghcr.io/blakeblackshear/frigate:0.16.1

Код располагается по следующему пути: opt/frigate/frigate

По указанному пути нас интересует файл object_detection\base.py. Создадим собственный детектор. Для этого в классе LocalObjectDetector:

  • в конструкторе __init_ инициализируем собственный детектор;
  • в методе detect_raw вызываем функцию своего детектора.

Выглядит это так. Объявление детектора в заголовке функции

from frigate.object_detection.triton_siz_detector import SIZDetector

Объявляем свой детектор:

def __init__(

        self,

        detector_config: BaseDetectorConfig = None,

        labels: str = None,

    ):

        self.fps = EventsPerSecond()

        if labels is None:

            self.labels = {}

        else:

            self.labels = load_labels(labels)

 

        if detector_config:

            self.input_transform = tensor_transform(detector_config.model.input_tensor) 

            self.dtype = detector_config.model.input_dtype

        else:

            self.input_transform = None

            self.dtype = InputDTypeEnum.int

 

        self.detect_api = create_detector(detector_config)

       

        #Наш детектор

        self.object_detector_siz = SIZDetector(None)

Вызов детектора в каждом кадре:

def detect_raw(self, tensor_input: np.ndarray):

        if self.input_transform:

            tensor_input = np.transpose(tensor_input, self.input_transform)

 

        if self.dtype == InputDTypeEnum.float:

            tensor_input = tensor_input.astype(np.float32)

            tensor_input /= 255

        elif self.dtype == InputDTypeEnum.float_denorm:

            tensor_input = tensor_input.astype(np.float32)

         #Наш детектор

        return self.object_detector_siz.detect(tensor_input)

        #return self.detect_api.detect_raw(tensor_input=tensor_input)

Ну а вот код файла нашего детектора triton_siz_detector.py:

import numpy as np

import cv2

import tritonclient.http as httpclient

from typing import Dict, List, Optional

import logging

import frigate.object_detection.yoloxpostprocess as yxp

 

 

TRITON_URL = "triton:8000"

 

SIZ_NAMES = [

    'Fall-Detected', 'Gloves', 'Goggles', 'Hardhat', 'Ladder', 'Mask',

    'NO-Gloves', 'NO-Goggles', 'NO-Hardhat', 'NO-Mask', 'NO-Safety Vest',

    'Person', 'Safety Cone', 'Safety Vest'

]

convert_class = [0, 1, 10, 11, 12, 13, 2, 3, 4, 5, 6, 7, 8, 9]

 

class SIZDetector:

    def __init__(self, detector_config):

        self.logger = logging.getLogger(__name__)

        self.client = None

        self._connect_to_triton()

       

    def _connect_to_triton(self):

        """Triton Inference Server"""

        try:

            self.client = httpclient.InferenceServerClient(url=TRITON_URL)

            self.logger.info("Triton ok")

        except Exception as e:

            self.logger.error(f"Triton Error: {e}")

            self.client = None

 

    def detect(self, tensor_input, thresholds=None):

        """       

        tensor_input:

        thresholds:

        """

        if self.client is None:

            self._connect_to_triton()

            if self.client is None:

                return np.zeros((0, 6))

       

        try:

            # tensor -> numpy array

            if hasattr(tensor_input, 'numpy'):

                frame = tensor_input.numpy()

            else:

                frame = tensor_input

           

            # (1, height, width, 3) -> (height, width, 3)

            if len(frame.shape) == 4:

                frame = frame[0]  # delete batch dimension

           

            # Triton (640x640, BGR)

            resized_frame = cv2.resize(frame, (640, 640))

                       

            siz_detections = self._detect_siz(resized_frame)

                                   

            return self._convert_detections_to_frigate_raw(siz_detections)

           

        except Exception as e:

            self.logger.error(f"Error in detector: {e}")

            return np.zeros((0, 6)), ["person"]

   

    def _detect_siz(self, frame):

       

        try:                        

            input_data = np.transpose(frame, (2, 0, 1)).astype(np.float32)  # HWC -> CHW

            input_data = np.expand_dims(input_data, axis=0)  # batch dimension

           

            #  Triton           

            inputs = [httpclient.InferInput("images", input_data.shape, "FP32")]           

            inputs[0].set_data_from_numpy(input_data)           

            outputs = [httpclient.InferRequestedOutput("output")]

                       

            # inference

            response = self.client.infer(model_name="SIZ_detector", inputs=inputs, outputs=outputs) 

            raw_output = response.as_numpy("output")

           

            return self._postprocess_siz(raw_output, frame.shape[:2])

           

        except Exception as e:

            self.logger.error(f"Error in SIZ: {e}")

            return [

   

    def _postprocess_siz(self, raw_output, original_shape):

       

        detections = [

       

        if raw_output.size > 0:

            output_no_batch = raw_output[0]  # (N, 19)

 

            predictions = yxp.postprocess(np.expand_dims(output_no_batch, axis=0), (640, 640))[0]

 

            if predictions is not None and len(predictions) > 0:

                boxes = predictions[:, :4]

                scores = predictions[:, 4:5] * predictions[:, 5:]

                boxes_xyxy = np.ones_like(boxes)

                boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2.

                boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2.

                boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2.

                boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2.

 

                dets_siz = yxp.multiclass_nms(boxes_xyxy, scores, nms_thr=0.45, score_thr=0.1)

 

                if dets_siz is not None and len(dets_siz) > 0:

                    scale_x = original_shape[1] / 640.0

                    scale_y = original_shape[0] / 640.0

                    dets_siz[:, [0, 2]] *= scale_x

                    dets_siz[:, [1, 3]] *= scale_y

                    

                    converted_classes = np.array([convert_class[int(cls_id)] for cls_id in dets_siz[:, 5]])

                    dets_siz[:, 5] = converted_classes

 

                    for det in dets_siz:

                        x1, y1, x2, y2, score, class_id = det

                        x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])

                        if score >= 0.1:

                            class_id = int(class_id)

                            detections.append({

                                'bbox': [x1, y1, x2, y2],

                                'score': score,

                                'class_id': class_id,

                                'class_name': SIZ_NAMES[class_id] if class_id < len(SIZ_NAMES) else 'unknown'

                            })

                      

        return detections

 

    def _convert_detections_to_frigate_raw(self,detections, class_mapping=None, max_detections=20, image_width=640, image_height=640):

        """

        Преобразует список детекций из формата:

            {'bbox': [x1, y1, x2, y2], 'score': ..., 'class_id': ...}

        в формат, ожидаемый Frigate в detect_raw:

            [[frigate_class_id, score, x1, y1, x2, y2], ...]

 

        Параметры:

            detections: list[dict] — результаты вашего детектора.

            class_mapping: dict[int, int] — маппинг {ваш_class_id: frigate_class_id}

            max_detections: int — максимальное число строк в выходе (остальное — нули)

 

        Возвращает:

            np.ndarray формы (max_detections, 6) с float32 значениями.

        """       

        if class_mapping is None:

            class_mapping = {11: 0}  # по умолчанию: только человек

 

        # Фильтруем и маппим только нужные классы

        raw_list = [

        for det in detections:

            your_class_id = int(det['class_id'])

            if your_class_id in class_mapping:

                frigate_class_id = class_mapping[your_class_id]

                x1, y1, x2, y2 = det['bbox']

                 # Нормализуем координаты в [0, 1]

                x1 = float(x1) / image_width

                y1 = float(y1) / image_height

                x2 = float(x2) / image_width

                y2 = float(y2) / image_height

                score = float(det['score'])

                raw_list.append([frigate_class_id, score, x1, y1, x2, y2])

 

        # Сортируем по убыванию score (обязательно для Frigate!)

        raw_list.sort(key=lambda x: x[1], reverse=True)

 

        # Ограничиваем до max_detections

        raw_list = raw_list[:max_detections]

 

        # Создаём массив с padding нулями

        output = np.zeros((max_detections, 6), dtype=np.float32)

        if raw_list:

            output[:len(raw_list)] = np.array(raw_list, dtype=np.float32)       

 

        return output

   

    def _check_helmet_status(self, detections):       

        import random

 

        return random.choice([True, False])

Файл yoloxpostprocess.py не приводится и содержит в себе вспомогательные функции для YoloX.

_connect_to_triton - осуществляется подключение к Triton Inference сервер

detect - поступает на распознавание тензор изображения, которое переводится в numpy в формат OpenCV BGR 640x640, после чего вызывается детекция СИЗ - _detect_siz, а затем возвращается в сконвертированный для Frigate формат _convert_detections_to_frigate_raw

_detect_siz - здесь осуществляется передача изображения в Triton и вызов постобработки _postprocess_siz

 Сборка докера Frigate и compose

Для сборки своего докера копируем только нужные файлы и устанавливаем компонент для взаимодействия с тритоном.

# Используйте оригинальный образ Frigate в качестве базового (важно для совместимости)

FROM ghcr.io/blakeblackshear/frigate:0.16.1

 

# Скопируйте все файлы из текущей директории (frigate_source_code) в контейнер

COPY opt/frigate/frigate/object_detection/triton_siz_detector.py /opt/frigate/frigate/object_detection/

COPY opt/frigate/frigate/object_detection/base.py /opt/frigate/frigate/object_detection/

COPY opt/frigate/frigate/object_detection/yoloxpostprocess.py /opt/frigate/frigate/object_detection/

COPY opt/frigate/frigate/video.py /opt/frigate/frigate/

 

RUN pip install --break-system-packages tritonclient[all]

 

# Установите рабочую директорию (может отличаться, уточните для Frigate)

WORKDIR /opt/frigate/

Ну и финальный штрих - docker compose, чтобы запускать совместно:

version: '3.8'

 

services:

  triton:

    image: nvcr.io/nvidia/tritonserver:25.08-py3

    container_name: triton

    deploy:

      resources:

        reservations:

          devices:

            - driver: nvidia

              count: all

              capabilities: [gpu]

    ports:

      - "8000:8000"

      - "8001:8001"

      - "8002:8002"

    volumes:

      - F:/dockerdisk2/models:/models

    command: ["tritonserver", "--model-repository=/models", "--strict-model-config=false"]

    restart: unless-stopped

 

  my-frigate:

    image: my-frigate:latest

    container_name: my-frigate

    depends_on:

      - triton

    environment:

      - TZ=Europe/Moscow

      - FRIGATE_RTSP_PASSWORD=password

    ports:

      - "8971:8971"

      - "8554:8554"

      - "8555:8555/tcp"

      - "8555:8555/udp"

    volumes:

      - F:/dockerfrigate/storage:/media/frigate

      - F:/dockerfrigate/config:/config

    tmpfs:

      - /tmp/cache:rw,size=1000000000

    shm_size: 64m

    restart: unless-stopped

    stop_grace_period: 30s

Это только прототип, подтверждающий работоспособность схемы. Для реального встраивания СИЗ через тритон в Frigate. Нужно назначать каждому человеку в трекинге объект каска и перчатки (например) и суммировать их по времени и отдельно выдавать результат, чтобы был виден в том числе и в отладочном режиме Frigate.