在使用spark编程的时候,我们需要知道我们编写的某一段代码是在driver端执行,还是在executor端执行。如果是在executor端执行的话,就要注意这些代码是否是可以序列化发送到executor端执行,如果不行的话就会报错。接下来这篇文章主要讨论Python单例在使用Spark的时候解决的问题。

问题描述

在使用spark的时候经常会用到map来对数据进行处理,在map函数里面的代码会被序列化,然后发送到executor端执行。但是并不是所有的代码都是可以被序列化的,比如一个mysql连接的实例就不能被序列化,然后发送到executor端执行。要解决这个问题很简单,如果一个mysql连接的实例不能被序列化发送的话,可以在executor端实例化一个mysql连接,然后使用这个连接。

1
2
3
4
def transform(row):
connector = mysql.connector.connect(**mysql_conf)
# Do something
rdd.map(transform)

上述的代码可以正确的运行,但是有一个比较严重的问题:map函数在每执行一次转换的时候都需要实例化一个mysql的实例,在实例化的时候要与mysql服务器建立连接,这个开销是比较大的,可想而知效率会非常的低。

要解决这个问题我们可以用mapPartitions接口来代替map,mapPartitions接口不是对每行数据进行操作的时候都执行一个mysql实例化操作,而是对一个partition的数据执行操作时候才执行一个mysql实例化操作,这样效率会大大的提升,上述的代码可以改成如下的代码。

1
2
3
4
5
def transform(rows):
connector = mysql.connector.connect(**mysql_conf)
for row in rows:
# Do something
rdd.mapPartitions(transform)

这样问题似乎已经解决了,但是解决的还不完全,或者说只解决rdd的问题,因为虽然spark的rdd支持mapPartitions操作,但是DataFrame并不支持mapPartitions的操作(对DataFrame执行map操作可以注册一个udf函数,然后调用这个udf函数)。有一个不太优美的做法是先把DataFrame转化成rdd进行mapPartitions,然后再把处理后的rdd转化成DataFrame。要在两种数据结构之间来回转化,这个操作实在是不够优美,而且效率低下。所以我们需要找到一个让mysql在map操作中只实例化一次的方法。

问题解决

要优美的解决上面提到的这个问题的话,我们就需要用到Python的单例了。Python的单例实现方式有很多种,这在边我只介绍最简单的一种。利用Python的import机制可以确保一个模块只被导入一次,如果在这个模块里面实例化一个mysql实例的话,那么即使被多次导入,也只会被实例化一次。

1
2
# SingletonMysql.py
connector = mysql.connector.connect(**mysql_conf)

1
2
3
4
5
6
def transform(row):
from SingletonMysql import connector
# Do something
rdd.map(transform)
spark.udf.register("transform", transform)
spark.sql("select transform(name) from table")

通过上面的代码,我们就可以在map函数里面使用mysql实例,而且能够保证它只会在第一次使用的时候被实例化。

总结

在一些第三方库的接口已经确定的情况下,有时候我们必须使用一些语言的特性才能达到一些特定的目标。这个Python单例的使用只是其中的一个例子,如果Spark本身的接口支持在executor端进行一些初始化操作的话就不需要用到单例了。还有,比如map里面的函数只能是一个参数的函数,但是有时候我们需要一些额外的信息,所以这时候就要用匿名函数来达到这个目的了。