Big Data Cluster (BDC) はスケールアウトデータマートとしてのデータストアを持っています。
CTP 2.3 時点では、スケールアウトデータマートへのデータ投入は標準機能として、T-SQL と Spark ジョブを使用した、2 パターンが用意されています。
- チュートリアル:Transact SQL を使用した SQL Server のデータ プールにデータを取り込む
- チュートリアル:Spark ジョブの SQL Server のデータ プールにデータを取り込む
どちらのパターンでも基本的な操作方法は同じですが、少し特殊な形での利用となっていますので、まとめてみたいと思います。
スケールアウトデータマートの利用ですが、「外部テーブル」と「スケールアウトデータマート」の 2 種類で考える必要があります。
操作対象としては、次の 2 個所となります。
スケールアウトデータマートにテーブルを作成する場合、最初に「マスターインスタンス」で外部テーブルの作成を行います。
本投稿では次のようなテーブルを作成しています。
USE StreamDB GO CREATE EXTERNAL TABLE [streamdata_datamart] ( wcs_click_date_sk BIGINT , wcs_click_time_sk BIGINT , wcs_sales_sk BIGINT , wcs_item_sk BIGINT , wcs_web_page_sk BIGINT , wcs_user_sk BIGINT) WITH ( DATA_SOURCE = SqlDataPool, DISTRIBUTION = ROUND_ROBIN ); SELECT * FROM streamdata_datamart
ポイントとなるのが「DATA_SOURCE」と「DISTRIBUTION」の 2 つの設定ですね。
DATA_SOURCE に「SqlDataPool」を指定することで、マスターインスタンスに作成したテーブルと同一の名称の実テーブルがデータプール上にも作成されます。
上記のクエリであれば「streamdata_datamart」というテーブルがマスターインスタンスに「外部テーブル」として作成が行われます。
「SqlDataPool」を指定した外部テーブルの場合、データプールを構成する各 SQL Server のインスタンスに、次のようなクエリが実行されます。
CREATE TABLE [StreamDB].[dbo].[streamdata_datamart]( wcs_click_date_sk BIGINT , wcs_click_time_sk BIGINT , wcs_sales_sk BIGINT , wcs_item_sk BIGINT , wcs_web_page_sk BIGINT , wcs_user_sk BIGINT) CREATE CLUSTERED COLUMNSTORE INDEX cci ON [dbo].[streamdata_datamart]
外部テーブルを作成することで、データプール内の各 SQL Server に実テーブルの作成が行われ、作成されたテーブルにはクラスターか列ストアインデックスが作成されていることが確認できますね。
作成されたデータプールのテーブルのデータ投入ですが、方法を理解しておく必要があります。
「外部テーブル」については、データの参照のみが可能となっており、データを投入することはできません。
データプール内に作成されたテーブルについては、データプールに接続して直接データを投入する必要があります。
具体的には次のような構成となります。
BDC の SQL Server のインスタンスについては、マスターインスタンスのみがアクセス可能となっていますので、クライアントはマスターインスタンスの外部テーブルを介して、透過的にデータプールのデータを参照します。
この参照方法については、SQL Database の Elastic Database Client のような仕組みにより実装が行われているようです。
データプールへのデータ投入については、BDC で動作している Spark 等を使用して、データプール内に直接データの投入を行うことになります。
T-SQL で実行する場合は「sp_data_pool_table_insert_data」が、Spark で実行する場合は「mssql-spark-lib-assembly-1.0.jar」が取り込み対象のデータを、データプール内のテーブルにデータ投入を行っています。
複数のデータプールへの対応は実施していないのですが、特定のデータプールへの投入であれば、次のような PySpark のアプリケーションで実行することができます。
from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import * conf = SparkConf() conf.setAppName('Data Mart Insert') sc = SparkContext(conf=conf) spark = SparkSession(sc) distFile = sc.textFile("hdfs:///importdata/web_clickstream_02.csv") tagsheader = distFile.first() header = sc.parallelize([tagsheader]) distData = distFile.subtract(header) distMap = distData.map(lambda line: line.split(",")) schema = StructType([ StructField("wcs_click_date_sk", StringType(), True), StructField("wcs_click_time_sk", StringType(), True), StructField("wcs_sales_sk", StringType(), True), StructField("wcs_item_sk", StringType(), True), StructField("wcs_web_page_sk", StringType(), True), StructField("wcs_user_sk", StringType(), True) ]) df = spark.createDataFrame(distMap, schema) server = "jdbc:sqlserver://mssql-data-pool-default-0.service-data-pool-default:1433" df.write.format("jdbc").option("url",server).option("user", "<ログイン名>").option("password", "<パスワード>").option("database", "StreamDB").option("dbtable", "streamdata_datamart").mode("append").save()
このアプリケーションは、BDC 内の HDFS に含まれているデータを、データプール (mssql-data-pool-default-0.service-data-pool-default) に INSERT を行っているものになります。
T-SQL も Spark のジョブも上記のような処理を高度な方法で実装しているものとなります。
上記はざっくりと書いたものですので、いくつかの考慮点が不足しています、がT-SQL / Spark ジョブについては次のような考慮が行われています。
- データプールの SQL Server インスタンスの動的な取得
- 複数のデータプールが存在している場合、特定の分割単位で各データプールのテーブルに分散させてデータを投入
「1」については、BDC 内のコントロールプレーンに対して、HTTP で要求を行うことで、「BDC 内に何台のデータプールが存在しているか」という情報を取得しているようです。
このような処理を実装しておくことで、複数のデータプールが存在しているかの判断と、どのサーバー名に対してデータ投入を行うかを動的に判断することができるようになります。
「2」については、T-SQL と Spark で実装方法が異なっているのですが、「複数のデータプールのテーブルにどのようにしてデータを分散させるか」の処理が行われています。
T-SQL から実行した場合は、バッチサイズ単位でどちらのデータテーブルにデータを投入するかが変わります。
(昔調べた際には、10万行ごとだった気がします)
Spark から実行した場合は、Spark の DataFrame のパーティション単位でどちらのデータテーブルにデータを投入するかが変わります。
(パーティション数 % データプールノード数 のような形でどのノードを使用するかを判断しているようです)
データプールにデータを投入する際には、アプリケーション側でシャーディングについての考慮が必要となるということですね。
(適切に考慮しないと特定のノードにデータの偏りが出てしまいますので)
スケールアウトデータマートは、チュートリアルですと T-SQL と Spark を実行するだけで終わってしまっているのですが、内部の実装方法をきちんと追っていくとなかなかに奥が深いですね。