#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Текст
"""
# ######################################################################################################################
# Импорт необходимых инструментов
# ######################################################################################################################
import warnings
# Подавление Warning
for warn in [UserWarning, FutureWarning]:
warnings.filterwarnings("ignore", category=warn)
from dataclasses import dataclass # Класс данных
import os # Взаимодействие с файловой системой
import requests # Отправка HTTP запросов
import liwc # Анализатор лингвистических запросов и подсчета слов
import numpy as np # Научные вычисления
import pandas as pd # Обработка и анализ данных
import subprocess
import torchaudio # Работа с аудио от Facebook
import re
import gradio
from urllib.error import URLError
from sklearn.metrics import mean_absolute_error
from datetime import datetime # Работа со временем
from transformers import (
MarianTokenizer,
MarianMTModel,
AutoModelForSeq2SeqLM,
AutoProcessor,
WhisperForConditionalGeneration,
BertTokenizer,
BertModel,
)
from urllib.parse import urlparse
from pathlib import Path # Работа с путями в файловой системе
# Типы данных
from typing import List, Tuple, Optional, Union, Callable # Типы данных
from types import FunctionType
from IPython.display import clear_output
import torch
import torch.nn as nn
# Персональные
from oceanai.modules.lab.download import Download # Загрузка файлов
from oceanai.modules.lab.architectures.text_architectures import text_model_hc, text_model_nn, text_model_b5
# ######################################################################################################################
# Сообщения
# ######################################################################################################################
[docs]
@dataclass
class TextMessages(Download):
"""Класс для сообщений
Args:
lang (str): Смотреть :attr:`~oceanai.modules.core.language.Language.lang`
color_simple (str): Смотреть :attr:`~oceanai.modules.core.settings.Settings.color_simple`
color_info (str): Смотреть :attr:`~oceanai.modules.core.settings.Settings.color_info`
color_err (str): Смотреть :attr:`~oceanai.modules.core.settings.Settings.color_err`
color_true (str): Смотреть :attr:`~oceanai.modules.core.settings.Settings.color_true`
bold_text (bool): Смотреть :attr:`~oceanai.modules.core.settings.Settings.bold_text`
num_to_df_display (int): Смотреть :attr:`~oceanai.modules.core.settings.Settings.num_to_df_display`
text_runtime (str): Смотреть :attr:`~oceanai.modules.core.settings.Settings.text_runtime`
"""
# ------------------------------------------------------------------------------------------------------------------
# Конструктор
# ------------------------------------------------------------------------------------------------------------------
def __post_init__(self):
super().__post_init__() # Выполнение конструктора из суперкласса
self._text_modality: str = self._(" (текстовая модальность) ...")
self._formation_text_model_hc: str = self._formation_model_hc + self._text_modality
self._formation_text_model_nn: str = self._formation_model_nn + self._text_modality
self._formation_text_model_b5: str = (
self._("Формирование нейросетевой архитектуры модели для получения " " оценок персональных качеств")
+ self._text_modality
)
self._load_text_model_weights_hc: str = self._load_model_weights_hc + self._text_modality
self._load_text_model_weights_nn: str = self._load_model_weights_nn + self._text_modality
self._load_text_model_weights_b5: str = (
self._("Загрузка весов нейросетевой модели для получения " "оценок персональных качеств")
+ self._text_modality
)
self._model_text_hc_not_formation: str = self._model_hc_not_formation + self._text_modality
self._model_text_nn_not_formation: str = self._model_nn_not_formation + self._text_modality
self._model_text_not_formation: str = (
self._oh
+ self._(
"нейросетевая архитектура модели для получения "
"оценок по экспертным и нейросетевым признакам не "
"сформирована"
)
+ self._text_modality
)
self._load_text_features: str = self._("Загрузка словаря с экспертными признаками ...")
self._load_text_features_error: str = self._oh + self._(
"не удалось загрузить словарь с экспертными признаками ..."
)
self._load_token_parser_error: str = self._oh + self._("не удалось считать лексикон LIWC ...")
self._load_translation_model: str = self._(
"Формирование токенизатора и нейросетевой модели машинного перевода ..."
)
self._load_bert_model: str = self._("Формирование токенизатора и нейросетевой модели BERT ...")
self._load_bert_model_error: str = self._oh + self._(
"не удалось загрузить токенизатор и нейросетевую модель BERT ..."
)
self._get_text_feature_info: str = self._("Извлечение признаков (экспертных и нейросетевых) из текста ...")
self._text_is_empty: str = self._oh + self._('текстовый файл "{}" пуст ...')
# ######################################################################################################################
# Текст
# ######################################################################################################################
[docs]
@dataclass
class Text(TextMessages):
"""Класс для обработки текста
Args:
lang (str): Смотреть :attr:`~oceanai.modules.core.language.Language.lang`
color_simple (str): Смотреть :attr:`~oceanai.modules.core.settings.Settings.color_simple`
color_info (str): Смотреть :attr:`~oceanai.modules.core.settings.Settings.color_info`
color_err (str): Смотреть :attr:`~oceanai.modules.core.settings.Settings.color_err`
color_true (str): Смотреть :attr:`~oceanai.modules.core.settings.Settings.color_true`
bold_text (bool): Смотреть :attr:`~oceanai.modules.core.settings.Settings.bold_text`
num_to_df_display (int): Смотреть :attr:`~oceanai.modules.core.settings.Settings.num_to_df_display`
text_runtime (str): Смотреть :attr:`~oceanai.modules.core.settings.Settings.text_runtime`
"""
# ------------------------------------------------------------------------------------------------------------------
# Конструктор
# ------------------------------------------------------------------------------------------------------------------
def __post_init__(self):
super().__post_init__() # Выполнение конструктора из суперкласса
# Нейросетевая модель **nn.Module** для получения оценок по экспертным признакам
self._text_model_hc: Optional[nn.Module] = None
# Нейросетевая модель **nn.Module** для получения оценок по нейросетевым признакам
self._text_model_nn: Optional[nn.Module] = None
self._text_model_b5: Optional[nn.Module] = None
# Словарь для формирования экспертных признаков
self._text_features: str = "https://drive.usercontent.google.com/download?id=1A_WqDPXzEmLLxjpl8YYeftp2dA9OuBRp&export=download&authuser=2&confirm=t&uuid=d0bac80d-b9ef-4c00-aa15-95398e3af59d&at=AN_67v3V4MHMTjVFeALLLX-CXzfb:1727453279624"
# self._text_features: str = (
# "https://download.sberdisk.ru/download/file/473268573?token=X3NB5VYGyPn8mjw&filename=LIWC2007.txt"
#)
# BERT модель
# self._bert_multi_model: str = "https://drive.usercontent.google.com/download?id=1yedNslt8jjwXm4pa3K15VqUg-JISJMYf&export=download&authuser=2&confirm=t&uuid=4071942c-989a-4a24-8ddd-60e964def6ec&at=AN_67v2cav8P41yss1AlvWGxXCQk:1727453235831"
# self._bert_multi_model: str = (
# "https://download.sberdisk.ru/download/file/473319508?token=p8hYNIjxacEARxl&filename=bert-base-multilingual-cased.zip"
# )
self._bert_multi_model: str = "google-bert/bert-base-multilingual-cased"
# Нейросетевая модель машинного перевода (RU -> EN)
self._translation_model: str = "Helsinki-NLP/opus-mt-ru-en"
self._tokenizer: Optional[MarianTokenizer] = None # Токенизатор для машинного перевода
self._traslate_model: Optional[MarianMTModel] = None # Нейросетевая модель для машинного перевода
self._bert_tokenizer: Optional[BertTokenizer] = None
self._bert_model: Optional[BertModel] = None
self._path_to_transriber = "openai/whisper-base"
self._processor: Optional[AutoProcessor] = None
self._model_transcriptions: Optional[WhisperForConditionalGeneration] = None
# ----------------------- Только для внутреннего использования внутри класса
# Названия мультимодальных корпусов
self.__multi_corpora: List[str] = ["fi", "mupta"]
self.__lang_traslate: List[str] = ["ru", "en"]
self.lang_traslate: List[str] = self.__lang_traslate
self.__parse_text_features: Optional[FunctionType] = None # Парсинг экспертных признаков
self.__category_text_features: List[str] = [] # Словарь с экспертными признаками
# Поддерживаемые текстовые форматы
self.__supported_text_formats: List[str] = ["txt"]
self.__contractions_dict = {
"ain't": "are not",
"'s": " is",
"aren't": "are not",
"can't": "cannot",
"can't've": "cannot have",
"'cause": "because",
"‘cause": "because",
"could've": "could have",
"couldn't": "could not",
"couldn't've": "could not have",
"didn't": "did not",
"doesn't": "does not",
"don't": "do not",
"hadn't": "had not",
"hadn't've": "had not have",
"hasn't": "has not",
"haven't": "have not",
"he'd": "he would",
"he'd've": "he would have",
"he'll": "he will",
"he'll've": "he will have",
"how'd": "how did",
"how'd'y": "how do you",
"how'll": "how will",
"how're": "how are",
"i'd": "i would",
"i'd've": "i would have",
"i'll": "i will",
"i 'll": "i will",
"i'll've": "i will have",
"i'm": "i am",
"i've": "i have",
"isn't": "is not",
"it'd": "it would",
"it'd've": "it would have",
"it'll": "it will",
"it'll've": "it will have",
"let's": "let us",
"ma'am": "madam",
"mayn't": "may not",
"might've": "might have",
"mightn't": "might not",
"mightn't've": "might not have",
"must've": "must have",
"mustn't": "must not",
"mustn't've": "must not have",
"needn't": "need not",
"needn't've": "need not have",
"o'clock": "of the clock",
"oughtn't": "ought not",
"oughtn't've": "ought not have",
"shan't": "shall not",
"sha'n't": "shall not",
"shan't've": "shall not have",
"she'd": "she would",
"she'd've": "she would have",
"she'll": "she will",
"she'll've": "she will have",
"should've": "should have",
"shouldn't": "should not",
"shouldn't've": "should not have",
"so've": "so have",
"that'd": "that would",
"that'll": "that will",
"that'd've": "that would have",
"there'd": "there would",
"there'd've": "there would have",
"there'll": "there will",
"there're": "there are",
"they'd": "they would",
"they'd've": "they would have",
"they'll": "they will",
"they'll've": "they will have",
"they're": "they are",
"they've": "they have",
"to've": "to have",
"wasn't": "was not",
"we'd": "we would",
"we'd've": "we would have",
"we'll": "we will",
"we'll've": "we will have",
"we're": "we are",
"we've": "we have",
"weren't": "were not",
"what'll": "what will",
"what'll've": "what will have",
"what're": "what are",
"what've": "what have",
"what'd": "what would",
"when've": "when have",
"where'd": "where did",
"where've": "where have",
"who'll": "who will",
"who'll've": "who will have",
"who've": "who have",
"who'd": "who would",
"why've": "why have",
"will've": "will have",
"won't": "will not",
"won't've": "will not have",
"why'd": "why would",
"would've": "would have",
"wouldn't": "would not",
"wouldn't've": "would not have",
"ya'll": "you all",
"y'all": "you all",
"y'all'd": "you all would",
"y'all'd've": "you all would have",
"y'all're": "you all are",
"y'all've": "you all have",
"you'd": "you would",
"you'd've": "you would have",
"you'll": "you will",
"you'll've": "you will have",
"you're": "you are",
"you've": "you have",
}
self.__forced_decoder_ids: Optional[List[Tuple[int, int]]] = None
self.__text_pred: str = ""
self.__len_paths: int = 0 # Количество искомых файлов
self.__local_path: Union[Callable[[str], str], None] = None # Локальный путь
# Ключи для точности
self.__df_accuracy_index: List[str] = ["MAE", "Accuracy"]
self.__df_accuracy_index_name: str = "Metrics"
self.__df_accuracy_mean: str = "Mean"
# ------------------------------------------------------------------------------------------------------------------
# Свойства
# ------------------------------------------------------------------------------------------------------------------
@property
def text_model_hc_(self) -> Optional[nn.Module]:
"""Получение нейросетевой модели **nn.Module** для получения оценок по экспертным признакам
Returns:
Optional[nn.Module]: Нейросетевая модель **nn.Module** или None
"""
return self._text_model_hc
@property
def text_model_nn_(self) -> Optional[nn.Module]:
"""Получение нейросетевой модели **nn.Module** для получения оценок по нейросетевым признакам
Returns:
Optional[nn.Module]: Нейросетевая модель **tnn.Module** или None
"""
return self._text_model_nn
@property
def text_model_b5_(self) -> Optional[nn.Module]:
"""Получение нейросетевой модели **nn.Module** для получения оценок персональных качеств
Returns:
Optional[nn.Module]: Нейросетевая модель **nn.Module** или None
"""
return self._text_model_b5
# ------------------------------------------------------------------------------------------------------------------
# Внутренние методы (приватные)
# ------------------------------------------------------------------------------------------------------------------
def __load_model_weights(
self,
url: str,
force_reload: bool = True,
info_text: str = "",
out: bool = True,
runtime: bool = True,
run: bool = True,
) -> bool:
"""Загрузка весов нейросетевой модели
.. note::
private (приватный метод)
Args:
url (str): Полный путь к файлу с весами нейросетевой модели
force_reload (bool): Принудительная загрузка файла с весами нейросетевой модели из сети
info_text (str): Текст для информационного сообщения
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если веса нейросетевой модели загружены, в обратном случае **False**
"""
self._clear_notebook_history_output() # Очистка истории вывода сообщений в ячейке Jupyter
try:
# Проверка аргументов
if (
type(url) is not str
or not url
or type(force_reload) is not bool
or type(info_text) is not str
or not info_text
or type(out) is not bool
or type(runtime) is not bool
or type(run) is not bool
):
raise TypeError
except TypeError:
self._inv_args(__class__.__name__, self.__load_model_weights.__name__, out=out)
return False
else:
# Блокировка выполнения
if run is False:
self._error(self._lock_user, out=out)
return False
if runtime:
self._r_start()
# Информационное сообщение
self._info(info_text, last=False, out=out)
sections = urlparse(url) # Парсинг URL адреса
try:
# URL файл невалидный
if sections.scheme == "":
raise requests.exceptions.InvalidURL
except requests.exceptions.InvalidURL:
url = os.path.normpath(url)
try:
if os.path.isfile(url) is False:
raise FileNotFoundError # Не файл
except FileNotFoundError:
self._other_error(self._load_model_weights_error, out=out)
return False
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
self._url_last_filename = url
return True
else:
try:
if force_reload is False:
clear_output(True)
# Загрузка файла из URL
res_download_file_from_url = self._download_file_from_url(
url=url, force_reload=force_reload, runtime=False, out=out, run=True
)
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
# Файл загружен
if res_download_file_from_url != 200:
return False
return True
finally:
if runtime:
self._r_end(out=out)
def __load_text_features(
self,
url: str,
force_reload: bool = True,
info_text: str = "",
out: bool = True,
runtime: bool = True,
run: bool = True,
) -> bool:
"""Загрузка словаря с экспертными признаками
.. note::
private (приватный метод)
Args:
url (str): Полный путь к файлу с экспертными признаками
force_reload (bool): Принудительная загрузка файла с экспертными признаками из сети
info_text (str): Текст для информационного сообщения
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если словарь с экспертными признаками загружен, в обратном случае **False**
"""
self._clear_notebook_history_output() # Очистка истории вывода сообщений в ячейке Jupyter
try:
# Проверка аргументов
if (
type(url) is not str
or not url
or type(force_reload) is not bool
or type(info_text) is not str
or not info_text
or type(out) is not bool
or type(runtime) is not bool
or type(run) is not bool
):
raise TypeError
except TypeError:
self._inv_args(__class__.__name__, self.__load_text_features.__name__, out=out)
return False
else:
# Блокировка выполнения
if run is False:
self._error(self._lock_user, out=out)
return False
if runtime:
self._r_start()
# Информационное сообщение
self._info(info_text, last=False, out=out)
sections = urlparse(url) # Парсинг URL адреса
try:
# URL файл невалидный
if sections.scheme == "":
raise requests.exceptions.InvalidURL
except requests.exceptions.InvalidURL:
url = os.path.normpath(url)
try:
if os.path.isfile(url) is False:
raise FileNotFoundError # Не файл
except FileNotFoundError:
self._other_error(self._load_text_features_error, out=out)
return False
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
self._url_last_filename = url
return True
else:
try:
if force_reload is False:
clear_output(True)
# Загрузка файла из URL
res_download_file_from_url = self._download_file_from_url(
url=url, force_reload=force_reload, runtime=False, out=out, run=True
)
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
# Файл загружен
if res_download_file_from_url != 200:
return False
return True
finally:
if runtime:
self._r_end(out=out)
def __load_bert_model(
self,
url: str,
force_reload: bool = True,
out: bool = True,
runtime: bool = True,
run: bool = True,
) -> bool:
"""Загрузка нейросетевой модели BERT
.. note::
private (приватный метод)
Args:
url (str): Полный путь к файлу с нейросетевой модели BERT
force_reload (bool): Принудительная загрузка файла с нейросетевой модели BERT из сети
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если нейросетевая модель BERT загружена, в обратном случае **False**
"""
try:
# Проверка аргументов
if (
type(url) is not str
or not url
or type(force_reload) is not bool
or type(out) is not bool
or type(runtime) is not bool
or type(run) is not bool
):
raise TypeError
except TypeError:
self._inv_args(__class__.__name__, self.__load_bert_model.__name__, out=out)
return False
else:
# Блокировка выполнения
if run is False:
self._error(self._lock_user, out=out)
return False
if runtime:
self._r_start()
sections = urlparse(url) # Парсинг URL адреса
try:
# URL файл невалидный
if sections.scheme == "":
raise requests.exceptions.InvalidURL
except requests.exceptions.InvalidURL:
url = os.path.normpath(url)
try:
if os.path.isfile(url) is False:
raise FileNotFoundError # Не файл
except FileNotFoundError:
self._other_error(self._load_bert_model_error, out=out)
return False
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
self._url_last_filename = url
return True
else:
try:
if force_reload is False:
clear_output(True)
# Загрузка файла из URL
res_download_file_from_url = self._download_file_from_url(
url=url, force_reload=force_reload, runtime=False, out=out, run=True
)
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
# Файл загружен
if res_download_file_from_url != 200:
return False
return True
finally:
if runtime:
self._r_end(out=out)
def __translate_and_extract_features(
self,
text: str,
lang: str,
show_text: bool = False,
last: bool = False,
out: bool = True,
) -> Tuple[np.ndarray, np.ndarray]:
"""Извлечение признаков из текста
.. note::
private (приватный метод)
Args:
text (str): Текст
lang (str): Язык
show_text (bool): Отображение текста
last (bool): Замена последнего сообщения
out (bool): Отображение
Returns:
Tuple[np.ndarray, np.ndarray]: Кортеж с двумя np.ndarray:
1. np.ndarray с экспертными признаками
2. np.ndarray с нейросетевыми признаками
"""
contractions_re = re.compile("(%s)" % "|".join(self.__contractions_dict.keys()))
expand_contractions = lambda s: contractions_re.sub(lambda match: self.__contractions_dict[match.group(0)], s)
get_norm_text = lambda text: re.sub(
r"(?<=[.,])(?=[^\s])",
" ",
re.sub(
r"\[[^\[\]]+\]",
"",
expand_contractions(
re.sub(
r'[.,"\'?:!/;]',
"",
re.sub(r"((?<=^)(\s*?(\-)??))|(((\-)??\s*?)(?=$))", "", text.lower().strip()),
)
),
),
)
norm_features = lambda feature, length: np.pad(
feature[:length, :], ((0, max(0, length - feature.shape[0])), (0, 0)), "constant"
)
if lang == self.__lang_traslate[0]:
if len(text) > 700:
translation = ""
for sentence in text.split(".")[:-1]:
input_ids = self._tokenizer.encode(sentence + ".", return_tensors="pt")
outputs = self._traslate_model.generate(input_ids.to(self._device), max_new_tokens=4000)
translation += self._tokenizer.decode(outputs[0], skip_special_tokens=True) + " "
else:
input_ids = self._tokenizer.encode(text, return_tensors="pt")
outputs = self._traslate_model.generate(input_ids.to(self._device), max_new_tokens=4000)
translation = self._tokenizer.decode(outputs[0], skip_special_tokens=True)
translation = re.sub(r"(?<=[.,])(?=[^\s])", r" ", translation)
else:
translation = text
text = get_norm_text(text)
translation = get_norm_text(translation)
encoded_input = self._bert_tokenizer(text, return_tensors="pt")
dict_new = {}
if encoded_input["input_ids"].shape[1] > 512:
dict_new["input_ids"] = encoded_input["input_ids"][:, :512].to(self._device)
dict_new["token_type_ids"] = encoded_input["token_type_ids"][:, :512].to(self._device)
dict_new["attention_mask"] = encoded_input["attention_mask"][:, :512].to(self._device)
encoded_input = dict_new
else:
dict_new["input_ids"] = encoded_input["input_ids"].to(self._device)
dict_new["token_type_ids"] = encoded_input["token_type_ids"].to(self._device)
dict_new["attention_mask"] = encoded_input["attention_mask"].to(self._device)
encoded_input = dict_new
features_bert = self._bert_model(**encoded_input)[0][0].detach().cpu().numpy()
features_liwc = []
for i in translation.split(" "):
curr_f = np.zeros((1, 64))
for j in self.__parse_text_features(i):
curr_f[:, self.__category_text_features.index(j)] += 1
features_liwc.extend(curr_f)
features_liwc = np.array(features_liwc)
if lang == self.__lang_traslate[0]:
features_bert = norm_features(features_bert, 414)
features_liwc = norm_features(features_liwc, 365)
elif lang == self.__lang_traslate[1]:
features_bert = norm_features(features_bert, 104)
features_liwc = norm_features(features_liwc, 89)
if not show_text:
text = None
if last is False:
# Статистика извлеченных признаков из текста
self._stat_text_features(
last=last,
out=out,
shape_hc_features=np.array(features_liwc).shape,
shape_nn_features=np.array(features_bert).shape,
text=text,
)
return features_liwc, features_bert
def __process_audio_and_extract_features(
self, path: str, win: int, lang: str, show_text: bool, last: bool, out: bool, url: str = None
) -> Tuple[np.ndarray, np.ndarray]:
# TODO: ВНИМАНИЕ! добавить в аргументы
if url:
self._path_to_transriber = url
if not self._processor:
self._processor = AutoProcessor.from_pretrained(self._path_to_transriber)
self._model_transcriptions = WhisperForConditionalGeneration.from_pretrained(self._path_to_transriber).to(
self._device
)
self._model_transcriptions.config.forced_decoder_ids = None
path_to_wav = os.path.join(str(Path(path).parent), Path(path).stem + "." + "wav")
if not Path(path_to_wav).is_file():
if Path(path).suffix not in ["mp3", "wav"]:
ff_audio = "ffmpeg -loglevel quiet -i {} -vn -acodec pcm_s16le -ar 44100 -ac 2 {}".format(
path, path_to_wav
)
call_audio = subprocess.call(ff_audio, shell=True)
if call_audio != 0:
self._other_error(self._unknown_err, last=last, out=out)
return np.empty([]), np.empty([])
wav, sr = torchaudio.load(path_to_wav)
if wav.size(0) > 1:
wav = wav.mean(dim=0, keepdim=True)
if sr != 16000:
transform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=16000)
wav = transform(wav)
sr = 16000
wav = wav.squeeze(0)
for start in range(0, len(wav), win):
inputs = self._processor(wav[start : start + win], sampling_rate=16000, return_tensors="pt")
input_features = inputs.input_features.to(self._device)
if lang == self.__lang_traslate[0]:
generated_ids = self._model_transcriptions.generate(
input_features=input_features,
language="ru",
forced_decoder_ids=None,
)
elif lang == self.__lang_traslate[1]:
generated_ids = self._model_transcriptions.generate(input_features=input_features, language="en")
transcription = self._processor.batch_decode(generated_ids, skip_special_tokens=False)
transcription = re.findall(r"> ([^<>]+)", transcription[0])
self.__text_pred += transcription[0] + " "
self.__text_pred = self.__text_pred.strip()
return self.__translate_and_extract_features(self.__text_pred, lang, show_text, last, out)
def __load_text_model_b5(self, show_summary: bool = False, out: bool = True) -> Optional[nn.Module]:
"""Формирование нейросетевой архитектуры модели для получения оценок персональных качеств
.. note::
private (приватный метод)
Args:
show_summary (bool): Отображение сформированной нейросетевой архитектуры модели
out (bool): Отображение
Returns:
Optional[nn.Module]:
**None** если неверные типы или значения аргументов, в обратном случае нейросетевая модель
**nn.Module** для получения оценок персональных качеств
"""
try:
# Проверка аргументов
if type(show_summary) is not bool or type(out) is not bool:
raise TypeError
except TypeError:
self._inv_args(__class__.__name__, self.__load_text_model_b5.__name__, out=out)
return None
else:
model = text_model_b5()
if show_summary and out:
print(model)
return model
# ------------------------------------------------------------------------------------------------------------------
# Внутренние методы (защищенные)
# ------------------------------------------------------------------------------------------------------------------
[docs]
def _get_text_features(
self,
path: str,
asr: bool = False,
lang: str = "ru",
show_text: bool = False,
last: bool = False,
out: bool = True,
runtime: bool = True,
run: bool = True,
) -> Tuple[np.ndarray, np.ndarray]:
"""Извлечение признаков из текста (без очистки истории вывода сообщений в ячейке Jupyter)
.. note::
protected (защищенный метод)
Args:
path (str): Путь к видеофайлу или текст
asr (bool): Автоматическое распознавание речи
lang (str): Язык
show_text (bool): Отображение текста
last (bool): Замена последнего сообщения
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
Tuple[np.ndarray, np.ndarray]: Кортеж с двумя np.ndarray:
1. np.ndarray с экспертными признаками
2. np.ndarray с нейросетевыми признаками
"""
try:
# Проверка аргументов
if (
(type(path) is not str or not path)
and (type(path) is not gradio.utils.NamedString)
or type(asr) is not bool
or not isinstance(lang, str)
or lang not in self.__lang_traslate
or type(last) is not bool
or type(show_text) is not bool
or type(out) is not bool
or type(runtime) is not bool
or type(run) is not bool
):
raise TypeError
except TypeError:
self._inv_args(__class__.__name__, self._get_text_features.__name__, last=last, out=out)
return np.empty([]), np.empty([])
else:
# Блокировка выполнения
if run is False:
self._error(self._lock_user, last=last, out=out)
return np.empty([]), np.empty([])
if runtime:
self._r_start()
if last is False:
# Информационное сообщение
self._info(self._get_text_feature_info, out=False)
if out:
self.show_notebook_history_output() # Отображение истории вывода сообщений в ячейке Jupyter
win = 400000
try:
self.__text_pred = ""
if os.path.isfile(path) is False:
raise FileNotFoundError # Не файл
except FileNotFoundError:
try:
path_to_text = os.path.join(
str(Path(path).parent), Path(path).stem + "." + self.__supported_text_formats[0]
)
if os.path.isfile(path_to_text) is False:
raise FileNotFoundError # Не текстовый файл
except FileNotFoundError:
self.__text_pred = path.strip()
return self.__translate_and_extract_features(self.__text_pred, lang, show_text, last, out)
except Exception:
self._other_error(self._unknown_err, last=last, out=out)
return np.empty([]), np.empty([])
else:
try:
with open(path_to_text, "r", encoding="utf-8") as file:
lines = file.readlines()
self.__text_pred = " ".join(line.strip() for line in lines)
except Exception:
self._other_error(self._unknown_err, last=last, out=out)
return np.empty([]), np.empty([])
else:
try:
if not self.__text_pred:
raise ValueError
except ValueError:
self._other_error(
self._text_is_empty.format(self._info_wrapper(path_to_text)), last=last, out=out
)
return np.empty([]), np.empty([])
else:
return np.empty([]), np.empty([])
except Exception:
self._other_error(self._unknown_err, last=last, out=out)
return np.empty([]), np.empty([])
else:
try:
path_to_text = os.path.join(
str(Path(path).parent), Path(path).stem + "." + self.__supported_text_formats[0]
)
if os.path.isfile(path_to_text) is False:
raise FileNotFoundError # Не текстовый файл
except FileNotFoundError:
return self.__process_audio_and_extract_features(path, win, lang, show_text, last, out)
except Exception:
self._other_error(self._unknown_err, last=last, out=out)
return np.empty([]), np.empty([])
else:
try:
with open(path_to_text, "r", encoding="utf-8") as file:
lines = file.readlines()
self.__text_pred = " ".join(line.strip() for line in lines)
except Exception:
self._other_error(self._unknown_err, last=last, out=out)
return np.empty([]), np.empty([])
else:
try:
if not self.__text_pred:
raise ValueError
except ValueError:
self._other_error(
self._text_is_empty.format(self._info_wrapper(path_to_text)), last=last, out=out
)
return np.empty([]), np.empty([])
else:
if asr:
return self.__process_audio_and_extract_features(path, win, lang, show_text, last, out)
else:
return self.__translate_and_extract_features(
self.__text_pred, lang, show_text, last, out
)
finally:
if runtime:
self._r_end(out=out)
# ------------------------------------------------------------------------------------------------------------------
# Внешние методы
# ------------------------------------------------------------------------------------------------------------------
[docs]
def load_text_model_hc(
self, corpus: str = "", show_summary: bool = False, out: bool = True, runtime: bool = True, run: bool = True
):
"""Формирование нейросетевой архитектуры модели для получения оценок по экспертным признакам
Args:
corpus (str): Корпус для тестирования нейросетевой модели
show_summary (bool): Отображение сформированной нейросетевой архитектуры модели
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если нейросетевая архитектура модели сформирована, в обратном случае **False**
"""
self._clear_notebook_history_output() # Очистка истории вывода сообщений в ячейке Jupyter
try:
# Проверка аргументов
if (
type(corpus) is not str
or not corpus
or (corpus in self.__multi_corpora) is False
or type(show_summary) is not bool
or type(out) is not bool
or type(runtime) is not bool
or type(run) is not bool
):
raise TypeError
except TypeError:
self._inv_args(__class__.__name__, self.load_text_model_hc.__name__, out=out)
return False
else:
# Блокировка выполнения
if run is False:
self._error(self._lock_user, out=out)
return False
if runtime:
self._r_start()
# Информационное сообщение
self._info(self._formation_text_model_hc, last=False, out=False)
if out:
self.show_notebook_history_output() # Отображение истории вывода сообщений в ячейке Jupyter
# fi
if corpus is self.__multi_corpora[0]:
input_shape = (89, 64)
# mupta
elif corpus is self.__multi_corpora[1]:
input_shape = (365, 64)
else:
input_shape = (89, 64)
self._text_model_hc = text_model_hc(input_shape)
if show_summary and out:
print(self._text_model_hc)
if runtime:
self._r_end(out=out)
return True
[docs]
def load_text_model_nn(
self, corpus: str = "", show_summary: bool = False, out: bool = True, runtime: bool = True, run: bool = True
) -> bool:
"""Формирование нейросетевой архитектуры для получения оценок по нейросетевым признакам
Args:
corpus (str): Корпус для тестирования нейросетевой модели
show_summary (bool): Отображение сформированной нейросетевой архитектуры модели
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если нейросетевая архитектура модели сформирована, в обратном случае **False**
"""
self._clear_notebook_history_output() # Очистка истории вывода сообщений в ячейке Jupyter
try:
# Проверка аргументов
if (
type(corpus) is not str
or not corpus
or (corpus in self.__multi_corpora) is False
or type(show_summary) is not bool
or type(out) is not bool
or type(runtime) is not bool
or type(run) is not bool
):
raise TypeError
except TypeError:
self._inv_args(__class__.__name__, self.load_text_model_nn.__name__, out=out)
return False
else:
# Блокировка выполнения
if run is False:
self._error(self._lock_user, out=out)
return False
if runtime:
self._r_start()
# Информационное сообщение
self._info(self._formation_text_model_nn, last=False, out=False)
if out:
self.show_notebook_history_output() # Отображение истории вывода сообщений в ячейке Jupyter
# fi
if corpus is self.__multi_corpora[0]:
input_shape = (104, 768)
# mupta
elif corpus is self.__multi_corpora[1]:
input_shape = (414, 768)
else:
input_shape = (104, 768)
self._text_model_nn = text_model_nn(input_shape)
if show_summary and out:
print(self._text_model_nn)
if runtime:
self._r_end(out=out)
return True
[docs]
def load_text_model_b5(
self, show_summary: bool = False, out: bool = True, runtime: bool = True, run: bool = True
) -> bool:
"""Формирование нейросетевой архитектуры модели для получения результатов оценки персональных качеств
Args:
show_summary (bool): Отображение сформированной нейросетевой архитектуры модели
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если нейросетевая архитектура модели сформирована, в обратном случае **False**
"""
self._clear_notebook_history_output() # Очистка истории вывода сообщений в ячейке Jupyter
try:
# Проверка аргументов
if (
type(show_summary) is not bool
or type(out) is not bool
or type(runtime) is not bool
or type(run) is not bool
):
raise TypeError
except TypeError:
self._inv_args(__class__.__name__, self.load_text_model_b5.__name__, out=out)
return False
else:
# Блокировка выполнения
if run is False:
self._error(self._lock_user, out=out)
return False
if runtime:
self._r_start()
# Информационное сообщение
self._info(self._formation_text_model_b5, last=False, out=False)
if out:
self.show_notebook_history_output() # Отображение истории вывода сообщений в ячейке Jupyter
self._text_model_b5 = self.__load_text_model_b5()
if show_summary and out:
self._text_model_b5.summary()
if runtime:
self._r_end(out=out)
return True
[docs]
def load_text_model_weights_hc(
self, url: str, force_reload: bool = True, out: bool = True, runtime: bool = True, run: bool = True
) -> bool:
"""Загрузка весов нейросетевой модели для получения оценок по экспертным признакам
Args:
url (str): Полный путь к файлу с весами нейросетевой модели
force_reload (bool): Принудительная загрузка файла с весами нейросетевой модели из сети
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если веса нейросетевой модели загружены, в обратном случае **False**
"""
if runtime:
self._r_start()
if self.__load_model_weights(url, force_reload, self._load_text_model_weights_hc, out, False, run) is True:
try:
self._text_model_hc.load_state_dict(torch.load(self._url_last_filename, weights_only=True))
self._text_model_hc.to(self._device).eval()
except Exception:
self._error(self._model_text_hc_not_formation, out=out)
return False
else:
return True
finally:
if runtime:
self._r_end(out=out)
return False
[docs]
def load_text_model_weights_nn(
self, url: str, force_reload: bool = True, out: bool = True, runtime: bool = True, run: bool = True
) -> bool:
"""Загрузка весов нейросетевой модели для получения оценок по нейросетевым признакам
Args:
url (str): Полный путь к файлу с весами нейросетевой модели
force_reload (bool): Принудительная загрузка файла с весами нейросетевой модели из сети
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если веса нейросетевой модели загружены, в обратном случае **False**
"""
if runtime:
self._r_start()
if self.__load_model_weights(url, force_reload, self._load_text_model_weights_nn, out, False, run) is True:
try:
self._text_model_nn.load_state_dict(torch.load(self._url_last_filename, weights_only=True))
self._text_model_nn.to(self._device).eval()
except Exception:
self._error(self._model_text_nn_not_formation, out=out)
return False
else:
return True
finally:
if runtime:
self._r_end(out=out)
return False
[docs]
def load_text_model_weights_b5(
self,
url: str,
force_reload: bool = True,
out: bool = True,
runtime: bool = True,
run: bool = True,
) -> bool:
"""Загрузка весов нейросетевой модели для получения оценок персональных качеств
Args:
url (str): Полный путь к файлу с весами нейросетевой модели
force_reload (bool): Принудительная загрузка файлов с весами нейросетевых моделей из сети
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если веса нейросетевой модели загружены, в обратном случае **False**
"""
if runtime:
self._r_start()
if self.__load_model_weights(url, force_reload, self._load_text_model_weights_b5, out, False, run) is True:
try:
self._text_model_b5.load_state_dict(torch.load(self._url_last_filename, weights_only=True))
self._text_model_b5.to(self._device).eval()
except Exception:
self._error(self._model_text_not_formation, out=out)
return False
else:
return True
finally:
if runtime:
self._r_end(out=out)
return False
[docs]
def load_text_features(
self, url: str = None, force_reload: bool = True, out: bool = True, runtime: bool = True, run: bool = True
) -> bool:
"""Загрузка словаря с экспертными признаками
Args:
url (str): Полный путь к лингвистическому словарю
force_reload (bool): Принудительная загрузка файла с весами нейросетевой модели из сети
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если словарь с экспертными признаками загружен, в обратном случае **False**
"""
if runtime:
self._r_start()
# TODO: ВНИМАНИЕ! добавить в аргументы
if url:
self._text_features = url
force_reload = False
if (
self.__load_text_features(self._text_features, force_reload, self._load_text_features, out, False, run)
is True
):
try:
self.__parse_text_features, self.__category_text_features = liwc.load_token_parser(
self._url_last_filename
)
self.__category_text_features = sorted(self.__category_text_features)
except Exception:
self._error(self._load_token_parser_error, out=out)
return False
else:
return True
finally:
if runtime:
self._r_end(out=out)
[docs]
def setup_translation_model(
self, url: str = None, out: bool = True, runtime: bool = True, run: bool = True
) -> bool:
"""Формирование токенизатора и нейросетевой модели машинного перевода
Args:
url: Полный путь к файлу с моделью для перевода языка
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если токенизатор и нейросетевая модель сформированы, в обратном случае **False**
"""
self._clear_notebook_history_output() # Очистка истории вывода сообщений в ячейке Jupyter
try:
# Проверка аргументов
if type(out) is not bool or type(runtime) is not bool or type(run) is not bool:
raise TypeError
except TypeError:
self._inv_args(__class__.__name__, self.setup_translation_model.__name__, out=out)
return False
else:
# Блокировка выполнения
if run is False:
self._error(self._lock_user, out=out)
return False
if runtime:
self._r_start()
# Информационное сообщение
self._info(self._load_translation_model, last=False, out=out)
# TODO: ВНИМАНИЕ! добавить в аргументы
if url:
self._translation_model = url
try:
self._tokenizer = MarianTokenizer.from_pretrained(
self._translation_model, clean_up_tokenization_spaces=True
)
self._traslate_model = AutoModelForSeq2SeqLM.from_pretrained(self._translation_model).to(self._device)
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
return True
finally:
if runtime:
self._r_end(out=out)
[docs]
def setup_bert_encoder(
self, url: str = None, force_reload: bool = True, out: bool = True, runtime: bool = True, run: bool = True
) -> bool:
"""Формирование токенизатора и нейросетевой модели BERT
Args:
url: Полный путь к файлу с нейросетевой моделью BERT
force_reload (bool): Принудительная загрузка файла с нейросетевой моделью BERT из сети
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если токенизатор и нейросетевая модель BERT сформированы, в обратном случае **False**
"""
self._clear_notebook_history_output() # Очистка истории вывода сообщений в ячейке Jupyter
try:
# Проверка аргументов
if (
type(force_reload) is not bool
or type(out) is not bool
or type(runtime) is not bool
or type(run) is not bool
):
raise TypeError
except TypeError:
self._inv_args(__class__.__name__, self.setup_bert_encoder.__name__, out=out)
return False
else:
# Блокировка выполнения
if run is False:
self._error(self._lock_user, out=out)
return False
if runtime:
self._r_start()
# Информационное сообщение
self._info(self._load_bert_model, last=False, out=out)
# TODO: ВНИМАНИЕ! добавить в аргументы
if url:
self._bert_multi_model = url
force_reload = False
try:
self._bert_tokenizer = BertTokenizer.from_pretrained(self._bert_multi_model)
self._bert_model = BertModel.from_pretrained(self._bert_multi_model).to(self._device)
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
return True
# if self.__load_bert_model(self._bert_multi_model, force_reload, out, False, run) is True:
# try:
# # Распаковка архива
# res_unzip = self._unzip(
# path_to_zipfile=os.path.join(self._url_last_filename),
# new_name=None,
# force_reload=force_reload,
# )
# except Exception:
# self._other_error(self._unknown_err, out=out)
# return False
# else:
# # Файл распакован
# if res_unzip is True:
# try:
# self._bert_tokenizer = BertTokenizer.from_pretrained(Path(self._url_last_filename).stem)
# self._bert_model = BertModel.from_pretrained(Path(self._url_last_filename).stem).to(
# self._device
# )
# except Exception:
# self._other_error(self._unknown_err, out=out)
# return False
# else:
# return True
# else:
# return False
finally:
if runtime:
self._r_end(out=out)
[docs]
def get_text_features(
self,
path: str,
asr: bool = False,
lang: str = "ru",
show_text: bool = False,
out: bool = True,
runtime: bool = True,
run: bool = True,
):
"""Извлечение признаков из текста
Args:
path (str): Путь к видеофайлу или текст
asr (bool): Автоматическое распознавание речи
lang (str): Язык
show_text (bool): Отображение текста
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
Tuple[np.ndarray, np.ndarray]: Кортеж с двумя np.ndarray:
1. np.ndarray с экспертными признаками
2. np.ndarray с нейросетевыми признаками
"""
self._clear_notebook_history_output() # Очистка истории вывода сообщений в ячейке Jupyter
return self._get_text_features(
path=path,
asr=asr,
lang=lang,
show_text=show_text,
last=False,
out=out,
runtime=runtime,
run=run,
)
[docs]
def get_text_union_predictions(
self,
depth: int = 1,
recursive: bool = False,
asr: bool = False,
lang: str = "ru",
accuracy=True,
url_accuracy: str = "",
logs: bool = True,
out: bool = True,
runtime: bool = True,
run: bool = True,
) -> bool:
"""Получения прогнозов по тексту
Args:
depth (int): Глубина иерархии для получения данных
recursive (bool): Рекурсивный поиск данных
asr (bool): Автоматическое распознавание речи
lang (str): Язык
accuracy (bool): Вычисление точности
url_accuracy (str): Полный путь к файлу с верными предсказаниями для подсчета точности
logs (bool): При необходимости формировать LOG файл
out (bool): Отображение
runtime (bool): Подсчет времени выполнения
run (bool): Блокировка выполнения
Returns:
bool: **True** если прогнозы успешно получены, в обратном случае **False**
"""
self._clear_notebook_history_output() # Очистка истории вывода сообщений в ячейке Jupyter
# Сброс
self._df_files = pd.DataFrame() # Пустой DataFrame с данными
self._df_accuracy = pd.DataFrame() # Пустой DataFrame с результатами вычисления точности
try:
# Проверка аргументов
if (
type(depth) is not int
or depth < 1
or type(out) is not bool
or type(recursive) is not bool
or type(asr) is not bool
or not isinstance(lang, str)
or lang not in self.__lang_traslate
or type(accuracy) is not bool
or type(url_accuracy) is not str
or type(logs) is not bool
or type(runtime) is not bool
or type(run) is not bool
):
raise TypeError
except TypeError:
self._inv_args(__class__.__name__, self.get_text_union_predictions.__name__, out=out)
return False
else:
# Блокировка выполнения
if run is False:
self._error(self._lock_user, out=out)
return False
if runtime:
self._r_start()
try:
# Получение директорий, где хранятся данные
path_to_data = self._get_paths(self.path_to_dataset_, depth, out=out)
if type(path_to_data) is bool:
return False
if type(self.keys_dataset_) is not list:
raise TypeError
# Словарь для DataFrame набора данных с данными
self._dict_of_files = dict(zip(self.keys_dataset_, [[] for _ in range(0, len(self.keys_dataset_))]))
# Словарь для DataFrame набора данных с результатами вычисления точности
self._dict_of_accuracy = dict(
zip(self.keys_dataset_[1:], [[] for _ in range(0, len(self.keys_dataset_[1:]))])
)
except (TypeError, FileNotFoundError):
self._other_error(self._folder_not_found.format(self._info_wrapper(self.path_to_dataset_)), out=out)
return False
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
# Вычисление точности
if accuracy is True:
get_text_union_predictions_info = self._get_union_predictions_info + self._get_accuracy_info
else:
get_text_union_predictions_info = self._get_union_predictions_info
get_text_union_predictions_info += self._text_modality
# Вычисление точности
if accuracy is True:
# Информационное сообщение
self._info(get_text_union_predictions_info, out=out)
if not url_accuracy:
url_accuracy = self._true_traits["sberdisk"]
try:
# Загрузка верных предсказаний
data_true_traits = pd.read_csv(url_accuracy)
except (FileNotFoundError, URLError, UnicodeDecodeError):
self._other_error(self._load_data_true_traits_error, out=out)
return False
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
true_traits = []
self._del_last_el_notebook_history_output()
paths = [] # Пути до искомых файлов
# Проход по всем директориям
for curr_path in path_to_data:
empty = True # По умолчанию директория пустая
# Рекурсивный поиск данных
if recursive is True:
g = Path(curr_path).rglob("*")
else:
g = Path(curr_path).glob("*")
# Формирование словаря для DataFrame
for p in g:
try:
if type(self.ext_) is not list or len(self.ext_) < 1:
raise TypeError
self.ext_ = [x.lower() for x in self.ext_]
except TypeError:
self._other_error(self._wrong_ext, out=out)
return False
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
# Расширение файла соответствует расширению искомых файлов
if p.suffix.lower() in self.ext_:
if empty is True:
empty = False # Каталог не пустой
paths.append(p.resolve())
try:
self.__len_paths = len(paths) # Количество искомых файлов
if self.__len_paths == 0:
raise TypeError
except TypeError:
self._other_error(self._files_not_found, out=out)
return False
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
# Локальный путь
self.__local_path = lambda path: os.path.join(
*Path(path).parts[-abs((len(Path(path).parts) - len(Path(self.path_to_dataset_).parts))) :]
)
last = False # Замена последнего сообщения
with torch.no_grad():
# Проход по всем искомым файлов
for i, curr_path in enumerate(paths):
if i != 0:
last = True
# Индикатор выполнения
self._progressbar_union_predictions(
get_text_union_predictions_info,
i,
self.__local_path(curr_path),
self.__len_paths,
True,
last,
out,
)
hc_features, nn_features = self.get_text_features(
path=str(curr_path.resolve()), # Путь к видеофайлу
asr=asr, # Распознавание речи
lang=lang, # Выбор языка
show_text=True, # Отображение текста
out=False, # Отображение
runtime=False, # Подсчет времени выполнения
run=run, # Блокировка выполнения
)
hc_features = np.expand_dims(hc_features, axis=0)
nn_features = np.expand_dims(nn_features, axis=0)
# Признаки из текста извлечены
if len(hc_features) > 0 and len(nn_features) > 0:
# Коды ошибок нейросетевых моделей
code_error_pred_hc = -1
code_error_pred_nn = -1
try:
# Оправка экспертных признаков в нейросетевую модель
hc_features = torch.from_numpy(np.array(hc_features, dtype=np.float32))
pred_hc, _ = self.text_model_hc_(hc_features.to(self._device))
pred_hc = pred_hc.detach().cpu()
except TypeError:
code_error_pred_hc = 1
except Exception:
code_error_pred_hc = 2
try:
# Отправка нейросетевых признаков в нейросетевую модель
nn_features = torch.from_numpy(np.array(nn_features, dtype=np.float32))
pred_nn, _ = self.text_model_nn_(nn_features.to(self._device))
pred_nn = pred_nn.detach().cpu()
except TypeError:
code_error_pred_nn = 1
except Exception:
code_error_pred_nn = 2
if code_error_pred_hc != -1 and code_error_pred_nn != -1:
self._error(self._model_text_not_formation, out=out)
return False
if code_error_pred_hc != -1:
self._error(self._model_text_hc_not_formation, out=out)
return False
if code_error_pred_nn != -1:
self._error(self._model_text_nn_not_formation, out=out)
return False
# pred_hc = pred_hc.numpy()[0]
# pred_nn = pred_nn.numpy()[0]
final_pred = self._text_model_b5(pred_hc.to(self._device), pred_nn.to(self._device))
final_pred = final_pred.detach().cpu().numpy()[0].tolist()
# final_pred = self._text_model_b5([pred_hc, pred_nn]).numpy()[0].tolist()
# Добавление данных в словарь для DataFrame
if self._append_to_list_of_files(str(curr_path.resolve()), final_pred, out) is False:
return False
# Вычисление точности
if accuracy is True:
try:
true_trait = (
data_true_traits[data_true_traits.NAME_VIDEO == curr_path.name][
list(self._b5["en"])
]
.values[0]
.tolist()
)
except IndexError:
self._other_error(self._expert_values_not_found, out=out)
return False
except Exception:
self._other_error(self._unknown_err, out=out)
return False
else:
true_traits.append(true_trait)
else:
# Добавление данных в словарь для DataFrame
if (
self._append_to_list_of_files(
str(curr_path.resolve()), [None] * len(self._b5["en"]), out
)
is False
):
return False
# Индикатор выполнения
self._progressbar_union_predictions(
get_text_union_predictions_info,
self.__len_paths,
self.__local_path(paths[-1]),
self.__len_paths,
True,
last,
out,
)
# Отображение в DataFrame с данными
self._df_files = pd.DataFrame.from_dict(data=self._dict_of_files, orient="index").transpose()
self._df_files.index.name = self._keys_id
self._df_files.index += 1
self._df_files.index = self._df_files.index.map(str)
self._df_files.Path = [os.path.basename(i) for i in self._df_files.Path]
# Отображение
if out is True:
self._add_notebook_history_output(self._df_files.iloc[0 : self.num_to_df_display_, :])
# Подсчет точности
if accuracy is True:
mae_curr = []
for cnt, name_b5 in enumerate(self._df_files.keys().tolist()[1:]):
mae_curr.append(
mean_absolute_error(np.asarray(true_traits)[:, cnt], self._df_files[name_b5].to_list())
)
mae_curr = [round(float(i), 4) for i in mae_curr]
mae_mean = round(float(np.mean(mae_curr)), 4)
accuracy_curr = [round(float(i), 4) for i in 1 - np.asarray(mae_curr)]
accuracy_mean = round(float(np.mean(accuracy_curr)), 4)
for curr_acc in [mae_curr, accuracy_curr]:
# Добавление данных в словарь для DataFrame с результатами вычисления точности
if self._append_to_list_of_accuracy(curr_acc, out) is False:
return False
self._dict_of_accuracy.update({self.__df_accuracy_mean: [mae_mean, accuracy_mean]})
# Отображение в DataFrame с данными
self._df_accuracy = pd.DataFrame.from_dict(
data=self._dict_of_accuracy, orient="index"
).transpose()
self._df_accuracy.index = self.__df_accuracy_index
self._df_accuracy.index.name = self.__df_accuracy_index_name
# Информационное сообщение
self._info(self._get_union_predictions_result, out=False)
# Отображение
if out is True:
self._add_notebook_history_output(self._df_accuracy.iloc[0 : self.num_to_df_display_, :])
self._info(
self._get_union_predictions_results_mean.format(
self._info_wrapper(str(mae_mean)), self._info_wrapper(str(accuracy_mean))
),
out=False,
)
clear_output(True)
# Отображение истории вывода сообщений в ячейке Jupyter
if out is True:
self.show_notebook_history_output()
if logs is True:
# Текущее время для лог-файла
# см. datetime.fromtimestamp()
curr_ts = str(datetime.now().timestamp()).replace(".", "_")
name_logs_file = self.get_text_union_predictions.__name__
# Сохранение LOG
res_save_logs_df_files = self._save_logs(
self._df_files, name_logs_file + "_df_files_" + curr_ts
)
# Подсчет точности
if accuracy is True:
# Сохранение LOG
res_save_logs_df_accuracy = self._save_logs(
self._df_accuracy, name_logs_file + "_df_accuracy_" + curr_ts
)
if res_save_logs_df_files is True:
# Сохранение LOG файла/файлов
if accuracy is True and res_save_logs_df_accuracy is True:
logs_s = self._logs_saves_true
else:
logs_s = self._logs_save_true
self._info_true(logs_s, out=out)
return True
finally:
if runtime:
self._r_end(out=out)