關(guān)注微信公眾號(hào)
創(chuàng)頭條企服版APP
原標(biāo)題:使用Apache Arrow助力PySpark數(shù)據(jù)處理
作者:江宇,阿里云EMR技術(shù)專家。從事Hadoop內(nèi)核開發(fā),目前專注于機(jī)器學(xué)習(xí)、深度學(xué)習(xí)大數(shù)據(jù)平臺(tái)的建設(shè)。
Apache Arrow從Spark 2.3版本開始被引入,通過列式存儲(chǔ),zero copy等技術(shù),JVM 與Python 之間的數(shù)據(jù)傳輸效率得到了大量的提升。本文主要介紹一下Apache Arrow以及Spark中的使用方法。
列式存儲(chǔ)簡(jiǎn)介
在介紹Spark中使用Apache Arrow之前,先簡(jiǎn)單的介紹一下Apache Arrow以及他背后的一些技術(shù)背景。
在大數(shù)據(jù)時(shí)代之前,大部分的存儲(chǔ)引擎使用的是按行存儲(chǔ)的形式,很多早期的系統(tǒng),如交易系統(tǒng)、ERP系統(tǒng)等每次處理的是增、刪、改、查某一個(gè)實(shí)體的所有信息,按行存儲(chǔ)的話能夠快速的定位到單個(gè)實(shí)體并進(jìn)行處理。如果使用列存儲(chǔ),對(duì)某一個(gè)實(shí)體的不同屬性的操作就需要進(jìn)行多次隨機(jī)讀寫,效率將會(huì)是非常差的。
隨著大數(shù)據(jù)時(shí)代的到來,尤其是數(shù)據(jù)分析的不斷發(fā)展,任務(wù)不需要一次讀取實(shí)體的所有屬性,而只關(guān)心特定的某些屬性,并對(duì)這些屬性進(jìn)行aggregate等復(fù)雜的操作等。這種情況下行存儲(chǔ)將需要讀取額外的數(shù)據(jù),形成瓶頸。而選擇列存儲(chǔ)將會(huì)減少額外數(shù)據(jù)的讀取,對(duì)相同屬性的數(shù)據(jù)還可以進(jìn)行壓縮,大大的加快了處理速度。
以下是行存儲(chǔ)和列存儲(chǔ)的對(duì)比說明,摘自Apache Arrow 官網(wǎng),上面是一個(gè)二維表,由三個(gè)屬性組成,分別是session_id, timestamp和source_ip。左側(cè)為行存儲(chǔ)在內(nèi)存中的表示,數(shù)據(jù)按行依次存儲(chǔ),每一行按照列的順序存儲(chǔ)。右側(cè)為列存儲(chǔ)在內(nèi)存中的表示,每一列單獨(dú)存放,根據(jù)batch size等屬性來控制一次寫入的列簇大小。這樣當(dāng)查詢語(yǔ)句只涉及少數(shù)列的時(shí)候,比如圖中SQL查詢,只需要過濾session_id列,避免讀取所有數(shù)據(jù)列,減少了大量的I/O損耗,同時(shí)考慮到CPU pipeline以及使用CPU SIMD技術(shù)等等,將大大的提升查詢速度。
Apache Arrow
在大數(shù)據(jù)領(lǐng)域,列式存儲(chǔ)的靈感來自Google于2010年發(fā)表的Dremel論文(
http://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/36632.pdf),文中介紹了一種支持嵌套結(jié)構(gòu)的存儲(chǔ)格式,并且使用了列式存儲(chǔ)的方式提升查詢性能,在Dremel論文中還介紹了Google如何使用這種存儲(chǔ)格式實(shí)現(xiàn)并行查詢的。這篇論文影響了Hadoop生態(tài)系統(tǒng)發(fā)展,之后的Apache Parquet和Apache ORC作為列式存儲(chǔ)格式已經(jīng)被廣大的Hadoop生態(tài)系統(tǒng)使用,如Spark、Hive、Impala等等。
Apache Arrow在官網(wǎng)上是這樣定義的,Apache Arrow是一個(gè)跨語(yǔ)言、跨平臺(tái)的內(nèi)存數(shù)據(jù)結(jié)構(gòu)。從這個(gè)定義中我們可以看到Apache Arrow與Apache Parquet以及Apache ORC的區(qū)別。Parquet與ORC設(shè)計(jì)的目的針對(duì)磁盤數(shù)據(jù),在列存儲(chǔ)的基礎(chǔ)上使用了高效率的壓縮算法進(jìn)行壓縮,比如使用snappy、gzip和zlib等算法對(duì)列數(shù)據(jù)進(jìn)行壓縮。所以大部分情況下在數(shù)據(jù)讀取的時(shí)候需要首先對(duì)數(shù)據(jù)進(jìn)行反壓縮,并有一定的cpu使用損耗。而Arrow,作為在內(nèi)存中的數(shù)據(jù),并不支持壓縮(當(dāng)然寫入磁盤是支持壓縮的),Arrow使用dictionary-encoded來進(jìn)行類似索引的操作。
除了列存儲(chǔ)外,Arrow在數(shù)據(jù)在跨語(yǔ)言的數(shù)據(jù)傳輸上具有相當(dāng)大的威力,Arrow的跨語(yǔ)言特性表示在Arrow的規(guī)范中,作者指定了不同數(shù)據(jù)類型的layout,包括不同原始數(shù)據(jù)類型在內(nèi)存中占的比特?cái)?shù),Array數(shù)據(jù)的組成以及Null值的表示等等。根據(jù)這些定義后,在不同的平臺(tái)和不同的語(yǔ)言中使用Arrow將會(huì)采用完全相同的內(nèi)存結(jié)構(gòu),因此在不同平臺(tái)間和不同語(yǔ)言間進(jìn)行高效數(shù)據(jù)傳輸成為了可能。在Arrow之前如果要對(duì)不同語(yǔ)言數(shù)據(jù)進(jìn)行傳輸必須要使用序列化與反序列化技術(shù)來完成,耗費(fèi)了大量的CPU資源和時(shí)間,而Arrow由于根據(jù)規(guī)范在內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)一致,可以通過共享內(nèi)存, 內(nèi)存映射文件等技術(shù)來共享Arrow內(nèi)存結(jié)構(gòu),省去了序列化與反序列過程。
Spark與Apache Arrow
介紹完Arrow的背景后,來看一下Apache Spark如何使用Arrow來加速PySpark處理的。一直以來,使用PySpark的客戶都在抱怨python的效率太低,導(dǎo)致了很多用戶轉(zhuǎn)向了使用Scala進(jìn)行開發(fā)。這主要是由于Spark使用Scala語(yǔ)言開發(fā),底層啟動(dòng)的是JVM,而PySpark是Scala中PythonRDD對(duì)象啟動(dòng)的一個(gè)Python子進(jìn)程,Python與JVM的通信使用了Py4J, 通過Py4J Python程序能夠動(dòng)態(tài)的訪問JVM中的Java對(duì)象,這一過程使用了linux pipe,在底層JVM需要對(duì)RDD進(jìn)行序列化,在Python端需要對(duì)RDD進(jìn)行反序列化,當(dāng)數(shù)據(jù)量較大的時(shí)候效率遠(yuǎn)不如直接使用Scala。流程如下圖。
很多數(shù)據(jù)科學(xué)家以及分析人員習(xí)慣使用python來進(jìn)行處理,尤其是使用Pandas和Numpy庫(kù)來對(duì)數(shù)據(jù)進(jìn)行后續(xù)處理,Spark 2.3以后引入的Arrow將會(huì)大大的提升這一效率。我們從代碼角度來看一下實(shí)現(xiàn),在Spark 2.4版本的dataframe.py代碼中,toPandas的實(shí)現(xiàn)為:
如果使用了Arrow(Spark 2.4默認(rèn)使用),比較重要的一行是_collectAsArrow,_collectAsArrow實(shí)現(xiàn)為:
.. note:: Experimental."""withSCCallSiteSync(self._sc) ascss:sock_info = self._jdf.collectAsArrowToPythonreturnlist(_load_from_socket(sock_info, ArrowStreamSerializer))
這里面使用了ArrowStreamSerializer,而ArrowStreamSerializer定義為
defdump_stream(self, iterator, stream):importpyarrow aspawriter = Nonetry:forbatch initerator:ifwriter isNone:writer = pa.RecordBatchStreamWriter(stream, batch.schema)writer.write_batch(batch)finally:ifwriter isnotNone:writer.close
defload_stream(self, stream):importpyarrow aspareader = pa.open_stream(stream)forbatch inreader:yieldbatch
def__repr__(self):return"ArrowStreamSerializer"
可以看出在這里面,jvm對(duì)數(shù)據(jù)根據(jù)Arrow規(guī)范設(shè)置好內(nèi)存數(shù)據(jù)結(jié)構(gòu)進(jìn)行列式轉(zhuǎn)化后,Python層面并不需要任何的反序列過程,而是直接讀取,這也是Arrow高效的原因之一。對(duì)比看那一下如果不使用Arrow方法為:
>>> df.collect[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]"""withSCCallSiteSync(self._sc) ascss:sock_info = self._jdf.collectToPythonreturnlist(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer)))
序列化方法為PickleSerializer,需要對(duì)每一條數(shù)據(jù)使用PickleSerializer進(jìn)行反序列化。
那如何通過這一特性來進(jìn)行我們的開發(fā)呢,Spark提供了Pandas UDFs功能,即向量化UDF,Pandas UDF主要是通過Arrow將JVM里面的Spark DataFrame傳輸給Python生成pandas DataFrame,并執(zhí)行用于定義的UDF。目前有兩種類型,一種是Scalar,一種是Grouped Map。
這里主要介紹一下Scalar Python UDFs的使用,以及可能的場(chǎng)景。Scalar Python UDFs可以在select和withColumn中使用,他的輸入?yún)?shù)為pandas.Series類型,輸出參數(shù)為相同長(zhǎng)度的pandas.Series。Spark內(nèi)部會(huì)通過Arrow將列式數(shù)據(jù)根據(jù)batch size獲取后,批量的將數(shù)據(jù)轉(zhuǎn)化為pandas.Series類型,并在每個(gè)batch都執(zhí)行用戶定義的function。最后將不同batch的結(jié)果進(jìn)行整合,獲取最后的數(shù)據(jù)結(jié)果。
以下是官網(wǎng)的一個(gè)例子:
frompyspark.sql.functions importcol, pandas_udffrompyspark.sql.types importLongType
# Declare the function and create the UDFdefmultiply_func(a, b):returna * b
multiply = pandas_udf(multiply_func, returnType=LongType)
# The function for a pandas_udf should be able to execute with local Pandas datax = pd.Series([1, 2, 3])print(multiply_func(x, x))# 0 1# 1 4# 2 9# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSessiondf = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDFdf.select(multiply(col("x"), col("x"))).show# +-------------------+# |multiply_func(x, x)|# +-------------------+# | 1|# | 4|# | 9|# +-------------------+
首先定義udf,multiply_func,主要功能就是將a、b兩列的數(shù)據(jù)對(duì)應(yīng)行數(shù)據(jù)相乘獲取結(jié)果。然后通過pandas_udf裝飾器生成Pandas UDF。最后使用df.selecct方法調(diào)用Pandas UDF獲取結(jié)果。這里面要注意的是pandas_udf的輸入輸出數(shù)據(jù)是向量化數(shù)據(jù),包含了多行,可以根據(jù)spark.sql.execution.arrow.maxRecordsPerBatch來設(shè)置。
可以看出Pandas UDF使用非常簡(jiǎn)單,只需要定義好Pandas UDF就可以了。有了Pandas UDF后我們可以很容易的將深度學(xué)習(xí)框架和Spark進(jìn)行結(jié)合,比如在UDF中使用一些深度學(xué)習(xí)框架,比如scikit-learn,我們可以對(duì)批量的數(shù)據(jù)分別進(jìn)行訓(xùn)練。下面是一個(gè)簡(jiǎn)單的例子,利用Pandas UDF來進(jìn)行訓(xùn)練:
# Create the schema for the resulting data frameschema = StructType([StructField('ID', LongType, True),StructField('p0', DoubleType, True),StructField('p1', DoubleType, True)])# Define the UDF, input and outputs are Pandas DFs@pandas_udf(schema, PandasUDFType.GROUPED_MAP)defanalyze_player(sample_pd):# return empty params in not enough dataif(len(sample_pd.shots) <= 1):returnpd.DataFrame({'ID': [sample_pd.player_id[0]], 'p0': [ 0], 'p1': [ 0]})# Perform curve fitting result = leastsq(fit, [1, 0], args=(sample_pd.shots, sample_pd.hits))# Return the parameters as a Pandas DF returnpd.DataFrame({'ID': [sample_pd.player_id[0]], 'p0': [result[0][0]], 'p1': [result[0][1]]})# perform the UDF and show the results player_df = df.groupby('player_id').apply(analyze_player)display(player_df)
除此之外還可以使用TensorFlow和MXNet等與Spark進(jìn)行融合,近期阿里云EMR Data Science集群將會(huì)推出相應(yīng)的功能,整合EMR Spark與深度學(xué)習(xí)框架之間調(diào)度與數(shù)據(jù)交換功能,希望大家關(guān)注。返回搜狐,查看更多
責(zé)任編輯: