How to Read Sequence File in Pyspark

Imagine you lot are given a job to parse thousands of xml files to extract the information, write the records into table format with proper data types, the job must be done in a timely manner and is repeated every hour. What are you going to practise? With Apache Spark, the embarrassingly parallel processing framework, it can be done with much less effort.

Introduction

In this mail, we are going to utilise PySpark to process xml files to excerpt the required records, transform them into DataFrame, and so write as csv files (or whatever other format) to the destination. The input and the output of this task looks like below.

XML files

XML is designed to shop and send data. XML is self-descriptive which makes it flexibile and extensible to store dissimilar kinds of data.

On the other hand, it makes hard to convert into tabular data because of its nature of semi-structured. For example, in the below XML excerption, the description chemical element tin can be expanded to multiple lines. The price element can be omitted considering it is however to exist adamant.

                              
i 2 3 iv 5 vi                        
                          <volume                          id=                          "bk119"                          >                          <author>Feng, Jason</author>                          <title>Playground</championship>                          <description>This is the place where Jason puts his fun stuff       mainly related with Python, R and GCP.</clarification>                          </book>                        

Solution

This is my scribble of the solution.

Stride one: Read XML files into RDD

We use spark.read.text to read all the xml files into a DataFrame. The DataFrame is with one column, and the value of each row is the whole content of each xml file. Then we convert information technology to RDD which we can utilize some low level API to perform the transformation.

                              
1 2                        
                          # read each xml file every bit one row, then convert to RDD                                                    file_rdd                          =                          spark                          .                          read                          .                          text                          (                          "./data/*.xml"                          ,                          wholetext                          =                          True                          ).                          rdd                        

Here is the output of one row in the DataFrame.

                              
1 2                        
                          [                          Row                          (                          value                          =                          '<?xml version="1.0"?>                          \r\northward                          <catalog>                          \r\n                                                      <book id="bk119">                          \r\north                                                      <author>Feng, Jason</author>                          \r\northward                                                      <championship>Playground</title>                          \r\n                                                      <clarification>This is the place where Jason puts his fun stuff                          \r\n                                                      mainly related with Python, R and GCP.</description>                          \r\northward                                                      </book>                          \r\n                          </catalog>'                          )]                        

Step 2: Parse XML files, extract the records, and expand into multiple RDDs

Now it comes to the cardinal part of the entire process. We demand to parse each xml content into records according the pre-defined schema.

Start, we define a function using Python standard library xml.etree.ElementTree to parse and excerpt the xml elements into a list of records. In this part, we cater for the scenario that some elements are missing which None is returned. It also casts price to float type and publish_date to engagement type.

                              
1 2 3 4 5 six 7 eight 9 ten 11 12 13 14 15 sixteen 17 18 nineteen twenty 21 22 23 24                        
                          def                          parse_xml                          (                          rdd                          ):                          """     Read the xml string from rdd, parse and extract the elements,     then return a list of list.     """                          results                          =                          []                          root                          =                          ET                          .                          fromstring                          (                          rdd                          [                          0                          ])                          for                          b                          in                          root                          .                          findall                          (                          'book'                          ):                          rec                          =                          []                          rec                          .                          append                          (                          b                          .                          attrib                          [                          'id'                          ])                          for                          due east                          in                          ELEMENTS_TO_EXTRAT                          :                          if                          b                          .                          find                          (                          eastward                          )                          is                          None                          :                          rec                          .                          append                          (                          None                          )                          continue                          value                          =                          b                          .                          find                          (                          e                          ).                          text                          if                          e                          ==                          'toll'                          :                          value                          =                          float                          (                          value                          )                          elif                          e                          ==                          'publish_date'                          :                          value                          =                          datetime                          .                          strptime                          (                          value                          ,                          '%Y-%m-%d'                          )                          rec                          .                          append                          (                          value                          )                          results                          .                          append                          (                          rec                          )                          render                          results                        

Then nosotros utilize flatMap office which each input item as the content of an XML file can be mapped to multiple items through the function parse_xml. flatMap is ane of the functions made me "WoW" when I outset used Spark a few years ago.

                              
1 ii                        
                          # parse xml tree, extract the records and transform to new RDD                                                    records_rdd                          =                          file_rdd                          .                          flatMap                          (                          parse_xml                          )                        

Step three: Catechumen RDDs into DataFrame

Nosotros so convert the transformed RDDs to DataFrame with the pre-defined schema.

                              
one 2                        
                          # convert RDDs to DataFrame with the pre-defined schema                                                    book_df                          =                          records_rdd                          .                          toDF                          (                          my_schema                          )                        

The DataFrame looks like below.

                              
1 2 three 4 5 half dozen seven 8 9                        
+-------+--------------------+--------------------+---------------+-----+------------+--------------------+ |book_id|              writer|               championship|          genre|cost|publish_date|         description| +-------+--------------------+--------------------+---------------+-----+------------+--------------------+ |  bk101|Gambardella, Matthew|XML Developer's G...|       Figurer|44.95|  2000-10-01|An in-depth look ...| |  bk102|          Ralls, Kim|       Midnight Pelting|        Fantasy| 5.95|  2000-12-sixteen|A former architec...| |  bk103|         Corets, Eva|     Maeve Ascendant|        Fantasy| 5.95|  2000-11-17|Later the collaps...| |  bk104|         Corets, Eva|     Oberon'southward Legacy|        Fantasy| 5.95|  2001-03-10|In mail-apocalyps...| |  bk105|         Corets, Eva|  The Sundered Grail|        Fantasy| 5.95|  2001-09-10|The 2 daughters...| |  bk106|    Randall, Cynthia|         Lover Birds|        Romance| 4.95|  2000-09-02|When Carla meets ...|                        

Pace 4: Save DataFrame as csv files

Finally we tin can relieve the results as csv files. Spark provides rich set of destination formats, i.e. we can write to JSON, parquet, avro, or even to a table in a database.

                              
1 2 three                        
                          # write to csv                                                    book_df                          .                          write                          .                          format                          (                          "csv"                          ).                          mode                          (                          "overwrite"                          )\                          .                          save                          (                          "./output"                          )                        

Conclusion

This is merely ane of the showcases of what Spark tin help to simplify the data processing peculiarly when dealing with big amount of data. Spark provides both high-level API (DataFrame / DataSet), and low-level API (RDD) which enables us with the flexibility to handle various types of data format. Spark also abstracts the physical parallel ciphering on the cluster. We just need to focus our codes on the implementation of business logic.

Source codes are here.



raycapsery.blogspot.com

Source: https://q15928.github.io/2019/07/14/parse-xml/

0 Response to "How to Read Sequence File in Pyspark"

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel