Empfangen ein Fluss in eine Spring-Boot-Client

Dies ist ein follow-up-Frage zu Feder 5 Web-Reaktiven – Wie können wir WebClient abrufen-Stream von Daten in einem Fluss?

Ich habe versucht, Folgen die Empfehlung, wie man erhalten ein Fluss mit dem Spring-WebClient aber tatsächlich eine netty Problem.

Auf der server-Seite, der code ist ein einfacher controller Freilegung der findAll-Methode, der eine Mongo-repository:

@RequestMapping("power")
public Flux<Power> getAll() {
    return powerRepository.findAll();
}

Auf dem client den Verbrauch code ist wie in der Antwort oben:

@Bean
public CommandLineRunner readFromApiServer() {
    return new CommandLineRunner() {
        @Override
        public void run(String... args) throws Exception {
            WebClient webClient = WebClient.create("http://localhost:8080");
            Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
                    .flatMap(response -> response.bodyToFlux(Power.class));
            flux.subscribe();
        }
    };
}

Aber dies wirft eine Ausnahme:

2017-02-27 08:19:41.281 FEHLER 99026 — [ctor-http-nio-5]
r.ipc.netty.Kanal.ChannelOperations : [HttpClient] Fehler
Verarbeitung-Verbindung. Anfordern schließen Sie den Kanal

io.netty.util.IllegalReferenceCountException: refCnt: 0, Dekrement: 1
bei
io.netty.Puffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:101)
~[netty-alle-4.1.8.Endgültig.jar:4.1.8.Final]

Ich bin mit der aktuellen Frühjahrs-Boot-2.0.0 BUILD-SNAPSHOT.

Was ist diese Ausnahme die mir zu sagen? Wie kann ich es richtig machen?

InformationsquelleAutor Gregor | 2017-02-27



One Reply
  1. 1

    Alle CommandLineRunner Bohnen ausgeführt werden, wie die Spring-Boot-Anwendung startet. Wenn es kein daemon-thread (also die aktuelle Anwendung ist nicht eine web-app), die Anwendung heruntergefahren wird, sobald alle Läufer ausgeführt wurden.

    In Ihrem Fall, mit flux.subscribe() nur „Starten Sie die Kette und Wunsch unbegrenzte Nachfrage“ (javadoc), so dass diese Methode-Aufruf ist nicht blockierend. Ich vermute, das command line runner zurück, bevor Sie die chance bekommen, etwas zu tun mit Ihrem Fluss sind, wird die Anwendung beendet und das Netzwerk Ressourcen geschlossen werden.

    Auch, Sie sind nicht etwas zu tun mit dem Ergebnis Ihrer HTTP-Anforderung.
    Ich denke, die Aktualisierung Ihrer code-Beispiel mit den folgenden sollte das Problem beheben:

    @Bean
    public CommandLineRunner readFromApiServer() {
        return new CommandLineRunner() {
            @Override
            public void run(String... args) throws Exception {
                WebClient webClient = WebClient.create("http://localhost:8080");
                Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
                        .flatMap(response -> response.bodyToFlux(Power.class));
                List<Power> powerList = flux.collectList().block();
                //do something with the result?
            }
        };
    }
    • Eigentlich habe ich eine endlos-Schleife im main-Programm, also es ist ein no-daemon-thread, die verhindert, dass die app Herunterfahren. Aber es sieht aus wie die Verbraucher läuft in einem eigenen thread und kehrt sofort, wenn nicht blockiert. Stattdessen sammeln die Ergebnisse in einer Liste, die ich verbrauchen die „Mächte“, wie Sie ankommen: flux.doOnNext(macht -> System.aus.println(power.getValue())).blockLast(); Das funktioniert Prima. Vielen Dank für die Unterstützung!

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.