Konvertieren org.apache.avro.generisches.GenericRecord zu org.apache.spark.sql.Zeile

Ich habe die Liste der org.apache.avro.generic.GenericRecord, avro schemamit diesem brauchen wir dataframe mit Hilfe von SQLContext – API zu erstellen dataframe es muss RDD von org.apache.spark.sql.Row und avro schema. Voraussetzung zum erstellen von DF ist, die wir haben sollten, RDD von org.apache.spark.sql.Zeile und es kann erreicht werden, verwenden Sie folgenden code, aber einige, wie es funktioniert nicht und gibt Fehler -, sample-code.

 1. Convert GenericRecord to Row
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    import org.apache.avro.Schema
    import org.apache.spark.sql.types.StructType
    def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
    {
      val fields = avroSchema.getFields
      var rows = new Seq[Row]
      for (avroRecord <- genericRecords) {
        var avroFieldsSeq = Seq[Any]();
        for (i <- 0 to fields.size - 1) {
          avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
        }
        val avroFieldArr = avroFieldsSeq.toArray
        val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
        rows = rows :+ genericRow
      }
      return rows;
    }

2. Convert `Avro schema` to `Structtype`
   Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType

3. Create `Dataframe` using `SQLContext`
   val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
   val rowRdd = sc.parallelize(rowSeq, 1)
   val finalDF =sqlContext.createDataFrame(rowRDD,structType)

Aber es wirft einen Fehler bei der Erstellung von DataFrame. Kann mir bitte jemand helfen, was ist falsch in obigen code. Abgesehen von diesem, wenn jemand hat eine unterschiedliche Logik für die Konvertierung und Erstellung von dataframe.

Sein, wenn ich berufen, jede Aktion auf Dataframe ist, wird ausgeführt, DAG und versuchen, zu erstellen DF-Objekt, aber in diesem es nicht mit unter die Ausnahme als

 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
 Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
                        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
                        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

Ich versuche zu geben Sie die richtige version von jar-in-jar-parameter von spark übermitteln und mit anderen parameter wie –conf Funken.- Treiber.userClassPathFirst=true
aber jetzt ist es nicht mit MapR als

ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
                    at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
                    at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
                    at java.lang.Class.forName0(Native Method)

Sind wir mit MapR distribution und after class-path ändern in spark-submit, es nicht mit der oben genannten Ausnahme.

Kann mir bitte jemand hier helfen oder meine grundlegende Notwendigkeit, es zu konvertieren Avro GenericRecord in Spark-Reihe, so kann ich erstellen Dataframe mit ihm, bitte um Hilfe

Danke.

  • Was ist der genaue Fehler? und bitte aktualisieren Sie die Frage mit genericRecords Probe, avroSchema.
  • Treiber stacktrace: org.apache.spark.SparkException: Job abgebrochen wegen Nierenversagen: Aufgabe 0 in der Stufe 0.0 scheiterte 4 mal, letzten Ausfall: Lost Aufgabe 0.3 in Phase 0.0 (TID 3, hdpoc-c01-r03-01, Testamentsvollstrecker 2): java.io.InvalidClassException: org.apache.Unterhaus.lang3.Zeit.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1 in java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) bei java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
  • der Fehler sieht aus wie version error zwischen der Quelle der streaming-Daten und die Umwandlung von code in Ihre lokale. Sie müssen die gleiche version des FastDateFormat-Paket, dass die Quelle verwendet wird. Und aktualisieren Sie Bitte die Fehler in der Frage, so dass andere können dir auch helfen.
  • Ich habe aktualisiert und alles in die Frage, gibt es irgendeinen anderen Weg, um zu konvertieren GenericRecord Spark-Reihe ?
InformationsquelleAutor Sagar balai | 2017-06-13



2 Replies
  1. 1

    Beim erstellen dataframe von RDD[GenericRecord] es gibt nur wenige Schritte

    1. Zuerst konvertieren müssen, org.apache.avro.generisches.GenericRecord in org.apache.spark.sql.Zeile

    Com.databricks.spark.avro.SchemaConverters.createConverterToSQL(
    sourceAvroSchema: Schema,targetSqlType: DataType)

    dies ist die private Methode in spark-avro version 3.2. Wenn wir mit gleichen oder weniger als 3.2 kopieren Sie diese Methode in Ihre eigene util-Klasse und verwenden Sie es sonst direkt verwenden.

    1. Erstellen Dataframe aus der Sammlung der Zeile (rowSeq).

    val rdd = ssc.sparkContext.parallelisieren(rowSeq,numParition) val
    dataframe = sparkSession.createDataFrame(rowRDD, schemaType -)

    Dies löst mein problem.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.