A pure Python implementation of Apache Spark's RDD and DStream interfaces.

Overview
https://raw.githubusercontent.com/svenkreiss/pysparkling/master/logo/logo-w100.png

pysparkling

Pysparkling provides a faster, more responsive way to develop programs for PySpark. It enables code intended for Spark applications to execute entirely in Python, without incurring the overhead of initializing and passing data through the JVM and Hadoop. The focus is on having a lightweight and fast implementation for small datasets at the expense of some data resilience features and some parallel processing features.

How does it work? To switch execution of a script from PySpark to pysparkling, have the code initialize a pysparkling Context instead of a SparkContext, and use the pysparkling Context to set up your RDDs. The beauty is you don't have to change a single line of code after the Context initialization, because pysparkling's API is (almost) exactly the same as PySpark's. Since it's so easy to switch between PySpark and pysparkling, you can choose the right tool for your use case.

When would I use it? Say you are writing a Spark application because you need robust computation on huge datasets, but you also want the same application to provide fast answers on a small dataset. You're finding Spark is not responsive enough for your needs, but you don't want to rewrite an entire separate application for the small-answers-fast problem. You'd rather reuse your Spark code but somehow get it to run fast. Pysparkling bypasses the stuff that causes Spark's long startup times and less responsive feel.

Here are a few areas where pysparkling excels:

  • Small to medium-scale exploratory data analysis
  • Application prototyping
  • Low-latency web deployments
  • Unit tests

Install

pip install pysparkling[s3,hdfs,streaming]

Documentation:

https://raw.githubusercontent.com/svenkreiss/pysparkling/master/docs/readthedocs.png

Other links: Github, pypi-badge , test-badge , Documentation Status

Features

  • Supports URI schemes s3://, hdfs://, gs://, http:// and file:// for Amazon S3, HDFS, Google Storage, web and local file access. Specify multiple files separated by comma. Resolves * and ? wildcards.
  • Handles .gz, .zip, .lzma, .xz, .bz2, .tar, .tar.gz and .tar.bz2 compressed files. Supports reading of .7z files.
  • Parallelization via multiprocessing.Pool, concurrent.futures.ThreadPoolExecutor or any other Pool-like objects that have a map(func, iterable) method.
  • Plain pysparkling does not have any dependencies (use pip install pysparkling). Some file access methods have optional dependencies: boto for AWS S3, requests for http, hdfs for hdfs

Examples

Some demos are in the notebooks docs/demo.ipynb and docs/iris.ipynb .

Word Count

from pysparkling import Context

counts = (
    Context()
    .textFile('README.rst')
    .map(lambda line: ''.join(ch if ch.isalnum() else ' ' for ch in line))
    .flatMap(lambda line: line.split(' '))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)
print(counts.collect())

which prints a long list of pairs of words and their counts.

Comments
  • Introducing pybuilder

    Introducing pybuilder

    I implemented pybuilder now here. The changes do NOT include the optimizations of the headers (yet) ;-). That's for another PR. pybuilder works, I don't know about travis yet though.

    opened by svaningelgem 14
  • Feat/spark sql

    Feat/spark sql

    This PR is related to #47: It implements a big part of DataFrame related APIs using pure Python

    It is already huge in term of code, and for that I'm sorry as there's a lot to review. On the other hand it adds a lot of features and support some of Spark nicest features :).

    NB: I'm opening this PR as-is for 2 main reasons:

    • See what happens with the test suite of pysparkling (I haven't test a lot with python2 even if a lot of effort went into compatibility with it)
    • Discuss if there is a way to make it easier to ingest. A suggestion would be to split it but there are still a lot of connected components that are codependent (mainly DataFrame and Column).

    What this PR is about

    It introduce the DataFrame object, a data structure that contains Rows of data, Row are quite similar to namedtuple.

    DataFrame coolest feature is that you describe operation based on the schema of your Row but not on their values (by manipulating Column). DataFrame operations are supported by the existing RDD code, like PySpark's DataFrame most of the logic is not directly in DataFrame but in another object (in PySpark it's in the Scala counterpart of DataFrame, here in a DataFrameInternal object written in Python).

    What this PR includes:

    • pysparkling.sql module, including:
      • SparkSession and SQLContext that allow DataFrame creation and management
      • DataFrame and GroupedData
      • Column
      • Types
    • DataFrameReader partial support of JSON and CSV
    • Some missing methods of RDD
    • An implementation of most of PySpark SQL functions, both classic expression and aggregations

    What it does not include and that should be address in another PR:

    • Raw SQL strings parsing, both for schema description and for query creation:

      This does not work:

      spark.sql("select count(1) from swimmers").show()
      

      This works:

      df = spark.read.csv("swimmer")
      df.select(count(1)).show()
      
    • Window functions

    • Catalog related features

    • Streaming related features

    I'm available for any questions/as mush walk-through on the code as you want :smiley:
    (twitter: https://twitter.com/geekowan if you want to send DMs)

    opened by tools4origins 13
  • Optimize imports

    Optimize imports

    Optimize import does cleanup:

    • unused imports
    • imports that aren't sorted within a group
    • two top-level imports on the same line.

    Very handy tool to keep the imports nice and tidy.

    opened by svaningelgem 11
  • Add accumulators, update broadcasts

    Add accumulators, update broadcasts

    Broadcast changes were made to make sure that Broadcast initialization has the same arguments as its pyspark counterpart (context as first argument).

    Accumulator and AccumulatorParam implementation is partly taken from their pyspark counterparts. Note that this implementation does not support parallelization, but such addition should be fairly doable on top of current implementation (again, by looking at the pyspark counterpart).

    Tests are passing, I added tests by re-using pyspark doctests.

    Fixes #25

    opened by alexprengere 11
  • [NOT FOR MERGE] Feat/sql string parsing

    [NOT FOR MERGE] Feat/sql string parsing

    Let's introduce SQL parsing! And support things like spark main example:

    image

    But first let's figure out how :smile:

    This PR adds one dependency (Antlr4) and one file (SqlBase.g4). It also contains many files generated based on SqlBase.g4, and that's why it is so big (and should not be merged).

    Antlr is a parser, and SqlBase.g4 defines a SQL grammar: It formalizes how are structured SQL strings such as SELECT * FROM table WHERE column IS NOT NULL.

    Note: SqlBase.g4 is derived from spark who itself is derived from Presto's one: This reinforces that we introduce the same SQL grammar as the one used by Spark.

    Based on this grammar, Antlr4 will convert each string into a syntax tree, where each syntaxical component is a Node with a predefined type and predefined children that are themself trees. It make SQL string parsing much easier as SQL is a bit complex.

    For instance it converts 42 > 1 into a tree like:

    | ComparisonContext 
    |-- ValueExpressionDefaultContext 
    |---- ConstantDefaultContext 
    |------ NumericLiteralContext 
    |-------- IntegerLiteralContext 
    |---------- TerminalNodeImpl                # 42
    |-- ComparisonOperatorContext 
    |---- TerminalNodeImpl                      # >
    |-- ValueExpressionDefaultContext 
    |---- ConstantDefaultContext 
    |------ NumericLiteralContext 
    |-------- IntegerLiteralContext 
    |---------- TerminalNodeImpl                # 1
    

    I am not opening this PR in order to have it merged: I do not think that we should add generated code to git.

    Rather, I am opening it to discuss how to automatized the code generation.

    Currently, it requires the following steps:

    1. Download antlr-4.7.1-complete.jarfrom https://www.antlr.org/download/
    2. Run java -Xmx500M -cp "/path/to/antlr-4.7.1-complete.jar:$CLASSPATH" org.antlr.v4.Tool ${project_dir}/pysparkling/sql/ast/grammar/SqlBase.g4 -o ${project_dir}/pysparkling/sql/ast/generated

    But that's only for developers: I think we will want to package the app with these generated files.

    These are the steps why I think a bit more automation in the app lifecyle would be nice.

    What do you think?

    opened by tools4origins 7
  • Warning when running tests

    Warning when running tests

    nosetests is failing with the following error:

    nosetests: error: Error reading config file 'setup.cfg': 
    no such option 'with-doctest-ignore-unicode'
    

    When using python setup.py test, it works but still we get a warning:

    RuntimeWarning: Option 'with-doctest' in config file 'setup.cfg' ignored:
    excluded by runtime environment
    

    Can we remove this option, or am I missing some libraries to make it work?

    opened by alexprengere 6
  • Handle paths manipulation with os.path, some cleanups

    Handle paths manipulation with os.path, some cleanups

    I read most of the code of the project, and removed a bit of code here and there code when I saw opportunities. Most of the changes are in fileio. Tests are still passing, I hope I did not break anything.

    opened by alexprengere 5
  • Feat/untangling imports

    Feat/untangling imports

    @tools4origins : you're not going to like this one when merging :(. Sorry about that!

    Cleanup of imports.

    • change of name from stat_counter.py to statcounter.py as that's the name in pyspark.
    • Moved as much as I could into 'terminals'. Meaning modules which are not depending on any others except for externals to pysparkling. There's still a lot to do here!
    • Moved stuff out from pysparkling into pysparkling.sql because the SQL code just does not belong in the root!
    • The most glaring example of this is the method toDF which is moved to sql.session.py and is being monkey-patched. Just like pyspark is doing it. (don't re-invent the wheel :)).
    • Moved stuff to private (to pysparkling) modules (modules starting with _) which are not defined in pyspark. This helped to reduce the complexity a lot.

    What I'm trying to achieve here is to reduce the dependency hell of the "SQL" module. Not much code has been changed, some code copied from pyspark to make it easier, but basically moving methods around into different files. I also started here to make a distinction between pysparkling internals and the pyspark interface. What I mean with that is that pyspark has a certain file structure, this I kept as rigorous as possible (it was already largely that way anyhow), But pysparkling specific methods I tried to move into "private" modules (_types, _casts, _expressions, ...).

    opened by svaningelgem 3
  • Feat/spark sql side effects

    Feat/spark sql side effects

    This PR contains all the modifications required by the Spark SQL implementation (#92) outside of pysparkling.sql,

    12 files are affected by this PR:

    .
    ├── pysparkling
    │   ├── sql
    │   │   ├── internal_utils
    │   │   │   └── joins.py
    │   │   └── types.py
    │   ├── tests
    │   │   ├── test_stat_counter.py
    │   │   └── test_streaming_files
    │   ├── __init__.py
    │   ├── context.py
    │   ├── rdd.py
    │   ├── stat_counter.py
    │   ├── storagelevel.py
    │   └── utils.py
    ├── LICENSE
    └── setup.py
    

    As it contains mostly interfaces with Spark SQL it sometimes refers to code that is not part of this PR, such references are commented in this PR.

    Biggest chunks of code are:

    pysparkling/stat_counter.py as this PR add stat counters similar to the existing StatCounter but for Column and Rows. Those counters computes the following stats:

    • mean
    • variance_pop
    • variance_samp
    • variance
    • stddev_pop
    • stddev_samp
    • stddev
    • min
    • max
    • sum
    • skewness
    • kurtosis
    • covar_samp
    • covar_pop
    • pearson_correlation

    pysparkling/utils.py as it introduces many utils functions

    opened by tools4origins 3
  • Travis python versions

    Travis python versions

    Fixes #93.

    It contains the commit history of #92 as I did not see a way to split git modifications on multiple branches while keeping the change history,

    My suggestion is to progressively checkout from feat/sparkSQL on top of this commit, with one commit referencing feat/sparkSQL (a.k.a. 1ca7a2a64d7f53ff44d02b033e571914d032b60a) for each PR, this way it's still easy to navigate through history

    What do you think of the process? This PR has a lot of commits but it's fairly easy to look at the files changes (commit comments are not relevant for those changes)

    opened by tools4origins 3
  • fix RDD.reduce when rdd contains empty partitions

    fix RDD.reduce when rdd contains empty partitions

    Fixes #83

    It let the TypeError to be thrown instead of checking the emptiness of the partition beforehand as values is a generator and it seems better not to affect it.

    opened by tools4origins 3
  • fix(dataFrameShow): Remove extra new lines

    fix(dataFrameShow): Remove extra new lines

    A regression was introduced in the latest commit on master:

    spark.range(3).show() printed not wanted blank lines:

    +---+
    | id|
    +---+
    |  0|
    <BLANK LINE>
    |  1|
    <BLANK LINE>
    |  2|
    <BLANK LINE>
    +---+
    

    This went unnoticed because doctest was configure to ignore whitespace differences, which we do not want for instance because of this regression, hence the removal in setup.cfg

    opened by tools4origins 1
  • Feat/expression types

    Feat/expression types

    This PR is on top of #157 so it may be good to merge #157 first. A diff between PR can be found here: https://github.com/tools4origins/pysparkling/pull/7/files

    It adds column data types handling, mostly by implementing an Expression.data_type methods that take the DataFrame schema and returns the Expression type

    I found a few minor issues (e.g. bad return type in functions) while implementing it so this PR fixes them too

    opened by tools4origins 0
  • pytest-xdist

    pytest-xdist

    I just tried pytest-xdist to run the tests in parallel.

    Mostly it went ok, but these 2 failed somehow:

    FAILED pysparkling/sql/tests/test_write.py::DataFrameWriterTests::test_write_nested_rows_to_json - FileNotFoundError: [WinError 3] The system cannot find the path specified: '.tmp'
    FAILED pysparkling/tests/test_streaming_tcp.py::TCPTextTest::test_connect - AssertionError: 6 != 20
    

    Would be nice to use the pytest-xdist (I ran it with -n 4) and it finished in a fraction of the time pytest takes to run.

    Wouldn't that be a good first issue for someone to look at ;-)?

    opened by svaningelgem 0
  • Feat/sql parsing

    Feat/sql parsing

    This PR implements an ANTLR-based logic to parse SQL.

    ANTLR is a parser generator used by Apache Spark. As such, we are able to use the exact spark SQL syntax.

    SQL strings are converted into an abstract syntax tree (AST) by this project https://github.com/pysparkling/python-sql-parser.

    These AST are then converted into Pysparkling object using the pysparkling/sql/ast/ast_to_python.py module, in particular via its entry points parse_xxx, e.g. parse_sql or parse_schema.

    This PR only exposes SQL parsing on SQL schemas via a refactoring of StructType.fromDDL.

    It also contains part of the logic that will be used to handle other types of SQL statements.

    opened by tools4origins 5
  • Make the imports of the `sql` module less complicated

    Make the imports of the `sql` module less complicated

    The import structure within the sql module is complicated and not great at the moment.

    The core classes outside of the sql module should be fine.

    This issue is to check whether these imports outside of top-level are really necessary for those cases like this one? ==> Rationalize & reduce complexity in imports.

    Originally posted by @svenkreiss in https://github.com/svenkreiss/pysparkling/pull/152#discussion_r574602389

    opened by svaningelgem 0
  • Show a bit of coverage report in Github Action terminal

    Show a bit of coverage report in Github Action terminal

    it might be worth to think about how to show a bit of the coverage report in the terminal output of the GitHub Action.

    Originally posted by @svenkreiss in https://github.com/svenkreiss/pysparkling/issues/150#issuecomment-775915822

    opened by svaningelgem 0
Releases(v0.6.2)
Owner
Sven Kreiss
Postdoc at Visual Intelligence for Transportation (VITA) lab at EPFL with a background in particle physics.
Sven Kreiss
cuDF - GPU DataFrame Library

cuDF - GPU DataFrames NOTE: For the latest stable README.md ensure you are on the main branch. Built based on the Apache Arrow columnar memory format,

RAPIDS 5.2k Dec 31, 2022
A pure Python implementation of Apache Spark's RDD and DStream interfaces.

pysparkling Pysparkling provides a faster, more responsive way to develop programs for PySpark. It enables code intended for Spark applications to exe

Sven Kreiss 254 Dec 06, 2022
Modin: Speed up your Pandas workflows by changing a single line of code

Scale your pandas workflows by changing one line of code To use Modin, replace the pandas import: # import pandas as pd import modin.pandas as pd Inst

8.2k Jan 01, 2023
Out-of-Core DataFrames for Python, ML, visualize and explore big tabular data at a billion rows per second 🚀

What is Vaex? Vaex is a high performance Python library for lazy Out-of-Core DataFrames (similar to Pandas), to visualize and explore big tabular data

vaex io 7.7k Jan 01, 2023
A Python package for manipulating 2-dimensional tabular data structures

datatable This is a Python package for manipulating 2-dimensional tabular data structures (aka data frames). It is close in spirit to pandas or SFrame

H2O.ai 1.6k Jan 05, 2023
A package which efficiently applies any function to a pandas dataframe or series in the fastest available manner

swifter A package which efficiently applies any function to a pandas dataframe or series in the fastest available manner. Blog posts Release 1.0.0 Fir

Jason Carpenter 2.2k Jan 04, 2023
NumPy and Pandas interface to Big Data

Blaze translates a subset of modified NumPy and Pandas-like syntax to databases and other computing systems. Blaze allows Python users a familiar inte

Blaze 3.1k Jan 01, 2023
Pandas Google BigQuery

pandas-gbq pandas-gbq is a package providing an interface to the Google BigQuery API from pandas Installation Install latest release version via conda

Python for Data 348 Jan 03, 2023
Universal 1d/2d data containers with Transformers functionality for data analysis.

XPandas (extended Pandas) implements 1D and 2D data containers for storing type-heterogeneous tabular data of any type, and encapsulates feature extra

The Alan Turing Institute 25 Mar 14, 2022
Koalas: pandas API on Apache Spark

pandas API on Apache Spark Explore Koalas docs » Live notebook · Issues · Mailing list Help Thirsty Koalas Devastated by Recent Fires The Koalas proje

Databricks 3.2k Jan 04, 2023
The easy way to write your own flavor of Pandas

Pandas Flavor The easy way to write your own flavor of Pandas Pandas 0.23 added a (simple) API for registering accessors with Pandas objects. Pandas-f

Zachary Sailer 260 Jan 01, 2023
The goal of pandas-log is to provide feedback about basic pandas operations. It provides simple wrapper functions for the most common functions that add additional logs

pandas-log The goal of pandas-log is to provide feedback about basic pandas operations. It provides simple wrapper functions for the most common funct

Eyal Trabelsi 206 Dec 13, 2022
High performance datastore for time series and tick data

Arctic TimeSeries and Tick store Arctic is a high performance datastore for numeric data. It supports Pandas, numpy arrays and pickled objects out-of-

Man Group 2.9k Dec 23, 2022
sqldf for pandas

pandasql pandasql allows you to query pandas DataFrames using SQL syntax. It works similarly to sqldf in R. pandasql seeks to provide a more familiar

yhat 1.2k Jan 09, 2023
Create HTML profiling reports from pandas DataFrame objects

Pandas Profiling Documentation | Slack | Stack Overflow Generates profile reports from a pandas DataFrame. The pandas df.describe() function is great

10k Jan 01, 2023