Operação APPEND falhou com o HTTP500?

1
package org.apache.spark.examples.kafkaToflink;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import com.microsoft.azure.datalake.store.ADLException;
import com.microsoft.azure.datalake.store.ADLFileOutputStream;
import com.microsoft.azure.datalake.store.ADLStoreClient;
import com.microsoft.azure.datalake.store.IfExists;
import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;

import scala.util.parsing.combinator.testing.Str;

public class App {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.1.72:9092");
        properties.setProperty("group.id", "test");
        DataStream<String> stream = env.addSource(
                new FlinkKafkaConsumer010<String>("tenant", new SimpleStringSchema(), properties), "Kafka_Source");

        stream.addSink(new ADLSink()).name("Custom_Sink").setParallelism(128);
        env.execute("App");
    }
}

class ADLSink<String> extends RichSinkFunction<String> {

    private java.lang.String clientId = "***********";
    private java.lang.String authTokenEndpoint = "***************";
    private java.lang.String clientKey = "*****************";
    private java.lang.String accountFQDN = "****************";
    private java.lang.String filename = "/Bitfinex/ETHBTC/ORDERBOOK/ORDERBOOK.json";

    @Override
    public void invoke(String value) {

        AccessTokenProvider provider = new ClientCredsTokenProvider(authTokenEndpoint, clientId, clientKey);
        ADLStoreClient client = ADLStoreClient.createClient(accountFQDN, provider);
        try {
            client.setPermission(filename, "744");
            ADLFileOutputStream stream = client.getAppendStream(filename);

            System.out.println(value);
            stream.write(value.toString().getBytes());

            stream.close();

        } catch (ADLException e) {

            System.out.println(e.requestId);
        } catch (Exception e) {

            System.out.println(e.getMessage());
            System.out.println(e.getCause());
        }

    }

}

Eu estou continuamente tentando anexar um arquivo que está no Azure data lake Store usando while loop .Mas às vezes isso dá isso, Operação APPEND falhou com HTTP500, erro ao iniciar ou às vezes após 10 min. Estou usando java

    
por Anubhav 20.04.2017 / 08:46

0 respostas

Tags