9月 122017

The algorithmic pricing is an exciting new area, and it combines engineering and mathematics. Chen’s paper has introduced the algorithmic pricing on Amazon Marketplace. This post is to discuss the implementation of an algorithmic pricing based on Redis from the perspective of the sellers.

The background

  • Each of Amazon’s ASINs will have many sellers that compete each other.
  • Amazon has a ranking mechanism for Buy Box, say, to punish a new seller. But for the same ASIN, the seller who has the lowest price usually wins the Buy Box.
  • For each ASIN, each of many sellers will have an optimal price (the price they want to sell) and a lowest acceptable price (they cannot sell if the price is below it).
  • Amazon allows a seller to change price for an ASIN every 15 minutes.
  • Why 15 minutes? Because Amazon said that it takes up to 15 minutes for all systems to converge to the new price. But it is not exactly true. Amazon MWS uses 4 data centers in North America and data synchronization is not that fast for each of the databases or each of the data centers.
  • Amazon MWS has APIs. But don’t rely on it, and build your own crawlers and price adjustor. The reasons are explained later.
  • The sellers’ target variables are mainly
    • Buy Box take-over percentage (how they beat the other competitors)
    • Profit margin (the higher prices they sell goods at, the better)

How an algorithmic pricing engine works

The overall infrastructure may include three parts.

1. The distributed crawler

Amazon most time allows a crawler running 1 QPS/IP. But all the data centers and all ASINs from the seller and his competitors have to be closely watched. So a distributed approach will be safer. The response time will be crucial to decide which one is the winner within the game. In a common chasing diagram below, clearly Seller 1 has the upper hand and Seller 2 is losing the group, since the crawler from Seller 1 is faster.

2. The algorithm core

The embedded algorithm will have two purposes
  • Analyze the data from the crawler and detect other competitors’ optimal price/lowest acceptable price and strategy.
  • Adjust price according to an algorithm. The strategy can be either as simple as "minus one cent from the current competitor's price", or as complicated as machine learning or deep learning.

3. The price adjustor

The computed price will enter Amazon and become effective. Amazon will rank lower for the sellers whom he thinks doing algorithmic pricing. Sometimes Amazon even bans the seller. So don’t let Amazon to find that a computer is manipulating its APIs. To emulate human’s behavior on a browser, the two options are phantom.js and headless Chrome.

The role of Redis

Redis is a in-memory data store, which supports persistence and sharding. The first usage for this algorithmic pricing engine is that it can be used as a centralized task scheduler for the crawlers. Besides that, some other interesting fields in algorithmic pricing could be explored and utilized with Redis.
1. Predicative pricing
Redis as a cache has support for TTL(time to live). With the accumulation of the data, the competitors’ price changing time could be predicted. In the publisher-subscriber model, each time the predicted duration for next price changing can be inputted as an expiring key with TTL. Once the key expires, the publisher dispatches a crawling task and makes the price adjustment. The good thing for this approach is that the crawlers don’t need to tap a single web page from Amazon every second that brings the risk of being banned.

| Subscriber |

| |
| + psubscribe():void| +--------------+
| | | PubSub |
+--------------------+ +--------+-----+

| Publisher |

| |
| + calculateNextTTL():int |
| + onPMessage():void |
| + dispatchCrawler():boolean |
| |

2. Cross-data-center hedging
The synchronization mechanism across the data centers costs time sometimes hours. That is the reason why we see different prices at different IPs of Amazon at the same time. People will also have different purchasing behavior pattern at different time. Since a seller has the option to change price at a specified data center with IP instead of domain name, it will be an interesting topic to utilize the cool down time for the price to spread to the overall network and make a hedging. Redis’ capacity to keep all prices as hashes in memory will be helpful to spot those valuable occasions.

2月 182017

As a formal statistician and a current engineer, I feel that a successful engineering project may require both the mathematician’s ability to find the abstraction and the engineer’s ability to find the implementation.

For a typical engineering problem, the steps are usually -
- 1. Abstract the problem with a formula or some pseudocodes
- 2. Solve the problem with the formula
- 3. Iterate the initial solution until it achieves the optimal time complexity and space complexity

I feel that a mathematician would like dynamic programming or DP questions most, because they are too similar to the typical deduction question in math. An engineer will feel it challenging, since it needs the imagination and some sense of math.

The formula is the most important: without it, try-and-error or debugging does not help. Once the the formula is figured out, the rest becomes a piece of the cake. However, sometimes things are not that straightforward. Good mathematics does not always lead to good engineering.

Let’s see one question from Leetcode.

You are given a list of non-negative integers, a1, a2, ..., an, and a target, S. Now you have 2 symbols + and -. For each integer, you should choose one from + and - as its new symbol.

Find out how many ways to assign symbols to make sum of integers equal to target S.

Example 1:
Input: nums is [1, 1, 1, 1, 1], S is 3.
Output: 5

-1+1+1+1+1 = 3
+1-1+1+1+1 = 3
+1+1-1+1+1 = 3
+1+1+1-1+1 = 3
+1+1+1+1-1 = 3

There are 5 ways to assign symbols to make the sum of nums be target 3.

1. The quick solution

For each of the element of a list, it has two options: plus or minus. So the question asks how many ways to get a special number by all possible paths. Of course, if the sum of numbers is unrealistic, we just need to return 0.

Sounds exactly like a DP question. If we have a pencil and a paper, we can start to explore the relationship between dp(n) and dp(n-1). For example, our goal is to get a sum of 5, and we are given a list of [1, 1, 1, 1, 1]. If th a smaller tuple/list is (1, 1, 1, 1) and some paths get 4, that is exactly what we want since it adds 1 and becomes 5. Similarly, if they could get 6, that is fine as well. We add simply both paths together, since there are only two paths.

The formula is is dp(n, s) = dp(n-1, s-x) + dp(n-1, s+x), where n is the size of the list, s is the sum of the numbers and x is the one that adds to the previous list. OK, the second step is easy.
def findTargetSumWays_1(nums, S):
:type nums: Tuple[int]
:type S: int
:rtype: int

if not nums:
if S == 0:
return 1
return 0
return findTargetSumWays_1(nums[1:], S+nums[0]) + findTargetSumWays_1(nums[1:], S-nums[0])

small_test_nums = (1, 1, 1, 1, 1)
small_test_S = 3

%time findTargetSumWays_1(small_test_nums, small_test_S)
It is theoretically correct and works perfectly with small test cases. But we know that it is going to a nightmare for an engineering application, because it has a hefty time complexity of O(2^N). So math part is done, and We have to move to the third step.

2. The third step that is hard

So we need to find a data structure to record all the paths. If it is the Fibonacci number problem, a simple linear data structure like a list will slash O(2^N) to O(N).

But the hard part is: what data structure is going to be used here. Since the get operation in Hashtable is O(1), a rolling dictionary will help record the previous states. However, Python’s dictionary does not support change/add ops while it is in a loop, then we have to manually replace it. The overall path will be like a tree structure. So the ideal solution will be like -

def findTargetSumWays_2(nums, S):
if not nums:
return 0
dic = {nums[0]: 1, -nums[0]: 1} if nums[0] != 0 else {0: 2}
for i in range(1, len(nums)):
tdic = {}
for d in dic:
tdic[d + nums[i]] = tdic.get(d + nums[i], 0) + dic.get(d, 0)
tdic[d - nums[i]] = tdic.get(d - nums[i], 0) + dic.get(d, 0)
dic = tdic
return dic.get(S, 0)

big_test_nums = tuple(range(100))
big_test_S = sum(range(88))
%time findTargetSumWays_2(big_test_nums, big_test_S)

The time is exactly what we need. However, the codes are not elegant and hard to understand.
CPU times: user 189 ms, sys: 4.77 ms, total: 194 ms
Wall time: 192 ms

3. Finally the easy solution

If we don’t want things to get complicated. Here we just want a cache and Python 3 provides a lru_cache decorator. Then adding one line to the first solution will quickly solve the problem.

def findTargetSumWays_3(nums, S):
if not nums:
if S == 0:
return 1
return 0
return findTargetSumWays_3(nums[1:], S+nums[0]) + findTargetSumWays_3(nums[1:], S-nums[0])

%time findTargetSumWays_3(big_test_nums, big_test_S)

CPU times: user 658 ms, sys: 19.7 ms, total: 677 ms
Wall time: 680 ms


Good math cannot solve all the engineering problems. It has to combine with the details of the languange, the application and the system to avoid bad engineering implementation.

The Jupyter notebook is at Github. If you have any comment, please email me wm@sasanalysis.com.
2月 172017

Nowadays Elasticsearch is more and more popular. Besides it original search functionalities, I found Elasticsearch can be

  1. used as a logging container. That is what the ELK stack is created for.
  2. utilized as a JSON server with richful APIs, which can be combined with its Kibana as BI servers.

That is the data store I see everyday

  • 10PB stocking data
  • average 30TB incoming data everyday
  • various data sources including binary files such PDF
  • including very complicated SQL queries (fortunately no stored procedures)
  • millions of JSON creations daily

People want to know what is going on with such data. So a business intelligence or an OLAP system is needed to visualize/aggregate the data and its flow. Since Elasticsearch is so easy to scale out, it beats other solutions for big data on the market.

1. Batch Worker

There are many options to implement a batch worker. Finally the decision falls to either Spring Data Batch or writing a library from the scratch in Python.

1.1 Spring Data Batch v.s. Python

Spring Data Batch
  • Pros:
    • is a full framework that includes rollback, notification and scheduler features
    • provides great design pattern for Dependency Injection, such as factory and singleton, which help multiple persons work together. For example -
public Step IndexMySQLJob01() {
return stepBuilderFactory.get("IndexMySQLJob01")
.<Data, Data> chunk(10)
  • Pros:
    • less codes; flexible
    • super easy from dictionary to JSON
    • has official/3rd party libraries for everything
  • Cons:
    • you create your own library/framework
    • if the pattern like Spring Data Batch is deployed, has to inject dependencies manually, such as -
class IndexMySQLJob01(object):
def __init__(self, reader, processor, writer, listener):
self.reader = reader
self.processor = processor
self.writer = writer
self.listener = listener

Eventually Python is picked, because the overall scenario is more algorithm-bound instead of language-bound.

1.2 The algorithms

Since the data size is pretty big, time and space are always considered. The direct way to decrease the time complexity is using the hash tables, as long as the memory can hold the data. For example, a join between an N rows table and an M rows table can be optimized from O(M*N) to O(M).

To save the space, a generator chain is used to stream data from the start to the end, instead of materializing sizable objects.

class JsonTask01(object):
def get_json(self, generator1, hashtable1):
for each_dict in generator1:
key = each_dict.get('key')
yield each_dict

1.3 The scheduler

A scheduler is a must: cron is enough for simple tasking, while a bigger system requires a work flow. Airflow is the one that helps organize and schedule. It has a web UI and is written in Python, which is easy to be integrated with the batch worker.

1.4 High availability with zero downtime

Indexing of large quantity of data will impose significant impact. For mission-critical indexes that need 100% up time, the zero down algorithm is implemented and we keep two copies of an index for maximum safety. The alias will switch between the two copies once the indexing is finished.

    def add_alias(self, idx):
LOGGER.warn("The alias {} will point to {}.".format(self.index, idx))
self.es.indices.put_alias(idx, self.index)

def delete_alias(self, idx):
LOGGER.warn("The alias {} will be removed from {}.".format(self.index, idx))
self.es.indices.delete_alias(idx, self.index)

2. Elasticsearch Cluster

2.1 Three kinds of nodes

An Elasticsearch node can choose one of three roles: master node, data node and ingest node(previously called client node). It is commonly seen to dedicate a node as master and ingest and data all together. For a large system, it is always helpful to assign the three roles to different machines/VMs. Therefore, once a node is down/up, it will be quicker to failover or recover.
elasticsearch-head can clearly visualize the data transfer process of the shards once an accident occurs.

With the increased number of cluster nodes, the deployment becomes painful. I feel the best tool so far is ansible-elasticsearch. With ansible-playbook -i hosts ./your-playbook.yml -c paramiko, the cluster is on the fly.

2.2 Memory is cheap but heap is expensive

The rules of thumb for Elasticsearch are -

Give (less than) Half Your Memory to Lucene

Don’t Cross 32 GB!

The result causes an awkward situation: if you have a machine that has more than 64GB memory, then the additional memory will mean nothing to Elasticsearch. Actually it is meaningful to run two or more Elasticsearch instances side by side to save the hardware. For example, there is a machine with 96GB memory. We can allocate 31GB for an ingest node, 31 GB for a data node and the rest for the OS. However, two data nodes in a single machine will compete for the disk IO that damages the performance, while a master node and a data node will increase the risk of downtime.

The great thing for Elasticsearch is that it provides richful REST APIs, such as http://localhost:9200/_nodes/stats?pretty. We could use Xpack(paid) or other customized tools to monitor them. I feel that the three most important statistics for the heap and therefore the performance are -

The heap usage and the old GC duration

The two statistics intertwined together. The high heap usage, such as 75%, will lead to a GC, while GC with high heap usage will take longer time. We have to keep both numbers as low as possible.

     "jvm" : {
"mem" : {
"heap_used_percent" : 89,
"gc" : {
"collectors" : {
"old" : {
"collection_count" : 225835,
"collection_time_in_millis" : 22624857
Thread pools

There are three kinds of thread pools: active, queue, and reject. It is useful to visualize the real time change. Once there are a lot of queued threads or rejected threads, it is good time to think about scale up or scale out.

"thread_pool" : {
"bulk" : {
"threads" : 4,
"queue" : 0,
"active" : 0,
"rejected" : 0,
"largest" : 4,
"completed" : 53680
The segments’ number/size

The segments are the in-memory inverted indexes corresponding to the indexes on the hard disk, which are persistent in the physical memory and GC will have no effect on them. The segment will have the footage on every search thread. The size of the segments are important because they will be multiplied by a factor of the number of threads.

        "segments" : {
"count" : 215,
"memory_in_bytes" : 15084680,

The number of shards actually controls the number of the segments. The shards increase, then the size of the segments decreases and the number of the segments increases. So we cannot increase the number of shards as many as we want. If there are many small segments, the heap usage will turn much higher. The solution is Force merge, which is time-consuming but effecitve.

3. Kibana as BI dashboard

3.1 Plugins

Kibana integreated DevTools(previously The Sense UI) for free. DevTools has code assistance and is a powerful tool for debugging. If the budget is not an issue, Xpack is also highly recommended. As for Elasticsearch, since 5.0, ingest-geoip is now a plugin. We will have to write it to Ansible YAML such as -

- plugin: ingest-geoip

3.2 Instant Aggregation

There are quite a few KPIs that need system-wide term aggregations. From 5.0 the request cache will be enabled by default for all requests with size:0.
For example -

POST /big_data_index/data/_search
{ "size": 0,
"query": {
"bool": {
"must_not": {
"exists": {
"field": "interesting_field"

The Fore merge as mentioned above, such asPOST /_forcemerge?max_num_segments=1, will combine the segments and dramatically increase the aggregation speed.

3.3 Proxy

Nginx is possibly the best proxy as the frontend toward Kibana. There are two advantages: first the proxy can cache the static resources of Kibana; second we can always check the Nginx logs to figure out what causes problem for Kibana.


Elasticsearch and Kibana together provide high availability and high scalability for large BI system.

if you have any comment, please email me wm@sasanalysis.com

9月 082016

I used to install Datadog or other SaaS to monitor my Linux boxes on the cloud. Most times they are just overkill for my tiny servers with only 1GB or 2GB memory. Actually what I am most interested is the up-and-running processes, or/and the exact memory usage. And I need a mobile solution to monitor on-the-go.
Now with the coming of Slack bot, and its real time Python client, I can just use a simple Python script to realize the purposes.
from slackclient import SlackClient
from subprocess import getoutput
import logging
import time

message_channel = '#my-server-001'
api_key = 'xoxb-slack-token'
client = SlackClient(api_key)

if client.rtm_connect():
while True:
last_read = client.rtm_read()
if last_read:
parsed = last_read[0]['text']
if parsed and 'status' in parsed:
result = getoutput('pstree')
result += 'nn' + getoutput('free -h')
client.rtm_send_message(message_channel, str(result))
except Exception as e:
Then I use systemd or other tools to daemonize it. No matter where and when I am, I enter status at the #my-server-001 channel on my phone, I will instantly get the result like -
| `-{gmain}
| |-{in:imuxsock}
| `-{rs:main Q:Reg}
| `-sshd---sshd

total used free shared buff/cache available
Mem: 2.0G 207M 527M 26M 1.2G 1.7G
Swap: 255M 0B 255M
7月 292015
This summer I took the Spark courses at edx CS100 and CS190, and had wonderful experience.
The two classes apply a Vagrant virtual machine containing Spark and all teaching materials. There are two challenges with the virtual machine —
  1. The labs usually take long time to finish, say 8-10 hours. If the host machine is closed, the RDDs will be lost and the pipeline has to be run again.
  2. Some RDD operations take a lot computation/communication powers, such as groupByKey and distinct. Many of my 50k classmates complained about the waiting time. And my most used laptop is a Chromebook and doesn’t even have options to install Virtual Box.
To deploy the learning environment to a cloud may be an alternative. DigitalOcean is a good choice because it uses mirrors for most packages, and the network speed is amazingly fast that is almost 100MB/s (thanks to the SSD infrastructure DigitalOcean implements for the cloud, otherwise the hard disk may not stand this rapid IO; see my deployment records GitHub).

I found that a Linux box with 1 GB memory and 1 CPU at DigitalOcean that costs 10 dollars a month will handle most labs fairly easy with IPython and Spark. A 2 GB memory and 2 CPU droplet will be ideal since it is the minimal requirement for a simulated cluster. It costs 20 dollars a month, but is still much cheaper than the cost to earn the big data certificate that is $100 (50 for each). I just need to write Python scripts to install IPython notebook with SSL, and download Spark and the course materials.
  • The DevOps tool is Fabric and the fabfile is at GitHub.
  • The deployment pipeline is also at GitHub
7月 182015
The demo pipeline is at GitHub.
Since the version 1.3, Spark has introduced the new data structure DataFrame. A data analyst now could easily scale out the exsiting codes based on the DataFrame from Python or R to a cluster hosting Hadoop and Spark.
There are quite a few practical scenarios that DataFrame fits well. For example, a lot of data files including the hardly read SAS files want to merge into a single data store. Apache Parquet is a popular column store in a distributed environment, and especially friendly to structured or semi-strucutred data. It is an ideal candidate for a univeral data destination.
I copy three SAS files called prdsale, prdsal2 and prdsal3, which are about a simulated sales record, from the SASHELP library to a Linux directory. And then I launch the SQL context from Spark 1.4.
The three SAS files now have the size of 4.2MB. My overall strategy is to build a pipeline to realize my purpose such as SAS --> Python --> Spark --> Parquet.
import os
import sas7bdat
import pandas
except ImportError:
print('try to install the packags first')

print('Spark verion is {}'.format(sc.version))

if type(sqlContext) != pyspark.sql.context.HiveContext:
print('reset the Spark SQL context')


def print_bytes(filename):
print('{} has {:,} bytes'.format(filename, os.path.getsize(filename)))


!du -ch --exclude=test_parquet

Spark verion is 1.4.0
prdsale.sas7bdat has 148,480 bytes
prdsal2.sas7bdat has 2,790,400 bytes
prdsal3.sas7bdat has 1,401,856 bytes
4.2M .
4.2M total

1. Test DataFrame in Python and Spark

First I transform a SAS sas7bdat file to a pandas DataFrame. The great thing in Spark is that a Python/pandas DataFrame could be translated to Spark DataFrame by the createDataFrame method. Now I have two DataFrames: one is a pandas DataFrame and the other is a Spark DataFrame.
with sas7bdat.SAS7BDAT('prdsale.sas7bdat') as f:
pandas_df = f.to_data_frame()
print('-----Data in Pandas dataframe-----')

print('-----Data in Spark dataframe-----')
spark_df = sqlContext.createDataFrame(pandas_df)

-----Data in Pandas dataframe-----

0 EAST 1993
1 EAST 1993
2 EAST 1993
3 EAST 1993
4 EAST 1993
-----Data in Spark dataframe-----
| 925.0| CANADA|EDUCATION|12054.0| 850.0|FURNITURE| SOFA| 1.0| EAST|1993.0|
| 999.0| CANADA|EDUCATION|12085.0| 297.0|FURNITURE| SOFA| 1.0| EAST|1993.0|
| 608.0| CANADA|EDUCATION|12113.0| 846.0|FURNITURE| SOFA| 1.0| EAST|1993.0|
| 642.0| CANADA|EDUCATION|12144.0| 533.0|FURNITURE| SOFA| 2.0| EAST|1993.0|
| 656.0| CANADA|EDUCATION|12174.0| 646.0|FURNITURE| SOFA| 2.0| EAST|1993.0|
The two should be the identical length. Here both show 1,440 rows.


2. Automate the transformation

I write a pipeline function to automate the transformation. As the result, the all three SAS files are saved to the same directory as Parquet format.
def sas_to_parquet(filelist, destination):
"""Save SAS file to parquet
filelist (list): the list of sas file names
destination (str): the path for parquet
rows = 0
for i, filename in enumerate(filelist):
with sas7bdat.SAS7BDAT(filename) as f:
pandas_df = f.to_data_frame()
rows += len(pandas_df)
spark_df = sqlContext.createDataFrame(pandas_df)
spark_df.save("{0}/key={1}".format(destination, i), "parquet")
print('{0} rows have been transformed'.format(rows))

sasfiles = [x for x in os.listdir('.') if x[-9:] == '.sas7bdat']

sas_to_parquet(sasfiles, '/root/playground/test_parquet')

['prdsale.sas7bdat', 'prdsal2.sas7bdat', 'prdsal3.sas7bdat']
36000 rows has been transformed
Then I read from the newly created Parquet data store. The query shows that the data has been successfully saved.
df = sqlContext.load("/root/playground/test_parquet", "parquet")
df.filter(df.key == 0).show(5)

| 925.0| CANADA| null|null|12054.0| 850.0|FURNITURE| SOFA| 1.0| null|1993.0| null|EDUCATION| EAST| 0|
| 999.0| CANADA| null|null|12085.0| 297.0|FURNITURE| SOFA| 1.0| null|1993.0| null|EDUCATION| EAST| 0|
| 608.0| CANADA| null|null|12113.0| 846.0|FURNITURE| SOFA| 1.0| null|1993.0| null|EDUCATION| EAST| 0|
| 642.0| CANADA| null|null|12144.0| 533.0|FURNITURE| SOFA| 2.0| null|1993.0| null|EDUCATION| EAST| 0|
| 656.0| CANADA| null|null|12174.0| 646.0|FURNITURE| SOFA| 2.0| null|1993.0| null|EDUCATION| EAST| 0|

3. Conclusion

There are multiple advantages to tranform data from various sources to Parquet.
  1. It is an open format that could be read and written by major softwares.
  2. It could be well distributed to HDFS.
  3. It compresses data.
For example, the original SAS files add up to 4.2 megabyte. Now as Parquet, it only weighs 292KB and achieves 14X compression ratio.
!du -ahc

4.0K ./key=2/._metadata.crc
4.0K ./key=2/._SUCCESS.crc
0 ./key=2/_SUCCESS
4.0K ./key=2/_common_metadata
4.0K ./key=2/.part-r-00001.gz.parquet.crc
4.0K ./key=2/._common_metadata.crc
4.0K ./key=2/_metadata
60K ./key=2/part-r-00001.gz.parquet
88K ./key=2
4.0K ./key=0/._metadata.crc
4.0K ./key=0/._SUCCESS.crc
0 ./key=0/_SUCCESS
4.0K ./key=0/_common_metadata
4.0K ./key=0/.part-r-00001.gz.parquet.crc
4.0K ./key=0/._common_metadata.crc
4.0K ./key=0/_metadata
12K ./key=0/part-r-00001.gz.parquet
40K ./key=0
4.0K ./key=1/._metadata.crc
4.0K ./key=1/._SUCCESS.crc
0 ./key=1/_SUCCESS
4.0K ./key=1/_common_metadata
4.0K ./key=1/.part-r-00001.gz.parquet.crc
4.0K ./key=1/._common_metadata.crc
4.0K ./key=1/_metadata
132K ./key=1/part-r-00001.gz.parquet
160K ./key=1
292K .
292K total
A bar plot visualizes the signifcant size difference between the two formats. It shows an order of magnitude space deduction.
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
index = np.arange(2)
bar_width = 0.35
data = [4200, 292]
header = ['SAS files', 'Parquet']
plt.bar(index, data)
plt.grid(b=True, which='major', axis='y')
plt.ylabel('File Size by KB')
plt.xticks(index + bar_width, header)
6月 192015

I really appreciate those wonderful comments on my SAS posts by the readers (123). They gave me a lot of inspirations. Due to SAS or SQL’s inherent limitation, recently I feel difficult in deal with some extremely large SAS datasets (it means that I exhausted all possible traditional ways). Here I conclude two alternative solutions in these extreme cases as a follow-up to the comments.
  1. Read Directly
    • Use a scripting language such as Python to Reading SAS datasets directly
  2. Code Generator
    • Use SAS or other scripting languages to generate SAS/SQL codes
The examples still use sashelp.class, which has 19 rows. The target variable is weight.
data class;
set sashelp.class;
row = _n_;

Example 1: Find the median

SQL Query

In the comment, Anders SköllermoFebruary wrote
Hi! About 1. Calculate the median of a variable:
If you look at the details in the SQL code for calculation the median, then you find that the intermediate file is of size N*N obs, where N is the number of obs in the SAS data set.
So this is OK for very small files. But for a file with 10000 obs, you have an intermediate file of size 100 million obs. / Br Anders Anders Sköllermo Ph.D., Reuma and Neuro Data Analyst
The SQL query below is simple and pure, so that it can be ported to any other SQL platform. However, just like what Anders said, it is just way too expensive.
proc sql;
select avg(weight) as Median
from (select e.weight
from class e, class d
group by e.weight
having sum(case when e.weight = d.weight then 1 else 0 end)
>= abs(sum(sign(e.weight - d.weight)))


In the comment, Anonymous wrote:
I noticed the same thing - we tried this on one of our 'smaller' datasets (~2.9 million records), and it took forever.
Excellent solution, but maybe PROC UNIVARIATE will get you there faster on a large dataset.
Indeed PROC UNIVARIATE is the best solution in SAS to find the median, which utilizes SAS's built-in powers.

Read Directly

When the extreme cases come, say SAS cannot even open the entire dataset, we may have to use the streaming method to Reading the sas7bdat file line by line. The sas7bdat format has been decoded by JavaR and Python. Theoretically we don't need to have SAS to query a SAS dataset.
Heap is an interesting data structure, which easily finds a min or a max. ream the values, we could build a max heap and a min heap to cut the incoming stream into half in Python. The algorithm looks like a heap sorting. The good news is that it only Reading one variable each time and thus saves a lot of space.
#In Python
import heapq
from sas7bdat import SAS7BDAT
class MedianStream(object):
def __init__(self):
self.first_half = [] # will be a max heap
self.second_half = [] # will be a min heap, 1/2 chance has one more element
self.N = 0

def insert(self, x):
heapq.heappush(self.first_half, -x)
self.N += 1
if len(self.second_half) == len(self.first_half):
to_second, to_first = map(heapq.heappop, [self.first_half, self.second_half])
heapq.heappush(self.second_half, -to_second)
heapq.heappush(self.first_half, -to_first)
to_second = heapq.heappop(self.first_half)
heapq.heappush(self.second_half, -to_second)

def show_median(self):
if self.N == 0:
raise IOError('please use the insert method first')
elif self.N % 2 == 0:
return (-self.first_half[0] + self.second_half[0]) / 2.0
return -self.first_half[0]

if __name__ == "__main__":
stream = MedianStream()
with SAS7BDAT('class.sas7bdat') as infile:
for i, line in enumerate(infile):
if i == 0:
print stream.show_median()


Example 2: Find top K by groups

SQL Query

This query below is very expensive. We have a self-joining O(N^2) and a sorting O(NlogN), and the total time complexity is a terrible O(N^2 + Nlog(N)).
* In SAS
proc sql;
select a.sex, a.name, a.weight, (select count(distinct b.weight)
from class as b where b.weight >= a.weight and a.sex = b.sex ) as rank
from class as a
where calculated rank <= 3
order by sex, rank

Code Generator

The overall thought is break-and-conquer. If we synthesize SAS codes from a scripting tool such as Python, we essentially get many small SAS codes segments. For example, the SQL code below is just about sorting. So the time comlexity is largely decreased to O(Nlog(N)).
# In Python
def create_sql(k, candidates):
template = """
proc sql outobs = {0};
select *
from {1}
where sex = '{2}'
order by weight desc
for x in candidates:
current = template.format(k, 'class', x)
print current
if __name__ == "__main__":
create_sql(3, ['M', 'F'])

proc sql outobs = 3;
select *
from class
where sex = 'M'
order by weight desc

proc sql outobs = 3;
select *
from class
where sex = 'F'
order by weight desc

Read Directly

This time we use the data structure of heap again in Python. To find the k top rows for each group, we just need to prepare the min heaps with the k size for each group. With the smaller values popped out everytime, we finally get the top k values for each group. The optimized time complexity is O(Nlog(k))
#In Python
from sas7bdat import SAS7BDAT
from heapq import heappush, heappop

def get_top(k, sasfile):
minheaps = [[], []]
sexes = ['M', 'F']
with SAS7BDAT(sasfile) as infile:
for i, row in enumerate(infile):
if i == 0:
sex, weight = row[1], row[-1]
i = sexes.index(sex)
current = minheaps[i]
heappush(current, (weight, row))
if len(current) > k:
for x in minheaps:
for _, y in x:
print y

if __name__ == "__main__":
get_top(3, 'class.sas7bdat')

[u'Robert', u'M', 12.0, 64.8, 128.0]
[u'Ronald', u'M', 15.0, 67.0, 133.0]
[u'Philip', u'M', 16.0, 72.0, 150.0]
[u'Carol', u'F', 14.0, 62.8, 102.5]
[u'Mary', u'F', 15.0, 66.5, 112.0]
[u'Janet', u'F', 15.0, 62.5, 112.5]

Example 3: Find Moving Window Maxium

At the daily work, I always want to find three statistics for a moving window: mean, max, and min. The sheer data size poses challenges.
In his blog post, Liang Xie showed three advanced approaches to calculated the moving averages, including PROC EXPANDDATA STEP and PROC SQL. Apparently PROC EXPAND is the winner throughout the comparison. As conclusion, self-joining is very expensive and always O(N^2) and we should avoid it as much as possible.
The question to find the max or the min is somewhat different other than to find the mean, since for the mean only the mean is memorized, while for the max/min the locations of the past min/max should also be memorized.

Code Generator

The strategy is very straightforward: we choose three rows from the table sequentially and calculate the means. The time complexity is O(k*N). The generated SAS code is very lengthy, but the machine should feel comfortable to Reading it.
In addition, if we want to save the results, we could insert those maximums to an empty table.
# In Python
def create_sql(k, N):
template = """
select max(weight)
from class
where row in ({0})
SQL = ""
for x in range(1, N - k + 2):
current = map(str, range(x, x + 3))
SQL += template.format(','.join(current))
print "proc sql;" + SQL + "quit;"
if __name__ == "__main__":
create_sql(3, 19)

proc sql;
select max(weight)
from class
where row in (1,2,3)
select max(weight)
from class
where row in (2,3,4)
select max(weight)
from class
where row in (3,4,5)
select max(weight)
from class
where row in (4,5,6)
select max(weight)
from class
where row in (5,6,7)
select max(weight)
from class
where row in (6,7,8)
select max(weight)
from class
where row in (7,8,9)
select max(weight)
from class
where row in (8,9,10)
select max(weight)
from class
where row in (9,10,11)
select max(weight)
from class
where row in (10,11,12)
select max(weight)
from class
where row in (11,12,13)
select max(weight)
from class
where row in (12,13,14)
select max(weight)
from class
where row in (13,14,15)
select max(weight)
from class
where row in (14,15,16)
select max(weight)
from class
where row in (15,16,17)
select max(weight)
from class
where row in (16,17,18)
select max(weight)
from class
where row in (17,18,19)

Read Directly

Again, if we want to further decrease the time complexity, say O(N), we have to use better data structure, such as queue. SAS doesn't have queue, so we may switch to Python. Actually it has two loops which adds up to O(2N). However, it is still better than any other methods.
# In Python
from sas7bdat import SAS7BDAT
from collections import deque

def maxSlidingWindow(A, w):
N = len(A)
ans =[0] * (N - w + 1)
myqueue = deque()
for i in range(w):
while myqueue and A[i] >= A[myqueue[-1]]:
for i in range(w, N):
ans[i - w] = A[myqueue[0]]
while myqueue and A[i] >= A[myqueue[-1]]:
while myqueue and myqueue[0] <= i-w:
ans[-1] = A[myqueue[0]]
return ans

if __name__ == "__main__":
weights = []
with SAS7BDAT('class.sas7bdat') as infile:
for i, row in enumerate(infile):
if i == 0:

print maxSlidingWindow(weights, 3)

[112.5, 102.5, 102.5, 102.5, 102.5, 112.5, 112.5, 112.5, 99.5, 99.5, 90.0, 112.0, 150.0, 150.0, 150.0, 133.0, 133.0]


While data is expanding, we should more and more consider three things -
  • Time complexity: we don't want run data for weeks.
  • Space complexity: we don't want the memory overflow.
  • Clean codes: the colleagues should easily Reading and modify the codes.

    6月 032015
    saslib is an HTML report generator to lookup the metadata (or the head information) like PROC CONTENTS in SAS.
    • It reads the sas7bdat files directly and quickly, and does not need SAS installed.
    • Emulate PROC CONTENTS by jQuery and DataTables.
    • Extract the meta data from all SAS7bdat files under the specified directory.
    • Support IE(>=10), firefox, chrome and any other modern browser.


    pip install saslib
    saslib requires sas7bdat and jinjia2.


    The module is very simple to use. For example, the SAS data sets under the SASHELP library could be viewed —
    from saslib import PROCcontents

    sasdata = PROCcontents('c:/Program Files/SASHome/SASFoundation/9.3/core/sashelp')

    The resulting HTML file from the codes above will be like here.
    6月 032015
    saslib is an HTML report generator to lookup the metadata (or the head information) like PROC CONTENTS in SAS.
    • It reads the sas7bdat files directly and quickly, and does not need SAS installed.
    • Emulate PROC CONTENTS by jQuery and DataTables.
    • Extract the meta data from all SAS7bdat files under the specified directory.
    • Support IE(>=10), firefox, chrome and any other modern browser.


    pip install saslib
    saslib requires sas7bdat and jinjia2.


    The module is very simple to use. For example, the SAS data sets under the SASHELP library could be viewed —
    from saslib import PROCcontents

    sasdata = PROCcontents('c:/Program Files/SASHome/SASFoundation/9.3/core/sashelp')

    The resulting HTML file from the codes above will be like here.
    3月 212015
    Since Spark is rapidly evolving, I need to deploy and maintain a minimal Spark cluster for the purpose of testing and prototyping. A public cloud is the best fit for my current demand.
    1. Intranet speed
      The cluster should easily copy the data from one server to another. MapReduce always shuffles a large chunk of data throughout the HDFS. It’s best that the hard disk is SSD.
    2. Elasticity and scalability
      Before scaling the cluster out to more machines, the cloud should have some elasticity to size up or size down.
    3. Locality of Hadoop
      Most importantly, the Hadoop cluster and the Spark cluster should have one-to-one mapping relationship like below. The computation and the storage should always be on the same machines.
    HadoopCluster ManagerSparkMapReduce
    Name NodeMasterDriverJob Tracker
    Data NodeSlaveExecutorTask Tracker

    Choice of public cloud:

    I simply compare two cloud service provider: AWS and DigitalOcean. Both have nice Python-based monitoring tools(Boto for AWS and python-digitalocean for DigitalOcean).
    1. From storage to computation
      Hadoop’s S3 is a great storage to keep data and load it into the Spark/EC2 cluster. Or the Spark cluster on EC2 can directly read S3 bucket such as s3n://file (the speed is still acceptable). On DigitalOcean, I have to upload data from local to the cluster’s HDFS.
    2. DevOps tools:
        • With default setting after running it, you will get
          • 2 HDFSs: one persistent and one ephemeral
          • Spark 1.3 or any earlier version
          • Spark’s stand-alone cluster manager
        • A minimal cluster with 1 master and 3 slaves will be consist of 4 m1.xlarge EC2 instances
          • Pros: large memory with each node having 15 GB memory
          • Cons: not SSD; expensive (cost $0.35 * 6 = $2.1 per hour)
        • With default setting after running it, you will get
          • HDFS
          • no Spark
          • Mesos
          • OpenVPN
        • A minimal cluster with 1 master and 3 slaves will be consist of 4 2GB/2CPUs droplets
          • Pros: as low as $0.12 per hour; Mesos provide fine-grained control over the cluster(down to 0.1 CPU and 16MB memory); nice to have VPN to guarantee the security
          • Cons: small memory(each has 2GB memory); have to install Spark manually

    Add Spark to DigitalOcean cluster

    Tom Faulhaber has a quick bash script for deployment. To install Spark 1.3.0, I write it into a fabfile for Python’s Fabric.
    Then all the deployment onto the DigitOcean is just one command line.
    # is the internal IP address of the master
    fab -H deploy_spark
    The source codes above are available at my Github