Sortieren nach Schlüssel in apache spark JavaPairRDD

Habe ich JavaPairRDD mit Schlüssel Tuple2<Integer, Integer>

Wollte ich die Sortierung der JavaPairRDD durch meine Schlüssel, also schrieb ich einen Komparator wie diese:

JavaPairRDD<Tuple2<Integer, Integer>, Integer> Rresult=result.sortByKey(new Comparator<Tuple2<Integer, Integer>>() {
     @Override
     public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
         if(o1._1()==o2._1())
             return o1._2()-o2._2();
         return o1._1()-o2._1();
       }
},true);

Dieser sortiert die Werte vom ersten Eintrag im Tupel, wenn Sie die gleiche Art von zweiter Eintrag.

Aber ich bin immer folgende Fehler-stack :

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

.. scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1083)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
    at 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
    at java.io.ObjectStrea
InformationsquelleAutor ashwinbhy | 2017-08-06



One Reply
  1. 2

    Wie erstellen Sie JavaPairRDD? Bitte überprüfen Sie es vor der Anwendung der Sortierung. Yow wird auch erhalten, die Aufgabe nicht serialisierbar Ausnahme für die Verwendung der neuen Komparator direkt in sortByKey Methode. Sie implementieren sollten Comparator und Serializable in eine separate Klasse, und übergeben es an sortByKey Methode. Hier ist das Beispiel für Ihre Referenz.

    public class SparkSortSample {
    public static void main(String[] args) {
        //SparkSession
        SparkSession spark = SparkSession
                .builder()
                .appName("SparkSortSample")
                .master("local[1]")
                .getOrCreate();
        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
        //Sample data
        List<Tuple2<Tuple2<Integer, Integer>, Integer>> inputList = new ArrayList<Tuple2<Tuple2<Integer, Integer>, Integer>>();
        inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 444), 4444));
        inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(3, 333), 3333));
        inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(1, 111), 1111));
        inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 222), 2222));
        //JavaPairRDD
        JavaPairRDD<Tuple2<Integer, Integer>, Integer> javaPairRdd = jsc.parallelizePairs(inputList);
        //Sorted RDD
        JavaPairRDD<Tuple2<Integer, Integer>, Integer> sortedPairRDD = javaPairRdd.sortByKey(new TupleComparator(), true);
        sortedPairRDD.foreach(rdd -> {
            System.out.println("sort = " + rdd);
        });
        //stop
        jsc.stop();
        jsc.close();
       }
    }

    Und hier ist TupleComparator Klasse implementiert die Komparator-und Serializable-interfaces.

    class TupleComparator implements Comparator<Tuple2<Integer, Integer>>, Serializable {
    @Override
    public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
        if (o1._1() == o2._1())
            return o1._2() - o2._2();
        return o1._1() - o2._1();
      }
    }

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.