Não há uma maneira geral de realizar o que você quer.
O problema básico é que um pipe é uma coisa unidirecional, e o produtor não tem absolutamente nenhum conhecimento sobre o estado atual do consumidor, e se os dados enviados para o pipe já foram consumidos ou não.
Existem duas maneiras de contornar essa limitação, e ambas exigem conhecimento a priori sobre os dados e os consumidores:
-
você faz a produção (ou o transporte do produtor original para os tubos dos consumidores) tão lenta que os consumidores estão sempre em sincronia, ou seja, depois de cada linha ser enviada para o consumo você espera tanto que os consumidores 100% certamente já terminou o processamento no momento em que a próxima linha é enviada (algo semelhante ao que foi sugerido por TiberiusKirk),
-
você verifica o progresso do processamento nos consumidores para ver se eles já consumiram as linhas de entrada (isso precisa de feedback ou saída dos consumidores, o que pode ou não existir, e pode ou não ser viável) processado).
A primeira solução precisa de um limite inferior adequado para a estimativa de tempo do processamento dos dados de entrada, a segunda alternativa precisa de algum tipo de feedback dos consumidores.