Making PySpark work with spaCy: Overcoming serialization errors
Domino2021-11-16 | 8 min read
In this guest post, Holden Karau, Apache Spark Committer, provides insights on how to use spaCy to process text data. Karau is a Developer Advocate at Google, as well as a co-author of "High Performance Spark" and "Learning Spark". She has a repository of her talks, code reviews and code sessions on Twitch and YouTube. She is also working on Distributed Computing 4 Kids.
How can spaCy help?
Are you a data scientist who needs to parallelize your NLP code with Python but keeps running into issues? This blog post shows how to overcome the serialization difficulties that occur when using the popular NLP library spaCy. (While these techniques are a little convoluted, you can hide them in a separate file and pretend everything is OK.) This post focuses on using spaCy, and I have another post focused on NLTK in the works that I'll post on my blog. If you're more of a Scala or a Java user, look for a post on JVM soon.
A word of warning before you get too excited though -- if you thought debugging Apache Spark was hard, debugging these serialization tricks is going to be a bit harder, so you should check out my debugging Spark video and keep an eye out for the deep dive course on Safari when it becomes available.
WordCount, of course
Now, this wouldn’t be a big data blog post if we didn’t focus unnecessarily on WordCount, but we’re going to do it with a twist. As in my first job, at some point a boss (or customer or other person who impacts your ability to pay rent) may come to you and ask you to “localize” your exciting WordCount** project. At that point, if you grew up speaking only English and French, you might answer, “That’s fine; all languages use spaces to separate words,” but you may quickly discover that the split function turns out not to be a universal tokenizer for languages like Japanese.
After realizing how complicated tokenizing other languages can actually be, we might start to feel stressed about our promised two-week delivery time, but thankfully tokenization is a basic part of NLP tools, with many existing libraries that work on multiple human (noncomputer) languages.
Regular (non Arrow) Python UDFs
The Python library we will look at using is spaCy, which is a world-class tool for natural language processing. While you don't need to know spaCy to understand this blog post, if you do want to learn more about spaCy, here's a wonderful collection of documentation.
To get started, we’ll figure out how to use spaCy inside of PySpark without worrying about the details of cross-language pipelines. In older versions of PySpark users registered UDFs like:
def spacy_tokenize(x):
# Note this is expensive, in practice you would use something like SpacyMagic, see footnote for link; which caches
# spacy.load so it isn’t happening multiple times
nlp = spacy.load(lang)
# If you are working with Python 2 and getting regular strings add x = unicode(x)
doc = nlp(text)
return [token.text for token in doc]
tokenize = session.udf.register("tokenize", spacy_tokenize)
This gives us a function we can call in Python which will use spaCy to tokenize the input, albeit in English, since I don’t really understand anything else. Taking the standard Spark SQL WordCount we can rework it to avoid Spark RDDs:
df = spark.read.format("text").load("README.md")
tokenized = df.select(explode(split(df.value, " ")))
result = tokenized.groupBy(tokenized.col).count()
result.show() # Or save
Then we can swap in our new function and use spaCy for tokenization:
tokenized = df.select(explode(spacy_tokenize(df.value, ' ')))
If we run this, it turns out to be rather slow for a few different reasons. The first is that spaCy.load is an expensive call; on my own system importing and loading spaCy takes almost a second. The second reason is the serialization overhead of copying the data from Java to Python and back.
Spark’s Arrow UDFs
Spark’s Arrow-based UDFs are often much faster for a few reasons. At its core Apache Arrow gives us a format which is understood by the JVM and Python, as well as many other languages, and is organized in a way that facilitates vectorized operations. Creating Arrow- based UDFs in Spark requires a bit of refactoring, since we operate on batches rather than on individual records. The new Arrow-based PySpark vectorized UDFs can be registered like:
@pandas_udf("integer", PandasUDFType.SCALAR) # doctest: +SKIP
def pandas_tokenize(x):
return x.apply(spacy_tokenize)
tokenize_pandas = session.udf.register("tokenize_pandas", pandas_tokenize)
If your cluster isn’t already set up for the Arrow-based PySpark UDFs, sometimes also known as Pandas UDFs, you’ll need to ensure that you have Spark 2.3+ and a matching version of PyArrow installed on all your machines. (Take a look at setup.py to see the version required for your Spark version).
Using PySpark UDFs (regular and Arrow)
The usage for the two looks similar.
In SQL:
spark.sql("SELECT tokenize(str_column) FROM db")
spark.sql("SELECT tokenize_pandas(str_column) FROM db")
With the Dataframe API:
dataframe.select(tokenize(dataframe.str_column))
dataframe.select(tokenize_pandas(dataframe.str_column))
And we can of course use them to do our (mandatory) WordCount example in Python:
dataframe.select(explode(tokenize_pandas(dataframe.str_column)).alias("tokens")).groupBy("tokens").sum("*").collect()
While these two look very similar, they can actually have vastly different performance implications, so we’ll naturally focus on the second and faster example, in this case the Pandas/Arrow powered one.
Going beyond `spacy.load()` everywhere
Serialization issues are one of the big performance challenges with PySpark. If you try and optimize your `spacy.load()`
by moving it outside of your function call, Spark will try and serialize spaCy itself, which can be quite large and include cdefs. Cdefs are not serializable by pickle, although with some careful wrapping we can still use code which depends on them. That may not even work, resulting in an error like “AttributeError: Can't pickle local object 'FeatureExtracter.<locals>.feature_extracter_fwd'”)!
Instead by using a global variable (I’m sorry!) and a wrapper function, we can ensure that we reuse spaCy:
# spaCy isn't serializable but loading it is semi-expensive
NLP = None
def get_spacy_magic_for(lang):
global NLP
if NLP is None:
NLP = {}
if lang not in NLP:
NLP[lang] = spacy.load(lang)
return NLP[lang]
Then in our code we access spaCy through our friend `get_spacy_magic`
instead. If you’re working in regular files instead of a notebook/REPL, you can use a cleaner class-based approach, but for esoteric serialization reasons using class in a repl with PySpark has some issues.
Since this code is less than pretty, you might be asking yourself just how important it is to reduce the loads. To give you an idea, loading the en language on my X1 Carbon takes about one second and, with an additional second of overhead per element, we could easily lose the benefits of parallelizing this workload.
Spark 2.4 has some new and interesting tricks coming where we could do the spaCy load prefork, but that’s a topic for another blog post. (Again, please keep an eye on my blog / medium / twitter where I’ll share that.)
Wrapping up
This approach works well enough for WordCount (I mean, which big data system doesn’t?), but it still leaves us lacking some desired information. For example, both in this case and a future NLTK post, much more information is collected in Python than we can easily return in a Scalar transformation currently, but work continues around this in SPARK-21187. If you try and directly return the spaCy document, you will run into serialization issues since it references memory blocks which cloud pickle doesn’t know how to handle. (See cloud pickle #182 for some context).
If this is exciting to you and you want to contribute, you are more than welcome to join us on the Sparkling ML project, Apache Arrow or general improved Apache Spark Python integration. Also, if you’re interested in seeing behind the scenes for this post, you can take a look at this live coding session and the corresponding sparkling ml repo.
Author note: SpacyMagic link is here.
Domino Data Lab empowers the largest AI-driven enterprises to build and operate AI at scale. Domino’s Enterprise AI Platform unifies the flexibility AI teams want with the visibility and control the enterprise requires. Domino enables a repeatable and agile ML lifecycle for faster, responsible AI impact with lower costs. With Domino, global enterprises can develop better medicines, grow more productive crops, develop more competitive products, and more. Founded in 2013, Domino is backed by Sequoia Capital, Coatue Management, NVIDIA, Snowflake, and other leading investors.