Skip to content Skip to sidebar Skip to footer

Converting Complex RDD To A Flatten RDD With PySpark

I have the following CSV (sample) id timestamp routeid creationdate parameters 1000 21-11-2016 22:55 14 21-11-2016 22:55 RSRP=-102, 1002 21-11

Solution 1:

You'll need to define an udf as followed and then select each field. I have used the same data you did with a tab separator.

from pyspark.sql.functions import udf
from pyspark.sql.types import *

df1 = spark.read.format('com.databricks.spark.csv').options(header='true',delimiter='\t').load('./sample.txt')
df1.show()
# +----+----------------+-------+----------------+--------------------+
# |  id|       timestamp|routeid|    creationdate|          parameters|
# +----+----------------+-------+----------------+--------------------+
# |1000|21-11-2016 22:55|     14|21-11-2016 22:55|          RSRP=-102,|
# |1002|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=-146,T...|
# |1003|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=134,RX...|
# +----+----------------+-------+----------------+--------------------+

Now let's define our UDF as mentioned above :

import re
def f_(s):
    pattern = re.compile("([^,=]+)=([0-9\-]+)")
    return dict(pattern.findall(s or "")) 

We can test the function directly on a "simple" sample :

f_("RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,")
# {'RA Req. SN': '134', 'RX Antennas': '-91', 'TPC': '-191', 'MCS': '-83'}

Ok it's working. We can now register to use in SQL :

spark.udf.register("f", f_, MapType(StringType(), StringType()))

spark.sql("SELECT f('RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,')").show()
# +---------------------------------------------------+
# |f(RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,)|
# +---------------------------------------------------+
# |                               Map(RA Req. SN ->...|
# +---------------------------------------------------+

But in your case, I think that you'll be interested in an actually udf for each field :

extract = udf(f_,  MapType(StringType(), StringType()))

df1.select(df1['*'], extract(df1['parameters']).getItem('RSRP').alias('RSRP')).show()
# +----+----------------+-------+----------------+--------------------+----+
# |  id|       timestamp|routeid|    creationdate|          parameters|RSRP|
# +----+----------------+-------+----------------+--------------------+----+
# |1000|21-11-2016 22:55|     14|21-11-2016 22:55|          RSRP=-102,|-102|
# |1002|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=-146,T...|null|
# |1003|21-11-2016 22:55|     14|21-11-2016 22:55|RA Req. SN=134,RX...|null|
# +----+----------------+-------+----------------+--------------------+----+

Post a Comment for "Converting Complex RDD To A Flatten RDD With PySpark"