python

8月 122016
 
As of writing, Deep Learning package Theano can't be installed under Python 3.5, which causes some problems with my project. ruslanagit provided a viable solution that works for me under Windows 10.

For convenience, I copied his solution below:

Well, the main problem with Python 3.5 on windows, I guess, is that mingw and libpython are not available (not complied with Python 3.5), so you cannot run $ conda install mingw libpython step.

So you either need to downgrade to Python 3.4 (was not an option for me) and then follow standard instructions for installing Theano on Windows, or make a few tricks to make theano work with Python 3.5. For me the following steps worked on Windows 10 with Anaconda3 and Python 3.5:

  • Install mingw from https://sourceforge.net/projects/mingw-w64/ 
  • Add the bin directory of mingw to PATH, and make sure there is no other gcc compiler in PATH (i.e. TDM-GCC is not there). 
  • In .theanorc file add 
 [gcc]
cxxflags = -shared -I"[TDM-GCC path]include" -I"[TDM-GCC path]x86_64-w64-mingw32include"

You should update paths to TDM-GCC according to your system. Note, that TDM include directory is required, since the compilation will fail with mingw include directories for python 3.5 (I think they would work for Python 2, but I am not 100% sure)
  • Create libpython35.a manually and copy it to appropriate directory. For example: 
    • Create temp directory 
    • Copy python35.dll (I took it from Anaconda3 folder) into created directory 
    • Navigate into created directory 
    • Run: gendef python35.dll 
    • Run: dlltool --dllname python35.dll --def python35.def --output-lib libpython35.a 
    • Copy libpython35.a into Anaconda3libs 
    • All other installations/configurations were done as described in the guide for installaing Theano on Windows.
 Posted by at 5:25 上午
6月 062016
 

1 pyodps:http://pyodps.readthedocs.io/zh_CN/latest/

2 udtf基础:https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF

3 udtf开发​

#-*- coding:utf8 -*-

from odps.udf import annotate

from odps.udf import BaseUDTF

import datetime

from datetime import timedelta

DATEFORMAT = '%Y%m%d'

@annotate('string,string->string,bigint')

class DateRangeUDTF(BaseUDTF):

def convert_date(self, date):

try:

date = datetime.datetime.strptime(date, DATEFORMAT)

return date.date()

except TypeError:

return date

def get_date(self, start, n):

return datetime.datetime.strftime(self.convert_date(start) + timedelta(days=n), DATEFORMAT)

def date_range(self, start, end):

days = (self.convert_date(end) - self.convert_date(start)).days + 1

if days <= 0:

raise ValueError('The start date must be before the end date.')

for n in xrange(0, days):

yield self.get_date(start, n)

def process(self, arg0, arg1):

start = arg0

end = arg1

i = 0

for t in self.date_range(start, end):

i += 1

self.forward(t, i)

4 实际应用

select rng.dt, rng.id 

from dual lateral view daterange_v1('20151010', '20151212') rng as dt, id

;



 
 Posted by at 6:45 下午
5月 112016
 

SASViyaI’ve been giving presentations about SAS® ViyaTM for a couple of months now, and the reactions have been positive. I’m part of a much bigger cast of speakers at SAS who talk about the new analytics platform with key customers and analysts. While some presenters focus on the overarching benefits or the integration with previous versions of SAS, I get to focus on hands-on demonstrations. My style is to show SAS Viya technology in use, which helps attendees understand the different user experiences.

The demonstrations I show use anywhere from 90,000 to 2.5 million observations. Sometimes I’m looking for fraud or sometimes I’m looking for opportunities to reduce attrition. No matter what business problem I’m trying to solve, I like to show four different ways to tackle the problem, depending on the user persona and skillsets.

First, I show SAS® Visual Statistics. This is a suitable interface for business analysts and citizen data scientists. I can point and click to do a logistic regression and find an answer. Or, I can start to explore my data with SAS® Visual Analytics before I do any modeling.

Running a logistic regression in SAS Visual Statistics

Running a logistic regression in SAS Visual Analytics (click to enlarge).

Next, I show SAS® Studio, where you can also point and click, or you can program SAS code. You can do both here, or toggle back and forth, which provides a lot of benefits when reviewing how your code works. Here, I run a logistic regression, write the procedure and use PROC LOGSELECT, which is the logistic procedure in SAS Viya. I also point out all of the exploratory and descriptive tasks and procedures you can use before fitting your model.

StudioLogistic (002)

SAS Studio shows the selected model and the code side by side (click to enlarge).

Then, I tab over to Jupyter Notebook and show the CAS Python API. (CAS is short for cloud analytic server.) I can write Python code that calls specific CAS actions, like calling the logistic action set or performing model assessment. The CAS Python API is not yet released but will be available later this year.

SAS code in Python

Performing a model assessment from inside Jupyter Notebook.

Finally, I like to show these prepackaged predictive models that run SAS through APIs. Using analytics as a service by SAS, these APIs can be used to embed logistic regression or other modeling techniques into any other application.

A3s (002)

APIs are portable pieces of code that can be easily combined and stacked together to enhance other applications or websites (click to enlarge).

I use the same data for all of the examples and because the same CAS action set is used for analysis you get the same results. For each one, SAS Viya goes right to the data and lifts it up into memory. I can perform my analyses quickly and nimbly, and when I’m done, the data dribbles back down to its initial location instead of proliferating copies of the data on hard disks.

Not only is it quick and easy to get complex answers – but you can get them in whatever way feels most comfortable to you.

Which of the four do you prefer? And what ideas do the different options inspire in you?

tags: Jupyter Notebook, Python, SAS Viya

Choose your own adventure with SAS Viya was published on SAS Users.

4月 252016
 

We've just celebrated Earth Day, but I'm here to talk about Jupyter -- and the SAS open source project that opens the door for more learning. With this new project on the github.com/sassoftware page, SAS contributes new support for running SAS from within Jupyter Notebooks -- a popular browser-based environment used by professors and data scientists.

My colleague Amy Peters announced this during a SAS Tech Talk show at SAS Global Forum 2016. If you want to learn more about Jupyter and see the SAS support in action, then you can watch the video here.

Visit the project on GitHub: sas_kernel by sassoftware

Within Jupyter, the sas_kernel provides multiple ways to access SAS programming methods. The most natural method is to create a new SAS notebook, available from the New menu in the Jupyter Home window and from the File menu in an active notebook:

newsasnotebook
From a SAS notebook, you can enter and run SAS code directly from a cell:

sasnotebook
There is even a Notebook extension (./nbextensions/showSASLog) that can show you the SAS log.

The second way that you can run SAS code is by using special Python "magics" supported by the sas_kernel. These magic commands look almost just like SAS macro calls (imagine that!). From within a Python language notebook, you can inject your SAS program code and pull in SAS results. This allows you to move easily between Python and SAS in a single environment. Here's a simple example:

%%SAS
proc means data=sashelp.cars;
run;
ods graphics / height=500 width=800;
proc sgplot data=sashelp.cars;
histogram msrp;
run;

How to get started

Currently, to run SAS with Jupyter you need:

  • SAS 9.4 or later running on Linux
  • Python 3 installed on the same machine (that's basically part of Linux)
  • Admin rights to be able to install/configure the Jupyter Notebook infrastructure and the sas_kernel.

End users of Jupyter Notebook do not need special privileges -- you need those only to install and configure the pieces that make it work. The GitHub project has all of the doc and step-by-step instructions for installation.

What's next for SAS and Jupyter?

This is just the start for SAS in the Jupyter world. Amy says that she has already received lots of interest and feedback, and SAS is working to make the Jupyter Notebook approach available in something like SAS University Edition and SAS OnDemand for Academics. Stay tuned!

tags: Jupyter, open source, Python, SAS global forum

The post How to run SAS programs in Jupyter Notebook appeared first on The SAS Dummy.

6月 192015
 

I really appreciate those wonderful comments on my SAS posts by the readers (123). They gave me a lot of inspirations. Due to SAS or SQL’s inherent limitation, recently I feel difficult in deal with some extremely large SAS datasets (it means that I exhausted all possible traditional ways). Here I conclude two alternative solutions in these extreme cases as a follow-up to the comments.
  1. Read Directly
    • Use a scripting language such as Python to Reading SAS datasets directly
  2. Code Generator
    • Use SAS or other scripting languages to generate SAS/SQL codes
The examples still use sashelp.class, which has 19 rows. The target variable is weight.
*In SAS
data class;
set sashelp.class;
row = _n_;
run;

Example 1: Find the median

SQL Query

In the comment, Anders SköllermoFebruary wrote
Hi! About 1. Calculate the median of a variable:
If you look at the details in the SQL code for calculation the median, then you find that the intermediate file is of size N*N obs, where N is the number of obs in the SAS data set.
So this is OK for very small files. But for a file with 10000 obs, you have an intermediate file of size 100 million obs. / Br Anders Anders Sköllermo Ph.D., Reuma and Neuro Data Analyst
The SQL query below is simple and pure, so that it can be ported to any other SQL platform. However, just like what Anders said, it is just way too expensive.
*In SAS
proc sql;
select avg(weight) as Median
from (select e.weight
from class e, class d
group by e.weight
having sum(case when e.weight = d.weight then 1 else 0 end)
>= abs(sum(sign(e.weight - d.weight)))
);
quit;

PROC UNIVARIATE

In the comment, Anonymous wrote:
I noticed the same thing - we tried this on one of our 'smaller' datasets (~2.9 million records), and it took forever.
Excellent solution, but maybe PROC UNIVARIATE will get you there faster on a large dataset.
Indeed PROC UNIVARIATE is the best solution in SAS to find the median, which utilizes SAS's built-in powers.

Read Directly

When the extreme cases come, say SAS cannot even open the entire dataset, we may have to use the streaming method to Reading the sas7bdat file line by line. The sas7bdat format has been decoded by JavaR and Python. Theoretically we don't need to have SAS to query a SAS dataset.
Heap is an interesting data structure, which easily finds a min or a max. ream the values, we could build a max heap and a min heap to cut the incoming stream into half in Python. The algorithm looks like a heap sorting. The good news is that it only Reading one variable each time and thus saves a lot of space.
#In Python
import heapq
from sas7bdat import SAS7BDAT
class MedianStream(object):
def __init__(self):
self.first_half = [] # will be a max heap
self.second_half = [] # will be a min heap, 1/2 chance has one more element
self.N = 0

def insert(self, x):
heapq.heappush(self.first_half, -x)
self.N += 1
if len(self.second_half) == len(self.first_half):
to_second, to_first = map(heapq.heappop, [self.first_half, self.second_half])
heapq.heappush(self.second_half, -to_second)
heapq.heappush(self.first_half, -to_first)
else:
to_second = heapq.heappop(self.first_half)
heapq.heappush(self.second_half, -to_second)

def show_median(self):
if self.N == 0:
raise IOError('please use the insert method first')
elif self.N % 2 == 0:
return (-self.first_half[0] + self.second_half[0]) / 2.0
return -self.first_half[0]

if __name__ == "__main__":
stream = MedianStream()
with SAS7BDAT('class.sas7bdat') as infile:
for i, line in enumerate(infile):
if i == 0:
continue
stream.insert(float(line[-1]))
print stream.show_median()

99.5

Example 2: Find top K by groups

SQL Query

This query below is very expensive. We have a self-joining O(N^2) and a sorting O(NlogN), and the total time complexity is a terrible O(N^2 + Nlog(N)).
* In SAS
proc sql;
select a.sex, a.name, a.weight, (select count(distinct b.weight)
from class as b where b.weight >= a.weight and a.sex = b.sex ) as rank
from class as a
where calculated rank <= 3
order by sex, rank
;quit;

Code Generator

The overall thought is break-and-conquer. If we synthesize SAS codes from a scripting tool such as Python, we essentially get many small SAS codes segments. For example, the SQL code below is just about sorting. So the time comlexity is largely decreased to O(Nlog(N)).
# In Python
def create_sql(k, candidates):
template = """
proc sql outobs = {0};
select *
from {1}
where sex = '{2}'
order by weight desc
;
quit;"""
for x in candidates:
current = template.format(k, 'class', x)
print current
if __name__ == "__main__":
create_sql(3, ['M', 'F'])


proc sql outobs = 3;
select *
from class
where sex = 'M'
order by weight desc
;
quit;

proc sql outobs = 3;
select *
from class
where sex = 'F'
order by weight desc
;
quit;

Read Directly

This time we use the data structure of heap again in Python. To find the k top rows for each group, we just need to prepare the min heaps with the k size for each group. With the smaller values popped out everytime, we finally get the top k values for each group. The optimized time complexity is O(Nlog(k))
#In Python
from sas7bdat import SAS7BDAT
from heapq import heappush, heappop

def get_top(k, sasfile):
minheaps = [[], []]
sexes = ['M', 'F']
with SAS7BDAT(sasfile) as infile:
for i, row in enumerate(infile):
if i == 0:
continue
sex, weight = row[1], row[-1]
i = sexes.index(sex)
current = minheaps[i]
heappush(current, (weight, row))
if len(current) > k:
heappop(current)
for x in minheaps:
for _, y in x:
print y

if __name__ == "__main__":
get_top(3, 'class.sas7bdat')


[u'Robert', u'M', 12.0, 64.8, 128.0]
[u'Ronald', u'M', 15.0, 67.0, 133.0]
[u'Philip', u'M', 16.0, 72.0, 150.0]
[u'Carol', u'F', 14.0, 62.8, 102.5]
[u'Mary', u'F', 15.0, 66.5, 112.0]
[u'Janet', u'F', 15.0, 62.5, 112.5]

Example 3: Find Moving Window Maxium

At the daily work, I always want to find three statistics for a moving window: mean, max, and min. The sheer data size poses challenges.
In his blog post, Liang Xie showed three advanced approaches to calculated the moving averages, including PROC EXPANDDATA STEP and PROC SQL. Apparently PROC EXPAND is the winner throughout the comparison. As conclusion, self-joining is very expensive and always O(N^2) and we should avoid it as much as possible.
The question to find the max or the min is somewhat different other than to find the mean, since for the mean only the mean is memorized, while for the max/min the locations of the past min/max should also be memorized.

Code Generator

The strategy is very straightforward: we choose three rows from the table sequentially and calculate the means. The time complexity is O(k*N). The generated SAS code is very lengthy, but the machine should feel comfortable to Reading it.
In addition, if we want to save the results, we could insert those maximums to an empty table.
# In Python
def create_sql(k, N):
template = """
select max(weight)
from class
where row in ({0})
;"""
SQL = ""
for x in range(1, N - k + 2):
current = map(str, range(x, x + 3))
SQL += template.format(','.join(current))
print "proc sql;" + SQL + "quit;"
if __name__ == "__main__":
create_sql(3, 19)


proc sql;
select max(weight)
from class
where row in (1,2,3)
;
select max(weight)
from class
where row in (2,3,4)
;
select max(weight)
from class
where row in (3,4,5)
;
select max(weight)
from class
where row in (4,5,6)
;
select max(weight)
from class
where row in (5,6,7)
;
select max(weight)
from class
where row in (6,7,8)
;
select max(weight)
from class
where row in (7,8,9)
;
select max(weight)
from class
where row in (8,9,10)
;
select max(weight)
from class
where row in (9,10,11)
;
select max(weight)
from class
where row in (10,11,12)
;
select max(weight)
from class
where row in (11,12,13)
;
select max(weight)
from class
where row in (12,13,14)
;
select max(weight)
from class
where row in (13,14,15)
;
select max(weight)
from class
where row in (14,15,16)
;
select max(weight)
from class
where row in (15,16,17)
;
select max(weight)
from class
where row in (16,17,18)
;
select max(weight)
from class
where row in (17,18,19)
;quit;

Read Directly

Again, if we want to further decrease the time complexity, say O(N), we have to use better data structure, such as queue. SAS doesn't have queue, so we may switch to Python. Actually it has two loops which adds up to O(2N). However, it is still better than any other methods.
# In Python
from sas7bdat import SAS7BDAT
from collections import deque

def maxSlidingWindow(A, w):
N = len(A)
ans =[0] * (N - w + 1)
myqueue = deque()
for i in range(w):
while myqueue and A[i] >= A[myqueue[-1]]:
myqueue.pop()
myqueue.append(i)
for i in range(w, N):
ans[i - w] = A[myqueue[0]]
while myqueue and A[i] >= A[myqueue[-1]]:
myqueue.pop()
while myqueue and myqueue[0] <= i-w:
myqueue.popleft()
myqueue.append(i)
ans[-1] = A[myqueue[0]]
return ans

if __name__ == "__main__":
weights = []
with SAS7BDAT('class.sas7bdat') as infile:
for i, row in enumerate(infile):
if i == 0:
continue
weights.append(float(row[-1]))

print maxSlidingWindow(weights, 3)

[112.5, 102.5, 102.5, 102.5, 102.5, 112.5, 112.5, 112.5, 99.5, 99.5, 90.0, 112.0, 150.0, 150.0, 150.0, 133.0, 133.0]

Conclusion

While data is expanding, we should more and more consider three things -
  • Time complexity: we don't want run data for weeks.
  • Space complexity: we don't want the memory overflow.
  • Clean codes: the colleagues should easily Reading and modify the codes.

    6月 032015
     
    saslib is an HTML report generator to lookup the metadata (or the head information) like PROC CONTENTS in SAS.
    • It reads the sas7bdat files directly and quickly, and does not need SAS installed.
    • Emulate PROC CONTENTS by jQuery and DataTables.
    • Extract the meta data from all SAS7bdat files under the specified directory.
    • Support IE(>=10), firefox, chrome and any other modern browser.

    Installation

    pip install saslib
    saslib requires sas7bdat and jinjia2.

    Usage

    The module is very simple to use. For example, the SAS data sets under the SASHELP library could be viewed —
    from saslib import PROCcontents

    sasdata = PROCcontents('c:/Program Files/SASHome/SASFoundation/9.3/core/sashelp')
    sasdata.show()


    The resulting HTML file from the codes above will be like here.
    6月 032015
     
    saslib is an HTML report generator to lookup the metadata (or the head information) like PROC CONTENTS in SAS.
    • It reads the sas7bdat files directly and quickly, and does not need SAS installed.
    • Emulate PROC CONTENTS by jQuery and DataTables.
    • Extract the meta data from all SAS7bdat files under the specified directory.
    • Support IE(>=10), firefox, chrome and any other modern browser.

    Installation

    pip install saslib
    saslib requires sas7bdat and jinjia2.

    Usage

    The module is very simple to use. For example, the SAS data sets under the SASHELP library could be viewed —
    from saslib import PROCcontents

    sasdata = PROCcontents('c:/Program Files/SASHome/SASFoundation/9.3/core/sashelp')
    sasdata.show()


    The resulting HTML file from the codes above will be like here.
    3月 212015
     
    Requirements
    Since Spark is rapidly evolving, I need to deploy and maintain a minimal Spark cluster for the purpose of testing and prototyping. A public cloud is the best fit for my current demand.
    1. Intranet speed
      The cluster should easily copy the data from one server to another. MapReduce always shuffles a large chunk of data throughout the HDFS. It’s best that the hard disk is SSD.
    2. Elasticity and scalability
      Before scaling the cluster out to more machines, the cloud should have some elasticity to size up or size down.
    3. Locality of Hadoop
      Most importantly, the Hadoop cluster and the Spark cluster should have one-to-one mapping relationship like below. The computation and the storage should always be on the same machines.
    HadoopCluster ManagerSparkMapReduce
    Name NodeMasterDriverJob Tracker
    Data NodeSlaveExecutorTask Tracker

    Choice of public cloud:

    I simply compare two cloud service provider: AWS and DigitalOcean. Both have nice Python-based monitoring tools(Boto for AWS and python-digitalocean for DigitalOcean).
    1. From storage to computation
      Hadoop’s S3 is a great storage to keep data and load it into the Spark/EC2 cluster. Or the Spark cluster on EC2 can directly read S3 bucket such as s3n://file (the speed is still acceptable). On DigitalOcean, I have to upload data from local to the cluster’s HDFS.
    2. DevOps tools:
        • With default setting after running it, you will get
          • 2 HDFSs: one persistent and one ephemeral
          • Spark 1.3 or any earlier version
          • Spark’s stand-alone cluster manager
        • A minimal cluster with 1 master and 3 slaves will be consist of 4 m1.xlarge EC2 instances
          • Pros: large memory with each node having 15 GB memory
          • Cons: not SSD; expensive (cost $0.35 * 6 = $2.1 per hour)
        • With default setting after running it, you will get
          • HDFS
          • no Spark
          • Mesos
          • OpenVPN
        • A minimal cluster with 1 master and 3 slaves will be consist of 4 2GB/2CPUs droplets
          • Pros: as low as $0.12 per hour; Mesos provide fine-grained control over the cluster(down to 0.1 CPU and 16MB memory); nice to have VPN to guarantee the security
          • Cons: small memory(each has 2GB memory); have to install Spark manually

    Add Spark to DigitalOcean cluster

    Tom Faulhaber has a quick bash script for deployment. To install Spark 1.3.0, I write it into a fabfile for Python’s Fabric.
    Then all the deployment onto the DigitOcean is just one command line.
    # 10.1.2.3 is the internal IP address of the master
    fab -H 10.1.2.3 deploy_spark
    The source codes above are available at my Github
    2月 022015
     
    This is a quick tutorial to deploy a web service (a social network) by the LNMP (Linux, Nginx, MongoDB, Python) infrastructure on any IaaS cloud. The repo at Github is at https://github.com/dapangmao/minitwit-mongo-ubuntu.

    Stack

    The stack is built on the tools in the ecosystem of Python below. 

    ToolNameAdvantage
    CloudDigitalOceanCheap but fast
    Server distroUbuntu 14.10 x64Everything is latest
    WSGI proxyGunicornManage workers automatically
    Web proxyNginxFast and easy to configure
    FrameworkFlaskSingle file approach for MVC
    Data storeMongoDBNo scheme needed and scalable
    DevOpsFabricAgentless and Pythonic
    In addition, a Supervisor running on the server provides a daemon to protect the Gunicorn-Flask process.

    The MiniTwit app

    The MiniTwit application is an example provided by Flask, which is a prototype of Twitter like multiple-user social network. The original application depends on SQLite. However, the data store could be modified to fit the category of NoSQL such as Google Data Store or MongoDB. A live MintiTwit demo is hosted at http://minitwit-123.appspot.com/public

    Deployment

    1. Install Fabric and clone the Github repo
    The DevOps tool is fabric that is simply based on SSH. The fabfile.py and the staging flask files are stored on Github. We should install fabric and download the fabfile.py on the local machine before the deployment.
    sudo pip install fabric 
    wget https://raw.githubusercontent.com/dapangmao/minitwit-mongo-ubuntu/master/fabfile.py
    fab -l
    2. Enter IP from the virtual machine
    A new VM from ausually emails IP address and the root password. Then we could modify the head part of the fabfile.py accordingly. There are quite a less expensive cloud providers for prototyping other than the costly Amazon EC2. For example, a minimal instance from DigitalOcean only costs five dollars a month. If SSH key has been uploaded, the password could be ignored.
    env.hosts = ['YOUR IP ADDRESS'] #  Enter IP
    env.user = 'root'
    env.password = 'YOUR PASSWORD' # Enter password
    3. Fire up Fabric
    Now it is time to formally deploy the application. With the command below, the fabric will first install pip, git, nginx, gunicorn, supervisor and the latest MongodB, and configure them sequentially. In less than 5 minutes, a Flask and MongoDB application will be ready for use. Since DigitalOcean has its own software repository for Ubuntu, and its VMs are on SSD, the deployment is even faster, which is usually finished in one minute.
    fab deploy_minitwit
    1月 092015
     
    Suppose there is a website tracking user activities to prevent robotic attack on the Internet. Please design an algorithm to identify user IDs that have more than 500 clicks within any given 10 minutes.
    Sample.txt: anonymousUserID timeStamp clickCount
    123    9:45am    10
    234 9:46am 12
    234 9:50am 20
    456 9:53am 100
    123 9:55am 33
    456 9:56am 312
    123 10:03am 110
    123 10:16am 312
    234 10:20am 201
    456 10:23am 180
    123 10:25am 393
    456 10:27am 112
    999 12:21pm 888

    Thought

    This is a typical example of stream processing. The key is to build a fixed-length window to slide through all data, count data within and return the possible malicious IDs.

    Single machine solution

    Two data structures are used: a queue and a hash table. The queue is scanning the data and only keeps the data within a 10-minute window. Once a new data entry is filled, the old ones out of the window are popped out. The hash table counts the data in the queue and will be updated with the changing queue. Any ID with more than 500 clicks will be added to a set.
    from datetime import datetime
    import time
    from collections import deque

    def get_minute(s, fmt = '%I:%M%p'):
    return time.mktime(datetime.strptime(s, fmt).timetuple())

    def get_diff(s1, s2):
    return int(get_minute(s2) - get_minute(s1)) / 60

    def find_ids(infile, duration, maxcnt):
    queue, htable, ans = deque(), {}, set()
    with open(infile, 'rt') as _infile:
    for l in _infile:
    line = l.split()
    line[2] = int(line[2])
    current_id, current_time, current_clk = line
    if current_id not in htable:
    htable[current_id] = current_clk
    else:
    htable[current_id] += current_clk
    queue.append(line)
    while queue and get_diff(queue[0][1], current_time) > duration:
    past_id, _, past_clk = queue.popleft()
    htable[past_id] -= past_clk
    if htable[current_id] > maxcnt:
    ans.add(current_id)
    return ans

    if __name__ == "__main__":
    print find_ids('sample.txt', 10, 500)

    Cluster solution

    The newest Spark (version 1.2.0) starts to support Python streaming. However, the document is still scarce — wait to see if this problem can be done by the new API.
    To be continued