spark

9月 262019
 

In part 1 of this post, we looked at setting up Spark jobs from Cloud Analytics Services (CAS) to load and save data to and from Hadoop. Now we are moving on to the next step in the analytic cycle, scoring data in Hadoop and executing SAS code as a Spark job. The Spark scoring jobs execute using SAS In-Database Technologies for Hadoop.

The integration of the SAS Embedded Process and Hadoop allows scoring code to run directly on Hadoop. As a result, publishing and scoring of both DS2 and Data step models occur inside Hadoop. Furthermore, access to Spark data exists through the SAS Workspace Server or the SAS Compute Server using SAS/ACCESS to Hadoop, or from the CAS server using SAS Data Connectors.

Scoring Data from CAS using Spark

SAS PROC SCOREACCEL provides an interface to the CAS server for DS2 and DATA step model publishing and scoring. Model code is published from CAS to Spark and then executed via the SAS Embedded Process.

PROC SCOREACCEL supports a file interface for passing the model components (model program, format XML, and analytic stores). The procedure reads the specified files and passes their contents on to the model-publishing CAS action. In this case, the files must be visible from the SAS client.

CAS publishModel and runModel actions publish and execute score data in Spark:

 
%let CLUSTER="/opt/sas/viya/config/data/hadoop/lib:
  /opt/sas/viya/config/data/hadoop/lib/spark:/opt/sas/viya/config/data/hadoop/conf";
proc scoreaccel sessref=mysess1;
publishmodel
   target=hadoop
   modelname="simple01"
   modeltype=DS2
/* 
   filelocation=local */
programfile="/demo/code/simple.ds2"
username="cas"
modeldir="/user/cas"
classpath=&CLUSTER.
; runmodel 
    target=hadoop
    modelname="simple01"
    username="cas"
    modeldir="/user/cas"
    server=hadoop.server.com'
    intable="simple01_scoredata"
    outtable="simple01_outdata"
    forceoverwrite=yes
    classpath=&CLUSTER.
    platform=SPARK
; 
quit;

In the PROC SCOREACCEL example above, a DS2 model is published to Hadoop and executed with the Spark processing engine. The CLASSPATH statement specifies a link to the Hadoop cluster. The input and output tables, simple01_scoredata and simple01_outdata, already exist on the Hadoop cluster.

Score data in Spark from CAS using SAS Scoring Accelerator

SAS Scoring Accelerator execution status in YARN as a Spark job from CAS

As you can see in the image above, the model is scored in Spark using the SAS Scoring Accelerator. The Spark job name reflects the input and output tables.

Scoring Data from MVA SAS using Spark

Steps to run a scoring model in Hadoop:

  1. Create a traditional scoring model using SAS Enterprise Miner or an analytic store scoring model, generated using SAS Factory Miner HPFOREST or HPSVM components.
  2. Specify the Hadoop connection attributes: %let indconn= user=myuserid;
  3. Use the INDCONN macro variable to provide credentials to connect to the Hadoop HDFS and Spark. Assign the INDCONN macro before running the %INDHD_PUBLISH_MODEL and the %INDHD_RUN_MODEL macros.
  4. Run the %INDHD_PUBLISH_MODEL macro.
  5. With traditional model scoring, the %INDHD_PUBLISH_MODEL performs multiple tasks using some of the files created by the SAS Enterprise Miner Score Code Export node. Using the scoring model program (score.sas file), the properties file (score.xml file), and (if the training data includes SAS user-defined formats) a format catalog, this model performs the following tasks:

    • translates the scoring model into the sasscore_modelname.ds2 file, runs scoring inside the SAS Embedded Process
    • takes the format catalog, if available, and produces the sasscore_modelname.xml file. This file contains user-defined formats for the published scoring model.
    • uses SAS/ACCESS Interface to Hadoop to copy the sasscore_modelname.ds2 and sasscore_modelname.xml scoring files to HDFS
  6. Run the %INDHD_RUN_MODEL macro.

The %INDHD_RUN_MODEL macro initiates a Spark job that uses the files generated by the %INDHD_PUBLISH_MODEL to execute the DS2 program. The Spark job stores the DS2 program output in the HDFS location specified by either the OUTPUTDATADIR= argument or by the element in the HDMD file.

Here is an example:

 
option set=SAS_HADOOP_CONFIG_PATH="/opt/sas9.4/Config/Lev1/HadoopServer/conf";
option set=SAS_HADOOP_JAR_PATH="/opt/sas9.4/Config/Lev1/HadoopServer/lib:/opt/sas9.4/Config/Lev1/HadoopServer/lib/spark";
 
%let scorename=m6sccode;
%let scoredir=/opt/code/score;
option sastrace=',,,d' sastraceloc=saslog;
option set=HADOOPPLATFORM=SPARK;
 
%let indconn = %str(USER=hive HIVE_SERVER=’hadoop.server.com');
%put &indconn;
%INDHD_PUBLISH_MODEL( dir=&scoredir., 
        datastep=&scorename..sas,
        xml=&scorename..xml,
        modeldir=/sasmodels,
        modelname=m6score,
        action=replace);
 
%INDHD_RUN_MODEL(inputtable=sampledata, 
outputtable=sampledata9score, 
scorepgm=/sasmodels/m6score/m6score.ds2, 
trace=yes, 
    platform=spark);
Score data in Spark from MVA SAS using Spark

SAS Scoring Accelerator execution status in YARN as a Spark job from MVA SAS

To execute the job in Spark, either set the HADOOPPLATFORM= option to SPARK or set PLATFORM= to SPARK inside the INDHD_RUN_MODEL macro. The SAS Scoring Accelerator uses SAS Embedded Process to execute the Spark job with the job name containing the input table and output table.

Executing user-written DS2 code using Spark

User-written DS2 programs can be complex. When running inside a database, a code accelerator execution plan might require multiple phases. Because of Scala program generation that integrates with the SAS Embedded Process program interface to Spark, the many phases of a Code Accelerator job reduces to one single Spark job.

In-Database Code Accelerator

The SAS In-Database Code Accelerator on Spark is a combination of generated Scala programs, Spark SQL statements, HDFS files access, and DS2 programs. The SAS In-Database Code Accelerator for Hadoop enables the publishing of user-written DS2 thread or data programs to Spark, executes in parallel, and exploits Spark’s massively parallel processing. Examples of DS2 thread programs include large transpositions, computationally complex programs, scoring models, and BY-group processing.

Below is a table of required DS2 options to execute as a Spark job.

DS2ACCEL Set to YES
HADOOPPLATFORM Set to SPARK

 

There are six different ways to run the code accelerator inside Spark, called CASES. The generation of the Scala program by the SAS Embedded Process Client Interface depends on how the DS2 program is written. In the following example, we are looking at Case 2, which is a thread and a data program, neither of them with a BY statement:

 
proc ds2 ds2accel=yes;
thread work.workthread / overwrite=yes; 
        method run();
          set hdplib.cars;
          output;
end; endthread; run; 
  data hdplib.carsout (overwrite=yes); dcl thread work.workthread m;
  dcl double count;
  keep count make model;
method run(); set from m; count+1; output; 
end; enddata; run; quit;

The entire DS2 program runs in two phases. The DS2 thread program runs during Phase One, and its tasks execute in parallel. The DS2 data program runs during Phase Two using a single task.

Finally

With SAS Scoring Accelerator and Spark integration, users have the power and flexibility to process and score modeled data in Spark. SAS Code Accelerator and Spark integration takes the flexibility further to process any Spark data using DS2 code. Furthermore it is now possible for business to respond to use cases immediately and with higher reliability in the big data space.

Data and Analytics Innovation using SAS & Spark - part 2 was published on SAS Users.

8月 282019
 

This article is not a tutorial on Hadoop, Spark, or big data. At the same time, no prerequisite knowledge of these technologies is required for understanding. We’ll give you enough background prior to diving into the details. In simplest terms, the Hadoop framework maintains the data and Spark controls and directs data processing. As an analogy, think of Hadoop as a train, big data as the payload, and Spark as the crew driving the train and organizing and distributing the goods.

Big data

I recently read that data volumes are doubling each year. Not that long ago we talked in terms of gigabytes. This quickly turned into terabytes and we’re now in the age of petabytes. The type of data is also changing. Data used to fit neatly into rows and columns. Now, nearly eighty percent of data is unstructured. All these trends and facts have led us to deal with massive amounts of data, aka big data. Maintaining and processing big data required creating technical frameworks. Next, we’ll investigate a couple of these tools.

Hadoop

Hadoop is a technology stack utilizing parallel processing on a distributed filesystem. Hadoop is useful to companies when data sets become so large or complex that their current solutions cannot effectively process the information in a reasonable amount of time. As the data science field has matured over the past few years, so has the need for a different approach to processing data.

Apache Spark

Apache Spark is a cluster-computing framework utilizing both iterative algorithms and interactive/exploratory data analysis. The goal of Spark is to keep the benefits of Hadoop’s scalable, distributed, fault-tolerant processing framework, while making it more efficient and easier to use. Using in-memory distributed computing, Spark provides capabilities over and above the batch model of Hadoop MapReduce. As a result, this brings to the big data world new applications of data science that were previously too expensive or slow on massive data sets.

Now let’s explore how SAS integrates with these technologies to maximize capturing, managing, and analyzing big data.

SAS capabilities to leverage Spark

SAS provides Hadoop data processing and data scoring capabilities using SAS/ACCESS Interface to Hadoop and In-Database Technologies to Hadoop with MapReduce or Spark as the processing framework. This addresses some of the traditional data management batch processing, huge volumes of extract, transform, load (ETL) data as well as faster, interactive and in-memory processing for quicker response with Spark.

In SAS Viya, SAS/ACCESS Interface to Hadoop includes SAS Data Connector to Hadoop. All users with SAS/ACCESS Interface to Hadoop can use the serial. Likewise, SAS Data Connect Accelerator to Hadoop can load or save data in parallel between Hadoop and SAS using SAS Embedded Process, as a Hive/MapReduce or Spark job.

Connecting to Spark in a Hadoop Cluster

There are two ways to connect to a Hadoop cluster using SAS/ACCESS Interface to Hadoop, based on the SAS environment: LIBNAME and CASLIB statements.

LIBNAME statement to connect to Spark from MVA SAS

options set=SAS_HADOOP_JAR_PATH="/third_party/Hadoop/jars/lib:/third_party/Hadoop/jars/lib/spark"; 
options set=SAS_HADOOP_CONFIG_PATH="/third_party/Hadoop/conf"; 
 
libname hdplib hadoop server="hadoop.server.com" port=10000 user="hive"
schema='default' properties="hive.execution.engine=SPARK";

Parameters

SAS_HADOOP_JAR_PATH Directory path for the Hadoop and Spark JAR files
SAS_HADOOP_CONFIG_PATH Directory path for the Hadoop cluster configuration files
Libref The hdplib libref specifies the location where SAS will find the data
SAS/ACCESS Engine Name HADOOP option to connect Hadoop engine
SERVER Hadoop Hive server to connect
PORT Listening Hive server Port. 10000 is the default, so it is not required. It is included just in case
USER and PASSWORD Are not always required
SCHEMA Hive schema to access. It is optional; by default, it connects to the “default” schema
PROPERTIES Hadoop properties. Choosing SPARK for the property hive.execution.engine enables SAS Viya to use Spark as the execution platform

 
CASLIB statement to connect from CAS

caslib splib sessref=mysession datasource=(srctype="hadoop", dataTransferMode="auto",username="hive", server="hadoop.server.com", 
hadoopjarpath="/opt/sas/viya/config/data/hadoop/lib:/opt/sas/viya/conf ig/data/hadoop/lib/spark", 
hadoopconfigdir="/opt/sas/viya/config/data/hadoop/conf", schema="default"
platform="spark"
dfdebug="EPALL" 
properties="hive.execution.engine=SPARK");

Parameters

CASLIB Space holder for the specified data access. The splib CAS library specifies the Hadoop data source
sessref Holds the CAS library in a specific CAS session. mysession is the current active CAS session
SRCTYPE Type of data source
DATATRANSFERMODE Type of data movement between CAS and Hadoop. Accepts one of three values – serial, parallel, auto. When AUTO is specified, CAS choose the type of data transfer based on available license in the system. If Data Connect Accelerator to Hadoop has been licensed, parallel data transfer will be used, otherwise serial mode of transfer is used
HADOOPJARPATH Hadoop and Spark JAR files location path on the CAS cluster
HADOOPCONFIGDIR Hadoop configuration files location path on the CAS cluster
PLATFORM Type of Hadoop platform to execute the job or transfer data using SAS Embedded Process. Default value is “mapred” for Hive MapReduce. When using “Spark”, data transfer and job executes as a Spark job
DFDEBUG Used to receive additional information back from SAS Embedded Process transfers data in the SAS log
PROPERTIES Hadoop properties. Choosing SPARK for the property hive.execution.engine enables SAS Viya to use Spark as the execution platform

 

Data Access using Spark

SAS Data Connect Accelerator for Hadoop with the Spark platform option uses Hive as the query engine to access Spark data. Data movement happens between Spark and CAS through SAS generated Scala code. This approach is useful when data already exists in Spark and either needs to be used for SAS analytics processing or moved to CAS for massively parallel data and analytics processing.

Loading Data from Hadoop to CAS using Spark

Processing data in CAS offers advanced data preparation, visualization, modeling and model pipelines, and finally model deployment. Model deployment can be performed using available CAS modules or pushed back to Spark if the data is already in Hadoop.

Load data from Hadoop to CAS using Spark

proc casutil 
      incaslib=splib
      outcaslib=casuser;
      load casdata="gas"
      casout="gas"
      replace;
run;

Parameters

PROC CASUTIL Used to process CAS action routines to process data
INCASLIB Input CAS library to read data
OUTCASLIB Output CAS library to write data
CASDATA Table to load to the CAS in-memory server
CASOUT Output CAS table name

 

We can look at the status of the data load job using Hadoop' resource management and job scheduling application, YARN. YARN is responsible for allocating system resources to the various applications running in a Hadoop cluster and scheduling tasks to be executed on different cluster nodes.

Loading Data from Hadoop to Viya CAS using Spark

In the figure above, the YARN application executed the data load as a Spark job. This was possible because the CASLIB statement had Platform= Spark option specified. The data movement direction, in this case Hadoop to CAS uses the Spark job name, “SAS CAS/DC Input,” where “Input” is data loaded into CAS.

Saving Data from CAS to Hadoop using Spark

You can save data back to Hadoop from CAS at many stages of the analytic life cycle. For example, use data in CAS to prepare, blend, visualize, and model. Once the data meets the business use case, data can be saved in parallel to Hadoop using Spark jobs to share with other parts of the organization.

Using the SAVE CAS action to move data to Hadoop using Spark

proc cas;
session mysession; 
      table.save /
      caslib="splib"
      table={caslib="casuser", name="gas"},
      name="gas.sashdat"
      replace=True;
quit;

Parameters

PROC CAS Used to execute CAS actionsets and actions to process data.
“table” is the actionset and “save” is the action
TABLE Location and name of the source table
NAME Name of the target table saved to the Hadoop library using Spark

 

We can verify the status of saving data from CAS to Hadoop using YARN application. Data from CAS saves as a Hadoop table using, Spark as the execution platform. Furthermore, as SAS Data Connect Accelerator for Hadoop transfers data in parallel, individual Spark executors in each of the Spark executor nodes handles data execution for that specific Hadoop cluster node.

Saving Data from Viya CAS to Hadoop using Spark

Finally, the SAVE data executed as a Spark job. As we can see from YARN, the Spark job named “SAS CAS/DC Output” specifies that the data moves from CAS to Hadoop.

Where we are; where we're going

We have so far traveled across the Spark pond to setup SAS libraries for Spark, Load and Save data from and to Hadoop using Spark. In the next section we’ll look at ways to Score data and execute SAS code inside Hadoop using Spark.

Data and Analytics Innovation using SAS & Spark - part 1 was published on SAS Users.

3月 182016
 

As the big data era continues to evolve, Hadoop remains the workhorse for distributed computing environments. MapReduce has been the dominant workload in Hadoop, but Spark -- due to its superior in-memory performance -- is seeing rapid acceptance and growing adoption. As the Hadoop ecosystem matures, users need the flexibility to use either traditional MapReduce […]

MapReduce vs. Apache Spark vs. SQL: Your questions answered here and at #StrataHadoop was published on SAS Voices.

3月 032016
 

I've been doing some investigation into Apache Spark, and I'm particularly intrigued by the concept of the resilient distributed dataset, or RDD. According to the Apache Spark website, an RDD is “a fault-tolerant collection of elements that can be operated on in parallel.” Two aspects of the RDD are particularly […]

The post Big data quality with continuations appeared first on The Data Roundtable.

7月 292015
 
This summer I took the Spark courses at edx CS100 and CS190, and had wonderful experience.
The two classes apply a Vagrant virtual machine containing Spark and all teaching materials. There are two challenges with the virtual machine —
  1. The labs usually take long time to finish, say 8-10 hours. If the host machine is closed, the RDDs will be lost and the pipeline has to be run again.
  2. Some RDD operations take a lot computation/communication powers, such as groupByKey and distinct. Many of my 50k classmates complained about the waiting time. And my most used laptop is a Chromebook and doesn’t even have options to install Virtual Box.
To deploy the learning environment to a cloud may be an alternative. DigitalOcean is a good choice because it uses mirrors for most packages, and the network speed is amazingly fast that is almost 100MB/s (thanks to the SSD infrastructure DigitalOcean implements for the cloud, otherwise the hard disk may not stand this rapid IO; see my deployment records GitHub).

I found that a Linux box with 1 GB memory and 1 CPU at DigitalOcean that costs 10 dollars a month will handle most labs fairly easy with IPython and Spark. A 2 GB memory and 2 CPU droplet will be ideal since it is the minimal requirement for a simulated cluster. It costs 20 dollars a month, but is still much cheaper than the cost to earn the big data certificate that is $100 (50 for each). I just need to write Python scripts to install IPython notebook with SSL, and download Spark and the course materials.
  • The DevOps tool is Fabric and the fabfile is at GitHub.
  • The deployment pipeline is also at GitHub
7月 182015
 
The demo pipeline is at GitHub.
Since the version 1.3, Spark has introduced the new data structure DataFrame. A data analyst now could easily scale out the exsiting codes based on the DataFrame from Python or R to a cluster hosting Hadoop and Spark.
There are quite a few practical scenarios that DataFrame fits well. For example, a lot of data files including the hardly read SAS files want to merge into a single data store. Apache Parquet is a popular column store in a distributed environment, and especially friendly to structured or semi-strucutred data. It is an ideal candidate for a univeral data destination.
I copy three SAS files called prdsale, prdsal2 and prdsal3, which are about a simulated sales record, from the SASHELP library to a Linux directory. And then I launch the SQL context from Spark 1.4.
The three SAS files now have the size of 4.2MB. My overall strategy is to build a pipeline to realize my purpose such as SAS --> Python --> Spark --> Parquet.
import os
try:
import sas7bdat
import pandas
except ImportError:
print('try to install the packags first')

print('Spark verion is {}'.format(sc.version))

if type(sqlContext) != pyspark.sql.context.HiveContext:
print('reset the Spark SQL context')

os.chdir('/root/playground')

def print_bytes(filename):
print('{} has {:,} bytes'.format(filename, os.path.getsize(filename)))

print_bytes('prdsale.sas7bdat')
print_bytes('prdsal2.sas7bdat')
print_bytes('prdsal3.sas7bdat')

!du -ch --exclude=test_parquet

Spark verion is 1.4.0
prdsale.sas7bdat has 148,480 bytes
prdsal2.sas7bdat has 2,790,400 bytes
prdsal3.sas7bdat has 1,401,856 bytes
4.2M .
4.2M total

1. Test DataFrame in Python and Spark

First I transform a SAS sas7bdat file to a pandas DataFrame. The great thing in Spark is that a Python/pandas DataFrame could be translated to Spark DataFrame by the createDataFrame method. Now I have two DataFrames: one is a pandas DataFrame and the other is a Spark DataFrame.
with sas7bdat.SAS7BDAT('prdsale.sas7bdat') as f:
pandas_df = f.to_data_frame()
print('-----Data in Pandas dataframe-----')
print(pandas_df.head())

print('-----Data in Spark dataframe-----')
spark_df = sqlContext.createDataFrame(pandas_df)
spark_df.show(5)

-----Data in Pandas dataframe-----
ACTUAL COUNTRY DIVISION MONTH PREDICT PRODTYPE PRODUCT QUARTER
0 925 CANADA EDUCATION 12054 850 FURNITURE SOFA 1
1 999 CANADA EDUCATION 12085 297 FURNITURE SOFA 1
2 608 CANADA EDUCATION 12113 846 FURNITURE SOFA 1
3 642 CANADA EDUCATION 12144 533 FURNITURE SOFA 2
4 656 CANADA EDUCATION 12174 646 FURNITURE SOFA 2

REGION YEAR
0 EAST 1993
1 EAST 1993
2 EAST 1993
3 EAST 1993
4 EAST 1993
-----Data in Spark dataframe-----
+------+-------+---------+-------+-------+---------+-------+-------+------+------+
|ACTUAL|COUNTRY| DIVISION| MONTH|PREDICT| PRODTYPE|PRODUCT|QUARTER|REGION| YEAR|
+------+-------+---------+-------+-------+---------+-------+-------+------+------+
| 925.0| CANADA|EDUCATION|12054.0| 850.0|FURNITURE| SOFA| 1.0| EAST|1993.0|
| 999.0| CANADA|EDUCATION|12085.0| 297.0|FURNITURE| SOFA| 1.0| EAST|1993.0|
| 608.0| CANADA|EDUCATION|12113.0| 846.0|FURNITURE| SOFA| 1.0| EAST|1993.0|
| 642.0| CANADA|EDUCATION|12144.0| 533.0|FURNITURE| SOFA| 2.0| EAST|1993.0|
| 656.0| CANADA|EDUCATION|12174.0| 646.0|FURNITURE| SOFA| 2.0| EAST|1993.0|
+------+-------+---------+-------+-------+---------+-------+-------+------+------+
The two should be the identical length. Here both show 1,440 rows.
print(len(pandas_df))
print(spark_df.count())

1440
1440

2. Automate the transformation

I write a pipeline function to automate the transformation. As the result, the all three SAS files are saved to the same directory as Parquet format.
def sas_to_parquet(filelist, destination):
"""Save SAS file to parquet
Args:
filelist (list): the list of sas file names
destination (str): the path for parquet
Returns:
None
"""
rows = 0
for i, filename in enumerate(filelist):
with sas7bdat.SAS7BDAT(filename) as f:
pandas_df = f.to_data_frame()
rows += len(pandas_df)
spark_df = sqlContext.createDataFrame(pandas_df)
spark_df.save("{0}/key={1}".format(destination, i), "parquet")
print('{0} rows have been transformed'.format(rows))

sasfiles = [x for x in os.listdir('.') if x[-9:] == '.sas7bdat']
print(sasfiles)

sas_to_parquet(sasfiles, '/root/playground/test_parquet')

['prdsale.sas7bdat', 'prdsal2.sas7bdat', 'prdsal3.sas7bdat']
36000 rows has been transformed
Then I read from the newly created Parquet data store. The query shows that the data has been successfully saved.
df = sqlContext.load("/root/playground/test_parquet", "parquet")
print(df.count())
df.filter(df.key == 0).show(5)

36000
+------+-------+------+----+-------+-------+---------+-------+-------+-----+------+-----+---------+------+---+
|ACTUAL|COUNTRY|COUNTY|DATE| MONTH|PREDICT| PRODTYPE|PRODUCT|QUARTER|STATE| YEAR|MONYR| DIVISION|REGION|key|
+------+-------+------+----+-------+-------+---------+-------+-------+-----+------+-----+---------+------+---+
| 925.0| CANADA| null|null|12054.0| 850.0|FURNITURE| SOFA| 1.0| null|1993.0| null|EDUCATION| EAST| 0|
| 999.0| CANADA| null|null|12085.0| 297.0|FURNITURE| SOFA| 1.0| null|1993.0| null|EDUCATION| EAST| 0|
| 608.0| CANADA| null|null|12113.0| 846.0|FURNITURE| SOFA| 1.0| null|1993.0| null|EDUCATION| EAST| 0|
| 642.0| CANADA| null|null|12144.0| 533.0|FURNITURE| SOFA| 2.0| null|1993.0| null|EDUCATION| EAST| 0|
| 656.0| CANADA| null|null|12174.0| 646.0|FURNITURE| SOFA| 2.0| null|1993.0| null|EDUCATION| EAST| 0|
+------+-------+------+----+-------+-------+---------+-------+-------+-----+------+-----+---------+------+---+

3. Conclusion

There are multiple advantages to tranform data from various sources to Parquet.
  1. It is an open format that could be read and written by major softwares.
  2. It could be well distributed to HDFS.
  3. It compresses data.
For example, the original SAS files add up to 4.2 megabyte. Now as Parquet, it only weighs 292KB and achieves 14X compression ratio.
os.chdir('/root/playground/test_parquet/')
!du -ahc

4.0K ./key=2/._metadata.crc
4.0K ./key=2/._SUCCESS.crc
0 ./key=2/_SUCCESS
4.0K ./key=2/_common_metadata
4.0K ./key=2/.part-r-00001.gz.parquet.crc
4.0K ./key=2/._common_metadata.crc
4.0K ./key=2/_metadata
60K ./key=2/part-r-00001.gz.parquet
88K ./key=2
4.0K ./key=0/._metadata.crc
4.0K ./key=0/._SUCCESS.crc
0 ./key=0/_SUCCESS
4.0K ./key=0/_common_metadata
4.0K ./key=0/.part-r-00001.gz.parquet.crc
4.0K ./key=0/._common_metadata.crc
4.0K ./key=0/_metadata
12K ./key=0/part-r-00001.gz.parquet
40K ./key=0
4.0K ./key=1/._metadata.crc
4.0K ./key=1/._SUCCESS.crc
0 ./key=1/_SUCCESS
4.0K ./key=1/_common_metadata
4.0K ./key=1/.part-r-00001.gz.parquet.crc
4.0K ./key=1/._common_metadata.crc
4.0K ./key=1/_metadata
132K ./key=1/part-r-00001.gz.parquet
160K ./key=1
292K .
292K total
A bar plot visualizes the signifcant size difference between the two formats. It shows an order of magnitude space deduction.
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
index = np.arange(2)
bar_width = 0.35
data = [4200, 292]
header = ['SAS files', 'Parquet']
plt.bar(index, data)
plt.grid(b=True, which='major', axis='y')
plt.ylabel('File Size by KB')
plt.xticks(index + bar_width, header)
plt.show()
3月 212015
 
Requirements
Since Spark is rapidly evolving, I need to deploy and maintain a minimal Spark cluster for the purpose of testing and prototyping. A public cloud is the best fit for my current demand.
  1. Intranet speed
    The cluster should easily copy the data from one server to another. MapReduce always shuffles a large chunk of data throughout the HDFS. It’s best that the hard disk is SSD.
  2. Elasticity and scalability
    Before scaling the cluster out to more machines, the cloud should have some elasticity to size up or size down.
  3. Locality of Hadoop
    Most importantly, the Hadoop cluster and the Spark cluster should have one-to-one mapping relationship like below. The computation and the storage should always be on the same machines.
HadoopCluster ManagerSparkMapReduce
Name NodeMasterDriverJob Tracker
Data NodeSlaveExecutorTask Tracker

Choice of public cloud:

I simply compare two cloud service provider: AWS and DigitalOcean. Both have nice Python-based monitoring tools(Boto for AWS and python-digitalocean for DigitalOcean).
  1. From storage to computation
    Hadoop’s S3 is a great storage to keep data and load it into the Spark/EC2 cluster. Or the Spark cluster on EC2 can directly read S3 bucket such as s3n://file (the speed is still acceptable). On DigitalOcean, I have to upload data from local to the cluster’s HDFS.
  2. DevOps tools:
      • With default setting after running it, you will get
        • 2 HDFSs: one persistent and one ephemeral
        • Spark 1.3 or any earlier version
        • Spark’s stand-alone cluster manager
      • A minimal cluster with 1 master and 3 slaves will be consist of 4 m1.xlarge EC2 instances
        • Pros: large memory with each node having 15 GB memory
        • Cons: not SSD; expensive (cost $0.35 * 6 = $2.1 per hour)
      • With default setting after running it, you will get
        • HDFS
        • no Spark
        • Mesos
        • OpenVPN
      • A minimal cluster with 1 master and 3 slaves will be consist of 4 2GB/2CPUs droplets
        • Pros: as low as $0.12 per hour; Mesos provide fine-grained control over the cluster(down to 0.1 CPU and 16MB memory); nice to have VPN to guarantee the security
        • Cons: small memory(each has 2GB memory); have to install Spark manually

Add Spark to DigitalOcean cluster

Tom Faulhaber has a quick bash script for deployment. To install Spark 1.3.0, I write it into a fabfile for Python’s Fabric.
Then all the deployment onto the DigitOcean is just one command line.
# 10.1.2.3 is the internal IP address of the master
fab -H 10.1.2.3 deploy_spark
The source codes above are available at my Github
1月 092015
 
Suppose there is a website tracking user activities to prevent robotic attack on the Internet. Please design an algorithm to identify user IDs that have more than 500 clicks within any given 10 minutes.
Sample.txt: anonymousUserID timeStamp clickCount
123    9:45am    10
234 9:46am 12
234 9:50am 20
456 9:53am 100
123 9:55am 33
456 9:56am 312
123 10:03am 110
123 10:16am 312
234 10:20am 201
456 10:23am 180
123 10:25am 393
456 10:27am 112
999 12:21pm 888

Thought

This is a typical example of stream processing. The key is to build a fixed-length window to slide through all data, count data within and return the possible malicious IDs.

Single machine solution

Two data structures are used: a queue and a hash table. The queue is scanning the data and only keeps the data within a 10-minute window. Once a new data entry is filled, the old ones out of the window are popped out. The hash table counts the data in the queue and will be updated with the changing queue. Any ID with more than 500 clicks will be added to a set.
from datetime import datetime
import time
from collections import deque

def get_minute(s, fmt = '%I:%M%p'):
return time.mktime(datetime.strptime(s, fmt).timetuple())

def get_diff(s1, s2):
return int(get_minute(s2) - get_minute(s1)) / 60

def find_ids(infile, duration, maxcnt):
queue, htable, ans = deque(), {}, set()
with open(infile, 'rt') as _infile:
for l in _infile:
line = l.split()
line[2] = int(line[2])
current_id, current_time, current_clk = line
if current_id not in htable:
htable[current_id] = current_clk
else:
htable[current_id] += current_clk
queue.append(line)
while queue and get_diff(queue[0][1], current_time) > duration:
past_id, _, past_clk = queue.popleft()
htable[past_id] -= past_clk
if htable[current_id] > maxcnt:
ans.add(current_id)
return ans

if __name__ == "__main__":
print find_ids('sample.txt', 10, 500)

Cluster solution

The newest Spark (version 1.2.0) starts to support Python streaming. However, the document is still scarce — wait to see if this problem can be done by the new API.
To be continued
12月 242014
 
Sample.txt
Requirements:
1. separate valid SSN and invalid SSN
2. count the number of valid SSN
402-94-7709 
283-90-3049
124-01-2425
1231232
088-57-9593
905-60-3585
44-82-8341
257581087
327-84-0220
402-94-7709

Thoughts

SSN indexed data is commonly seen and stored in many file systems. The trick to accelerate the speed on Spark is to build a numerical key and use the sortByKey operator. Besides, the accumulator provides a global variable existing across machines in a cluster, which is especially useful for counting data.

Single machine solution

#!/usr/bin/env python
# coding=utf-8
htable = {}
valid_cnt = 0
with open('sample.txt', 'rb') as infile, open('sample_bad.txt', 'wb') as outfile:
for l in infile:
l = l.strip()
nums = l.split('-')
key = -1
if l.isdigit() and len(l) == 9:
key = int(l)
if len(nums) == 3 and map(len, nums) == [3, 2, 4]:
key = 1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2])
if key == -1:
outfile.write(l + 'n')
else:
if key not in htable:
htable[key] = l
valid_cnt += 1

with open('sample_sorted.txt', 'wb') as outfile:
for x in sorted(htable):
outfile.write(htable[x] + 'n')

print valid_cnt

Cluster solution

#!/usr/bin/env python
# coding=utf-8
import pyspark
sc = pyspark.SparkContext()
valid_cnt = sc.accumulator(0)

def is_validSSN(l):
l = l.strip()
nums = l.split('-')
cdn1 = (l.isdigit() and len(l) == 9)
cdn2 = (len(nums) == 3 and map(len, nums) == [3, 2, 4])
if cdn1 or cdn2:
return True
return False

def set_key(l):
global valid_cnt
valid_cnt += 1
l = l.strip()
if len(l) == 9:
return (int(l), l)
nums = l.split('-')
return (1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2]), l)

rdd = sc.textFile('sample.txt')
rdd1 = rdd.filter(lambda x: not is_validSSN(x))

rdd2 = rdd.filter(is_validSSN).distinct()
.map(lambda x: set_key(x))
.sortByKey().map(lambda x: x[1])

for x in rdd1.collect():
print 'Invalid SSNt', x

for x in rdd2.collect():
print 'valid SSNt', x

print 'nNumber of valid SSN is {}'.format(valid_cnt)

# Save RDD to file system
rdd1.saveAsTextFile('sample_bad')
rdd2.saveAsTextFile('sample_sorted')
sc.stop()