Use org.json.XML library to convert XML data to JSON.

Check below code.

Creating UDF

scala> import org.json.XML
import org.json.XML

scala> val parse = udf((value: String) => XML.toJSONObject(value).toString) // Defined UDF to parse xml to json
parse: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

Defining schema based on XML data.

scala> val schema_json = """{"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}""" // Define Schema of your xml data in json.
schema_json: String = {"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}

scala> val schema = DataType.fromJson(schema_json).asInstanceOf[StructType] // Convert Json schema data to schema.
schema: org.apache.spark.sql.types.StructType = StructType(StructField(employees,StructType(StructField(employee,StructType(StructField(building,LongType,true), StructField(division,StringType,true), StructField(firstname,StringType,true), StructField(id,StringType,true), StructField(lastname,StringType,true), StructField(room,LongType,true), StructField(supervisor,StringType,true), StructField(title,StringType,true)),true)),true))

Final Schema

scala>
    inputStream
    .selectExpr("CAST(value AS STRING)")
    .select(from_json(parse($"data"),schema).as("emp_data"))
    .select($"emp_data.employees.employee.*")
    .printSchema

root
 |-- building: long (nullable = true)
 |-- division: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- room: long (nullable = true)
 |-- supervisor: string (nullable = true)
 |-- title: string (nullable = true)

Writing converted to JSON data to console.

scala> 
    inputStream
    .selectExpr("CAST(value AS STRING)")
    .select(from_json(parse($"data"),schema).as("emp_data"))
    .select($"emp_data.employees.employee.*")
    .writeStream
    .format("console")
    .option("truncate", false)
    .outputMode("append")
    .start()
    .awaitTermination()
Answer from s.polam on Stack Overflow
Top answer
1 of 1
2

Use org.json.XML library to convert XML data to JSON.

Check below code.

Creating UDF

scala> import org.json.XML
import org.json.XML

scala> val parse = udf((value: String) => XML.toJSONObject(value).toString) // Defined UDF to parse xml to json
parse: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

Defining schema based on XML data.

scala> val schema_json = """{"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}""" // Define Schema of your xml data in json.
schema_json: String = {"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}

scala> val schema = DataType.fromJson(schema_json).asInstanceOf[StructType] // Convert Json schema data to schema.
schema: org.apache.spark.sql.types.StructType = StructType(StructField(employees,StructType(StructField(employee,StructType(StructField(building,LongType,true), StructField(division,StringType,true), StructField(firstname,StringType,true), StructField(id,StringType,true), StructField(lastname,StringType,true), StructField(room,LongType,true), StructField(supervisor,StringType,true), StructField(title,StringType,true)),true)),true))

Final Schema

scala>
    inputStream
    .selectExpr("CAST(value AS STRING)")
    .select(from_json(parse($"data"),schema).as("emp_data"))
    .select($"emp_data.employees.employee.*")
    .printSchema

root
 |-- building: long (nullable = true)
 |-- division: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- room: long (nullable = true)
 |-- supervisor: string (nullable = true)
 |-- title: string (nullable = true)

Writing converted to JSON data to console.

scala> 
    inputStream
    .selectExpr("CAST(value AS STRING)")
    .select(from_json(parse($"data"),schema).as("emp_data"))
    .select($"emp_data.employees.employee.*")
    .writeStream
    .format("console")
    .option("truncate", false)
    .outputMode("append")
    .start()
    .awaitTermination()
🌐
HPE Developer
developer.hpe.com › blog › apache-spark-packages-from-xml-to-json
Apache Spark Packages, from XML to JSON | HPE Developer Portal
December 11, 2020 - When applying the JSON function to the DataFrame, we get an RDD[String] with the JSON representation of our data. Then we save the RDD as a plain text file. Now, we could use Drill to read and query our new dataset and of course, we can always go back to Spark if we need to do something more complicated operations / transformations. Transforming our dataset from XML to JSON is an easy task in Spark, but the advantages of JSON over XML are a big deal.
Discussions

Convert xml to json using scala? - Scala Platform - Scala Contributors
I need to save xml files to Elasticsearch, for that I need to convert the xml file to json file to save it to Elasticsearch. How can I do the job using Scala. here is my code import org.apache.spark.{SparkConf, SparkC… More on contributors.scala-lang.org
🌐 contributors.scala-lang.org
0
August 15, 2018
python - PySpark XML to JSON w/ Time Series Data - Stack Overflow
I have almost half a million XML files containing time series data that are about ~2-3MB each and contains around 10k rows of time series data per file. The idea is to convert the XML files into JS... More on stackoverflow.com
🌐 stackoverflow.com
Parse large XML file to JSON files?
60GB XML file holy crap More on reddit.com
🌐 r/aws
14
5
April 26, 2023
JSON vs XML
To answer your question about security, XML is "secure" because it's structure can be enforced with an XSD. If you need your data to be in a particular format, have required fields, or require certain data types for fields then you will want to XML as JSON cannot do that. XML is also transformable via XSLT, so if you have a need to present the data you could apply a map to generate that presentation output. However XML can be pretty verbose so if file size is a concern it could become a problem. If you just want the data to be structured, (de)serializable, and readable then JSON the way to go. JSON is much less verbose and would give you smaller data files. With Deserialization in C# the querying advantage of XML is basically lost. More on reddit.com
🌐 r/csharp
70
32
July 21, 2020
🌐
Medium
medium.com › hackernoon › spark-packages-from-xml-to-json-ab6b5bb1f988
Spark Packages, from Xml to Json. Spark Packages, from Xml to Json | by Nicolas A Perez | HackerNoon.com | Medium
July 14, 2017 - When applying the toJSON function to the DataFrame, we get an RDD[String] with the JSON representation of our data. Then we save the RDD as a plain text file. Now, we could use Drill to read and query our new dataset and of course, we can always ...
🌐
Szczeles
szczeles.github.io › Reading-JSON-CSV-and-XML-files-efficiently-in-Apache-Spark
Reading JSON, CSV and XML files efficiently in Apache Spark
November 6, 2017 - With Apache Spark you can easily read semi-structured files like JSON, CSV using standard library and XML files with spark-xml package. Sadly, the process of loading files may be long, as Spark needs to infer schema of underlying records by reading them. That's why I'm going to explain possible ...
🌐
Scala Contributors
contributors.scala-lang.org › scala platform
Convert xml to json using scala? - Scala Platform - Scala Contributors
August 15, 2018 - I need to save xml files to Elasticsearch, for that I need to convert the xml file to json file to save it to Elasticsearch. How can I do the job using Scala. here is my code import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext import org.elasticsearch.spark.sql._ object testxml { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("testxml").setMaster("local[*]") val sc= new SparkContext(conf) val sqlContext = new SQLContext(...
🌐
GitHub
github.com › databricks › spark-xml
GitHub - databricks/spark-xml: XML data source for Spark SQL and DataFrames · GitHub
A library for parsing and querying XML data with Apache Spark, for Spark SQL and DataFrames. The structure and test tools are mostly copied from CSV Data Source for Spark. This package supports to process format-free XML files in a distributed way, unlike JSON datasource in Spark restricts in-line JSON format.
Starred by 512 users
Forked by 225 users
Languages   Scala 97.8% | Java 1.5% | Shell 0.7%
🌐
Databricks Community
community.databricks.com › t5 › data-engineering › converting-column-of-xml-strings-to-column-of-jsons › td-p › 35629
Converting column of XML strings to column of Json... - Databricks Community - 35629
June 28, 2023 - To convert a column of XML strings to a column of JSON in PySpark, you can use the `from_json` function along with the `xmltodict` library. However, instead of using a UDF with `withColumn`, you can use the `select` function to transform the column. ... Passionate about hosting events and ...
Find elsewhere
Top answer
1 of 2
2

Well based on the information this could be a solution. Unfortunately my Python is a bit rusty, but there should be equivalents for all the scala functions here

// Assume nth is based of dTim ordering
val windowSpec = Window
  .partitionBy($"_id")
  .orderBy($"dTim".desc)

val nthRow  = 2  // define the nthItem to be fetched

df.select(
  "TimeData.data".getItem(0).getItem(0).cast(TimestampType).alias("dTim"),
  $"TimeData.data".getItem(0).getItem(1).cast(DoubleType).alias("A"),
  $"TimeData.data".getItem(0).getItem(2).cast(DoubleType).alias("B"),
  $"TimeData.data".getItem(0).getItem(3).cast(DoubleType).alias("C")
).withColumn("n", row_number().over(windowSpec))
  .filter(col("n") === nthRow)
  .drop("n")
.show()

Will output something like

+-------+--------------------+------+------+-----+
|    _id|                dTim|     A|     B|    C|
+-------+--------------------+------+------+-----+
|123456A|2011-03-24 11:18:...|251.23|130.56| null|
|123593X|2011-03-26 12:11:...|641.13|220.51|10.45|
+-------+--------------------+------+------+-----+

I'll improve the answer if I know a bit more


Update

I liked the puzzle, so if I understand the problem correctly, this could be a solution:

I've created 3 xml files with each 2 data records with 2 different ids in total

val df = spark
  .sqlContext
  .read
  .format("com.databricks.spark.xml")
  .option("rowTag", "log")
  .load("src/main/resources/xml")


// Could be computationally heavy, maybe cache df first if possible, otherwise run it on a sample, otherwise hardcode possible colums
val colNames = df
  .select(explode(split($"TimeData.colNames",",")).as("col"))
  .distinct()
  .filter($"col" =!= lit("dTim") && $"col" =!= "")
  .collect()
  .map(_.getString(0))
  .toList
  .sorted

// or list all possible columns
//val colNames = List("colA", "colB", "colC")


// Based on XML colNames and data are comma seprated strings that have to be split. Could be done using sql split function, but this UDF maps the columns to the correct field
def mapColsToData = udf((cols:String, data:Seq[String]) =>
  if(cols == null || data == null) Seq.empty[Map[String, String]]
  else {
    data.map(str => (cols.split(",") zip str.split(",")).toMap)
  }
)

//  The result of this action is 1 record for each datapoint for all XML's. Each data record is key->value map of colName->data
val denorm = df.select($"id", explode(mapColsToData($"TimeData.colNames", $"TimeData.data")).as("data"))

denorm.show(false)

Output:

+-------+-------------------------------------------------------------------------------+
|id     |data                                                                           |
+-------+-------------------------------------------------------------------------------+
|123456A|Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0)|
|123593X|Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)           |
|123593X|Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017)           |
|123456A|Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)|
|123456A|Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0)|
+-------+-------------------------------------------------------------------------------+
// now create column for each map value, based on predef / found columnNames
val columized = denorm.select(
  "data.dTim".cast(TimestampType).alias("dTim"),
  $"data"
)

columized.show()

Output:

+-------+--------------------+--------------------+
|     id|                dTim|                data|
+-------+--------------------+--------------------+
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-24 12:18:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...|Map(dTim -> 2011-...|
+-------+--------------------+--------------------+
// create window over which to resample
val windowSpec = Window
  .partitionBy($"id")
  .orderBy($"dTim".desc)

val resampleRate = 2

// add batchId based on resample rate. Group by batch and
val batched = columized
  .withColumn("batchId", floor((row_number().over(windowSpec) - lit(1)) / lit(resampleRate)))
  .groupBy("batchId")
  .agg(collect_list($"data").as("data"))
  .drop("batchId")

batched.show(false)

Output:

+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id     |data                                                                                                                                                              |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|123593X|[Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017), Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)]                      |
|123456A|[Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0), Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)]|
|123456A|[Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0), Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)]|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// Store as 1 huge json file (drop reapatrition if you can handle multiple json, better for master as well)
batched.repartition(1).write.mode(SaveMode.Overwrite).json("/tmp/xml")

Output json:

{"id":"123593X","data":[{"dTim":"2011-03-26T12:20:43.897+01:00","colA":"1.138","colB":"29.017"},{"dTim":"2011-03-26T12:20:13.350+01:00","colA":"1.139","colB":"28.988"}]}
{"id":"123456A","data":[{"dTim":"2011-03-27T13:18:43.897+02:00","colA":"0.128","colB":"35.017","colC":"0"},{"dTim":"2011-03-27T13:18:13.350+02:00","colA":"0.129","colB":"35.988","colC":"0"}]}
{"id":"123456A","data":[{"dTim":"2011-03-24T12:18:43.897+01:00","colA":"0.138","colB":"39.017","colC":"0"},{"dTim":"2011-03-24T12:18:13.350+01:00","colA":"0.139","colB":"38.988","colC":"0"}]}
2 of 2
1

Here is another way that does not depend on hard-coded column names. Basically, the idea is to explode the data and ColNames columns to get a 'melted' DF, which we can then pivot to get the form you want:

# define function that processes elements of rdd
# underlying the DF to get a melted RDD
def process(row, cols):
    """cols is list of target columns to explode"""
    row=row.asDict()
    exploded=[[row['id']]+list(elt) for elt in zip(*[row[col] for col in cols])]    
    return(exploded)


#Now split ColNames:
df=df.withColumn('col_split', f.split('ColNames',","))

# define target cols to explode, each element of each col 
# can be of different length
cols=['data', 'col_split']

# apply function and flatmap the results to get melted RDD/DF
df=df.select(['id']+cols).rdd\
    .flatMap(lambda row: process(row, cols))\
    .toDF(schema=['id', 'value', 'name'])

# Pivot to get the required form
df.groupby('id').pivot('name').agg(f.max('value')).show()
🌐
Java Code Geeks
javacodegeeks.com › home › enterprise java
Apache Spark Packages, from XML to JSON - Java Code Geeks
August 25, 2016 - The Apache Spark community has put a lot of effort into extending Spark. Recently, we wanted to transform an XML dataset into something that was easier to
🌐
Spark By {Examples}
sparkbyexamples.com › home › apache hadoop › spark read xml file using databricks api
Spark Read XML file using Databricks API - Spark By {Examples}
March 27, 2024 - Spark SQL provides a parquet method to read/write parquet files hence, no additional libraries are not needed, once the DatraFrame created from XML we can use the parquet method on DataFrameWriter class to write to the Parquet file. Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON...
🌐
Open311
wiki.open311.org › JSON_and_XML_Conversion
JSON and XML Conversion
Spark XML to JSON Converter: http://dropbox.ashlock.us/open311/json-xml/xml-tools/xml2json_spark.php
🌐
Medium
medium.com › @uzzaman.ahmed › working-with-xml-files-in-pyspark-reading-and-writing-data-d5e570c913de
Working with XML files in PySpark: Reading and Writing Data | by Ahmed Uz Zaman | Medium
April 11, 2023 - This package provides a data source for reading XML files into PySpark DataFrames and a data sink for writing PySpark DataFrames to XML files. When reading XML files in PySpark, the spark-xml package infers the schema of the XML data and returns a DataFrame with columns corresponding to the tags and attributes in the XML file.
🌐
Medium
medium.com › @pawankumarshukla_57258 › spark-structured-streaming-read-and-process-data-xml-and-json-from-azure-event-hub-4030567cff44
Spark Structured Streaming : Read and process data(XML and json)from Azure Event hub | by Pawan Kumar Shukla | Medium
May 21, 2023 - If string is XML Type then follow the Spark.xml approach (https://github.com/databricks/spark-xml) from pyspark.sql.types import StructType, StructField, StringType,ArrayType,_parse_datatype_json_string from pyspark.sql.streaming import * from pyspark.sql.column import Column, _to_java_column from pyspark.sql.functions import * def ext_from_xml(xml_column, schema, options={}): java_column = _to_java_column(xml_column.cast('string')) java_schema = spark._jsparkSession.parseDataType(schema.json()) scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options) jc = spark._jvm.
🌐
LinkedIn
linkedin.com › pulse › flattening-nested-data-jsonxml-using-apache-spark-saikrishna-pujari
Flattening Nested Data (JSON/XML) Using Apache-Spark
June 21, 2020 - But for nested Json , after the data has been flattened it doesn't write the data into output path as recursion has been used it causes memory issue. I even tried tail recursion doesn't work either .And if you have some column let's say Name and name when you call flatten function it will fail with duplicate issue. To resolve this we need to set spark.sql.caseSensitive=true. ... Its good functions for xml parsing worked for my use case ..
🌐
Databricks Documentation
docs.databricks.com › data engineering › lakeflow connect › data formats › xml file
Read and write XML files | Databricks on AWS
July 5, 2023 - The following table shows the conversion of XSD data types to Spark data types: XML data in a string-valued column in an existing DataFrame can be parsed with schema_of_xml and from_xml that returns the schema and the parsed results as new struct columns.
🌐
Sonra
sonra.io › home › xml › how to parse xml in spark and databricks (guide)
How to Parse XML in Spark and Databricks (Guide) - Sonra
June 17, 2025 - Master XML parsing in Spark and Databricks. Explore spark-xml vs. native features, schema inference, and converting XML to Delta Tables.
🌐
HackerNoon
hackernoon.com › spark-packages-from-xml-to-json-ab6b5bb1f988
Hackernoon
December 6, 2016 - Discover Anything · ReadWrite · LoginSignUp
🌐
Maven Repository
mvnrepository.com › artifact › com.databricks › spark-xml
Maven Repository: com.databricks » spark-xml
April 10, 2024 - JSON Libraries · Java Specifications · Core Utilities · Mocking · Annotation Libraries · Web Assets · HTTP Clients · Logging Bridges · Dependency Injection · XML Processing · Concurrency Libraries · Web Frameworks · Android Platform · Code Generators · View All · Home » com.databricks » spark-xml ·
🌐
GitHub
github.com › Anant › example-spark-xml-json
GitHub - Anant/example-spark-xml-json
Start a Spark Application with the spark-start-xml-cassandra.bash script · Use the Json-ToCassandra.scala script to load a json file into a cassandra table
Forked by 2 users
Languages   Scala 89.5% | Shell 10.5% | Scala 89.5% | Shell 10.5%