Gerenciando os fluxos de saída de muitos subprocessos com deadlocks

1

Eu tenho um script Python que faz mais ou menos isso

current_tasks = TaskManager()
MAXPROCS = 8

while len(outstanding_tasks) > 0:
    if len(current_tasks.running) < MAXPROCS:
      current_tasks.addTask(outstanding_tasks.next())
    else:
      current_tasks.wait_for_one_finish()

e outstanding_tasks.next () é basicamente isto:

p = subprocess.Popen([task], stdout=OUTFILE, stderr=subprocess.PIPE)

e current_tasks.wait_for_one_finish() :

waiting = True
while waiting:
    for t in tasks:
        ret = t.poll()
        if ret not None:
            handle_stderr(t)
            waiting = False
            break

Bastante direta - gerar tarefas sob demanda até executarmos oito delas de cada vez e, em seguida, bloquear até que elas terminem uma de cada vez antes de gerar mais tarefas.

O problema é este:

stderr=subprocess.PIPE

Cada subprocesso está escrevendo é stderr para um pipe. Se ele falhar e quiser gravar uma mensagem de log grande ou qualquer outra para o pipe, e essa mensagem exceder o tamanho do buffer do pipe, o write () será bloqueado. O processo não será concluído, então meu processo de controle nunca verá um valor de retorno de poll() e será lido de seu stderr.

Existem obviamente maneiras de contornar isso:

  • redireciona o stderr dos meus subprocessos para arquivos temporários
  • gera um thread do Python que lê os descritores de arquivos stderr de todas as tarefas em execução e os armazena na memória
  • tem um select () ou algo no meu pequeno loop de eventos ad hoc

Mas tudo isso é coisa que tenho que manipular no código do meu aplicativo. O que eu quero saber é: existe alguma maneira de obter o comportamento de um pipe, mas com um ótimo buffer elástico, para que os subprocessos sempre possam fazer um write () bem-sucedido em seu stderr e então sair, sem que eu precise olha para isso até que estejam prontos?

    
por Cera 09.02.2013 / 04:10

1 resposta

1

A resposta curta é: não há.

Você já destacou as soluções alternativas necessárias para lidar com grandes dados enviados por um canal de subprocesso. O cachimbo "nice big elastic buffer" não existe. Isso é chamado na documentação do Python de gerenciamento de subprocesso como uma possível fonte de deadlocks , com a solução adicionada que você pode chamar proc.communicate() para ler de stderr. O problema no seu caso é que você não pode chamar communicate() em todos os processos ao mesmo tempo, e esse método bloqueia até que todos os dados sejam lidos.

Se fosse eu, provavelmente usaria uma chamada select() em todos os processos stderr em vez de um loop proc.poll() . select() pode bloquear até que qualquer processo faça alguma coisa, e quando o processo sair, ele irá fechar o stderr pipe, então você mata dois coelhos com uma cajadada (saiba quando os dados são gravados no stderr e saiba quando o processo morre). / p>     

por 20.02.2013 / 04:08