Como eu regulo com que frequência as linhas de saída são passadas?

4

Desejo

Eu quero executar um comando repetidamente em resposta a linhas canalizadas para ele:

firehose | expensive-command

No entanto, estou recebendo muitas linhas e o comando é intensivo em recursos. Eu quero que a entrada para o comando seja filtrada, de tal forma que ele só seja executado no máximo uma vez a cada x segundos:

firehose | interval 1 second | expensive-command

O comando interval não deve ser apenas um filtro. Ele deve enviar a linha recebida mais recentemente no final do período de resfriamento, se tiver uma, em vez de bloquear apenas a chegada de tudo durante o cooldown.

Como posso fazer isso?

Tentativa

epoch () { date +%s --date="$*" }

interval () {
    INTERVAL="$*"
    LAST_RUN_AT=0
    WHEN_TO_RUN=0
    while read LINE; do
        if (( $(epoch now) >= $WHEN_TO_RUN )) then
            echo $LINE
            WHEN_TO_RUN="$(epoch now + $INTERVAL)"
        fi
    done
}

alias firehose='(print "1\n2\n3" ; sleep 2 ; print "4\n")'
alias expensive-command='cat'

firehose | interval 1 second | expensive-command

Isso funciona principalmente, mas tem o problema de não atrasar a passagem de linhas até mais tarde - só pode decidir transmiti-las imediatamente ou descartá-las.

O que acontece:

1
4

O acelerador recebe o 1 , passa-o e depois cooldown. Os 1 e 3 chegam durante o cooldown, então são descartados completamente. O cooldown termina antes que 4 chegue, então é passado.

Oqueeugostariaqueacontecesse:

134

Depoisderecebero1,oaceleradordevecontinuarcooldownpor1segundo.Emseguida,eledeverecebero2earquivá-loparamaistarde,porqueeleaindaestáemcooldown.Emseguida,elerecebeo3,quesubstituio2arquivadoparamaistarde.Oaceleradorentãosaidocooldown,pontonoqualdeveenviarimediatamente3.Finalmente,o4chegaquandoarodadaestáforadocooldown,entãoéenviadoimediatamente.

Seozshtivesse fechamentos , eu lançaria um subshell que dorme por $INTERVAL , depois echo es o último LINE recebido, mas, infelizmente, o zsh não tem encerramentos.

    
por Anko 13.07.2017 / 21:56

4 respostas

1

Eu fiz isso!

Aqui está o meu script interval (também no github ):

#!/usr/bin/env zsh
# Lets a line pass only once every $1 seconds.  If multiple lines arrive during
# the cooldown interval, only the latest is passed on when the cooldown ends.

INTERVAL="$1"

CHILD_PID=
BUFFER=$(mktemp)
CAN_PRINT_IMMEDIATELY=1
CAN_START_SUBPROCESS=1

# Reset state when child process returns
child-return () {
    CAN_START_SUBPROCESS=1
    CAN_PRINT_IMMEDIATELY=1
}
trap child-return CHLD

# Clean up when quitting
cleanup () {
    kill -TERM "$CHILD_PID" &> /dev/null
    rm "$BUFFER"
    exit
}
trap cleanup TERM INT QUIT

while read LINE; do
    # If we're just starting, just print immediately
    if [[ -n $CAN_PRINT_IMMEDIATELY ]]; then
        echo $LINE
        CAN_PRINT_IMMEDIATELY=
    else
        # Otherwise, store the line for later
        echo "$LINE" > $BUFFER
        # And spawn a subprocess to handle it one interval later, unless one is
        # already running.  With the SIGCHLD trap, the state variables will
        # reset when it exits.
        if [[ -n $CAN_START_SUBPROCESS ]]; then
            CAN_START_SUBPROCESS=
            (
                sleep $INTERVAL
                tail -n1 $BUFFER
            ) &
            CHILD_PID=$!
        fi
    fi
done

# Once we exhaust stdin, wait for the last child process to finish, if any.
if [[ -n $CHILD_PID ]]; then
    wait $CHILD_PID &> /dev/null
    cleanup
fi

Eu observei que o loop read ing lines nem sempre pode ser encarregado de imprimi-las, porque o programa às vezes precisa imprimir as linhas de forma assíncrona (quando nenhuma está sendo recebida, às vezes até mesmo após stdin ter terminado). Daí o processo da criança.

Aqui está funcionando, com a entrada também tee >(sed) 'd apart para observar o tempo:

Issocorrespondeaomeudiagramaanterior:

    
por 15.07.2017 / 16:35
1

O problema é que você precisa de uma leitura com um tempo limite . Se firehose não estiver enviando nada, seu loop é bloqueado indefinidamente e, quando estiver fazendo isso, não enviará a linha recebida mais recentemente. O Bash tem o argumento -t para uma leitura com tempo limite. Se o read do zsh tiver isso, isso seria o que seria usado.

O algoritmo é manter as linhas de leitura com um tempo limite que é sempre recalculado (encurtado cada vez mais) para expirar no final do intervalo de um segundo (ou qualquer outro). Quando esse intervalo chegar, então, se uma ou mais linhas tiverem sido lidas, envie a última. Caso contrário, não envie nada e agora comece a ler as linhas para o próximo intervalo.

Você pode implementar um "passo instantâneo" para a primeira linha recebida ou para a primeira linha recebida após um período de tempo maior que o intervalo. Como se o intervalo fosse 1 segundo, e nada viesse de firehose para 1.5 desde a última vez que uma linha foi enviada, então essa linha pode ser passada, e a máquina pode redefinir para iniciar um novo intervalo de um segundo naquele ponto .

Esta implementação de prova de conceito no TXR Lisp funciona para mim, validando o algoritmo básico:

(defvarl %interval% 1000000) ;; us

(defun epoch-usec ()
  (tree-bind (sec . usec) (time-usec)
    (+ (* 1000000 sec) usec)))

(let ((now (epoch-usec))
      (*stdin* (open-fileno (fileno *stdin*) "rl")) ;; line buffered
      remaining-time next-time line done)
  (while (not done)
    (set next-time (+ now %interval%))
    (set remaining-time (- next-time now))
    (while (poll (list (cons *stdin* poll-in))
                 (trunc remaining-time 1000))
      ;; got a line or EOF poll: no timeout
      (iflet ((nline (get-line)))
        (set line nline)              ;; got line
        (progn (flip done) (return))) ;; EOF poll
      (set now (epoch-usec))
      (when (minusp (set remaining-time (- next-time now)))
        (return)))
    ;; timeout, past deadline or exit: flush line, if any:
    (when line
      (put-line line)
      (set line nil))))

Um fluxo sem buffer está configurado, porque poll está sendo usado para as leituras com tempo limite e poll não vê buffers de fluxo. A ideia é que não queremos pesquisar por entrada, enquanto houver dados em buffer não lidos no fluxo. Isso é um nitpick. No teste, eu realmente não vi nenhuma diferença qualitativa no comportamento entre isso e apenas usando o fluxo original *stdin* em buffer. Se perdermos tempo pesquisando quando há dados em buffer no fluxo e nenhum no descritor de arquivo, temos a garantia de não esperar mais do que o nosso tempo de intervalo e menos do que se novos dados chegarem mais cedo.

Estamos assumindo que um poll bem-sucedido significa que podemos ler uma linha completa. poll não garante isso, é claro, mas fontes de entrada de fluxo de texto bem comportadas devem fornecer a garantia de que se um byte de entrada estiver disponível para ativar poll , haverá uma linha completa seguindo esse byte sem qualquer atraso indevido.

Os cálculos de tempo restantes usam o tempo de calendário, enquanto poll usa apenas uma espera relativa que provavelmente é insensível a ajustes de horário. Então as advertências usuais se aplicam. Se o relógio repentinamente pular para trás, opa!

Esses casos de teste prosseguem sem nenhum atraso perceptível:

$ echo foo | txr throttle.txr
foo
$ (echo foo; echo bar) | txr throttle.tl 
bar
$ (echo foo; echo bar; echo xyzzy) | txr throttle.tl 
xyzzy

Então:

$ (echo foo; sleep 2; echo bar; sleep 2; echo xyzzy) | txr throttle.tl 
foo
bar
xyzzy

Eu testei com find / | txr throttle.tl e tal.

    
por 14.07.2017 / 02:43
1

Primeira variante (não funciona, consulte a segunda variante)

Parece que não podemos usar o comando read para esse tipo de tarefa porque read interrompe a execução do loop while .

Veja este exemplo: (printf "1\n2\n3\n" ; sleep 5; printf "4\n") | while read -r line; do echo hello; done .

while loop com read inside será executado da seguinte forma:

  • 1 iteração - leia 1 ;
  • 2 iteração - leia 2 ;
  • 3 iteração - leia 3 ;
  • 4 iteração - ESPERA 5 seg e leia 4 .

Não podemos fazer o trabalho programado dentro desse loop, como "faça isso a cada 1 segundo", porque ele será interrompido periodicamente, aguardando entrada. Ele pode estar esperando por um minuto ou mais, por exemplo, e nosso trabalho programado também será interrompido.

function interval () {
    amount_of_seconds=$1
    print_time=0
    buffer=''
    while read -r line; do
        current_time=$(date +%s)

        if (( current_time > print_time )); then
            echo -e "${buffer}${line}"
            buffer=''
            print_time=$((current_time + amount_of_seconds))
        else
            buffer="$line\n"
        fi
    done
    echo -en "$buffer"
}

Teste:

$ alias firehose='(printf "1\n2\n3\n" ; sleep 2 ; printf "4\n"; sleep 2 ; printf "5\n6\n7\n" ; sleep 2; printf "8\n")'
$ firehose | interval 1 | cat
1
3
4
5
7
8
$ 

Segunda variante

Redirecione a saída firehose para o arquivo: firehose >> buffer_file.txt (Explicação porque >> e não > veja abaixo)

expensive-command será lido a última linha deste arquivo a cada segundo e liberará o arquivo:

while true; do
    tail -n 1 buffer_file.txt | expensive-command
    # clear file
    echo -n '' > buffer_file.txt
    # and sleep 1 second
    sleep 1      
done

No resultado, teremos o seguinte:

  1. comando simultâneo ( firehose no plano de fundo):

    firehose >> buffer_file.txt & ./script_with_expensive_command_inside.sh

    Operador APPEND - >> é necessário depois de firehose , não WRITE > . Caso contrário, o arquivo não será limpo e crescerá continuamente. Aqui explique este comportamento.
  2. Todas as linhas indesejadas serão descartadas; somente a última será passada para o expensive command
  3. A última linha será salva antes que expensive command não a leia e limpe o arquivo.
por 14.07.2017 / 00:22
0

Isso deve fazer o que você quer de uma maneira muito simples:)

firehose | awk '{print $1; system("sleep 1")}' | expensive-command

Tem a desvantagem de que a coisa toda se torna um pouco difícil de matar ( killall awk funciona, mas é moderadamente elegante), mas pelo menos é simples e não requer nenhum script especial nem nada.

    
por 14.07.2017 / 02:09