Use org.json.XML library to convert XML data to JSON.
Check below code.
Creating UDF
scala> import org.json.XML
import org.json.XML
scala> val parse = udf((value: String) => XML.toJSONObject(value).toString) // Defined UDF to parse xml to json
parse: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
Defining schema based on XML data.
scala> val schema_json = """{"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}""" // Define Schema of your xml data in json.
schema_json: String = {"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}
scala> val schema = DataType.fromJson(schema_json).asInstanceOf[StructType] // Convert Json schema data to schema.
schema: org.apache.spark.sql.types.StructType = StructType(StructField(employees,StructType(StructField(employee,StructType(StructField(building,LongType,true), StructField(division,StringType,true), StructField(firstname,StringType,true), StructField(id,StringType,true), StructField(lastname,StringType,true), StructField(room,LongType,true), StructField(supervisor,StringType,true), StructField(title,StringType,true)),true)),true))
Final Schema
scala>
inputStream
.selectExpr("CAST(value AS STRING)")
.select(from_json(parse($"data"),schema).as("emp_data"))
.select($"emp_data.employees.employee.*")
.printSchema
root
|-- building: long (nullable = true)
|-- division: string (nullable = true)
|-- firstname: string (nullable = true)
|-- id: string (nullable = true)
|-- lastname: string (nullable = true)
|-- room: long (nullable = true)
|-- supervisor: string (nullable = true)
|-- title: string (nullable = true)
Writing converted to JSON data to console.
scala>
inputStream
.selectExpr("CAST(value AS STRING)")
.select(from_json(parse($"data"),schema).as("emp_data"))
.select($"emp_data.employees.employee.*")
.writeStream
.format("console")
.option("truncate", false)
.outputMode("append")
.start()
.awaitTermination()
Answer from s.polam on Stack OverflowConvert xml to json using scala? - Scala Platform - Scala Contributors
python - PySpark XML to JSON w/ Time Series Data - Stack Overflow
Parse large XML file to JSON files?
JSON vs XML
Videos
Well based on the information this could be a solution. Unfortunately my Python is a bit rusty, but there should be equivalents for all the scala functions here
// Assume nth is based of dTim ordering
val windowSpec = Window
.partitionBy($"_id")
.orderBy($"dTim".desc)
val nthRow = 2 // define the nthItem to be fetched
df.select(
"TimeData.data".getItem(0).getItem(0).cast(TimestampType).alias("dTim"),
$"TimeData.data".getItem(0).getItem(1).cast(DoubleType).alias("A"),
$"TimeData.data".getItem(0).getItem(2).cast(DoubleType).alias("B"),
$"TimeData.data".getItem(0).getItem(3).cast(DoubleType).alias("C")
).withColumn("n", row_number().over(windowSpec))
.filter(col("n") === nthRow)
.drop("n")
.show()
Will output something like
+-------+--------------------+------+------+-----+
| _id| dTim| A| B| C|
+-------+--------------------+------+------+-----+
|123456A|2011-03-24 11:18:...|251.23|130.56| null|
|123593X|2011-03-26 12:11:...|641.13|220.51|10.45|
+-------+--------------------+------+------+-----+
I'll improve the answer if I know a bit more
Update
I liked the puzzle, so if I understand the problem correctly, this could be a solution:
I've created 3 xml files with each 2 data records with 2 different ids in total
val df = spark
.sqlContext
.read
.format("com.databricks.spark.xml")
.option("rowTag", "log")
.load("src/main/resources/xml")
// Could be computationally heavy, maybe cache df first if possible, otherwise run it on a sample, otherwise hardcode possible colums
val colNames = df
.select(explode(split($"TimeData.colNames",",")).as("col"))
.distinct()
.filter($"col" =!= lit("dTim") && $"col" =!= "")
.collect()
.map(_.getString(0))
.toList
.sorted
// or list all possible columns
//val colNames = List("colA", "colB", "colC")
// Based on XML colNames and data are comma seprated strings that have to be split. Could be done using sql split function, but this UDF maps the columns to the correct field
def mapColsToData = udf((cols:String, data:Seq[String]) =>
if(cols == null || data == null) Seq.empty[Map[String, String]]
else {
data.map(str => (cols.split(",") zip str.split(",")).toMap)
}
)
// The result of this action is 1 record for each datapoint for all XML's. Each data record is key->value map of colName->data
val denorm = df.select($"id", explode(mapColsToData($"TimeData.colNames", $"TimeData.data")).as("data"))
denorm.show(false)
Output:
+-------+-------------------------------------------------------------------------------+
|id |data |
+-------+-------------------------------------------------------------------------------+
|123456A|Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0)|
|123593X|Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988) |
|123593X|Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017) |
|123456A|Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0)|
+-------+-------------------------------------------------------------------------------+
// now create column for each map value, based on predef / found columnNames
val columized = denorm.select(
"data.dTim".cast(TimestampType).alias("dTim"),
$"data"
)
columized.show()
Output:
+-------+--------------------+--------------------+
| id| dTim| data|
+-------+--------------------+--------------------+
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
+-------+--------------------+--------------------+
// create window over which to resample
val windowSpec = Window
.partitionBy($"id")
.orderBy($"dTim".desc)
val resampleRate = 2
// add batchId based on resample rate. Group by batch and
val batched = columized
.withColumn("batchId", floor((row_number().over(windowSpec) - lit(1)) / lit(resampleRate)))
.groupBy(
"batchId")
.agg(collect_list($"data").as("data"))
.drop("batchId")
batched.show(false)
Output:
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |data |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|123593X|[Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017), Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)] |
|123456A|[Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0), Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)]|
|123456A|[Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0), Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)]|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// Store as 1 huge json file (drop reapatrition if you can handle multiple json, better for master as well)
batched.repartition(1).write.mode(SaveMode.Overwrite).json("/tmp/xml")
Output json:
{"id":"123593X","data":[{"dTim":"2011-03-26T12:20:43.897+01:00","colA":"1.138","colB":"29.017"},{"dTim":"2011-03-26T12:20:13.350+01:00","colA":"1.139","colB":"28.988"}]}
{"id":"123456A","data":[{"dTim":"2011-03-27T13:18:43.897+02:00","colA":"0.128","colB":"35.017","colC":"0"},{"dTim":"2011-03-27T13:18:13.350+02:00","colA":"0.129","colB":"35.988","colC":"0"}]}
{"id":"123456A","data":[{"dTim":"2011-03-24T12:18:43.897+01:00","colA":"0.138","colB":"39.017","colC":"0"},{"dTim":"2011-03-24T12:18:13.350+01:00","colA":"0.139","colB":"38.988","colC":"0"}]}
Here is another way that does not depend on hard-coded column names. Basically, the idea is to explode the data and ColNames columns to get a 'melted' DF, which we can then pivot to get the form you want:
# define function that processes elements of rdd
# underlying the DF to get a melted RDD
def process(row, cols):
"""cols is list of target columns to explode"""
row=row.asDict()
exploded=[[row['id']]+list(elt) for elt in zip(*[row[col] for col in cols])]
return(exploded)
#Now split ColNames:
df=df.withColumn('col_split', f.split('ColNames',","))
# define target cols to explode, each element of each col
# can be of different length
cols=['data', 'col_split']
# apply function and flatmap the results to get melted RDD/DF
df=df.select(['id']+cols).rdd\
.flatMap(lambda row: process(row, cols))\
.toDF(schema=['id', 'value', 'name'])
# Pivot to get the required form
df.groupby('id').pivot('name').agg(f.max('value')).show()