Leia non-blocking de múltiplos fifos em paralelo

6

Às vezes eu sento com um monte de fifos de saída de programas que correm em paralelo. Eu gostaria de mesclar esses fifos. A solução ingênua é:

cat fifo* > output

Mas isso requer que o primeiro fifo seja concluído antes de ler o primeiro byte do segundo fifo, e isso bloqueará os programas paralelos em execução.

Outra maneira é:

(cat fifo1 & cat fifo2 & ... ) > output

Mas isso pode misturar a saída, obtendo assim meias-linhas na saída.

Ao ler de vários fifos, deve haver algumas regras para mesclar os arquivos. Normalmente, fazer isso linha por linha é o suficiente para mim, então estou procurando algo que faça:

parallel_non_blocking_cat fifo* > output

que lerá de todas as fifos em paralelo e mesclará a saída com uma linha completa de cada vez.

Eu posso ver que não é difícil escrever esse programa. Tudo o que você precisa fazer é:

  1. abre todas as fifos
  2. faça um select de bloqueio em todos eles
  3. leia nonblocking do fifo que tem dados no buffer para esse fifo
  4. se o buffer contiver uma linha completa (ou registro), imprima a linha
  5. se todas as fifos estiverem fechadas / eof: exit
  6. goto 2

Então minha pergunta é não : isso pode ser feito?

A minha pergunta é: já foi feito e posso apenas instalar uma ferramenta que faz isso?

    
por Ole Tange 03.10.2012 / 16:39

3 respostas

1

Esta solução só funcionará se o número de fifos for menor que o número de jobs que o paralelo GNU pode executar em paralelo (que é limitado por identificadores de arquivos e número de processos):

parallel -j0 --line-buffer cat ::: fifo*

Parece ser possível mover até 500 MB / s:

window1$ mkfifo {1..100}
window1$ parallel -j0 --line-buffer cat ::: {1..100} | pv >/dev/null

window2$ parallel -j0 'cat bigfile > ' ::: *

E isso não combina meias-linhas:

window1$ mkfifo {1..100}
window1$ parallel -j0 --line-buffer cat ::: {1..100} &

window2$ parallel -j0 'traceroute {}.1.1.1 > {}' ::: *

Ele lê os trabalhos em paralelo (ele não lê um trabalho completamente antes de ir para o próximo):

window1$ mkfifo {1..100}
window1$ parallel -j0 --line-buffer cat ::: * > >(tr -s ABCabc)

window2$ long_lines_with_pause() {
            perl -e 'print STDOUT "a"x30000_000," "'                                                      
    perl -e 'print STDOUT "b"x30000_000," "'                                                      
    perl -e 'print STDOUT "c"x30000_000," "'                                                      
    echo "$1"                                                                                     
    sleep 2                                                                                       
    perl -e 'print STDOUT "A"x30000_000," "'                                                      
    perl -e 'print STDOUT "B"x30000_000," "'                                                      
    perl -e 'print STDOUT "C"x30000_000," "'                                                      
    echo "$1"                                                                                     
}
window2$ export -f long_lines_with_pause
window2$ parallel -j0 'long_lines_with_pause {} > {}' ::: *

Aqui um monte de 'a b c' (primeira metade de um trabalho) será impresso antes de 'A B C' (segunda metade do trabalho).

    
por 28.05.2014 / 10:34
1

Então,

tail -q -n+1 -f --pid=stop-tail-when-this-is-gone fifo1 fifo2 fifo3

> quase funciona (como aludido nos comentários iniciais sobre essa versão anterior de minha resposta, embora você possa precisar de um "para f em fifo *; cat < / dev / null > $ f & feito "de antemão para garantir que todos os FIFOs estejam abertos para gravação porque a cauda do coreutils os abre O_RDONLY sem O_NONBLOCK).

Infelizmente, há um erro em que tail é cuidadoso sobre terminações de linha / registro apenas com entradas de pipes em stdin mas não com entrada de pipes nomeados / FIFOs em argumentos. Algum dia alguém pode consertar a cauda do coreutils.

Nesse ínterim, para obter uma fila de multi-consumidor / produtor único honrando as terminações de linha, você pode usar um programa C simples de 100 linhas que chamo de tailpipes.c :

#include <stdio.h>
#include <stdlib.h>
#include <string.h>    //TODO: Find&document build environments lacking memrchr
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <errno.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#define errstr strerror(errno)

char const * const Use = "%s: %s\n\nUsage:\n\n"
"  %s [-p PID] [-o OPEN_MODE(RW)] [-d DLM(\n)] [-s SEC(.01)] PATH1 PATH2..\n\n"
"Read delimited records (lines by default) from all input paths, writing only\n"
"complete records to stdout and changing to a stop-at-EOF mode upon receiving\n"
"SIGHUP (unlike \"tail -fqn+1\" which just dies) OR when we first notice that\n"
"PID does not exist (if PID is given).  Since by default fifos are opened RW,\n"
"signal/PID termination is needed to not loop forever, but said FIFOs may be\n"
"closed & reopened by other processes as often as is convenient. For one-shot\n"
"writing style, ending input reads at the first EOF, use \"-oRO\".  Also, DLM\n"
"adjusts the record delimiter byte from the default newline, and SEC adjusts\n"
"max select sleep time.  Any improperly terminated final records are sent to\n"
"stderr at the end of execution (with a label and bracketing).\n";

int writer_done;
void sig(int signum) { writer_done = 1; }

int main(int N, char *V[]) {
    signed char     ch;
    char           *buf[N-1], delim = '\n', *V0 = V[0], *eol;
    int             len[N-1], fds[N-1], nBf[N-1], i, fdMx = 0, nS = 0, nF = 0,
                    oFlags = O_RDWR;
    pid_t           pid = 0;
    ssize_t         nR, nW;
    struct timespec tmOut = { 0, 10000000 }; //10 ms select time out
    fd_set          fdRdMaster, fdRd;
    //If we get signaled before here, this program dies and data may be lost.
    //If possible use -p PID option w/pre-extant PID of appropriate lifetime.
    signal(SIGHUP, sig);                    //Install sig() for SIGHUP
    memset((void *)fds, 0, sizeof fds);
    memset((void *)len, 0, sizeof len);
    FD_ZERO(&fdRdMaster);
    fdRd = fdRdMaster;
    while ((ch = getopt(N, V, "d:p:s:o:")) != -1)
        switch (ch) {                       //For 
export MYTEMP=$(mktemp -d /tmp/MYPROG.XXXXX)
FIFOs='n=0; while [ $n -lt 8 ]; do echo $MYTEMP/$n; n=$((n+1)); done'
mkfifo $FIFOs
sleep 2147483647 & p=$!       #Cannot know xargs pid is good for long
( find . -print0 | xargs -0 -P8 --process-slot-var=MYSLOT MYPROGRAM
  kill $p ) &                 #Inform tailpipes writers are done
tailpipes -p$p $FIFOs | CONSUMING-PIPELINE
rm -rf $MYTEMP
wait                          #Wait for xargs subshell to finish
do '' as a sep CLI arg double tO; case 'd': delim = optarg ? *optarg : '\n'; break; case 'p': pid = optarg ? atoi(optarg) : 0; break; case 's': tO = optarg ? atof(optarg) : .01; tmOut.tv_sec = (long)tO; tmOut.tv_nsec = 1e9 * (tO - tmOut.tv_sec); break; case 'o': oFlags = (optarg && strcasecmp(optarg, "ro") == 0) ? O_RDONLY | O_NONBLOCK : O_RDWR; break; default: return fprintf(stderr, Use, V0, "bad option", V0), 1; } V += optind; N -= optind; //Shift off option args if (N < 1) return fprintf(stderr, Use, V0, "too few arguments", V0), 2; setvbuf(stdout, NULL, _IONBF, 65536); //Full pipe on Linux for (i = 0; i < N; i++) //Check for any available V[] if ((fds[i] = open(V[i], oFlags)) != -1) { struct stat st; fstat(fds[i], &st); if (!S_ISFIFO(st.st_mode)) return fprintf(stderr,"%s: %s not a named pipe\n", V0, V[i]), 3; nF++; FD_SET(fds[i], &fdRdMaster); //Add fd to master copy for pselect buf[i] = malloc(nBf[i] = 4096); if (fds[i] > fdMx) fdMx = fds[i]; } else if (errno == EINTR) { //We may get signaled to finish up.. i--; continue; //..before we even this far. } else return fprintf(stderr, "%s: open(%s): %s\n", V0, V[i], errstr), 3; fdMx++; fdRd = fdRdMaster; while (nF && (nS = pselect(fdMx, &fdRd, NULL, NULL, &tmOut, NULL)) != -99) { if (pid && kill(pid, 0) != 0 && errno != EPERM) //Given pid didn't exist writer_done = 1; if (nS == 0 && writer_done) //No input & no writers break; else if (nS == -1) { //Some select error: if (errno != EINTR && errno == EAGAIN) //..fatal or retry return fprintf(stderr, "%s: select: %s\n", V0, errstr), 4; continue; } for (i = 0; nS > 0 && i < N; i++) { //For all fds.. if (fds[i] < 0 || !FD_ISSET(fds[i], &fdRd)) //with readable data continue; if ((nR = read(fds[i], buf[i]+len[i], nBf[i] - len[i])) < 0) { if (errno != EAGAIN && errno != EINTR) fprintf(stderr, "%s: read: %s\n", V0, errstr); continue; } else if (oFlags == (O_RDONLY | O_NONBLOCK) && nR == 0) { FD_CLR(fds[i], &fdRdMaster); nF--; free(buf[i]); } len[i] += nR; //Update Re: read data if ((eol = memrchr(buf[i], delim, len[i]))) { nW = eol - buf[i] + 1; //Only to last delim if (fwrite(buf[i], nW, 1, stdout) == 1) { memmove(buf[i], buf[i] + nW, len[i] - nW); len[i] -= nW; //Residual buffer shift } else return fprintf(stderr, "%s: %d bytes->stdout failed: %s\n", V0, len[i], errstr), 5; } else if (len[i] == nBf[i]) { //NoDelim&FullBuf=>GROW void *tmp; if (nBf[i] >= 1 << 30) return fprintf(stderr, "%s: record > 1 GiB\n", V0), 6; nBf[i] *= 2; if (!(tmp = realloc(buf[i], nBf[i]))) return fprintf(stderr,"%s: out of memory\n", V0), 7; buf[i] = tmp; } } fdRd = fdRdMaster; } for (i = 0; i < N; i++) //Ensure any residual data is.. if (len[i] > 0) { //..labeled,bracketed,=>stderr. fprintf(stderr, "%s: %s: final unterminated record: {", V0, V[i]); fwrite(buf[i], len[i], 1, stderr); fputs("}\n", stderr); } return 0; }

A instalação é cortada & colar & %código%. Testado no Linux & FreeBSD. Eu recebo cerca de 2500e6 bytes / seg de saída, mas a memória pode ser mais rápida do que a caixa de 500e6 bytes / seg.

O algoritmo é mais ou menos como sugerido, mas mais geral. O_NONBLOCK é necessário apenas com O_RDONLY e com algumas opções para facilidade de uso, como abrir FIFOs O_RDWR por padrão para que os escritores possam fechar e reabrir muitas vezes e usar o rastreamento PID para um protocolo livre de corrida. Você pode passar -oRO para usar EOF se quiser. cc -Owhatever tailpipes.c -o somewhere-in-$PATH/tailpipes também lida com linhas incompletas na finalização do programa, enviando-as rotuladas e entre colchetes para stderr caso haja um pós-processamento fácil que possa ser feito para tornar os registros completos ou se os logs deles forem úteis para depuração.

Exemplo de uso. O GNU tailpipes pode ser uma parte de um único consumidor, de vários produtores / fan-out de um pipeline paralelo de mapa reduzido com xargs operando como a parte fan-in de reconhecimento de limite de registro, tudo sem espaço em disco usado para arquivos temporários:

tail -q -n+1 -f --pid=stop-tail-when-this-is-gone fifo1 fifo2 fifo3

Acima, é importante que A) tailpipes passe de n para o limite superior apropriado, porque esse é o esquema 0 usa para xargs e B) MYSLOT direciona seus resultados para um novo arquivo MYPROGRAM -keyed atribuído como $MYSLOT , por exemplo $MYTEMP/$MYSLOT if exec > $MYTEMP/$MYSLOT é um script de shell. O invólucro do shell / programa poderia ser eliminado em muitos casos se MYPROGRAM tivesse uma xargs hipotética para configurar os stdouts de seus filhos.

    
por 31.07.2018 / 11:57
0

Uma resposta mais elegante que não armazena em buffer uma cópia inútil no disco:

#!/usr/bin/perl                                                                                                       

use threads;
use threads::shared;
use Thread::Queue;

my $done :shared;

my $DataQueue = Thread::Queue->new();

my @producers;
for (@ARGV) {
    push @producers, threads->create('producer', $_);
}

while($done <= $#ARGV) {
    # This blocks until $DataQueue->pending > 0                                                                       
    print $DataQueue->dequeue();
}

for (@producers) {
    $_->join();
}


sub producer {
    open(my $fh, "<", shift) || die;
    while(<$fh>) {
        $DataQueue->enqueue($_);
    }
    # Closing $fh blocks                                                                                              
    # close $fh;                                                                                                      
    $done++;
    # Guard against race condition                                                                                    
    $DataQueue->enqueue("");
}
    
por 31.08.2016 / 19:38