数据 | 如何用Google Cloud 建立简易数据处理流程

上次, 在《数据硬核!如何在量化模型中生成输入数据》一文中我们提到过数据集的生成一般通过编写数据流程( Data Pipeline)来实现。针对不同的原始数据类型-批数据(Batch Data)或流数据(Streaming Data),处理数据的流程也会略有不同。本文通过一个简单的例子,来介绍如何借助Google Cloud 建立批数据处理流程。

01丨提出需求

假设我们想要建立一个数据集,其包含大陆两大证券交易所(上海及深圳证券交易所)每日成交量排名前20的股票信息。

原始信息可以通过两大证券交易所网站获得:

http://www.sse.com.cn/market/stockdata/activity/main/

http://www.szse.cn/market/stock/active/actv/index.html

02丨分析需求

在实现代码前,我们需要先对需求进行分析,建立模型,并判断需要实现的功能。

建 模

一个典型的ETL(Extract, Transform, Load)数据流程大都由以下几部分组成:

原始数据存储(download)从数据源(Vendor Source)下载数据,并备份到自有存储。自有存储的好处是便于日后重新处理或查找,而不用担心数据源的消失。

原始数据变形(reshape)将不同数据格式统一,重新生成原始数据。不同数据提供商的数据保存格式往往不同(如csv,excel,xml等),将不同的数据格式进行统一,能方便后续的数据处理。

数据归一(normalize)对变形后的数据进行重新建模,在理想的数据模型(schema)下重新生成数据。不同数据源的数据,其数据模型往往不尽相同,同时与我们最终想要得到的数据集模型也未必相同,所以我们需要对数据进行归一处理。

数据整合(merge)对不同来源的相同数据进行整合,生成最终数据集。对同一对象,不同的数据源可能会提供不同的数据。例如根据计算方法不同,不同数据提供商针对同一支证券会提供不同的的收盘价。我们需要将这些数据整合来生成最终的数据集。

分 析

在本例中,数据源(Vendor Source)共有两个,上海证券交易所和深圳证券交易所。

上海证券交易所提供的数据存储在html文件中,除了成交量的排名外还有成交金额等其他指标排名。而深圳证券交易所提供的数据存储在xlsx文件中,并仅有成交量的排名。

上海证券交易所提供的数据中除了股票代码,股票简称,累计成交量外,还有价格信息(开盘,收盘,均价)及几个比率信息(振幅,换手率)等。而深圳证券交易所提供的数据除了股票代码,股票简称,累计成交量外,也提供几个价格信息。同时两组数据的单位并不相同(上海证券交易所为万股、万元,深圳证券交易所为亿股、亿元)。

两组数据互斥(mutually exclusive),所以假设如果没有人为改写(manual override)的需求,数据整合并不需要做什么。

根据上面的分析,我们确定数据流程中需要完成以下几项任务:

下载html及xlsx文件。转换html及xslx文件至同一格式。对两组数据进行重新建模,统一数据模型及单位。简单数据整合并输出数据集。

流程图

根据上面的分析,我们可以建立以下流程图:

03丨实现需求

技术栈

在本例中,数据流程的实现需要用到Google Cloud中的以下几个产品:

Cloud Storage 用来存储原始数据文件。

Cloud Function 用来下载数据文件,触发文件存储后的各项变形。

Cloud Scheduler 用来定时执行Cloud Function。

Dataflow 用来执行数据变形,归一以及整合。Big Query 用来存储变形后,归一后,以及整合后的数据。

原始数据存储

上海证券交易所主板成交量前20的数据并没有提供文件下载链接,所以我们只能存储整个活跃股排名前20的html页面。

数据下载及存储可以通过Cloud Function,Cloud Scheduler和Cloud Storage来实现

Cloud Function伪代码如下:

通过Cloud Scheduler定时调用上面的Cloud Function,下载每日的数据信息,并将最终的html文件存储在Cloud Storage中。

深圳证券交易所主板成交量前20的数据提供xlsx格式下载。同上,我们依旧可以通过Cloud Function,Cloud Scheduler和Cloud Storage来实现

Cloud Function伪代码如下:

原始数据变形,归一以及整合

在原始文件下载后,我们需要对原始数据进行变形,归一以及整合。这些变形都可以通过Dataflow来实现。每一个数据源,我们都需要进行变形及归一操作,而整合只需要对所有数据进行统一处理便可。

针对上海证券交易所数据的变形、归一的伪代码如下:

针对深圳证券交易所数据的变形、归一与上面类似。

之后,我们可以通过Cloud Function来建立触发机制(trigger),每当原始数据文件被下载到Google Cloud Storage后,上面的Dataflow流程便会被自动调用。

因为本例中数据互斥,数据整合仅仅是简单的合并,所以整合部分的伪代码便略过了。

最后,整个流程中生成的各种数据会被存储到Big Query的各个表格中:

main_board_trading_volume_top_20中存储的数据便是我们需要的数据集。

04丨写在最后

如果要将上面所建立的数据流程应用到正式的生产环境(Production Environment)中,其实还有大量的问题需要解决,以下仅列出几个例子:

技术栈的选取随着各种技术的不断发展,我们往往有不同的选择来实现同一功能。如存储数据我们可以用Big Query,也可以用RDBMS,NoSQL等。选取的技术栈是否合适,是我们需要仔细考虑的问题。

代码复用 (Code Reusability)处理简单数据的数据流程大都大同小异。为了能减少后期维护成本,以及提高代码复用,这些数据流程往往是通过一个共同的数据流程框架来实现,而不是成百上千个独立的小程序,所以上面示例中存在的大量重复代码,在正式生产环境中并不常见。

证券标识(Security ID)统一 一个量化交易系统的搭建,往往需要数十乃至上百个数据集的协作。同一支证券,在不同数据集中往往拥有不同的标示(如Ticker,SEDOL,ISIN,CUSIP等)。为了能将新的数据集导入现有平台,我们需要解决证券标识的统一问题。

流程的可靠性(Pipeline Reliability)及错误处理 (Error Handling)一个流程的可靠性会极大地影响后期的维护成本。上面的例子中,一个常见的改动(文件地址更新,不完整文件,数据错误,数据模型更改)便会使整个数据流程出错。我们需要尽可能的完善错误处理机制,来尽可能的减少后期维护需求。


 

近期热招:(点击标题,即可了解详情)

1.金融科技招聘 | 技术专场

2.招聘 | 机器学习(数据分析) -50万+奖金-对冲基金-上海

3.招聘 | Java/C++架构师-60-90万/年-上海

4.招聘 | 数据科学家 (NLP)-对冲基金-上海&北京

5.招聘 | 数据开发工程师-40-48万+奖金-北京-对冲基金