Então,
tail -q -n+1 -f --pid=stop-tail-when-this-is-gone fifo1 fifo2 fifo3
> quase funciona (como aludido nos comentários iniciais sobre essa versão anterior de minha resposta, embora você possa precisar de um "para f em fifo *; cat < / dev / null > $ f & feito "de antemão para garantir que todos os FIFOs estejam abertos para gravação porque a cauda do coreutils os abre O_RDONLY sem O_NONBLOCK).
Infelizmente, há um erro em que tail
é cuidadoso sobre terminações de linha / registro apenas com entradas de pipes em stdin mas não com entrada de pipes nomeados / FIFOs em argumentos. Algum dia alguém pode consertar a cauda do coreutils.
Nesse ínterim, para obter uma fila de multi-consumidor / produtor único honrando as terminações de linha, você pode usar um programa C simples de 100 linhas que chamo de tailpipes.c
:
#include <stdio.h>
#include <stdlib.h>
#include <string.h> //TODO: Find&document build environments lacking memrchr
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <errno.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#define errstr strerror(errno)
char const * const Use = "%s: %s\n\nUsage:\n\n"
" %s [-p PID] [-o OPEN_MODE(RW)] [-d DLM(\n)] [-s SEC(.01)] PATH1 PATH2..\n\n"
"Read delimited records (lines by default) from all input paths, writing only\n"
"complete records to stdout and changing to a stop-at-EOF mode upon receiving\n"
"SIGHUP (unlike \"tail -fqn+1\" which just dies) OR when we first notice that\n"
"PID does not exist (if PID is given). Since by default fifos are opened RW,\n"
"signal/PID termination is needed to not loop forever, but said FIFOs may be\n"
"closed & reopened by other processes as often as is convenient. For one-shot\n"
"writing style, ending input reads at the first EOF, use \"-oRO\". Also, DLM\n"
"adjusts the record delimiter byte from the default newline, and SEC adjusts\n"
"max select sleep time. Any improperly terminated final records are sent to\n"
"stderr at the end of execution (with a label and bracketing).\n";
int writer_done;
void sig(int signum) { writer_done = 1; }
int main(int N, char *V[]) {
signed char ch;
char *buf[N-1], delim = '\n', *V0 = V[0], *eol;
int len[N-1], fds[N-1], nBf[N-1], i, fdMx = 0, nS = 0, nF = 0,
oFlags = O_RDWR;
pid_t pid = 0;
ssize_t nR, nW;
struct timespec tmOut = { 0, 10000000 }; //10 ms select time out
fd_set fdRdMaster, fdRd;
//If we get signaled before here, this program dies and data may be lost.
//If possible use -p PID option w/pre-extant PID of appropriate lifetime.
signal(SIGHUP, sig); //Install sig() for SIGHUP
memset((void *)fds, 0, sizeof fds);
memset((void *)len, 0, sizeof len);
FD_ZERO(&fdRdMaster);
fdRd = fdRdMaster;
while ((ch = getopt(N, V, "d:p:s:o:")) != -1)
switch (ch) { //For export MYTEMP=$(mktemp -d /tmp/MYPROG.XXXXX)
FIFOs='n=0; while [ $n -lt 8 ]; do echo $MYTEMP/$n; n=$((n+1)); done'
mkfifo $FIFOs
sleep 2147483647 & p=$! #Cannot know xargs pid is good for long
( find . -print0 | xargs -0 -P8 --process-slot-var=MYSLOT MYPROGRAM
kill $p ) & #Inform tailpipes writers are done
tailpipes -p$p $FIFOs | CONSUMING-PIPELINE
rm -rf $MYTEMP
wait #Wait for xargs subshell to finish
do '' as a sep CLI arg
double tO;
case 'd': delim = optarg ? *optarg : '\n'; break;
case 'p': pid = optarg ? atoi(optarg) : 0; break;
case 's': tO = optarg ? atof(optarg) : .01;
tmOut.tv_sec = (long)tO;
tmOut.tv_nsec = 1e9 * (tO - tmOut.tv_sec);
break;
case 'o': oFlags = (optarg && strcasecmp(optarg, "ro") == 0) ?
O_RDONLY | O_NONBLOCK : O_RDWR;
break;
default: return fprintf(stderr, Use, V0, "bad option", V0), 1;
}
V += optind; N -= optind; //Shift off option args
if (N < 1)
return fprintf(stderr, Use, V0, "too few arguments", V0), 2;
setvbuf(stdout, NULL, _IONBF, 65536); //Full pipe on Linux
for (i = 0; i < N; i++) //Check for any available V[]
if ((fds[i] = open(V[i], oFlags)) != -1) {
struct stat st;
fstat(fds[i], &st);
if (!S_ISFIFO(st.st_mode))
return fprintf(stderr,"%s: %s not a named pipe\n", V0, V[i]), 3;
nF++;
FD_SET(fds[i], &fdRdMaster); //Add fd to master copy for pselect
buf[i] = malloc(nBf[i] = 4096);
if (fds[i] > fdMx)
fdMx = fds[i];
} else if (errno == EINTR) { //We may get signaled to finish up..
i--; continue; //..before we even this far.
} else
return fprintf(stderr, "%s: open(%s): %s\n", V0, V[i], errstr), 3;
fdMx++;
fdRd = fdRdMaster;
while (nF && (nS = pselect(fdMx, &fdRd, NULL, NULL, &tmOut, NULL)) != -99) {
if (pid && kill(pid, 0) != 0 && errno != EPERM) //Given pid didn't exist
writer_done = 1;
if (nS == 0 && writer_done) //No input & no writers
break;
else if (nS == -1) { //Some select error:
if (errno != EINTR && errno == EAGAIN) //..fatal or retry
return fprintf(stderr, "%s: select: %s\n", V0, errstr), 4;
continue;
}
for (i = 0; nS > 0 && i < N; i++) { //For all fds..
if (fds[i] < 0 || !FD_ISSET(fds[i], &fdRd)) //with readable data
continue;
if ((nR = read(fds[i], buf[i]+len[i], nBf[i] - len[i])) < 0) {
if (errno != EAGAIN && errno != EINTR)
fprintf(stderr, "%s: read: %s\n", V0, errstr);
continue;
} else if (oFlags == (O_RDONLY | O_NONBLOCK) && nR == 0) {
FD_CLR(fds[i], &fdRdMaster);
nF--;
free(buf[i]);
}
len[i] += nR; //Update Re: read data
if ((eol = memrchr(buf[i], delim, len[i]))) {
nW = eol - buf[i] + 1; //Only to last delim
if (fwrite(buf[i], nW, 1, stdout) == 1) {
memmove(buf[i], buf[i] + nW, len[i] - nW);
len[i] -= nW; //Residual buffer shift
} else
return fprintf(stderr, "%s: %d bytes->stdout failed: %s\n",
V0, len[i], errstr), 5;
} else if (len[i] == nBf[i]) { //NoDelim&FullBuf=>GROW
void *tmp;
if (nBf[i] >= 1 << 30)
return fprintf(stderr, "%s: record > 1 GiB\n", V0), 6;
nBf[i] *= 2;
if (!(tmp = realloc(buf[i], nBf[i])))
return fprintf(stderr,"%s: out of memory\n", V0), 7;
buf[i] = tmp;
}
}
fdRd = fdRdMaster;
}
for (i = 0; i < N; i++) //Ensure any residual data is..
if (len[i] > 0) { //..labeled,bracketed,=>stderr.
fprintf(stderr, "%s: %s: final unterminated record: {", V0, V[i]);
fwrite(buf[i], len[i], 1, stderr);
fputs("}\n", stderr);
}
return 0;
}
A instalação é cortada & colar & %código%. Testado no Linux & FreeBSD. Eu recebo cerca de 2500e6 bytes / seg de saída, mas a memória pode ser mais rápida do que a caixa de 500e6 bytes / seg.
O algoritmo é mais ou menos como sugerido, mas mais geral. O_NONBLOCK é necessário apenas com O_RDONLY e com algumas opções para facilidade de uso, como abrir FIFOs O_RDWR por padrão para que os escritores possam fechar e reabrir muitas vezes e usar o rastreamento PID para um protocolo livre de corrida. Você pode passar -oRO para usar EOF se quiser. cc -Owhatever tailpipes.c -o somewhere-in-$PATH/tailpipes
também lida com linhas incompletas na finalização do programa, enviando-as rotuladas e entre colchetes para stderr caso haja um pós-processamento fácil que possa ser feito para tornar os registros completos ou se os logs deles forem úteis para depuração.
Exemplo de uso. O GNU tailpipes
pode ser uma parte de um único consumidor, de vários produtores / fan-out de um pipeline paralelo de mapa reduzido com xargs
operando como a parte fan-in de reconhecimento de limite de registro, tudo sem espaço em disco usado para arquivos temporários:
tail -q -n+1 -f --pid=stop-tail-when-this-is-gone fifo1 fifo2 fifo3
Acima, é importante que A) tailpipes
passe de n
para o limite superior apropriado, porque esse é o esquema 0
usa para xargs
e B) MYSLOT
direciona seus resultados para um novo arquivo MYPROGRAM
-keyed atribuído como $MYSLOT
, por exemplo $MYTEMP/$MYSLOT
if exec > $MYTEMP/$MYSLOT
é um script de shell. O invólucro do shell / programa poderia ser eliminado em muitos casos se MYPROGRAM
tivesse uma xargs
hipotética para configurar os stdouts de seus filhos.