china0114.com-日韩欧美中文免费,免费视频一区,免费视频一区,国产精品色网

公眾號(hào)
關(guān)注微信公眾號(hào)
移動(dòng)端
創(chuàng)頭條企服版APP

使用Apache Arrow助力PySpark數(shù)據(jù)處理

6226

原標(biāo)題:使用Apache Arrow助力PySpark數(shù)據(jù)處理

作者:江宇,阿里云EMR技術(shù)專家。從事Hadoop內(nèi)核開發(fā),目前專注于機(jī)器學(xué)習(xí)、深度學(xué)習(xí)大數(shù)據(jù)平臺(tái)的建設(shè)。

Apache Arrow從Spark 2.3版本開始被引入,通過列式存儲(chǔ),zero copy等技術(shù),JVM 與Python 之間的數(shù)據(jù)傳輸效率得到了大量的提升。本文主要介紹一下Apache Arrow以及Spark中的使用方法。

列式存儲(chǔ)簡(jiǎn)介

在介紹Spark中使用Apache Arrow之前,先簡(jiǎn)單的介紹一下Apache Arrow以及他背后的一些技術(shù)背景。

在大數(shù)據(jù)時(shí)代之前,大部分的存儲(chǔ)引擎使用的是按行存儲(chǔ)的形式,很多早期的系統(tǒng),如交易系統(tǒng)、ERP系統(tǒng)等每次處理的是增、刪、改、查某一個(gè)實(shí)體的所有信息,按行存儲(chǔ)的話能夠快速的定位到單個(gè)實(shí)體并進(jìn)行處理。如果使用列存儲(chǔ),對(duì)某一個(gè)實(shí)體的不同屬性的操作就需要進(jìn)行多次隨機(jī)讀寫,效率將會(huì)是非常差的。

隨著大數(shù)據(jù)時(shí)代的到來,尤其是數(shù)據(jù)分析的不斷發(fā)展,任務(wù)不需要一次讀取實(shí)體的所有屬性,而只關(guān)心特定的某些屬性,并對(duì)這些屬性進(jìn)行aggregate等復(fù)雜的操作等。這種情況下行存儲(chǔ)將需要讀取額外的數(shù)據(jù),形成瓶頸。而選擇列存儲(chǔ)將會(huì)減少額外數(shù)據(jù)的讀取,對(duì)相同屬性的數(shù)據(jù)還可以進(jìn)行壓縮,大大的加快了處理速度。

以下是行存儲(chǔ)和列存儲(chǔ)的對(duì)比說明,摘自Apache Arrow 官網(wǎng),上面是一個(gè)二維表,由三個(gè)屬性組成,分別是session_id, timestamp和source_ip。左側(cè)為行存儲(chǔ)在內(nèi)存中的表示,數(shù)據(jù)按行依次存儲(chǔ),每一行按照列的順序存儲(chǔ)。右側(cè)為列存儲(chǔ)在內(nèi)存中的表示,每一列單獨(dú)存放,根據(jù)batch size等屬性來控制一次寫入的列簇大小。這樣當(dāng)查詢語(yǔ)句只涉及少數(shù)列的時(shí)候,比如圖中SQL查詢,只需要過濾session_id列,避免讀取所有數(shù)據(jù)列,減少了大量的I/O損耗,同時(shí)考慮到CPU pipeline以及使用CPU SIMD技術(shù)等等,將大大的提升查詢速度。

Apache Arrow

在大數(shù)據(jù)領(lǐng)域,列式存儲(chǔ)的靈感來自Google于2010年發(fā)表的Dremel論文(

http://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/36632.pdf),文中介紹了一種支持嵌套結(jié)構(gòu)的存儲(chǔ)格式,并且使用了列式存儲(chǔ)的方式提升查詢性能,在Dremel論文中還介紹了Google如何使用這種存儲(chǔ)格式實(shí)現(xiàn)并行查詢的。這篇論文影響了Hadoop生態(tài)系統(tǒng)發(fā)展,之后的Apache Parquet和Apache ORC作為列式存儲(chǔ)格式已經(jīng)被廣大的Hadoop生態(tài)系統(tǒng)使用,如Spark、Hive、Impala等等。

Apache Arrow在官網(wǎng)上是這樣定義的,Apache Arrow是一個(gè)跨語(yǔ)言、跨平臺(tái)的內(nèi)存數(shù)據(jù)結(jié)構(gòu)。從這個(gè)定義中我們可以看到Apache Arrow與Apache Parquet以及Apache ORC的區(qū)別。Parquet與ORC設(shè)計(jì)的目的針對(duì)磁盤數(shù)據(jù),在列存儲(chǔ)的基礎(chǔ)上使用了高效率的壓縮算法進(jìn)行壓縮,比如使用snappy、gzip和zlib等算法對(duì)列數(shù)據(jù)進(jìn)行壓縮。所以大部分情況下在數(shù)據(jù)讀取的時(shí)候需要首先對(duì)數(shù)據(jù)進(jìn)行反壓縮,并有一定的cpu使用損耗。而Arrow,作為在內(nèi)存中的數(shù)據(jù),并不支持壓縮(當(dāng)然寫入磁盤是支持壓縮的),Arrow使用dictionary-encoded來進(jìn)行類似索引的操作。

除了列存儲(chǔ)外,Arrow在數(shù)據(jù)在跨語(yǔ)言的數(shù)據(jù)傳輸上具有相當(dāng)大的威力,Arrow的跨語(yǔ)言特性表示在Arrow的規(guī)范中,作者指定了不同數(shù)據(jù)類型的layout,包括不同原始數(shù)據(jù)類型在內(nèi)存中占的比特?cái)?shù),Array數(shù)據(jù)的組成以及Null值的表示等等。根據(jù)這些定義后,在不同的平臺(tái)和不同的語(yǔ)言中使用Arrow將會(huì)采用完全相同的內(nèi)存結(jié)構(gòu),因此在不同平臺(tái)間和不同語(yǔ)言間進(jìn)行高效數(shù)據(jù)傳輸成為了可能。在Arrow之前如果要對(duì)不同語(yǔ)言數(shù)據(jù)進(jìn)行傳輸必須要使用序列化與反序列化技術(shù)來完成,耗費(fèi)了大量的CPU資源和時(shí)間,而Arrow由于根據(jù)規(guī)范在內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)一致,可以通過共享內(nèi)存, 內(nèi)存映射文件等技術(shù)來共享Arrow內(nèi)存結(jié)構(gòu),省去了序列化與反序列過程。

Spark與Apache Arrow

介紹完Arrow的背景后,來看一下Apache Spark如何使用Arrow來加速PySpark處理的。一直以來,使用PySpark的客戶都在抱怨python的效率太低,導(dǎo)致了很多用戶轉(zhuǎn)向了使用Scala進(jìn)行開發(fā)。這主要是由于Spark使用Scala語(yǔ)言開發(fā),底層啟動(dòng)的是JVM,而PySpark是Scala中PythonRDD對(duì)象啟動(dòng)的一個(gè)Python子進(jìn)程,Python與JVM的通信使用了Py4J, 通過Py4J Python程序能夠動(dòng)態(tài)的訪問JVM中的Java對(duì)象,這一過程使用了linux pipe,在底層JVM需要對(duì)RDD進(jìn)行序列化,在Python端需要對(duì)RDD進(jìn)行反序列化,當(dāng)數(shù)據(jù)量較大的時(shí)候效率遠(yuǎn)不如直接使用Scala。流程如下圖。

很多數(shù)據(jù)科學(xué)家以及分析人員習(xí)慣使用python來進(jìn)行處理,尤其是使用Pandas和Numpy庫(kù)來對(duì)數(shù)據(jù)進(jìn)行后續(xù)處理,Spark 2.3以后引入的Arrow將會(huì)大大的提升這一效率。我們從代碼角度來看一下實(shí)現(xiàn),在Spark 2.4版本的dataframe.py代碼中,toPandas的實(shí)現(xiàn)為:

ifuse_arrow: try: from pyspark.sql.types import_check_dataframe_convert_date, _check_dataframe_localize_timestamps importpyarrow batches = self._collectAsArrow iflen(batches) > 0: table = pyarrow.Table.from_batches(batches) pdf = table.to_pandas pdf = _check_dataframe_convert_date(pdf, self.schema) return_check_dataframe_localize_timestamps(pdf, timezone) else: returnpd.DataFrame.from_records([], columns=self.columns) except Exceptionase: # Wemight have to allow fallback here aswell but multiple Sparkjobs can # be executed. So, simply fail inthis casefornow. msg = ( "toPandas attempted Arrow optimization because " "'spark.sql.execution.arrow.enabled' is set to true, but has reached " "the error below and can not continue. Note that " "'spark.sql.execution.arrow.fallback.enabled' does not have an effect " "on failures in the middle of computation.n %s"% _exception_message(e)) warnings.warn(msg) raise

如果使用了Arrow(Spark 2.4默認(rèn)使用),比較重要的一行是_collectAsArrow,_collectAsArrow實(shí)現(xiàn)為:

def_collectAsArrow(self): """ Returns all records as a list of ArrowRecordBatches, pyarrow must be installed and available on driver and worker Python environments.

.. note:: Experimental."""withSCCallSiteSync(self._sc) ascss:sock_info = self._jdf.collectAsArrowToPythonreturnlist(_load_from_socket(sock_info, ArrowStreamSerializer))

這里面使用了ArrowStreamSerializer,而ArrowStreamSerializer定義為

classArrowStreamSerializer(Serializer):"""Serializes Arrow record batches as a stream."""

defdump_stream(self, iterator, stream):importpyarrow aspawriter = Nonetry:forbatch initerator:ifwriter isNone:writer = pa.RecordBatchStreamWriter(stream, batch.schema)writer.write_batch(batch)finally:ifwriter isnotNone:writer.close

defload_stream(self, stream):importpyarrow aspareader = pa.open_stream(stream)forbatch inreader:yieldbatch

def__repr__(self):return"ArrowStreamSerializer"

可以看出在這里面,jvm對(duì)數(shù)據(jù)根據(jù)Arrow規(guī)范設(shè)置好內(nèi)存數(shù)據(jù)結(jié)構(gòu)進(jìn)行列式轉(zhuǎn)化后,Python層面并不需要任何的反序列過程,而是直接讀取,這也是Arrow高效的原因之一。對(duì)比看那一下如果不使用Arrow方法為:

defcollect(self):"""Returns all the records as a list of :class:`Row`.

>>> df.collect[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]"""withSCCallSiteSync(self._sc) ascss:sock_info = self._jdf.collectToPythonreturnlist(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer)))

序列化方法為PickleSerializer,需要對(duì)每一條數(shù)據(jù)使用PickleSerializer進(jìn)行反序列化。

那如何通過這一特性來進(jìn)行我們的開發(fā)呢,Spark提供了Pandas UDFs功能,即向量化UDF,Pandas UDF主要是通過Arrow將JVM里面的Spark DataFrame傳輸給Python生成pandas DataFrame,并執(zhí)行用于定義的UDF。目前有兩種類型,一種是Scalar,一種是Grouped Map。

這里主要介紹一下Scalar Python UDFs的使用,以及可能的場(chǎng)景。Scalar Python UDFs可以在select和withColumn中使用,他的輸入?yún)?shù)為pandas.Series類型,輸出參數(shù)為相同長(zhǎng)度的pandas.Series。Spark內(nèi)部會(huì)通過Arrow將列式數(shù)據(jù)根據(jù)batch size獲取后,批量的將數(shù)據(jù)轉(zhuǎn)化為pandas.Series類型,并在每個(gè)batch都執(zhí)行用戶定義的function。最后將不同batch的結(jié)果進(jìn)行整合,獲取最后的數(shù)據(jù)結(jié)果。

以下是官網(wǎng)的一個(gè)例子:

importpandas aspd

frompyspark.sql.functions importcol, pandas_udffrompyspark.sql.types importLongType

# Declare the function and create the UDFdefmultiply_func(a, b):returna * b

multiply = pandas_udf(multiply_func, returnType=LongType)

# The function for a pandas_udf should be able to execute with local Pandas datax = pd.Series([1, 2, 3])print(multiply_func(x, x))# 0 1# 1 4# 2 9# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSessiondf = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDFdf.select(multiply(col("x"), col("x"))).show# +-------------------+# |multiply_func(x, x)|# +-------------------+# | 1|# | 4|# | 9|# +-------------------+

首先定義udf,multiply_func,主要功能就是將a、b兩列的數(shù)據(jù)對(duì)應(yīng)行數(shù)據(jù)相乘獲取結(jié)果。然后通過pandas_udf裝飾器生成Pandas UDF。最后使用df.selecct方法調(diào)用Pandas UDF獲取結(jié)果。這里面要注意的是pandas_udf的輸入輸出數(shù)據(jù)是向量化數(shù)據(jù),包含了多行,可以根據(jù)spark.sql.execution.arrow.maxRecordsPerBatch來設(shè)置。

可以看出Pandas UDF使用非常簡(jiǎn)單,只需要定義好Pandas UDF就可以了。有了Pandas UDF后我們可以很容易的將深度學(xué)習(xí)框架和Spark進(jìn)行結(jié)合,比如在UDF中使用一些深度學(xué)習(xí)框架,比如scikit-learn,我們可以對(duì)批量的數(shù)據(jù)分別進(jìn)行訓(xùn)練。下面是一個(gè)簡(jiǎn)單的例子,利用Pandas UDF來進(jìn)行訓(xùn)練:

# Load necessary librariesfrompyspark.sql.functions importpandas_udf, PandasUDFTypefrompyspark.sql.types import*importpandas aspdfromscipy.optimize importleastsqimportnumpy asnp

# Create the schema for the resulting data frameschema = StructType([StructField('ID', LongType, True),StructField('p0', DoubleType, True),StructField('p1', DoubleType, True)])# Define the UDF, input and outputs are Pandas DFs@pandas_udf(schema, PandasUDFType.GROUPED_MAP)defanalyze_player(sample_pd):# return empty params in not enough dataif(len(sample_pd.shots) <= 1):returnpd.DataFrame({'ID': [sample_pd.player_id[0]], 'p0': [ 0], 'p1': [ 0]})# Perform curve fitting result = leastsq(fit, [1, 0], args=(sample_pd.shots, sample_pd.hits))# Return the parameters as a Pandas DF returnpd.DataFrame({'ID': [sample_pd.player_id[0]], 'p0': [result[0][0]], 'p1': [result[0][1]]})# perform the UDF and show the results player_df = df.groupby('player_id').apply(analyze_player)display(player_df)

除此之外還可以使用TensorFlow和MXNet等與Spark進(jìn)行融合,近期阿里云EMR Data Science集群將會(huì)推出相應(yīng)的功能,整合EMR Spark與深度學(xué)習(xí)框架之間調(diào)度與數(shù)據(jù)交換功能,希望大家關(guān)注。返回搜狐,查看更多

責(zé)任編輯:

聲明:該文章版權(quán)歸原作者所有,轉(zhuǎn)載目的在于傳遞更多信息,并不代表本網(wǎng)贊同其觀點(diǎn)和對(duì)其真實(shí)性負(fù)責(zé)。如涉及作品內(nèi)容、版權(quán)和其它問題,請(qǐng)?jiān)?0日內(nèi)與本網(wǎng)聯(lián)系。
您閱讀這篇文章花了0
轉(zhuǎn)發(fā)這篇文章只需要1秒鐘
喜歡這篇 0
評(píng)論一下 0
凱派爾知識(shí)產(chǎn)權(quán)全新業(yè)務(wù)全面上線
評(píng)論
試試以這些內(nèi)容開始評(píng)論吧
登錄后發(fā)表評(píng)論
凱派爾知識(shí)產(chǎn)權(quán)全新業(yè)務(wù)全面上線
寧波城市站
金華城市站
×
#熱門搜索#
精選雙創(chuàng)服務(wù)
歷史搜索 清空

Tel:18514777506

關(guān)注微信公眾號(hào)

創(chuàng)頭條企服版APP

china0114.com-日韩欧美中文免费,免费视频一区,免费视频一区,国产精品色网
91福利在线观看| 久久女同互慰一区二区三区| 欧美电影免费观看高清完整版在| 中文幕一区二区三区久久蜜桃| 午夜日韩在线电影| 成人sese在线| 精品久久久久久久久久久久久久久 | 欧美日韩精品综合在线| 中文字幕av不卡| 九九精品视频在线看| 欧美日免费三级在线| 中文字幕一区二区三中文字幕| 国产自产视频一区二区三区| 欧美日韩激情在线| 亚洲美女精品一区| 成人激情图片网| 欧美精品一区二区三| 丝袜亚洲另类欧美| 色婷婷国产精品| 国产精品欧美久久久久一区二区| 久久成人久久爱| 欧美一区二区三区公司| 亚洲成精国产精品女| 色先锋久久av资源部| 国产精品乱子久久久久| 国产麻豆视频精品| 精品国产乱子伦一区| 日本va欧美va瓶| 91超碰这里只有精品国产| 一区二区三区产品免费精品久久75| 波多野结衣91| 中文字幕一区二区三区在线观看| 成人精品免费网站| 欧美激情自拍偷拍| 国产99一区视频免费| 国产日韩一级二级三级| 国产精品一区免费视频| 2020国产精品自拍| 国产一区二区三区高清播放| 久久伊99综合婷婷久久伊| 久久99久久久久| 日韩一区二区视频在线观看| 日本不卡在线视频| 日韩欧美综合一区| 激情六月婷婷综合| 久久一区二区视频| 国产高清亚洲一区| 久久综合久色欧美综合狠狠| 国产一区二区三区观看| 久久精品男人的天堂| 国产精品香蕉一区二区三区| 国产丝袜在线精品| 福利视频网站一区二区三区| 国产精品丝袜一区| av在线这里只有精品| 最新热久久免费视频| 色欲综合视频天天天| 一区二区三区免费| 欧美日韩免费视频| 免费在线观看一区二区三区| 精品国产乱码久久久久久浪潮| 国产一区啦啦啦在线观看| 国产亚洲1区2区3区| 成人a级免费电影| 亚洲三级在线免费| 欧美日韩色一区| 老汉av免费一区二区三区| 26uuu另类欧美亚洲曰本| 国产精品一区在线| 最新国产の精品合集bt伙计| 在线一区二区三区| 日韩成人伦理电影在线观看| 精品精品欲导航| 国产精品系列在线播放| 亚洲欧美影音先锋| 欧美日本一道本在线视频| 久久99精品久久久久久| 中文字幕国产精品一区二区| 色诱视频网站一区| 日产国产欧美视频一区精品 | 亚洲激情第一区| 欧美精品丝袜久久久中文字幕| 日本不卡1234视频| 国产欧美一区二区三区鸳鸯浴 | 国产精品久久久久久久久免费桃花 | 成人免费三级在线| 亚洲主播在线观看| 日韩一区二区三区高清免费看看 | 欧美电影免费观看高清完整版| 国产成人在线免费| 亚洲一区二区影院| 日韩精品一区二区三区swag| 成人va在线观看| 日日嗨av一区二区三区四区| 国产丝袜美腿一区二区三区| 在线视频亚洲一区| 激情av综合网| 一区二区三区国产精华| 26uuu欧美| 欧美三级资源在线| 国产酒店精品激情| 亚洲午夜久久久久久久久久久| 2023国产精品| 欧美午夜精品电影| 国产精品一区二区免费不卡| 亚洲一区二区综合| 久久精品在线观看| 欧美日本一区二区在线观看| 国产91精品露脸国语对白| 亚洲大片一区二区三区| 久久久久国色av免费看影院| 欧美日韩一区二区不卡| 成人在线综合网站| 日精品一区二区| 亚洲日本在线观看| 久久综合狠狠综合久久综合88| 在线精品国精品国产尤物884a| 国产一区在线观看视频| 偷拍自拍另类欧美| 亚洲视频图片小说| 久久人人超碰精品| 宅男噜噜噜66一区二区66| 波多野结衣一区二区三区| 久久国产精品无码网站| 亚洲一级在线观看| 国产精品国产三级国产aⅴ原创| 日韩一区二区三区免费看| 日本大香伊一区二区三区| 国产福利91精品一区| 石原莉奈一区二区三区在线观看 | 国v精品久久久网| 蜜臀国产一区二区三区在线播放| 亚洲乱码日产精品bd| 国产欧美精品区一区二区三区| 欧美一级欧美三级在线观看| 日本国产一区二区| voyeur盗摄精品| 国产精品一二三四五| 麻豆久久久久久| 偷窥国产亚洲免费视频| 亚洲精品国产无天堂网2021| 国产精品热久久久久夜色精品三区| 日韩你懂的在线观看| 欧美精品日韩一区| 欧美天天综合网| 99精品欧美一区二区蜜桃免费 | 欧美高清视频在线高清观看mv色露露十八| 成人av手机在线观看| 国产一区二区美女诱惑| 久久国产剧场电影| 免费高清不卡av| 日韩国产精品久久| 午夜精品久久久久久久久久久| 自拍偷拍国产精品| 中文字幕在线观看不卡| 亚洲国产精品二十页| 国产拍揄自揄精品视频麻豆| 久久综合给合久久狠狠狠97色69| 欧美一级二级三级蜜桃| 91精品国产乱码| 日韩一卡二卡三卡四卡| 欧美一区二区三区免费视频| 欧美日韩国产小视频| 欧美日韩中文一区| 欧美色图激情小说| 欧美日韩精品三区| 欧美日韩国产高清一区二区三区| 欧美丝袜自拍制服另类| 色老头久久综合| 欧美综合色免费| 欧美日韩一区二区欧美激情| 欧美美女黄视频| 欧美一区三区四区| 日韩美女在线视频| 欧美精品一区二区三区视频| 久久久亚洲午夜电影| 久久久久国色av免费看影院| 国产欧美日韩亚州综合| 国产精品女同互慰在线看| 国产精品国产三级国产专播品爱网| 亚洲欧美在线视频| 一区二区三区欧美视频| 性做久久久久久久免费看| 午夜精品福利一区二区三区av| 日韩av电影免费观看高清完整版在线观看| 日本网站在线观看一区二区三区| 蜜桃免费网站一区二区三区| 国产在线精品国自产拍免费| 国产精品18久久久久久久久 | 精品国产乱码久久久久久闺蜜 | 美女www一区二区| 韩国欧美国产1区| 国产精品18久久久久| 不卡的av网站| 欧美中文字幕一区二区三区| 69av一区二区三区| xnxx国产精品| 椎名由奈av一区二区三区| 一区二区三区日韩欧美| 日日摸夜夜添夜夜添精品视频|