Einfach Apache Strahl Manipulationen arbeiten sehr langsam

Ich bin sehr neu in Apache Strahl und meine Java-Kenntnisse sind ziemlich gering, aber ich würde gerne verstehen, warum meine einfache Einträge Manipulationen arbeiten so langsam mit Apache Strahl.

Was ich versuche zu führen, ist die folgende: ich habe eine CSV-Datei mit 1 million Datensätzen (Alexa top 1 million-Websites) von dem folgenden Schema: NUMBER,DOMAIN (z.B. 1,google.com), ich will „strip“ – der ersten (Zahl) Feld und bekommen nur den domain-Teil. Mein code für diese Leitung ist die folgende:

package misc.examples;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class Example {

  static class ExtractDomainsFn extends DoFn<String, String> {
    private final Counter domains = Metrics.counter(ExtractDomainsFn.class, "domains");

    @ProcessElement
    public void processElement(ProcessContext c) {
      if (c.element().contains(",")) {
        domains.inc();

        String domain = c.element().split(",")[1];
        c.output(domain);
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create();

    p.apply("ReadLines", TextIO.read().from("./top-1m.csv"))
     .apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
     .apply("WriteDomains", TextIO.write().to("domains"));

    p.run().waitUntilFinish();
  }
}

Wenn ich diesen code ausführen, werden mit Maven, es dauert mehr als vier Minuten, bis es erfolgreich auf meinem laptop:

$ mvn compile exec:java -Dexec.mainClass=misc.examples.Example
[INFO] Scanning for projects...
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building my-example 1.0.0
[INFO] ------------------------------------------------------------------------
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ my-example ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory /…/src/main/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ my-example ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ my-example ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 04:36 min
[INFO] Finished at: 2017-06-24T15:20:33+03:00
[INFO] Final Memory: 31M/1685M
[INFO] ------------------------------------------------------------------------

Während die einfache cut(1) arbeiten, bevor Sie können sogar blinken:

$time cut -d, -f2 top-1m.csv > domains

real    0m0.171s
user    0m0.140s
sys     0m0.028s

So, ist so Apache Strahl Verhalten als akzeptabel angesehen (wahrscheinlich würde es die Arbeit vergleichsweise besser auf größere Mengen von Daten), oder ist mein code einfach nur ineffizient?

01-07-2014 Update:

Da Kenn Knowles vorgeschlagen, ich habe versucht, führen Sie die Rohrleitung auf der anderen Läufer als die DirectRunner — auf der DataflowRunner. So der aktualisierte code sieht wie folgt aus:

package misc.examples;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class Example {

  static class ExtractDomainsFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      if (c.element().contains(",")) {
        String domain = c.element().split(",")[1];
        c.output(domain);
      }
    }
  }

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    dataflowOptions.setRunner(DataflowRunner.class);
    dataflowOptions.setProject("my-gcp-project-id");
    Pipeline p = Pipeline.create(options);
    p.apply("ReadLines", TextIO.read().from("gs://my-gcs-bucket/top-1m.csv"))
     .apply("ExtractDomains", ParDo.of(new ExtractDomainsFn()))
     .apply("WriteDomains", TextIO.write().to("gs://my-gcs-bucket/output/"));

    p.run().waitUntilFinish();
  }
}

Verstrichene Zeit läuft auf Google Dataflow ist kleiner im Vergleich zu den Direkten runner, aber immer noch langsam genug — ein bisschen mehr, dass 3 Minuten:

Einfach Apache Strahl Manipulationen arbeiten sehr langsam

Einfach Apache Strahl Manipulationen arbeiten sehr langsam

  • Von dem, was ich verstanden, Apache Beam laden die ganze text-Datei im Speicher in eine PCollection und dann weiter gearbeitet wird. Auf der anderen Seite, Linux-streams sind sehr effizient und lädt nicht die gesamte Datei auf einmal, die in der Tat ist hier nicht notwendig
  • Und warum denkst du, dass es funktioniert vergleichsweise besser mit größeren Datenmengen? Sicher nicht, solange Sie nur diese einfache Aufgabe und nutzen Sie den direkten Läufer. Die Idee des Apache Strahl erzeugen eine generische pipeline, die ausgeführt werden könnte, die auf unterschiedlichen Rahmenbedingungen. Und natürlich die direkte runner ist einfach das Java in-memory-Art und Weise – mit einem runner für die Funke oder Flink ist zwar effizienter als diejenigen sind frameworks für BigData-Verarbeitung. Cheers.
  • Aktualisiert meine Antwort entsprechend aktualisiert Ihre Frage.
  • Was in der SDK-version verwenden Sie? Wir haben vereinzelt eine bedeutende Verlangsamung in der direkten Läufer am KOPF, aber die version 2.1.0 ist viel schneller. Wenn Sie erleben Verlangsamung mit 2.1.0 wir würden gerne wissen, dass.
  • Nevermind, ich sehe 2.0.0 in deinem screenshot. In diesem Fall, wir eigentlich nicht haben einen Griff auf die Abschwächung. Wenn Sie kümmern würde, hinzufügen von details auf der verlinkten bug, wir wollen auf jeden Fall erhalten Sie unten auf dieser.
InformationsquelleAutor Petr Shevtsov | 2017-06-24



One Reply
  1. 3

    Apache-Beam sorgt für das korrekte Ereignis Zeit für die Verarbeitung und Portabilität über massive-scale data processing engines wie Apache Flink, Apache Spark, Apache Apex und Google Cloud Dataflow.

    Hier, es scheint, als würde Sie Ihre pipeline in der Standard – DirectRunner das ist ein Weg, um zu testen, die Richtigkeit der pipeline im kleinen Maßstab (wobei „klein“ bedeutet alles, was nicht mit mehreren Rechnern). Zum testen der Korrektheit, der Läufer führt auch zusätzliche Aufgaben, um sicherzustellen, Korrektheit, wie Sie die überprüfung Ihrer Serialisierung (Coder) und setzen die Elemente in zufälliger Reihenfolge, um sicherzustellen, dass Sie Ihre pipeline nicht um-abhängig.

    Den DirectRunner hat nicht unbedingt mitbringen, alle Werte gleichzeitig in den Speicher, hat aber einen streaming-Modell der Ausführung, so funktioniert es auch mit eingegrenzten Datensätze und-Triggerung. Auch dies erhöht den Verarbeitungsaufwand im Vergleich zu einer einfachen Schleife.

    Sagte, vier Minuten ist ziemlich langsam und ich reichte STRAHL-2516 zu verfolgen.

    Können Sie auch versuchen, läuft es auf anderen backends, und insbesondere die SparkRunner, FlinkRunner, und ApexRunner Unterstützung embedded-Ausführung auf Ihrem laptop.

    Antwort auf 2017-07-01 Update:

    Obwohl die Gesamtspielzeit erleben Sie auf Cloud Dataflow ist ~3 Minuten, die tatsächliche Zeit, die ergriffen werden, um die Daten zu verarbeiten ist, ~1 minute. Sie können dies in den Protokollen. Der rest ist Spinnerei und Herunterfahren Arbeiter VMs. Wir arbeiten ständig daran, diesen Aufwand zu reduzieren. Warum dauert es ~1 minute? Sie müssten, um zu Profil, um herauszufinden, (und ich hätte gerne gehört, sind die Ergebnisse!) aber sicherlich Datenfluss ist dabei viel mehr als cut: Lesen und schreiben von GCS, der die Haltbarkeit und Fehlertoleranz, und in der TextIO schreiben Schritt ist die Durchführung einer vernetzten shuffle mit Ihren Daten in Reihenfolge zu schreiben, in sharded-Dateien parallel. Natürlich gibt es Dinge, die optimiert werden konnten Weg, wenn Dataflow bemerkt, dass Ihre Berechnung hat keine Parallelität und ist klein genug, dass es nicht brauchen.

    Aber denken Sie daran, dass Strahl-und Cloud-Dataflow existieren, um Ihnen helfen, verwenden Sie parallel Verarbeitung auf Mengen von Daten, die nicht verarbeitet werden kann, in einer fristgerechten Weise auf einer einzigen Maschine. Also Verarbeitung kleine Beispiele mit keine Parallelität vorhanden ist, nicht ein Ziel.

    Minor sequentiellen Berechnungen, die oft auftreten, als kleine Teile eines großen pipeline, aber im Rahmen einer realistischen physischen plan die kleine Hilfs-Berechnung wird oft keine Auswirkungen auf die end-to-end-Zeit. Die Kosten des VM-management auch, sind einmalige Kosten, so werden Sie eher gemessen werden, die gegen viele Minuten bis Stunden nach der Berechnung auf Dutzende bis Hunderte von Maschinen.

    • Danke, @KennKnowles! Ich habe soeben meine Fragen mit details, die beim ausführen der Aufgabe auf Google Dataflow. Es ist ein bisschen schneller, aber immer noch über 3 Minuten.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.