Converting Complex RDD To A Flatten RDD With PySpark
I have the following CSV (sample) id timestamp routeid creationdate parameters 1000 21-11-2016 22:55 14 21-11-2016 22:55 RSRP=-102, 1002 21-11
Solution 1:
You'll need to define an udf as followed and then select each field. I have used the same data you did with a tab separator.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
df1 = spark.read.format('com.databricks.spark.csv').options(header='true',delimiter='\t').load('./sample.txt')
df1.show()
# +----+----------------+-------+----------------+--------------------+
# | id| timestamp|routeid| creationdate| parameters|
# +----+----------------+-------+----------------+--------------------+
# |1000|21-11-2016 22:55| 14|21-11-2016 22:55| RSRP=-102,|
# |1002|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=-146,T...|
# |1003|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=134,RX...|
# +----+----------------+-------+----------------+--------------------+
Now let's define our UDF as mentioned above :
import re
def f_(s):
pattern = re.compile("([^,=]+)=([0-9\-]+)")
return dict(pattern.findall(s or ""))
We can test the function directly on a "simple" sample :
f_("RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,")
# {'RA Req. SN': '134', 'RX Antennas': '-91', 'TPC': '-191', 'MCS': '-83'}
Ok it's working. We can now register to use in SQL :
spark.udf.register("f", f_, MapType(StringType(), StringType()))
spark.sql("SELECT f('RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,')").show()
# +---------------------------------------------------+
# |f(RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,)|
# +---------------------------------------------------+
# | Map(RA Req. SN ->...|
# +---------------------------------------------------+
But in your case, I think that you'll be interested in an actually udf for each field :
extract = udf(f_, MapType(StringType(), StringType()))
df1.select(df1['*'], extract(df1['parameters']).getItem('RSRP').alias('RSRP')).show()
# +----+----------------+-------+----------------+--------------------+----+
# | id| timestamp|routeid| creationdate| parameters|RSRP|
# +----+----------------+-------+----------------+--------------------+----+
# |1000|21-11-2016 22:55| 14|21-11-2016 22:55| RSRP=-102,|-102|
# |1002|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=-146,T...|null|
# |1003|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=134,RX...|null|
# +----+----------------+-------+----------------+--------------------+----+
Post a Comment for "Converting Complex RDD To A Flatten RDD With PySpark"