A Time Series Library for Apache Spark

Overview

Flint: A Time Series Library for Apache Spark

The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark. Flint is Two Sigma's implementation of highly optimized time series operations in Spark. It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.

Flint is an open source library for Spark based around the TimeSeriesRDD, a time series aware data structure, and a collection of time series utility and analysis functions that use TimeSeriesRDDs. Unlike DataFrame and Dataset, Flint's TimeSeriesRDDs can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties. It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.

Documentation Status

Requirements

Dependency Version
Spark Version 2.3 and 2.4
Scala Version 2.12
Python Version 3.5 and above

How to install

Scala artifact is published in maven central:

https://mvnrepository.com/artifact/com.twosigma/flint

Python artifact is published in PyPi:

https://pypi.org/project/ts-flint

Note you will need both Scala and Python artifact to use Flint with PySpark.

How to build

To build from source:

Scala (in top-level dir):

sbt assemblyNoTest

Python (in python subdir):

python setup.py install

or

pip install .

Python bindings

The python bindings for Flint, including quickstart instructions, are documented at python/README.md. API documentation is available at http://ts-flint.readthedocs.io/en/latest/.

Getting Started

Starting Point: TimeSeriesRDD and TimeSeriesDataFrame

The entry point into all functionalities for time series analysis in Flint is TimeSeriesRDD (for Scala) and TimeSeriesDataFrame (for Python). In high level, a TimeSeriesRDD contains an OrderedRDD which could be used to represent a sequence of ordering key-value pairs. A TimeSeriesRDD uses Long to represent timestamps in nanoseconds since epoch as keys and InternalRows as values for OrderedRDD to represent a time series data set.

Create TimeSeriesRDD

Applications can create a TimeSeriesRDD from an existing RDD, from an OrderedRDD, from a DataFrame, or from a single csv file.

As an example, the following creates a TimeSeriesRDD from a gzipped CSV file with header and specific datetime format.

import com.twosigma.flint.timeseries.CSV
val tsRdd = CSV.from(
  sqlContext,
  "file://foo/bar/data.csv",
  header = true,
  dateFormat = "yyyyMMdd HH:mm:ss.SSS",
  codec = "gzip",
  sorted = true
)

To create a TimeSeriesRDD from a DataFrame, you have to make sure the DataFrame contains a column named "time" of type LongType.

import com.twosigma.flint.timeseries.TimeSeriesRDD
import scala.concurrent.duration._
val df = ... // A DataFrame whose rows have been sorted by their timestamps under "time" column
val tsRdd = TimeSeriesRDD.fromDF(dataFrame = df)(isSorted = true, timeUnit = MILLISECONDS)

One could also create a TimeSeriesRDD from a RDD[Row] or an OrderedRDD[Long, Row] by providing a schema, e.g.

import com.twosigma.flint.timeseries._
import scala.concurrent.duration._
val rdd = ... // An RDD whose rows have sorted by their timestamps
val tsRdd = TimeSeriesRDD.fromRDD(
  rdd,
  schema = Schema("time" -> LongType, "price" -> DoubleType)
)(isSorted = true,
  timeUnit = MILLISECONDS
)

It is also possible to create a TimeSeriesRDD from a dataset stored as parquet format file(s). The TimeSeriesRDD.fromParquet() function provides the option to specify which columns and/or the time range you are interested, e.g.

import com.twosigma.flint.timeseries._
import scala.concurrent.duration._
val tsRdd = TimeSeriesRDD.fromParquet(
  sqlContext,
  path = "hdfs://foo/bar/"
)(isSorted = true,
  timeUnit = MILLISECONDS,
  columns = Seq("time", "id", "price"),  // By default, null for all columns
  begin = "20100101",                    // By default, null for no boundary at begin
  end = "20150101"                       // By default, null for no boundary at end
)

Group functions

A group function is to group rows with nearby (or exactly the same) timestamps.

  • groupByCycle A function to group rows within a cycle, i.e. rows with exactly the same timestamps. For example,
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1000L 2.0
// 2000L 3.0
// 2000L 4.0
// 2000L 5.0

val results = priceTSRdd.groupByCycle()
// time  rows
// ------------------------------------------------
// 1000L [[1000L, 1.0], [1000L, 2.0]]
// 2000L [[2000L, 3.0], [2000L, 4.0], [2000L, 5.0]]
  • groupByInterval A function to group rows whose timestamps fall into an interval. Intervals could be defined by another TimeSeriesRDD. Its timestamps will be used to defined intervals, i.e. two sequential timestamps define an interval. For example,
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1500L 2.0
// 2000L 3.0
// 2500L 4.0

val clockTSRdd = ...
// A TimeSeriesRDD with only column "time"
// time
// -----
// 1000L
// 2000L
// 3000L

val results = priceTSRdd.groupByInterval(clockTSRdd)
// time  rows
// ----------------------------------
// 1000L [[1000L, 1.0], [1500L, 2.0]]
// 2000L [[2000L, 3.0], [2500L, 4.0]]
  • addWindows For each row, this function adds a new column whose value for a row is a list of rows within its window.
val priceTSRdd = ...
// A TimeSeriesRDD with columns "time" and "price"
// time  price
// -----------
// 1000L 1.0
// 1500L 2.0
// 2000L 3.0
// 2500L 4.0

val result = priceTSRdd.addWindows(Window.pastAbsoluteTime("1000ns"))
// time  price window_past_1000ns
// ------------------------------------------------------
// 1000L 1.0   [[1000L, 1.0]]
// 1500L 2.0   [[1000L, 1.0], [1500L, 2.0]]
// 2000L 3.0   [[1000L, 1.0], [1500L, 2.0], [2000L, 3.0]]
// 2500L 4.0   [[1500L, 2.0], [2000L, 3.0], [2500L, 4.0]]

Temporal Join Functions

A temporal join function is a join function defined by a matching criteria over time. A tolerance in temporal join matching criteria specifies how much it should look past or look futue.

  • leftJoin A function performs the temporal left-join to the right TimeSeriesRDD, i.e. left-join using inexact timestamp matches. For each row in the left, append the most recent row from the right at or before the same time. An example to join two TimeSeriesRDDs is as follows.
val leftTSRdd = ...
val rightTSRdd = ...
val result = leftTSRdd.leftJoin(rightTSRdd, tolerance = "1day")
  • futureLeftJoin A function performs the temporal future left-join to the right TimeSeriesRDD, i.e. left-join using inexact timestamp matches. For each row in the left, appends the closest future row from the right at or after the same time.
val result = leftTSRdd.futureLeftJoin(rightTSRdd, tolerance = "1day")

Summarize Functions

Summarize functions are the functions to apply summarizer(s) to rows within a certain period, like cycle, interval, windows, etc.

  • summarizeCycles A function computes aggregate statistics of rows that are within a cycle, i.e. rows share a timestamp.
val volTSRdd = ...
// A TimeSeriesRDD with columns "time", "id", and "volume"
// time  id volume
// ------------
// 1000L 1  100
// 1000L 2  200
// 2000L 1  300
// 2000L 2  400

val result = volTSRdd.summarizeCycles(Summary.sum("volume"))
// time  volume_sum
// ----------------
// 1000L 300
// 2000L 700

Similarly, we could summarize over intervals, windows, or the whole time series data set. See

  • summarizeIntervals
  • summarizeWindows
  • addSummaryColumns

One could check timeseries.summarize.summarizer for different kinds of summarizer(s), like ZScoreSummarizer, CorrelationSummarizer, NthCentralMomentSummarizer etc.

Contributing

In order to accept your code contributions, please fill out the appropriate Contributor License Agreement in the cla folder and submit it to [email protected].

Disclaimer

Apache Spark is a trademark of The Apache Software Foundation. The Apache Software Foundation is not affiliated, endorsed, connected, sponsored or otherwise associated in any way to Two Sigma, Flint, or this website in any manner.

© Two Sigma Open Source, LLC

Owner
Two Sigma
Two Sigma is a financial sciences company. Our scientists use rigorous inquiry, data analysis, and invention to solve tough challenges across financial services
Two Sigma
All-in-one web-based development environment for machine learning

All-in-one web-based development environment for machine learning Getting Started • Features & Screenshots • Support • Report a Bug • FAQ • Known Issu

3 Feb 03, 2021
💀mummify: a version control tool for machine learning

mummify is a version control tool for machine learning. It's simple, fast, and designed for model prototyping.

Max Humber 43 Jul 09, 2022
PyHarmonize: Adding harmony lines to recorded melodies in Python

PyHarmonize: Adding harmony lines to recorded melodies in Python About To use this module, the user provides a wav file containing a melody, the key i

Julian Kappler 2 May 20, 2022
Book Recommender System Using Sci-kit learn N-neighbours

Model-Based-Recommender-Engine I created a book Recommender System using Sci-kit learn's N-neighbours algorithm for my model and the streamlit library

1 Jan 13, 2022
GroundSeg Clustering Optimized Kdtree

ground seg and clustering based on kitti velodyne data, and a additional optimized kdtree for knn and radius nn search

2 Dec 02, 2021
Stats, linear algebra and einops for xarray

xarray-einstats Stats, linear algebra and einops for xarray ⚠️ Caution: This project is still in a very early development stage Installation To instal

ArviZ 30 Dec 28, 2022
Python Automated Machine Learning library for tabular data.

Simple but powerful Automated Machine Learning library for tabular data. It uses efficient in-memory SAP HANA algorithms to automate routine Data Scie

Daniel Khromov 47 Dec 17, 2022
ml4ir: Machine Learning for Information Retrieval

ml4ir: Machine Learning for Information Retrieval | changelog Quickstart → ml4ir Read the Docs | ml4ir pypi | python ReadMe ml4ir is an open source li

Salesforce 77 Jan 06, 2023
Python module for machine learning time series:

seglearn Seglearn is a python package for machine learning time series or sequences. It provides an integrated pipeline for segmentation, feature extr

David Burns 536 Dec 29, 2022
AI and Machine Learning with Kubeflow, Amazon EKS, and SageMaker

Data Science on AWS - O'Reilly Book Get the book on Amazon.com Book Outline Quick Start Workshop (4-hours) In this quick start hands-on workshop, you

Data Science on AWS 2.8k Jan 03, 2023
Metric learning algorithms in Python

metric-learn: Metric Learning in Python metric-learn contains efficient Python implementations of several popular supervised and weakly-supervised met

1.3k Dec 28, 2022
Visualize classified time series data with interactive Sankey plots in Google Earth Engine

sankee Visualize changes in classified time series data with interactive Sankey plots in Google Earth Engine Contents Description Installation Using P

Aaron Zuspan 76 Dec 15, 2022
Machine Learning Course with Python:

A Machine Learning Course with Python Table of Contents Download Free Deep Learning Resource Guide Slack Group Introduction Motivation Machine Learnin

Instill AI 6.9k Jan 03, 2023
This repository has datasets containing information of Uber pickups in NYC from April 2014 to September 2014 and January to June 2015. data Analysis , virtualization and some insights are gathered here

uber-pickups-analysis Data Source: https://www.kaggle.com/fivethirtyeight/uber-pickups-in-new-york-city Information about data set The dataset contain

B DEVA DEEKSHITH 1 Nov 03, 2021
QuickAI is a Python library that makes it extremely easy to experiment with state-of-the-art Machine Learning models.

QuickAI is a Python library that makes it extremely easy to experiment with state-of-the-art Machine Learning models.

152 Jan 02, 2023
[HELP REQUESTED] Generalized Additive Models in Python

pyGAM Generalized Additive Models in Python. Documentation Official pyGAM Documentation: Read the Docs Building interpretable models with Generalized

daniel servén 747 Jan 05, 2023
Causal Inference and Machine Learning in Practice with EconML and CausalML: Industrial Use Cases at Microsoft, TripAdvisor, Uber

Causal Inference and Machine Learning in Practice with EconML and CausalML: Industrial Use Cases at Microsoft, TripAdvisor, Uber

EconML/CausalML KDD 2021 Tutorial 124 Dec 28, 2022
Karate Club: An API Oriented Open-source Python Framework for Unsupervised Learning on Graphs (CIKM 2020)

Karate Club is an unsupervised machine learning extension library for NetworkX. Please look at the Documentation, relevant Paper, Promo Video, and Ext

Benedek Rozemberczki 1.8k Jan 03, 2023
Distributed scikit-learn meta-estimators in PySpark

sk-dist: Distributed scikit-learn meta-estimators in PySpark What is it? sk-dist is a Python package for machine learning built on top of scikit-learn

Ibotta 282 Dec 09, 2022
Library for machine learning stacking generalization.

stacked_generalization Implemented machine learning *stacking technic[1]* as handy library in Python. Feature weighted linear stacking is also availab

114 Jul 19, 2022