Using Word2VecModel.transform() Does Not Work In Map Function
Solution 1:
It is an expected behavior. Like other MLlib
models Python object is just a wrapper around Scala model and actual processing is delegated to its JVM counterpart. Since Py4J gateway is not accessible on workers (see How to use Java/Scala function from an action or a transformation?) you cannot call Java / Scala method from an action or transformation.
Typically MLlib models provide a helper method which can work directly on RDDs but it is not the case here. Word2VecModel
provides getVectors
method which returns a map from words to vector but unfortunately it is a JavaMap
so it won't work inside transformation. You could try something like this:
from pyspark.mllib.linalg import DenseVector
vectors_ = model.getVectors() # py4j.java_collections.JavaMap
vectors = {k: DenseVector([x for x in vectors_.get(k)])
for k in vectors_.keys()}
to get Python dictionary but it will be extremely slow. Another option is to dump this object to disk in a form that can be consumed by Python but it requires some tinkering with Py4J and it is better to avoid this. Instead lets read model as a DataFrame:
lookup = sqlContext.read.parquet("path_to_word2vec_model/data").alias("lookup")
and we'll get a following structure:
lookup.printSchema()
## root
## |-- word: string (nullable = true)
## |-- vector: array (nullable = true)
## | |-- element: float (containsNull = true)
which can be used to map words to vectors for example through join
:
from pyspark.sql.functions import col
words = sc.parallelize([('hello', ), ('test', )]).toDF(["word"]).alias("words")
words.join(lookup, col("words.word") == col("lookup.word"))
## +-----+-----+--------------------+
## | word| word| vector|
## +-----+-----+--------------------+
## |hello|hello|[-0.030862354, -0...|
## | test| test|[-0.13154022, 0.2...|
## +-----+-----+--------------------+
If data fits into driver / worker memory you can try to collect and map with broadcast:
lookup_bd = sc.broadcast(lookup.rdd.collectAsMap())
rdd = sc.parallelize([['Hello'],['test']])
rdd.map(lambda ws: [lookup_bd.value.get(w) for w in ws])
Post a Comment for "Using Word2VecModel.transform() Does Not Work In Map Function"