Как мне настроить на Flask сервере очередь.

Пользователь

от berilloda , в категории: Python , 2 года назад

Всем привет, я написал простой сервер на Flask, и он получает на вход аудиофайл, id и имя файла и должен запускать программу, которая транскрибирует аудиофайл в текст.

Но когда на сервер прилетает несколько запросов мой ПК виснет из-за большой нагрузки. Получается что сервер на каждый файл (из нового запроса), запускает мою программу по транскрибации.

Может кто знает, как настроить очередь на обработку файлов.


У меня на сервер может за один запрос прилетать сразу несколько файлов и тогда они корректно становятся в очередь, а если прилетает несколько запросов, то ПК сразу виснет.

Сервер установлен на windows 10.


Код сервера выглядит следующим образом:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from flask import Flask, request, jsonify, send_file
import os
import subprocess
import time
import threading

app = Flask(__name__)

directory = os.path.dirname(os.path.abspath(__file__))
directory = directory.replace('\\', '/')

def delete_file(file_path):
    time.sleep(300)  # 5 минут ожидания
    os.remove(file_path)

@app.route('/api/process_audio', methods=['POST'])
def process_audio():
    try:
        start_time = time.time()

        # Получаем данные
        audio_file = request.files['audio']
        audio_id = request.form['audio_id']
        audio_name = request.form['audio_name']

        # Сохраняем аудио
        audio_path = os.path.join('program_audio/audio', f'{audio_name}.mp3')
        audio_file.save(audio_path)

        # Запуск обработки аудио
        subprocess.run('starter.bat', shell=True)

        # Ждем результата
        text_file_path = os.path.join('program_audio/final_text', f'{audio_name}_final.txt')
        timeout = time.time() + 60
        while not os.path.exists(text_file_path):
            if time.time() > timeout:
                return jsonify({'error': 'Timeout'}), 500
            time.sleep(1)

        # Формируем ответ
        end_time = time.time()
        elapsed_time = end_time - start_time

        response_data = {
            'audio_id': audio_id,
            'elapsed_time': elapsed_time
        }

        # Отправляем результат
        response = send_file(text_file_path, as_attachment=True)
        
        # Запускаем удаление файла через 5 минут
        delete_thread = threading.Thread(target=delete_file, args=(text_file_path,))
        delete_thread.start()

        return response, response_data

    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True, port=39588, host='0.0.0.0')
Facebook Vk Ok Twitter LinkedIn Telegram Whatsapp

2 ответа

Пользователь

от jaren , 2 года назад

@berilloda Вам нужно организовать очередь задач для обработки аудиофайлов, чтобы ваш сервер обрабатывал файлы по очереди, а не одновременно.Что то вроде такого я думаю должно быть:


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
from flask import Flask, request, jsonify, send_file
import os
import subprocess
import time
import threading
from queue import Queue


app = Flask(__name__)


directory = os.path.dirname(os.path.abspath(__file__))
directory = directory.replace('\\', '/')


# Создаем очередь задач
task_queue = Queue()


def delete_file(file_path):
    time.sleep(300# 5 минут ожидания
    os.remove(file_path)


def process_audio_task(audio_file, audio_id, audio_name):
    try:
        start_time = time.time()


        # Сохраняем аудио
        audio_path = os.path.join('program_audio/audio', f'{audio_name}.mp3')
        audio_file.save(audio_path)


        # Запуск обработки аудио
        subprocess.run('starter.bat', shell=True)


        # Ждем результата
        text_file_path = os.path.join('program_audio/final_text', f'{audio_name}_final.txt')
        timeout = time.time() + 60
        while not os.path.exists(text_file_path):
            if time.time() > timeout:
                return jsonify({'error': 'Timeout'}), 500
            time.sleep(1)


        # Формируем ответ
        end_time = time.time()
        elapsed_time = end_time - start_time


        response_data = {
            'audio_id': audio_id,
            'elapsed_time': elapsed_time
        }


        # Отправляем результат
        response = send_file(text_file_path, as_attachment=True)
        
        # Запускаем удаление файла через 5 минут
        delete_thread = threading.Thread(target=delete_file, args=(text_file_path,))
        delete_thread.start()


        return response, response_data


    except Exception as e:
        return jsonify({'error': str(e)}), 500


def process_audio_worker():
    while True:
        audio_file, audio_id, audio_name = task_queue.get()
        process_audio_task(audio_file, audio_id, audio_name)
        task_queue.task_done()


@app.route('/api/process_audio', methods=['POST'])
def queue_process_audio():
    try:
        # Получаем данные
        audio_files = request.files.getlist('audio')
        audio_id = request.form['audio_id']
        audio_name = request.form['audio_name']


        # Добавляем задачи в очередь
        for audio_file in audio_files:
            task_queue.put((audio_file, audio_id, audio_name))


        return jsonify({'message': 'Audio files are added to processing queue.'}), 200


    except Exception as e:
        return jsonify({'error': str(e)}), 500


if __name__ == '__main__':
    # Запускаем потоки для обработки задач
    worker_thread = threading.Thread(target=process_audio_worker)
    worker_thread.start()


    app.run(debug=True, port=39588, host='0.0.0.0')


Пользователь

от kaleigh , 5 дней назад

@berilloda 

Да, использовать очередь задач — это правильный подход для обработки аудиофайлов по очереди и предотвращения перегрузки вашего сервера.


Вот как вы можете улучшить работу с очередью и потоками в вашем коде:

  1. Используйте ThreadPoolExecutor: Вместо ручного создания потоков можно использовать ThreadPoolExecutor из модуля concurrent.futures для управления потоками более эффективно.
  2. Обработка каждого файла: Убедитесь, что у каждого аудиофайла есть уникальный audio_id и audio_name, чтобы предотвратить перезапись и конфликт данных.
  3. Управляйте ресурсами: Можно добавить лимит на количество одновременно обрабатываемых аудиофайлов в зависимости от возможностей вашего оборудования.


Вот пример кода, который включает эти улучшения:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from flask import Flask, request, jsonify, send_file
import os
import subprocess
import time
from queue import Queue
from concurrent.futures import ThreadPoolExecutor

app = Flask(__name__)

# Директория для файла
directory = os.path.dirname(os.path.abspath(__file__))


# Создаем очередь задач с фиксированным количеством потоков
max_workers = 4  # Количество рабочих потоков в зависимости от возможностей ПК
task_queue = Queue()
executor = ThreadPoolExecutor(max_workers=max_workers)


def delete_file(file_path):
    time.sleep(300)  # 5 минут ожидания
    os.remove(file_path)


def process_audio_task(audio_file, audio_id, audio_name):
    try:
        start_time = time.time()

        # Сохраняем аудио
        audio_path = os.path.join(directory, 'program_audio/audio', f'{audio_name}.mp3')
        audio_file.save(audio_path)

        # Запуск обработки аудио
        subprocess.run(['starter.bat'], shell=True)

        # Ждем результата
        text_file_path = os.path.join(directory, 'program_audio/final_text', f'{audio_name}_final.txt')
        timeout = time.time() + 60
        while not os.path.exists(text_file_path):
            if time.time() > timeout:
                return jsonify({'error': 'Timeout'}), 500
            time.sleep(1)

        # Формируем ответ
        end_time = time.time()
        elapsed_time = end_time - start_time

        response_data = {
            'audio_id': audio_id,
            'elapsed_time': elapsed_time
        }

        # Отправляем результат
        response = send_file(text_file_path, as_attachment=True)

        # Запускаем удаление файла через 5 минут
        delete_thread = threading.Thread(target=delete_file, args=(text_file_path,))
        delete_thread.start()

        return response, response_data

    except Exception as e:
        return jsonify({'error': str(e)}), 500


def process_audio_worker():
    while True:
        audio_file, audio_id, audio_name = task_queue.get()
        executor.submit(process_audio_task, audio_file, audio_id, audio_name)
        task_queue.task_done()


@app.route('/api/process_audio', methods=['POST'])
def queue_process_audio():
    try:
        # Получаем данные
        audio_files = request.files.getlist('audio')
        audio_id = request.form['audio_id']
        audio_name = request.form['audio_name']

        # Добавляем задачи в очередь
        for audio_file in audio_files:
            task_queue.put((audio_file, audio_id, audio_name))

        return jsonify({'message': 'Audio files are added to processing queue.'}), 200

    except Exception as e:
        return jsonify({'error': str(e)}), 500


if __name__ == '__main__':
    # Запускаем поток для обработки задач
    worker_thread = threading.Thread(target=process_audio_worker)
    worker_thread.start()

    app.run(debug=True, port=39588, host='0.0.0.0')


Основные изменения и улучшения:

  • Оптимизация потоков: Вместо создания отдельных потоков для каждой задачи, использован ThreadPoolExecutor для управления количеством одновременно работающих задач.
  • Корректное использование subprocess.run: Переведено на список аргументов вместо строки для безопасности и универсальности, хотя использование shell=True все равно требует осторожности.
  • Удаление файлов: Как и ранее, удаление файлов производится через 5 минут с использованием потоков.


Этот код поможет стабилизировать работу сервера и предотвратит зависание при множественных параллельных запросах.