Espalhando stdin para processos paralelos

12

Eu tenho uma tarefa que processa uma lista de arquivos em stdin. O tempo de inicialização do programa é substancial e a quantidade de tempo que cada arquivo demora varia muito. Eu quero gerar um número substancial desses processos e, então, enviar o trabalho para os que não estiverem ocupados. Existem várias ferramentas de linha de comando que quase fazem o que eu quero, eu reduzi a duas opções de trabalho:

find . -type f | split -n r/24 -u --filter="myjob"
find . -type f | parallel --pipe -u -l 1 myjob

O problema é que split faz um round-robin puro, então um dos processos fica para trás e fica para trás, atrasando a conclusão de toda a operação; enquanto parallel quer gerar um processo por N linhas ou bytes de entrada e acabo gastando muito tempo com a sobrecarga de inicialização.

Existe algo assim que reutilizará os processos e as linhas de feeds para qualquer processo que tenha os stdins desbloqueados?

    
por BCoates 09.10.2012 / 22:46

8 respostas

1

Isso não parece possível em um caso geral. Isso implica que você tem um buffer para cada processo e você pode assistir os buffers de fora para decidir onde colocar a próxima entrada (agendamento) ... Claro que você pode escrever algo (ou usar um sistema de lote como slurm)

Mas, dependendo do processo, você poderá pré-processar a entrada. Por exemplo, se você deseja baixar arquivos, atualizar entradas de um banco de dados ou algo semelhante, mas 50% deles serão ignorados (e, portanto, você tem uma grande diferença de processamento dependendo da entrada), basta configurar um pré-processador que verifica quais entradas vão demorar muito (o arquivo existe, os dados foram alterados, etc), então o que vier do outro lado é garantido para levar uma quantidade de tempo bastante igual. Mesmo que a heurística não seja perfeita, você poderá ter uma melhora considerável. Você pode despejar os outros em um arquivo e processá-los da mesma maneira.

Mas isso depende do seu caso de uso.

    
por 26.03.2013 / 18:36
1

Não, não há uma solução genérica. Seu despachante precisa saber quando cada programa está pronto para ler outra linha, e não há nenhum padrão que eu saiba que permita isso. Tudo o que você pode fazer é colocar uma linha no STDOUT e esperar que algo a consuma; não há realmente uma boa maneira para o produtor em um pipeline dizer se o próximo consumidor está pronto ou não.

    
por 22.10.2013 / 01:16
0

Eu não penso assim. Na minha revista favorita foi um artigo uma vez na programação bash que fez o que você quer. Estou disposto a acreditar que, se houvesse ferramentas para fazer isso, elas teriam sido mencionadas. Então você quer algo ao longo das linhas de:

set -m # enable job control
max_processes=8
concurrent_processes=0

child_has_ended() { concurrent_processes=$((concurrent_processes - 1)) }

trap child_has_ended SIGCHLD # that's magic calling our bash function when a child processes ends

for i in $(find . -type f)
do
  # don't do anything while there are max_processes running
  while [ ${concurrent_processes} -ge ${max_processes}]; do sleep 0.5; done 
  # increase the counter
  concurrent_processes=$((concurrent_processes + 1))
  # start a child process to actually deal with one file
  /path/to/script/to/handle/one/file $i &
done

Obviamente, você pode alterar a invocação para o script de trabalho real ao seu gosto. A revista mencionada inicialmente faz coisas como configurar pipes e realmente iniciar threads de trabalho. Confira mkfifo para isso, mas essa rota é muito mais complicada, pois os processos de trabalho precisam sinalizar ao processo mestre que eles estão prontos para receber mais dados. Então você precisa de um fifo para cada processo de trabalho para enviar os dados e um fifo para o processo mestre receber material dos trabalhadores.

ISENÇÃO DE RESPONSABILIDADE Eu escrevi esse script do topo da minha cabeça. Pode ter alguns problemas de sintaxe.

    
por 09.10.2012 / 23:48
0

Para o GNU Parallel, você pode definir o tamanho do bloco usando --block. No entanto, exige que você tenha memória suficiente para manter 1 bloco na memória para cada um dos processos em execução.

Eu entendo que isso não é exatamente o que você está procurando, mas pode ser um trabalho aceitável por enquanto.

Se suas tarefas, em média, demorarem o mesmo tempo, você poderá usar o mbuffer:

find . -type f | split -n r/24 -u --filter="mbuffer -m 2G | myjob"
    
por 10.10.2012 / 16:18
0

Tente isto:

mkfifo para cada processo.

Depois, desligue tail -f | myjob em cada fifo.

Por exemplo, configurando os trabalhadores (processos myjob)

mkdir /tmp/jobs
for X in 1 2 3 4
do
   mkfifo pipe$X
   tail -f pipe$X | myjob &
   jobs -l| awk '/pipe'$X'/ {print $2, "'pipe$X'"}' >> pipe-job-mapping
done

Dependendo do seu aplicativo (meu trabalho), você poderá usar os jobs -s para localizar os trabalhos interrompidos. Caso contrário, liste os processos classificados pela CPU e selecione o que consome menos recursos. De ter o relatório de trabalho em si, por exemplo, definindo um sinalizador no sistema de arquivos quando ele quer mais trabalho.

Supondo que o trabalho pára ao esperar pela entrada, use

jobs -sl para descobrir o pid de um trabalho parado e atribuir um trabalho a ele, por exemplo

grep "^$STOPPED_PID" pipe-to-job-mapping | while read PID PIPE
do
   cat workset > $PIPE
done

Eu testei isso com

garfield:~$ cd /tmp
garfield:/tmp$ mkfifo f1
garfield:/tmp$ mkfifo f2
garfield:/tmp$ tail -f f1 | sed 's/^/1 /' &
[1] 21056
garfield:/tmp$ tail -f f2 | sed 's/^/2 /' &
[2] 21058
garfield:/tmp$ echo hello > f1
1 hello
garfield:/tmp$ echo what > f2
2 what
garfield:/tmp$ echo yes > f1
1 yes

Isso eu devo admitir que foi inventado assim ymmv.

    
por 16.02.2013 / 09:48
0

O que é realmente necessário para resolver isso é um mecanismo de fila de algum tipo.

É possível ter as tarefas lendo suas entradas de uma Fila, como uma fila de mensagens SYSV, e então fazer com que os programas sejam executados paralelamente simplesmente enviando os valores para a fila?

Outra possibilidade é usar um diretório para a fila, assim:

  1. a saída de localização cria um link simbólico para cada arquivo a ser processado em um diretório, pending
  2. cada processo de trabalho executa um mv do primeiro arquivo que vê no diretório para um diretório secundário de pending , denominado inprogress .
  3. se o trabalho mover com êxito o arquivo, ele executará o processamento; caso contrário, volta para encontrar e mover outro nome de arquivo de pending
por 27.08.2013 / 00:07
0

expõe na resposta do @ ash, você pode usar uma fila de mensagens SYSV para distribuir o trabalho. Se você não quiser escrever seu próprio programa em C, existe um utilitário chamado ipcmd isso pode ajudar. Aqui está o que eu coloquei para passar a saída de find $DIRECTORY -type f para $PARALLEL number of processes:

set -o errexit
set -o nounset

export IPCMD_MSQID=$(ipcmd msgget)

DIRECTORY=$1
PARALLEL=$2

# clean up message queue on exit
trap 'ipcrm -q $IPCMD_MSQID' EXIT

for i in $(seq $PARALLEL); do
   {
      while true
      do
          message=$(ipcmd msgrcv) || exit
          [ -f $message ] || break
          sleep $((RANDOM/3000))
      done
   } &
done

find "$DIRECTORY" -type f | xargs ipcmd msgsnd

for i in $(seq $PARALLEL); do
   ipcmd msgsnd "/dev/null/bar"
done
wait

Aqui está um teste:

$ for i in $(seq 20 10 100) ; do time parallel.sh /usr/lib/ $i ; done
parallel.sh /usr/lib/ $i  0.30s user 0.67s system 0% cpu 1:57.23 total
parallel.sh /usr/lib/ $i  0.28s user 0.69s system 1% cpu 1:09.58 total
parallel.sh /usr/lib/ $i  0.19s user 0.80s system 1% cpu 1:05.29 total
parallel.sh /usr/lib/ $i  0.29s user 0.73s system 2% cpu 44.417 total
parallel.sh /usr/lib/ $i  0.25s user 0.80s system 2% cpu 37.353 total
parallel.sh /usr/lib/ $i  0.21s user 0.85s system 3% cpu 32.354 total
parallel.sh /usr/lib/ $i  0.30s user 0.82s system 3% cpu 28.542 total
parallel.sh /usr/lib/ $i  0.27s user 0.88s system 3% cpu 30.219 total
parallel.sh /usr/lib/ $i  0.34s user 0.84s system 4% cpu 26.535 total
    
por 18.10.2013 / 09:55
0

A menos que você possa estimar por quanto tempo um determinado arquivo de entrada será processado e , os processos de trabalho não têm como reportar ao agendador (como fazem em cenários normais de computação paralela - geralmente por meio da MPI ), você geralmente está sem sorte - ou paga a penalidade de alguns trabalhadores processando dados por mais tempo que outros (por causa de desigualdade de entrada), ou pagar a penalidade de gerar um novo processo para cada arquivo de entrada.

    
por 03.12.2013 / 17:16