Импорт данных
- Получение списка и состояний подключений
- Импорт данных из коннектора
- Регулярное обновление данных в проекте
- Одновременный запуск импорта нескольких проектов использующих один конектор
- Управление процессами импорта (мониторинг статусов, отстановка, удаление)
- Логирование данных о загрузке скрипта (обновление данных в базе clickhouse)
- Работа скрипта загрузки
Получение списка и состояний подключений
Пользовательские сценарии
Стартовое состояние – пользователь находится в конструкторе дашбордов
Роль – Аналитик
Действие: Пользователь нажимает на кнопку "В диспетчер данных"
Результат:
- Открыт диспетчер данных на вкладке "Скрипт загрузки".
- Список подключений пустой.
- Каждые 10 секунд появляется сообщение:
Роль – Разработчик/Администратор
Действие: Пользователь нажимает на кнопку "В диспетчер данных"
Результат:
- Открыт диспетчер данных на вкладке "Скрипт загрузки".
- В списке подключений отображаются доступные пользователю источники (права доступа для этих источников у этого пользователя "Управление" или "Использование").
- Виден текущий статус подключений:
- "Доступен" – источник обнаружен и может быть использован. Визуально определяется по отсутствию дополнительных значков на источнике.
- "Сломан" – источник был создан, однако впоследствии возникла ошибка, препятствующая его использованию. Визуально определяется красной подсветкой названия источника и дополнительной иконкой с восклицательным знаком.
- "Занят" – источник доступен, но в текущий момент времени происходит загрузка данных, поэтому до конца загрузки его нельзя использовать. Визуально определяется "лоадером" на источнике.
- Виден тип источников:
- Файл – рядом с названием источника будет значок "страница"
- БД – рядом с названием источника будет значок "база данных"
- API – рядом с названием источника будет значок "облако"
Необходимые изменения
Проблема: Некорректное отображение в интерфейсе "сломанных" источников.
Даже если источник доступен, может возникнуть ошибка, препятствующая его использованию. В таком случае статус последней загрузке будет "aborted", а не 'success'.
Решение: Считать "сломанными" не только те источники, у которых "isValid" = False, но и те, у которых "status" = "aborted"
Схема потока данных
Техническое описание
* коннектор = источник
Запрос на получение списка коннекторов (отправляется на сервис Nest.js, в примере ссылка на стенд dev4):
GET https://fastbord-back-dev4.fb-dev.winsolutions.ru/api/v1/sources/list/{projectId}, где projectId – id проекта, для которого запрашиваются источники
Последовательность выполнения запроса (на стороне Nest.js):
- Регистрация запроса в контроллере (GET 'list/:projectId?'), проверка авторизации, соответствия роли пользователя (админ или разработчик) – подтверждает доступ для получения списка
- Получение списка доступных коннекторов, на основе userId из базы данных postgres
- Подписка на каналы rabbitMQ по заданным топикам для каждого источника
- На основе projectId из базы данных запрашиваются страницы скрипта загрузки
- Выполняется проверка, если коннектор когда-либо использовался в скрипте загрузки, в объект ответа добавляется поле "lastLoading" с информацией о последней загрузке коннектора в проект
Ответ содержит массив, состоящий из объектов вида:
{
"id": "d11e1bf4-8021-4fa0-ae91-c6b5066d5949",
"isValid": true,
"createdAt": "2023-10-04T23:49:27.953Z",
"updatedAt": "2024-05-20T04:13:58.332Z",
"name": "rasnikov-pg-test",
"driver": "postgres",
"type": "database",
"selected": false
}
где:
- id – идентификатор проекта
- isValid – статус проекта (булево поле, показывает, доступен ли коннектор).
- createdAt – дата/время создания подключения
- updatedAt – дата/время последнего изменения подключения
- name – название подключения
-
selected - источник выбран (существует в скрипте загрузки) или уже используется в проекте
lastLoading - часть ответа, которая добавляется к объекту коннектора, если ранее коннектор был использован и запущен в каком-либо скрипте загрузки (неважно исправный источник или нет). Существует только у файловых источников
{
"id": "28d1f145-506d-4df8-9ed8-ba78cb75626e",
"projectId": null,
"sourceId": "c2b6dec6-8830-45a7-bff2-9190ef702725",
"status": "aborted",
"started": "2024-07-17T14:30:12.006Z",
"loaded": 0,
"progress": 0,
"message": "Error: Файл /upload/testromans.qvd не существует"
}
где
- id - идентификатор сущности логирующей записи
- projectId - идентификатор проекта
- sourceIf - идентификатор источника
- status - статус импорта (enum - 'success', 'aborted', 'pending', 'active)
- loaded - количество строк из источника, которое было импортировано
- progress - процент выполнения загрузки источника в проект
- message - сообщение об ошибке
Если источник находится в состоянии загрузки и/или «сломан», дополнительно будет прикрепляться поле message с сообщением об ошибке
В момент открытия проекта, происходит подключение к сокет сервису loader, который будет уведомлять пользователя о процессе загрузки источника, добавлении новых источников и т.д.
К каждому проекту создается своё сокет подключение. Сама подписка на коннектор происходит через брокер rabbitMQ и описана ранее. Сокет соединение необходимо для отправки информации после получения данных из GO сервиса (и не только).
Таким образом интерактивное обновление источников обеспечивается за счет:
- подключения к сокетам конкретного проекта (Nest.js)
- подписки пользователя на каналы брокера (rabbitMQ) по топикам на каждый интересующий коннектор
- отправки информации обратно клиенту через подключенный сокет, при получении события из брокера (rabbitMQ)
Импорт данных из коннектора
Пользовательские сценарии
Стартовое состояние – пользователь находится в диспетчере данных, загружен список источников
Действие: Пользователь нажимает на "доступный" источник
Результат 1 (если у источника нет ошибок – "status" = "success"):
- Открывается окно выбора таблиц из источника
- Система не выдаёт ошибку
- В левой части окна загружается список таблиц источника
- Ни одна таблица не выбрана, область просмотра табличной части пустая
Результат 2 (если у источника возникла ошибка при пройденной проверке на валидность – "status" = "aborted"):
- Открывается окно выбора таблиц из источника
- Система выдаёт ошибку, полученную при загрузке источника (поле "message")
- В левой части окна отсутствует список источников
- Область просмотра табличной части пустая
Действие: Пользователь нажимает на "сломанный" источник
Результат (аналогичен доступному источнику с ошибкой):
- Открывается окно выбора таблиц из источника
- Система выдаёт ошибку, полученную при загрузке источника (поле "message")
- В левой части окна отсутствует список источников
- Область просмотра табличной части пустая
Действие: Пользователь нажимает на "занятый" источник
Результат:
- Открывается окно со статусом загрузки в процентах
- До окончания загрузки невозможно просмотреть табличные данные
- По окончанию загрузки источник получает статус "доступен" или "сломан"
Стартовое состояние – открыта область выбора данных из источника
Действие: Пользователь нажимает на таблицу в списке
Результат: В области просмотра данных появляется таблица с данными
Действия:
- Пользователь меняет тип данных поля
- Пользователь выбирает/исключает из выбора столбец/таблицу
- Пользователь меняет алгоритм генерации скрипта (Drop/Alter)
Результат:
- Изменения фиксируются в интерфейсе
- До подтверждения загрузки никаких взаимодействий с бэкендом не происходит
Действие: Пользователь нажимает на кнопку "Далее"
Результат: Формируется скрипт загрузки из указанных таблиц с указанными столбцами в указанных типах данных по указанному алгоритму
Необходимые изменения
Добавить Чекбокс "Исключить пустые столбцы"
Добавить Статистику по столбцам
Схема потока данных
Техническое описание
1. При нажатии на коннектор из списка в левой панели клиентская часть отправляет на сервис Nest.js GET запрос на получение соответствующих таблиц коннектора
https://fastbord-back-dev4.fb-dev.winsolutions.ru/api/v2/source/{sourceId}/tables, где sourceId - id выбранного коннектора
Список доступных таблиц получается путём запроса метаданных источника из бд postgres
Результат отображается в левой панели "Таблицы источника"
2. При выборе таблицы из списка "Таблицы источника" отправляются GET запросы Meta + Preview:
Meta - содержит метаданные по выбранной таблице (тип драйвера, количество записей, название таблицы, id таблицы)
{
"kind": "sourceTableMeta",
"sourceTableMeta": {
"sourceId": "3ad0c9f4-5a92-4aa8-8776-1e5b3f0098e3",
"tableName": "Tablitsa",
"rowCount": 99,
"driverType": "xlsx"
}
}
https://fastbord-back-dev4.fb-dev.winsolutions.ru/api/v2/source/{sourceId}/table/Tablitsa/meta
Preview - содержит фактические данные для отображения превью клиенту, всю информацию по колонкам таблицы и некоторое количество записей
https://fastbord-back-dev4.fb-dev.winsolutions.ru/api/v2/source/{sourceId}/table/Tablitsa/preview
Данные получают путём запроса метаданных коннектора из бд postgres, поиска источника по этим метаданным на стороне сервера и дальнейшего форматирования результата запроса для отправки ответа клиентской части
Результат:
3. Драйверы. Тип драйверов:
- clickhouse
- csv
- hive
- json
- ms
- my
- oracle
- postgres
- qvd
- rest-api
- txt
- xlsx
- xml
Каждый тип драйвера наследуется от класса DriverDatabase и реализует интерфейс IDriver
Это гарантирует, что драйвер реализует необходимые методы, такие как:
- getCredentialsInstance - получение данных для подключения к драйверу (если требуются)
-
getRowCount - получение общего числа записей в источник
- getDbMeta - получение метаинформации по бд (список таблиц в источнике, с краткой информацией по каждой)
-
getPreview2 - получение превью бд (ограниченное количество строк + список столбцов)
-
query - выполнение любого запроса относящегося к источникуи др.
5. Создание скрипта загрузки
Нажатие на кнопку далее инициирует процесс формирования скрипта загрузка на основе обновленной информации
https://fastbord-back-dev4.fb-dev.winsolutions.ru/api/v1/script/{projectId}/table, где projectId - id проекта
Ответ (подтверждение успешного выполнения операции - "Ok"):
б) Для отображения скрипта загрузки на клиентской части выполняется GET запрос:
https://fastbord-back-dev4.fb-dev.winsolutions.ru/api/v1/script/{projectId}, где projectId - id проекта
Ответ:
список страниц с кодом скрипта загрузки для обновления ui клиентской части
Регулярное обновление данных в проекте
Пользовательские сценарии
Стартовое состояние – пользователь находится в диспетчере данных, создан скрипт загрузки
Действие: Пользователь нажимает на кнопку "Обновление данных"
Результат: Открывается окно для создания или изменения задания на обновление данных с 5 возможными графиками: ежечасно, ежедневно, еженедельно, ежемесячно, CRON.
Действие: Пользователь выбирает график обновления нажатием на радиокнопку рядом с названием
Результат:
- Изменяются поля ввода данных согласно выбранному графику: можно ввести час для ежечасного, день для ежедневного и т.д. Всегда есть ещё одно поле ввода для времени. По сути, каждый график представляет собой вариант CRON-задания, у которого вручную задаётся только часть параметров
- Загружается иконка для выбора времени (только один раз)
- Теряются все внесенные изменения
Действие: Пользователь вводит данные для графика обновления
Результат:
- Изменения фиксируются в интерфейсе
- Изменения сбрасываются при выборе другого графика
- До подтверждения загрузки никаких взаимодействий с бэкендом не происходит
Действие: Пользователь нажимает на кнопку "Создать/Изменить"
Результат: Формируется задание на автообновление данных согласно выбранному графику и установленным пользователем параметрам
Стартовое состояние – пользователь находится в панели администратора на вкладке "Задания"
Действие: Пользователь нажимает на кнопку "Создать"
Результат: Открывается окно создания задания как в диспетчере данных, но есть возможность выбрать проект из потока, для которого будет создано задание
Действие: Пользователь нажимает на кнопку "Черновик" при активной кнопке "Потоки"
Результат: Становится доступен список черновиков пользователей, автоматически выбирается первый в списке, становится доступен список проектов для пользователя
Действие: Пользователь нажимает на кнопку "Потоки" при активной кнопке "Черновик"
Результат: Становится доступен список потоков стенда, автоматически выбирается первый в списке, становится доступен список проектов для проекта
Действие: Пользователь выбирает поток из списка потоков/черновиков
Результат: Становится доступен список проектов для потока
Настройка CRON-задания аналогична настройке из диспетчера данных
Необходимые изменения
Обеспечивать сохранность предыдущих данных при обновлении
Добавить таск-менеджер и очередь заданий
Добавить мониторинг обновлений
Схема потока данных
Техническое описание
График обновления данных проекта устанавливается пользователем при нажатии на соответствующую кнопку на странице скрипта загрузки и подтверждении графика
При подтверждении (кнопка "создать" или "изменить", если задание уже существует), на Nest.js сервис отправляется POST запрос, содержащий информацию по графику обновления данных и флаг (поле isActive), контролирующий активно ли обновление или нет.
Идентификатор задачи на обновление равен идентификатору проекта и добавляется как параметр в запросе
https://fastbord-back-dev4.fb-dev.winsolutions.ru/api/v1/cronjob/{cronId}, где cronId – идентификатор задачи (соответствует id проекта)
Тело запроса:
{
"isActive": false,
"cron": {
"type": "hour",
"minutes": 0,
"hours": 0
}
}
Ответ:
{
"kind": "cronjobItemWithId",
"cronjobItemWithId": {
"name": "111222113",
"cron": {
"type": "hour",
"hours": 0,
"minutes": 0
},
"isActive": false,
"type": "project_import",
"message": "Error",
"duration": "00:00:09",
"startedTask": "2025-06-04T14:07:34.048Z",
"finishedTask": "2025-06-04T14:07:43.510Z",
"status": "aborted",
"nextStartAt": "2025-06-19T09:00:00.000Z",
"id": "b3d298c1-a58c-44e6-8a1a-1dad28ca0820"
}
}
Если по каким-либо причина проекта не существует, будет возвращена ошибка 604 (проект не найден)
Последовательность выполнения запроса
- инициация POST запроса от пользователя, с указанием периода обновления
- регистрация (прием) запроса в контроллере Nest.js (проверка лицензии, роли). Если роль не админ, разработчик или нет лицензии пользователю возвращается ошибка
- проверка существования проекта по id из запроса. Если проекта нет, возвращается ошибка
- отправка внутреннего запроса (в пределах сервера) от микросервиса обрабатывающего пользовательские запросы микросервису планировки задач (cron), через брокер (rabbitMQ). микросервис принимающий запросы клиентов à брокер (rabbitMq) à микросервис планировки задач
- в микросервисе планировки (cron) создается задача и её метаданные сохраняются в базу данных postgress.
Если задача активна, она будет запускаться в соответствии с указанным временным периодом.
Обновление проекта – это функция cron, которая с помощью брокера отправляет в микросервис, отвечающий за скрипт загрузки, команду запускающую соответствующий проекту скрипт
Одновременный запуск импорта нескольких проектов использующих один конектор
- Если конектор занят одним проектом, а к нему стучатся другие. Что произойдет с запуском других импортов (блокер)
- После того как источник освободился импорт других проектов нужно запустить заново?
- Как устроен модуль блокера
Управление процессами импорта (мониторинг статусов, отстановка, удаление)
Техническое описание
1. Мониторинг статусов
Клиентская часть оценивает состояние коннектора (в том числе загрузки) с помощью запроса, а также по сокет соединению, которое инициируется в момент навигации на страницу проекта:
GET https://fastbord-back-dev4.fb-dev.winsolutions.ru/api/v1/sources/list/{projectId}, где projectId – id проекта, для которого запрашиваются источники.
Результат содержит массив из объектов описывающих каждый коннектор, подключенный к выбранному проекту (подробное описание см. раздел "Получение списка и состояний подключений").
В объекте находится вся полезная информация для обновления пользовательского интерфейса и отслеживания статуса загрузки коннектор
{
"id": "a9db47c9-a1f9-4637-895f-b2f175a2c80a",
"isValid": true,
"createdAt": "2023-11-22T13:02:12.594Z",
"updatedAt": "2023-11-22T13:02:12.594Z",
"name": "QVDNEW",
"driver": "qvd",
"type": "file",
"filePath": "/qvd/df0b5654-d967-40d0-bcfc-ce39055001c0.qvd",
"fileName": "df0b5654-d967-40d0-bcfc-ce39055001c0.qvd",
"selected": false,
"lastLoading": {
"id": "2116fef9-07b6-4f4a-8af6-60e90fc11351",
"projectId": null,
"sourceId": "a9db47c9-a1f9-4637-895f-b2f175a2c80a",
"status": "success",
"started": "2023-11-22T13:02:12.613Z",
"loaded": 10103,
"progress": 100
}
}
В частности поле lastLoading - описывает последнюю попытку использования коннектора для импорта данных в проект.
С помощью его содержимого выполняется мониторинг статусов импорта данных в проект, касающийся данного коннектора.
Например:
- progress - процент выполнения загрузки источника в проект
- loaded - определяет количество записей (строк) успешно импортированных в проект
- status - статус последнего процесса импорта из источника
Информация о статусе импорта данных из источника также динамически поступает через сокет соединение, которое ожидает сообщений от канала брокера (rabbitMQ). Микросервис загрузки файловых источников отслеживает статус импорта данных из источника и через брокер (rabbitMQ) и затем сокет соединение оповещает клиентов об изменении в статуса импорта
2. Остановка импорта, удаление источника из проекта
Остановка импорта осуществляется посредством удаления связанного источника. Отдельного запроса на остановку импорта не существует, для отмены результатов импорта, достаточно обновить скрипт загрузки, удалив лишние данные и запустить скрипт.
Удаление источника из проекта инициируется на клиентской части:
Post запрос, https://fastbord-back-dev4.fb-dev.winsolutions.ru/api/v1/sources/delete
Тело запроса:
{
"id": "863f0581-5344-4bee-a264-92d03ea8bef3"
}
где, id - идентификатор источника, который необходимо удалить
Результат:
Если нет доступа к источнику (роль не администратор или не владелец источника) возвращается ошибка 403 "У вас нет прав управлять данным источником"
Если источник не найден, возвращается ошибка 404 'Источник не найден'
В случае успешного удаления источника возвращается статус 200 с сообщением "Ok"
На стороне Nest.js происходит удаление базы данных clickhouse, ассоциированной с указанным в запросе источником, а также обновляется метка источника "deletedAt" в базе данных postgres, которая отражает успешный результат удаления
Логирование данных о загрузке скрипта (обновление данных в базе clickhouse)
Техническое описание
Логирование процесса импорта данных выполняется в микросервисе Nest.js - "loader".
Данные по процессу импорта записываются в базу данных postgress и в дальнейшем используются в пределах приложения, например для формирования ответа на запрос о получении информации статуса импорта (загрузка скрипта)
Запуск скрипта вызывается запросом PUT https://fastbord-back-dev.fb-dev.winsolutions.ru/api/v1/script/{projectId}/run, где projectId - идентификатор проекта
После запуска скрипта загрузки клиентская часть с определённой периодичностью вызывает запрос для получения статуса импорта:
Результат запроса:
{
"id": "3dd62e51-f77e-45a7-9902-d4f03225e43d",
"projectId": "5c11b334-98b7-43db-a82a-e86a7e8a2a79",
"sourceId": null,
"status": "pending",
"started": "2025-06-20T17:32:51.908Z",
"loaded": 0,
"progress": 0
}
- id - идентификатор процесса загрузки скрипта
- projectId - идентификатор проекта
- sourceId - идентификатор источника данных
- status - статус импорта данных в таблицу clickhouse
- loaded - количество успешно импортированных записей из источника
- progress - процент выполнения импорта
Для объяснения механизма записи логов по процессу импорта данных рассмотрим подробнее структуру выполнения запроса загрузки скрипта, представленного ранее. ( PUT https://fastbord-back-dev.fb-dev.winsolutions.ru/api/v1/script/{projectId}/run)
Последовательность выполнения запроса:
- Регистрация запроса в контроллере (PUT 'script/:projectId'), проверка авторизации, соответствия роли пользователя (админ или разработчик) – подтверждение доступа к внутренним сервисам приложения
- Отправка в микросервис загрузок (loader) запроса о запуске процесса импорта данных в базу clickhouse (посредством брокера rabbitMQ) - результат, регистрация запроса о запуске
- Выполняется проверка о состоянии обновления проекта. Если в данный момент обновление уже выполняется (запущено другим пользователем) - будет возвращена ошибка с сообщением "Проект/источник с id {идентификатор проекта/источника} уже обновляется". Проверка выполняется благодаря отдельному микросервису "blocker", сообщение происходит через брокер rabbitMQ
- Создаётся сущность процесса загрузки ("loading"), которая записывается в базу данных postgres. На основе этой сущности процесс импорта добавляется в специальную очередь по загрузке данных. Механизм очереди реализован на основе библиотеки bull, которая является "обёрткой" для redis клиента.
- Когда очередь bull доходит до текущей задачи загрузки, в дочернем процессе сервера Nest.js запускается соответствующая задача (loader-process-ctl.service.ts -> startProcess)
- В зависимости от типа задачи (файловый источник, подключение к бд или rest-соединение) запускается соответствующий дочерний процесс из п.5.
- Файловый источник - дочерний процесс Nest.js отправляет запрос на микросервис GO ("loader_dev") для обновления источника на стороне сервиса go (loader) и ожидает сообщений о статусе процесса. Общение с сервисом GO внутри дочернего процесса Nest.js осуществляется с помощью брокера. (File.process.ts)
При получении результата обновления файла, дочерний процесс сообщает результат главному процессу Nest.js через канал IPC - Работа дочерних процессов по импорту данных из подключений к бд и rest-соединений выполняется аналогично, за исключением того, что весь процесс осуществляется на стороне Nest.js, без привлечения микросервиса на GO (loader-pgsql.process.ts, loader-rest-api.process.ts)
- В процессе импорта данных дочерние процессы могут уведомлять главный процесс Nest.js о наступлении ключевых событий, таких как начала запуска импорта, изменение прогресса, успешное выполнение импорта или ошибка (onTaskStart, onTaskProgress, onTaskSuccess, onTaskFail). Благодаря этому, обновляются записи о загрузке данных в базе postgress, а также клиенты получают уведомления о процессе импорта данных в базу clickhouse
Все стадии и этапы импорта данных записываются (логируются) в отдельную таблицу postgres. На каждом этапе импорта, заинтересованные пользователи получают уведомления о статусе процесса через отдельный REST запрос или с помощью сокет соединения
Работа скрипта загрузки
Не завершено
Техническое описание
При попытке импорта данных (из одного или разных источников) выбранные таблицы будут добавляться в скрипт загрузки в порядке обработки запросов на сервере (импорт данных из коннекторов описан в предыдущих главах)
При повторной попытке импорта одной и той же таблицы из одинакового источника в скрипте загрузки будет создана новая секция с обновленным именем таблицы. Например, в первичной загрузке таблица называлась "Tablitsa". При повторном импорте, во время формирования скрипта загрузки, название таблицы изменится на "Tablitsa_1" (добавляется суффикс "_1" с следующим порядковым номером)
Обработка случая, при котором разные пользователи импортируют данные из одного и того же источника, в данный момент не реализована. Таким образом, обновленные таблицы будут добавляться в скрипт загрузки в порядке выполнения запросов
Проверить кейс:
загрузка один и тот же источник в один проект открытый с двух разных клиентов одновременно. Проверить результат, наличие коллизий