A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs.

Overview

This version: 0.1.11

logo

A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs, supporting both control flow and dataflow execution paradigms as well as de-centralized CPU & GPU scheduling.

For a quick look at what makes Entangle special, take a look at Design Goals.

New In This Release

Quick Usage

With Entangle you can run simple, hardware parallelized code with conditional logic that looks like this.

result = add(
            add(
                num(6),
                two() if False else one()
            ),
            subtract(
                five(),
                two()
            )
)
print(result())

Outline

Overview

Entangle is a different kind of parallel compute framework for multi-CPU/GPU environments. It allows for simple workflow design using plain old python and special decorators that control the type of parallel compute and infrastructure needed.

One key feature of entangle is fine-grained control over individual functions in a workflow. You could easily describe multiple functions running across multiple compute environments all interacting as if they were simple local python functions. No central scheduler or workflow manager is needed.

Another unique quality is the use of composition to build parallel workflows dynamically.

What does "Entangle" mean?

The term is derived from a quantum physics phenomena called quantum entanglement which involves the state of a pair or group of particles affecting one another at a distance instantaneously.

In this context, it is a metaphor for how tasks send data (particles) to one another in the context of a connected microflow.

IMPORTANT NOTES!

Please keep in mind that Entangle is in development and is classified as Pre-Alpha. Some of the functionality shown here is incomplete. If you clone this repo and want to experiment be sure to update often as things break, improve, get fixed etc. quite frequently. The main branch will always contain the most current release. All development for the next version is done on that branch (e.g. 0.1.4). If you want the current development version, look for the latest semantic version numbered branch.

Installation

NOTE: At the moment entangle only works with python 3.8 due to how coroutines work there and also shared memory features.

From PyPi

$ pip install --upgrade py-entangle
$ python -m entangle.examples.example

From repo root

python3.8

$ virtualenv --python=python3.8 venv
$ source venv/bin/activate
(venv) $ python setup.py install
(venv) $ python -m entangle.examples.example

miniconda3

  1. Install miniconda3 with python3.8 for linux
$ conda init   
$ python setup.py install
$ python -m entangle.examples.example

Installing Numba

NOTE: Numba package is disabled by default in setup.py. If you want this package, just uncomment it; however some OS specific steps might be required.

On some systems you might encounter the following error when trying to install numba.

RuntimeError: Could not find a `llvm-config` binary.

Try the following remedy (for ubuntu systems)

$ sudo apt-get install -y --no-install-recommends  llvm-10 llvm-10-dev
$ export LLVM_CONFIG=/usr/bin/llvm-config-10
$ pip3 install numba

Testing

$ pytest --verbose --color=yes --disable-pytest-warnings --no-summary --pyargs entangle.tests

or if you don't have GPU

$ pytest --verbose --color=yes --pyargs entangle.tests.test_entangle

Cleaning

Clean all build files, directories, temp files and any files created by examples and tests.

$ python setup.py clean

Miniconda

If you are planning to run or use GPU enabled code it is recommended to set up a miniconda3 virtualenv.

Design Goals

  • Small & Simple
  • Easy to Understand
  • API-less
  • Plain Old Python
  • True Parallelism
  • Pluggable & Flexible
  • Composition Based
  • Shared-Nothing
  • Serverless & Threadless
  • True Dataflow Support
  • CPU/GPU Scheduling

Architecture

Entangle is designed without a central scheduler or workflow manager. Rather, each function is decorated with special descriptors that turn them into their own workflow managers. These decorators implement logic to parallelize and gather values from its dependent arguments, which are executed as separate processes. As each function is assigned a dedicated CPU, the workflow is thus an ensemble of parallel, independent micro-flows that resolve themselves and pass their values into queues until the workflow completes.

This offers an extreme shared nothing design that maximizes CPU usage in a multi-CPU environment.

Each function (or task) is given a process and scheduled to a CPU by the operating system. Since python Processes are native bound OS processes, this inherits the benefit of the operating system scheduler which is optimized for the underlying hardware. Arguments that satisfy the function are run in parallel in the same fashion. The parent function then uses asyncio coroutines to monitor queues for the results from the processes. This keeps the CPU usage down while the dependent functions produce their results and eliminates the need for monitor threads.

As a workflow executes, it fans out over CPUs. Each process acting as it's own scheduler to spawn new processes and resolve arguments, while also monitoring queues for incoming results asynchronously. This makes the workflow a truly emergent, dynamic computing construct vs a monolithic service managing all the pieces. Of course, this is not to say one approach is better, just that entangle takes a different approach based on its preferred tradeoffs. arch

Tradeoffs

Every design approach is a balance of tradeoffs. Entangle favors CPU utilization and true parallelism over resource managers, schedulers or other shared services. It favors simplicity over behavior, attempting to be minimal and un-opinionated. It tries to be invisible to the end user as much as possible. It strives for the basic principle that, "if it looks like it should work, it should work."

Entangle leans on the OS scheduler to prioritize processes based on the behavior of those processes and underlying resource utilizations. It therefore does not provide its own redundant scheduler or task manager. Because of this, top-down visibility or control of workflow processes is not as easy as with centralized task managers.

Entangle prefers the non-API approach, where it looks like regular python expressions, over strict API's or invocation idioms. This makes it easier to pick up and use and plays well with 3rd party frameworks too.

Use Cases

Because of these tradeoffs, there are certain use cases that align with entangle and others that probably do not.

If you want top-down visibility & control of workflows and tasks, Entangle is probably not ready for you.

If you have lots of CPUs, entangle could be for you! If you want easy python workflows that span local and remote cloud resources, entangle could be for you. If you want to write custom handlers that enrich or execute code in custom ways for your needs, entangle makes this easy for you.

Entangle benefits more with CPU intensive, longer running tasks than shorter, less CPU intensive tasks.

Orchestration

One focused use case for entangle is when you want to orchestrate across different compute nodes, remote APIs and other disparate endpoints in a single workflow, with inherent parallelism.

workflow

Each step of the workflow has different parameters, needs and protocols used to communicate with it. Such a workflow might simply look like:

data = data_refinement(
    get_source_data()
)
result = measure_vectors(
    vector1(
        data("vector1")
    ),
    vector2(
        data("vector2")
    ),
    vector3(
        data("vector3")
    )
)

GPU Processing

Another use case is the need to run multiple parallel tasks that operate on matrix data using a GPU. Entangle makes this quite easy as seen in GPU Example, Docker Example and Shared Memory Example

DevOps

For devops use cases Entangle allows you to write simple, parallel workflow graphs using plain old python. This let's you write efficient parallel devops pipelines with ease.

What Entangle is not

Here are some things entangle is not, out-of-the-box. This isn't to say entangle can't do these things. In fact, entangle is designed to be a low level framework for implementing these kinds of things.

  • Entangle does not yet perform fail over or retries
  • Entangle is not a batch process framework
  • Entangle is not map/reduce
  • Entangle is not a centralized task manager

Declarative Infrastructure

Entangle allows you to target specific infrastructure environments or needs using simple decorators.

For example, to specify a process run on local hardware you can use the @local decorator

@process
@local
def myfunc():
    return

If you want to execute a function in AWS EC2 or fargate, you could write it as:

@process
@aws(keys=[])
@ec2(ami='ami-12345')
def myfunc():
    return

@process
@aws(keys=[])
@fargate(ram='2GB', cpu='Xeon')
def myfunc():
    return

or using docker containers

@process
@docker(image="tensorflow/tensorflow:latest-gpu")
def reduce_sum():
    import tensorflow as tf
    return tf.reduce_sum(tf.random.normal([1000, 1000]))

infrastructure

If you have a custom on-prem environment you can write a simple decorator that deploys the task to that and use it alongside other infrastructure decorators.

Where, What & How: Using Mixins

Entangle uses the concept of mixins to associate infrastructure needs (where) with compute (what) and concurrency needs (how). Thus, it allows you to mix and match combinations of these within a single workflow or dataflow.

For example, you might need to get data from AWS Lambda, run GPU Algorithm on that data inside a container then send those results to 10 CPUs in a compute cloud for parallel analysis, then gather those results and send them to a web service on your network for storage or rendering. All these steps have different locations, infrastructure requirements, compute needs, processing times, and protocols.

Execution

As we mentioned above, entangle workflows will fan out during execution and occupy CPUs throughout the workflow. The OS will determine the priority of processes based on their resource needs at the time of execution. Here is a simple workflow and diagram showing how the parallel execution unfolds.

result = add(
   mult(
      one(),
      two()
   ),
   three()
)

This execution order applies to workflows in entange. If you use @dataflow decorator the execution follows that of a dataflow compute model. Refer to the section Dataflows for more information.

execution

Threads vs Processes

In Python, threads do not execute in parallel to one another, it only gives the illusion of such. Python handles the context switching between threads and is limited by the GIL. Processes on the other hand, are not controlled by a GIL and can thus truly run in parallel. The host operating system governs the sheduling of processes and entangle is designed to exploit this benefit.

Workflows

A workflow in Entangle is just a fancy term for a call graph of function processes. The example above in Execution is a simple workflow. Workflows execute in natural dependency ordering that you'd expect from any python function call. The inner most dependencies are invoked first so they can return their values to parent functions.

Note that this paradigm is pretty much the way most imperative languages operate today, but it does differ from dataflows which we talk about down below Dataflows.

Imperative vs Structured Declaration

In Entangle, there are two ways you can write your workflows, depending which is more convenient for you. Both produce in the same execution sequence and results.

Let's look at the example below:

result = add(
            add(
                num(6),
                two() if False else one()
            ),
            subtract(
                five(),
                two()
            )
)

This represents the structured paradigm, based off a more strict type of lambda math notation where functions invoke functions until the top-most value is produced.

We can also write this as a sequence of imperative declarations

_five = five()
_two = two()
_sub = subtract(_five,_two)
_num = num(6)
_two2 = two() if False else one()
_add1 = add(_num,_two2)
result = add(_add1,_sub)

Process Behavior

Keyword parameters on the @process decorator allow you to control some meta-behavior of the process.

Wait

Wait indicates how long a function should wait before its arguments arrive. It is a sibling to timeout however it is different.

@process(wait=20)
def values(*args):
    values = [arg for arg in args]

    return values


o = values(
    one(),
    train()
)

In the above example, it is saying that the values function will wait up to 20 seconds for both one() and train() functions to complete and return values otherwise it will throw a ProcessTimeoutException.

Timeout

Timeout is more self-evident. It is the wait period in seconds, entangle will allow a process to run.

# Wait at most, 3 seconds for this task to complete
@process(timeout=3)
def task():
    return True

# Wait indefinitely for this task to complete
@process
def taskB():
    return False

When a process times out, a ProcessTimeoutException will be thrown by Entangle and the process will be terminated if it is still alive.

Composition

Entangle offers a couple different ways to use composition effectively: with decorators and with workflows.

Decorator Composition

You can compose your tasks by combining process and infrastructure decorators.

Again, in the example below, we are declaring a process and local infrastructure for our task to run by composing two decorators together.

@process
@local
def taskA():
    return

or, specifying that the task run as a process inside AWS fargate, unchanged.

@process
@aws(keys=[])
@fargate(ram='2GB', cpu=4)
def taskA():
    return

Workflow Composition

Composing workflows is just as simple. You can write code that itself constructs workflows on the fly easily.

from entangle.process import process
from entangle.http import request
from entangle.workflow import workflow


@process
@request(url='https://datausa.io/api/data', method='GET')
def mydata(data):
    import json
    data = json.loads(data)
    return int(data['data'][0]['Year'])


@process
def two():
    return 2


@process
def add(a, b):
    v = int(a) + int(b)
    print("ADD: *"+str(v)+"*")
    return v


@workflow
def workflow1():
    return add(
        mydata(drilldowns='Nation', measures='Population'),
        two()
    )


@workflow
def workflow2(value):
    return add(
        value(),
        two()
    )


result = workflow2(workflow1)

print(result())

The key to making this work is the deferring of execution trait of Entangle which we will discuss in a later post. But essentially it allows for separation of workflow declaration from execution. Doing this allows you to treat workflows as objects and pass them around anywhere a normal python function (or workflow) is expected. Prior to execution.

Containers

Entangle supports two container technologies: docker and singularity. These are used with the associated decorators @docker and @singularity. Using containers allows you to run functions that have complex OS or python depdendencies not native to your hosting environment.

For a complete example, please see Docker Example

Dataflows

Entangle supports two kinds of execution flow, dataflow[8] and workflow (or what is more traditionally called control flow). They both complete a DAG-based execution graph but in slightly different ways and with different advantages to the programmer.

As wikipedia states[8]:

Dataflow is a software paradigm based on the idea of disconnecting computational actors into stages (pipelines) that can execute concurrently. Dataflow can also be called stream processing or reactive programming.[1]

However, Merriam-Webster's simple definition[9] illuminates a key trait of dataflows - "...as data becomes available"

: a computer architecture that utilizes multiple parallel processors to perform simultaneous operations as data becomes available

Data Readiness

In many parallel data computations the arrival or readiness of some data might lag behind other data, perhaps coming from longer computations or farther away. True dataflow models allow the computation to proceed on a parallel path as far as it can go with the currently available data. This means dependent operations are not held up by control flow execution order in some cases and the overall computation is optimized.

dataflow

Dataflows vs Workflows

Author, slikts [1], descibes these differences very nicely (from [1]).

Control flow refers to the path the point of execution takes in a program, and sequential programming that focuses on explicit control flow using control structures like loops or conditionals is called imperative programming. In an imperative model, data may follow the control flow, but the main question is about the order of execution.

Dataflow abstracts over explicit control flow by placing the emphasis on the routing and transformation of data and is part of the declarative programming paradigm. In a dataflow model, control follows data and computations are executed implicitly based on data availability.

Concurrency control refers to the use of explicit mechanisms like locks to synchronize interdependent concurrent computations. Dataflow is also used to abstract over explicit concurrency control.

Simple Example

Let's start with a simple workflow example:

A(B(),C())

In traditional control flow or what I call lambda based[10] execution, the programming language's dependency analysis will determine the order of execution. In this example B() and C() are dependencies of A() and thus need to complete before A() can be executed. In other words, they are inputs to A(). Basic stuff.

This means the execution of each compute function is aware of the specific dependent functions it must resolve first. We call this control depedency [2].

Let's say the dependency was reversed. Whereby, a value computed by A() was a dependency of both B() and C(). How would we write this in conventional control flow?

We might do something like this.

B(A())
C(A())

Breaking our previous single expression into multiple expressions. However, in this case, A() is being invoked twice, which could produce different values. So we might introduce a variable

a = A()
B = B(a)
C = C(a)

Now we have 3 expressions that must run in a proper order. We have done some of the work by making a separate expression for our dependent value a. But for large dataflows this can be a bigger burden on the programmer to unravel all the dependencies and put them in proper order.

What if the execution of an expression was not computed using the traditional dependency analysis most languages use today but instead was defined by stricter dataflow semantics?

DAG Dataflow

In dataflow, a DAG represents the flow of values from compute nodes where each node computes its value once and the value is emitted or sent to directionally connected nodes in the DAG.

dataflow

This paradigm makes it easier to express our intentions of sharing values from A() by computing it once and sending the results to B() and C(). Neither B() nor C() explicitly depend on A(). The dataflow DAG provides the dependency structure for all the compute nodes.

Now let's rewrite our expression if it were executed in strict dataflow order.

A(
   B(),
   C()
)

Here, the dataflow engine executing this expression understands the intention to compute A() first, then in parallel compute B() and C() with the same result computed only once from A() as their input. Written imperatively, this would equate to:

a = A()
B(a)
C(a)

where B(a) and C(a) run in parallel.

IMPORTANT! The dataflow syntax provides the necessary graph structure for a dataflow engine to know explicitly which functions can operate in parallel.

Results Comparison

So what are the differences in the results from our workflow version and our dataflow version? It should be clear that the workflow version takes as input 2 values (B(),C()) and produces 1 value, A().

However, our dataflow version is different. It takes as input 1 value A() and produces two results, B() and C(), in parallel. So the computations are different!

Advantages of Strict Dataflow

As was pointed out in the intro to this section, dataflow provides declarative data dependency modelling for a computation. This is sometimes a more natural way of thinking about a problem for the human programmer. It allows a clean separation between the initial state of a dataflow and various desired outcomes that would be more difficult to model using control flow programming, as the programmer will have to use multiple imperative steps to introduce the proper execution order and indicating which computations are parallel is unclear from linear ordering.

Dataflow has improved efficiencies when it comes to data-centric computations as well because it only computes nodes once per DAG execution. This approach requires no caching or variables that might be required with imperative-based control flow.

Naturally Parallel

A dataflow DAG is a naturally and implicitly parallel model - by its declarative structure. For CPU-bound, data centric tasks it is simple and easy to understand for this reason.

Detailed Example

For a more detailed example of using @dataflow in entangle see Dataflow Examples.

References

  1. Concurrency Glossary - https://slikts.github.io/concurrency-glossary/
  2. Dependency Graphs - https://en.wikipedia.org/wiki/Dependency_graph
  3. Dataflow Programming - https://en.wikipedia.org/wiki/Dataflow_programming
  4. Data-Flow vs Control-Flow for Extreme Level Computing - https://ieeexplore.ieee.org/document/6919190
  5. Advances in Dataflow Programming Languages - https://futureofcoding.org/notes/dataflow/advances-in-dataflow-programming-langauges.html
  6. Data dependency - https://en.wikipedia.org/wiki/Data_dependency
  7. An introduction to a formal theory of dependence analysis - https://link.springer.com/article/10.1007/BF00128174
  8. Dataflow - https://en.wikipedia.org/wiki/Dataflow
  9. Dataflow - https://www.merriam-webster.com/dictionary/dataflow
  10. Functional Programming/Lambda Calculus -https://www.tutorialspoint.com/functional_programming/functional_programming_lambda_calculus.htm

Schedulers

Entangle supports a composition based mechanism for attaching schedulers to workflows and functions. The scheduler class will control access to CPU resources based on its constraints. For example, if you want to run a workflow with potentially 20 parallel tasks, but only want to allocate 4 CPUs to execute the workflow, the scheduler class can ensure entangle doesn't spawn more processes than requested. Schedulers wrap individual functions and pull CPU "cookies" off a scheduler queue to hand off to processes. Each cookie contains a CPU identifier that the process then binds to. When a cookie is placed on the queue by a process it means that CPU (id) is available for use.

Parallel processes thus use the queue mechanism to self-organize around the allocated CPUs by requesting cookies, assigning their CPU affinity to that cpu id, running their behaviors and returning the cookie to the queue when complete. This approach requires no centralized scheduler server as the workfow processes all use the same multiprocessing.Queue to retrieve CPU cookies.

scheduler

Pluggable Schedulers

Entangle allows you to provide your own scheduler class using the decorator.

@scheduler(cpus=4,impl='my.package.MyScheduler'}
def myfunc():
    return

Currently, the scheduler class need only implement one method.

def register(self, f, cpus=12):

and return a function or partial that wraps the provided function with scheduler behavior.

To see the implementation of DefaultScheduler click here.

For a workflow example using scheduler see Scheduler Example below.

Distributed Flows

Entangle allows you to pass a workflow (or dataflow) to a remote machine for execution. When combined with @scheduler decorators this also forwards scheduler behavior to the remote machine where it manages the received workflow there. This type of propagation requires no centralized (i.e. shared) scheduler or services and thus scales very well.

Moreover, parts of a workflow can be sent to different machines for a truly distributed workflow.

SSH Decorator

Functions or flows (graph of functions) are remoted by using the @ssh decorator like the example below.

@ssh(user='me', host='radiant', key='/home/me/.ssh/id_rsa.pub', python='/home/me/venv/bin/python')
@scheduler(**scheduler_config)
@thread
def workflow2():
    pass

In this example, we have declared a (contrived) workflow that adds the return values of 2 embedded functions and returns it. The @ssh decorator indicates that this workflow is to be copied and executed on the remote server myserver as user me using the python executable at /home/me/venv/bin/python. Entangle will marshall the codes to the remote server and execute them there.

Scheduler Propagation

If any dependent or subsequent functions are invoked on the remote server, any decorators that apply to those will be enforced. If you use @scheduler then it will utilize the scheduler queue to request CPU cookies. If you also use another @ssh decorator then that dependent function will be shipped to a 3rd remote server and the process repeated there.

diagram here

Each time a workflow decorated with @scheduler is sent to a remote machine, that scheduler then manages its portion of the workflow and any dependent functions that it might resolve. This pattern forms a sort of distributed hierarchy of schedulers that work in parallel across multiple machines, yet fully resolve to complete the root workflow.

Let's take a closer look at this example, which uses 3 different machines to solve its workflow.

@ssh(user='darren', host='miko', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@scheduler(**config)
@process
def two():
    # run some codes
    return 2

@ssh(user='darren', host='radiant', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@scheduler(**config)
@process
def three():
    # run some codes
    return 3

@scheduler(**config)
@process
def add(a, b):
    v = int(a.get_result()) + int(b)
    return v

@ssh(user='darren', host='phoenix', key='/home/darren/.ssh/id_rsa.pub', python='/home/darren/venv/bin/python')
@scheduler(**config)
@process
def workflow():

    _add = add(
        three(),
        two()
    )
    return _add()

In the above example, the workflow() is first sent to machine phoenix and executed there. It wraps the function add which also executes on phoenix because it has no @ssh decorator and the workflow is already there.

The add() function requires the functions three() and two() be solved first. These two functions are sent to machines miko and radiant to be solved. The results are returned to the add function running on phoenix and the result of the workflow() is returned to the calling machine, or the machine where the workflow was executed on.

Once the workflow() reaches phoenix the @scheduler attached to the workflow manages the CPU's there according to its constraints. Since the add() function has two dependencies that can run in parallel the @schedular can request 2 CPUs and run them in parallel.

diagram

Examples

There are a variety of example workflows and dataflows you can run. In addition to the sample code provided below you can run these using the following commands.

$ python -m entangle.examples.example
$ python -m entangle.examples.example2
$ python -m entangle.examples.example3
$ python -m entangle.examples.example4
$ python -m entangle.examples.example5
$ python -m entangle.examples.example6
$ python -m entangle.examples.lambdaexample
$ python -m entangle.examples.listexample
$ python -m entangle.examples.listexample2
$ python -m entangle.examples.dataflowexample
$ python -m entangle.examples.dataflowexample2
$ python -m entangle.examples.dockerexample
$ python -m entangle.examples.aiexample
$ python -m entangle.examples.schedulerexample
$ python -m entangle.examples.schedulerexample2
$ python -m entangle.examples.sshschedulerexample
$ python -m entangle.examples.timeoutexample

GPU Example

This example assumes you have installed nvidia-cuda-toolkit and associated python packages along with numba.

In this example, two vectorized functions with different sized matrices are run in parallel, and their times are gathered.

import numpy as np
from entangle.process import process
from timeit import default_timer as timer
from numba import vectorize


@process
def dovectors1():

    @vectorize(['float32(float32, float32)'], target='cuda')
    def pow(a, b):
        return a ** b

    vec_size = 100

    a = b = np.array(np.random.sample(vec_size), dtype=np.float32)
    c = np.zeros(vec_size, dtype=np.float32)

    start = timer()
    pow(a, b)
    duration = timer() - start
    return duration


@process
def dovectors2():

    @vectorize(['float32(float32, float32)'], target='cuda')
    def pow(a, b):
        return a ** b

    vec_size = 100000000

    a = b = np.array(np.random.sample(vec_size), dtype=np.float32)
    c = np.zeros(vec_size, dtype=np.float32)

    start = timer()
    pow(a, b)
    duration = timer() - start
    return duration


@process
def durations(*args):

    times = [arg for arg in args]

    return times


dp = durations(
    dovectors1(),
    dovectors2()
)

print(dp())

Which outputs something like

[0.21504536108113825, 0.3445616390090436]

Shared Memory Example

The default return value conduit in Entangle is the Queue. Task return values are marshalled back through queues where they are gathered and provided as function parameters to the parent process task. This method is not desirable for very large data sets such as matrices in GPU computations. The below example shows how Entangle uses python 3.8's shared memory feature to implicitly share volatile memory across native parallel processes.

sharedmemory

import numpy as np
from entangle.process import process
from timeit import default_timer as timer
from numba import vectorize


@process(shared_memory=True)
def dopow(names, smm=None, sm=None):
    (namea, nameb, shapea, shapeb, typea, typeb) = names

    start = timer()
    shma = sm(namea)
    shmb = sm(nameb)

    
    # Get matrixes from shared memory
    np_shma = np.frombuffer(shma.buf, dtype=typea)
    np_shmb = np.frombuffer(shmb.buf, dtype=typeb)

    @vectorize(['float32(float32, float32)'], target='cuda')
    def pow(a, b):
        return a ** b

    pow(np_shma, np_shmb)
    duration = timer() - start
    print("Powers Time: ", duration)


@process(shared_memory=True)
def createvectors(smm=None, sm=None):

    vec_size = 100000000

    start = timer()
    a = b = np.array(np.random.sample(vec_size), dtype=np.float32)
    c = np.zeros(vec_size, dtype=np.float32)

    # create shared memory for matrices
    shma = smm.SharedMemory(a.nbytes)
    shmb = smm.SharedMemory(b.nbytes)

    names = (shma.name, shmb.name, a.shape, b.shape, a.dtype, b.dtype)

    duration = timer() - start
    print("Create Vectors Time: ", duration)
    return names


dp = dopow(
    createvectors()
)

dp()

Outputs

Create Vectors Time:  0.8577492530457675
Powers Time:  0.8135421359911561

SharedMemoryManager & SharedMemory

In the example above, you will notice two special keywords being passed into the functions,

def createvectors(smm=None, sm=None):

smm is a handle to the SharedMemoryManager being used for this workflow and sm is a handle to the SharedMemory class needed to acquire the shared memory segments by name. If you set shared_memory=True then you must include these keyword arguments in your method or an error will occur.


Now, you might be asking yourself, if one of the design goals was shared-nothing then why are we talking about shared memory? When we say "shared" (in shared-nothing) we refer to resources that have to be synchronized or locked when accessed by parallel processes, thereby creating bottlenecks in the execution. The shared memory example here does not introduce any contention, rather, it is used in a pipeline fashion. In this approach, a given shared memory address is only updated by one process at a time (e.g. using it to return its data to the waiting process). Multiple shared memory segments can be created during the course of a workflow for parallel running processes.

AI Example

Here is an example that uses tensorflow to train a model and return the summary.

from entangle.logging.debug import logging
from entangle.docker import docker
from entangle.process import process

@process
@docker(image="tensorflow/tensorflow:latest-gpu", packages=['tensorflow_datasets'])
def train():
    import tensorflow as tf
    import tensorflow_datasets as tfds

    (ds_train, ds_test), ds_info = tfds.load(
        'mnist',
        split=['train', 'test'],
        shuffle_files=True,
        as_supervised=True,
        with_info=True,
    )

    def normalize_img(image, label):
        return tf.cast(image, tf.float32) / 255., label

    ds_train = ds_train.map(
        normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    ds_train = ds_train.cache()
    ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
    ds_train = ds_train.batch(128)
    ds_train = ds_train.prefetch(tf.data.experimental.AUTOTUNE)

    ds_test = ds_test.map(
        normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    ds_test = ds_test.batch(128)
    ds_test = ds_test.cache()
    ds_test = ds_test.prefetch(tf.data.experimental.AUTOTUNE)

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10)
    ])

    model.compile(
        optimizer=tf.keras.optimizers.Adam(0.001),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
    )

    model.fit(
        ds_train,
        epochs=6,
        verbose=0,
        validation_data=ds_test,
    )

    return model.summary()


model = train()
print(model())

Docker Example

In this example we are running a process that spawns the decorated function inside a docker container and waits for the result. We compose this using the @process and @docker decorators to achieve the design. The function reduce_sum is run inside the docker container using image tensorflow/tensorflow:latest-gpu and the result is returned seamlessly to the workflow.

from entangle.docker import docker
from entangle.process import process

import logging
logging.basicConfig(
    format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)


@process
@docker(image="tensorflow/tensorflow:latest-gpu")
def reduce_sum():
    import tensorflow as tf
    return tf.reduce_sum(tf.random.normal([1000, 1000]))

rs = reduce_sum()
print(rs())

Tensorflow GPU Container

The above example launches a GPU enabled docker on the nvidia docker platform (running on your local machine). Tensorflow by default will consume the entire GPU for its processing, however if you want to run parallel GPU jobs that only consume GPU memory as needed, then you need to use:

@docker(image="tensorflow/tensorflow:latest-gpu", consume_gpu=False)


docker

Dataflow Examples

The example below demonstrates the dataflow capability of Entangle. This is a different compute paradigm from workflows. Please read the section on Dataflows vs Workflows for complete explanation of the difference.

NOTE: We use threads as our execution unit in this example as it makes seeing the output possible. With @process you won't see the aggregate output on your console, instead it will be logged to entangle.log file. With Entangle you decide whether to use concurrency (threads) or parallelism (processes). Entangle is itself, threadless.

import threading
import time
from entangle.logging.debug import logging
from entangle.dataflow import thread
from entangle.dataflow import process
from entangle.dataflow import dataflow

def triggered(func, result):
    print("triggered: {} {}".format(func.__name__, result))


@dataflow(callback=triggered)
@thread
def printx(x):
    print('printx: {}'.format(threading.current_thread().name))
    return("X: {}".format(x))


@dataflow(callback=triggered)
@thread
def printy(y):
    print('printy: {}'.format(threading.current_thread().name))
    return("Y: {}".format(y))


@dataflow(callback=triggered)
@thread
def printz(z):
    print('printz: {}'.format(threading.current_thread().name))
    return("Z: {}".format(z))


@dataflow(callback=triggered)
@thread
def echo(e):
    print('echo: {}'.format(threading.current_thread().name))
    return "Echo! {}".format(e)


@dataflow(executor='thread', callback=triggered, maxworkers=3)
def emit(a, **kwargs):
    print('emit: {}'.format(threading.current_thread().name))
    return a+"!"

# Create the dataflow graph 
flow = emit(
    printx(
        printz(
            echo()
        )
    ),
    printy(
        printz()
    ),
    printy()
)

# Invoke the dataflow graph with initial input
flow('emit')

time.sleep(2)

# Call flow again with different input value
flow('HELLO')

Data-Driven Branching

It's useful to have a data flow that routes to different paths depending on input data. Entangle makes this relatively easy. The example below embeds a lambda expression directly in the dataflow structure that chooses either printx() or printy() as the next compute node depending on what the input value is - after emit has generated the value.

In the snippet below emit first produces a value based on some input, the result is emitted to either printx() or printy() depending on the value of the result. Note that this is computed during the execution of the DAG, not at declaration time.

flow = emit(
    lambda x: printx() if x == 'emit' else printy()
)

flow('emit')

lambda

Full example:

import threading
import time
from entangle.dataflow import thread
from entangle.dataflow import dataflow

def triggered(func, result):
    print("triggered: {} {}".format(func.__name__, result))

@dataflow(callback=triggered)
@thread
def printx(x):
    print('printx: {}'.format(threading.current_thread().name))
    return("X: {}".format(x))

@dataflow(callback=triggered)
@thread
def printy(y):
    print('printy: {}'.format(threading.current_thread().name))
    return("Y: {}".format(y))

@dataflow(executor='thread', callback=triggered, maxworkers=3)
def emit(a, **kwargs):
    print('emit: {}'.format(threading.current_thread().name))
    return a+"!"

# Create the dataflow graph 
# Defer whether we will forward to printx() or printy() depending
# on the result receive from emit. This won't be known until the data is ready.
flow = emit(
    lambda x: printx() if x == 'emit' else printy()
)

# Invoke the dataflow graph with initial input
flow('emit')

time.sleep(2)

# Call flow again with different input value
flow('HELLO')

Which outputs:

   printx: emit MainThread
triggered: inner X: emit
printz: ThreadPoolExecutor-3_0
triggered: inner Z: X: emit
----------------------------
printy: MainThread
triggered: inner Y: HELLO

Scheduler Example

from entangle.logging.debug import logging
from entangle.process import process
from entangle.http import request
from entangle.workflow import workflow
from entangle.scheduler import scheduler

scheduler_config = {'cpus': 2,
                    'impl': 'entangle.scheduler.DefaultScheduler'}

@scheduler(**scheduler_config)
@process
def two():
    return 2

@scheduler(**scheduler_config)
@process
def three():
    return 3

@scheduler(**scheduler_config)
@process
def add(a, b):
    print("add: {} {}".format(a,b))
    v = int(a) + int(b)
    print("ADD: *"+str(v)+"*")
    return v

@scheduler(**scheduler_config)
@workflow
def workflow2():
    return add(
        three(),
        two()
    )

result = workflow2()

print(result())

Graph Example

The follow example code shows how we can collect the call graph trace for our workflow and display it.

import json
import time
import asyncio
from entangle.logging.debug import logging
from entangle.process import process

@process
def one():
    return 1

@process
def two():
    return 2

@process
def five():
    return 5

@process
def num(n):
    return n

@process
def add(a, b):
    v = int(a) + int(b)
    print("ADD: *"+str(v)+"*")
    return v

@process
def subtract(a, b):
    return int(a) - int(b)

if __name__ == '__main__':

    workflow = add(
        add(
            num(6),
            two() if False else one()
        ),
        subtract(
            five(),
            add(
                subtract(
                    num(8),
                    two()
                ),
                one()
            )
        )
    )
    result = workflow()
    print(result)

    graph = workflow.graph(wait=True)
    print("GRAPH:",json.dumps(graph, indent=4))

Which outputs this graph structure

GRAPH: {
    "add": [
        {
            "add": {
                "num": {
                    "6": []
                },
                "one": []
            }
        },
        {
            "subtract": {
                "five": [],
                "add": {
                    "subtract": {
                        "num": {
                            "8": []
                        },
                        "two": []
                    },
                    "one": []
                }
            }
        }
    ]
}

We can also use futures to wait for the graph data to arrive as a callback.

future = workflow.graph(wait=False)

def show_graph(graph):
    print("GRAPH:", graph.result())

future.add_done_callback(show_graph)

loop = asyncio.get_event_loop()
print("WAITING ON RESULT")
loop.run_until_complete(future)
print("GOT RESULT")

Workflow Future Example

Entangle allows you to use future results for workflows if the blocking method doesn't meet your use case. To do this, we alter the invocation of the workflow slightly.

def callback(result):
    print("CALLBACK:", result.result())

# set up future callbacks
future = workflow.future(callback=callback)
print('Future:', future)

# Trigger workflow. Does not block
workflow(proc=True)

# Notify results when available
future.entangle() # Does not block

General Example

An example of how entangle will be used (still in development)

from entangle.process import process
from entangle.thread import thread
from entangle.task import task
from entangle.local import local
from entangle.aws import ec2
from entangle.aws import lmbda
from entangle.http import request

@process(timeout=60)
@local(cpus=4)
def add(a, b):
    return a + b

@process(cache=True)
@aws(keys=[])
@ec2
def one():
    return 1

@thread
@local
def two():
    return 2

@lmbda(function='name')
@aws(keys=[])
def proxy():
    # lambda proxy
    pass

@process
def subtract(a, b):
    return int(a) - int(b)

@process
def five():
    return 5

@process
def num(n):
    return n

@process
@request(url='http://..../', method='POST')
def request(data):
    # Manipulate http response data here and return new result
    return data

# 1,2,3 get passed to lambda function and result returned
result = proxy(1,2,3)
# Pass key:value params and get result from your function
result = request(key1=value, key2=value )

# parallel workflow is just "plain old python"
result = add(
            add(
                num(6),
                two() if False else one()
            ),
            subtract(
                five(),
                two()
            )
        )

print(result())

Logging

Logging in Entangle is intended to be convenient and provide some useful out-of-the-box defaults that "just work".

There are 3 default loggers you can import.

from entangle.logging.info import logging
from entangle.logging.debug import logging
from entangle.logging.file import logging

And the details of each are:

info

import logging

logging.basicConfig(
    format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

debug

import logging

logging.basicConfig(
    format='%(asctime)s : %(levelname)s : %(message)s', level=logging.DEBUG)

file

import logging

logging.basicConfig(filename='entangle.log',
                    format='%(asctime)s : %(levelname)s : %(message)s', level=logging.DEBUG)

You can of course provide your own logging configuration, but be sure to include it at the top of your file so the various entangle modules pick it up.

Design Tool

A prototype visual design tool for Entangle is shown below.

ui

You might also like...
SQS + Lambda를 활용한 문자 메시지 및 이메일, Voice call 호출을 간단하게 구현하는 serverless 템플릿
SQS + Lambda를 활용한 문자 메시지 및 이메일, Voice call 호출을 간단하게 구현하는 serverless 템플릿

AWS SQS With Lambda notification 서버 구축을 위한 Poc TODO serverless를 통해 sqs 관련 리소스(람다, sqs) 배포 가능한 템플릿 작성 및 배포 poc차원에서 간단한 rest api 호출을 통한 sqs fifo 큐에 메시지

Dude is a very simple framework for writing web scrapers using Python decorators
Dude is a very simple framework for writing web scrapers using Python decorators

Dude is a very simple framework for writing web scrapers using Python decorators. The design, inspired by Flask, was to easily build a web scraper in just a few lines of code. Dude has an easy-to-learn syntax.

pycallgraph is a Python module that creates call graphs for Python programs.
pycallgraph is a Python module that creates call graphs for Python programs.

Project Abandoned Many apologies. I've stopped maintaining this project due to personal time constraints. Blog post with more information. I'm happy t

Code2flow generates call graphs for dynamic programming language. Code2flow supports Python, Javascript, Ruby, and PHP.
Code2flow generates call graphs for dynamic programming language. Code2flow supports Python, Javascript, Ruby, and PHP.

Code2flow generates call graphs for dynamic programming language. Code2flow supports Python, Javascript, Ruby, and PHP.

Python implementation of an automatic parallel parking system in a virtual environment, including path planning, path tracking, and parallel parking
Python implementation of an automatic parallel parking system in a virtual environment, including path planning, path tracking, and parallel parking

Automatic Parallel Parking: Path Planning, Path Tracking & Control This repository contains a python implementation of an automatic parallel parking s

PyCG: Practical Python Call Graphs

PyCG - Practical Python Call Graphs PyCG generates call graphs for Python code using static analysis. It efficiently supports Higher order functions T

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code. Tuplex has similar Python APIs to Apache Spark or Dask, but rather than invoking the Python interpreter, Tuplex generates optimized LLVM bytecode for the given pipeline and input data set.

⚡ Fast • 🪶 Lightweight • 0️⃣ Dependency • 🔌 Pluggable • 😈 TLS interception • 🔒 DNS-over-HTTPS • 🔥 Poor Man's VPN • ⏪ Reverse & ⏩ Forward • 👮🏿 impy is an all-in-one image analysis library, equipped with parallel processing, GPU support, GUI based tools and so on.
impy is an all-in-one image analysis library, equipped with parallel processing, GPU support, GUI based tools and so on.

impy is All You Need in Image Analysis impy is an all-in-one image analysis library, equipped with parallel processing, GPU support, GUI based tools a

A Python module for decorators, wrappers and monkey patching.

wrapt The aim of the wrapt module is to provide a transparent object proxy for Python, which can be used as the basis for the construction of function

Collection of admin fields and decorators to help to create computed or custom fields more friendly and easy way
Collection of admin fields and decorators to help to create computed or custom fields more friendly and easy way

django-admin-easy Collection of admin fields, decorators and mixin to help to create computed or custom fields more friendly and easy way Installation

 Python Advanced --- numpy, decorators, networking
Python Advanced --- numpy, decorators, networking

Python Advanced --- numpy, decorators, networking (and more?) Hello everyone 👋 This is the project repo for the "Python Advanced - ..." introductory

Strong Typing in Python with Decorators

typy Strong Typing in Python with Decorators Description This light-weight library provides decorators that can be used to implement strongly-typed be

Extensible memoizing collections and decorators

cachetools This module provides various memoizing collections and decorators, including variants of the Python Standard Library's @lru_cache function

Generate Class & Decorators for your FastAPI project ✨🚀

Classes and Decorators to use FastAPI with class based routing. In particular this allows you to construct an instance of a class and have methods of that instance be route handlers for FastAPI & Python 3.8.

Create argparse subcommands with decorators.

python-argparse-subdec This is a very simple Python package that allows one to create argparse's subcommands via function decorators. Usage Create a S

Decorators for maximizing memory utilization with PyTorch & CUDA

torch-max-mem This package provides decorators for memory utilization maximization with PyTorch and CUDA by starting with a maximum parameter size and

 TorchShard is a lightweight engine for slicing a PyTorch tensor into parallel shards
TorchShard is a lightweight engine for slicing a PyTorch tensor into parallel shards

TorchShard is a lightweight engine for slicing a PyTorch tensor into parallel shards. It can reduce GPU memory and scale up the training when the model has massive linear layers (e.g., ViT, BERT and GPT) or huge classes (millions). It has the same API design as PyTorch.

Open-Source Python CLI package for copying DynamoDB tables and items in parallel batch processing + query natural & Global Secondary Indexes (GSIs)

Python Command-Line Interface Package to copy Dynamodb data in parallel batch processing + query natural & Global Secondary Indexes (GSIs).

Comments
  • Added one useful resource from Scaler Topics

    Added one useful resource from Scaler Topics

    Hey, I have added a helpful reference link for understanding Docker in the Docker section that I think will best add to your content and give your readers a more diverse understanding of the topic. I hope you will like this. Thank you.

    opened by bikashdaga09 0
Releases(v0.2.3)
A Python package for easy multiprocessing, but faster than multiprocessing

MPIRE, short for MultiProcessing Is Really Easy, is a Python package for multiprocessing, but faster and more user-friendly than the default multiprocessing package.

753 Dec 29, 2022
Trio – a friendly Python library for async concurrency and I/O

Trio – a friendly Python library for async concurrency and I/O The Trio project aims to produce a production-quality, permissively licensed, async/awa

5k Jan 07, 2023
🌀 Pykka makes it easier to build concurrent applications.

🌀 Pykka Pykka makes it easier to build concurrent applications. Pykka is a Python implementation of the actor model. The actor model introduces some

Stein Magnus Jodal 1.1k Dec 30, 2022
A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs.

A lightweight (serverless) native python parallel processing framework based on simple decorators and call graphs, supporting both control flow and dataflow execution paradigms as well as de-centrali

102 Jan 06, 2023
A curated list of awesome Python asyncio frameworks, libraries, software and resources

Awesome asyncio A carefully curated list of awesome Python asyncio frameworks, libraries, software and resources. The Python asyncio module introduced

Timo Furrer 3.8k Jan 08, 2023
aiomisc - miscellaneous utils for asyncio

aiomisc - miscellaneous utils for asyncio Miscellaneous utils for asyncio. The complete documentation is available in the following languages: English

aiokitchen 295 Jan 08, 2023
Backport of the concurrent.futures package to Python 2.6 and 2.7

This is a backport of the concurrent.futures standard library module to Python 2. It does not work on Python 3 due to Python 2 syntax being used in th

Alex Grönholm 224 Nov 07, 2022
AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio.

AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio. It implements trio-like structured concurrenc

Alex Grönholm 1.1k Jan 06, 2023
Ultra fast asyncio event loop.

uvloop is a fast, drop-in replacement of the built-in asyncio event loop. uvloop is implemented in Cython and uses libuv under the hood. The project d

magicstack 9.1k Jan 07, 2023
Parallelformers: An Efficient Model Parallelization Toolkit for Deployment

Parallelformers: An Efficient Model Parallelization Toolkit for Deployment

TUNiB 559 Dec 28, 2022
Python Multithreading without GIL

Multithreaded Python without the GIL

Sam Gross 2.3k Jan 05, 2023
rosny is a lightweight library for building concurrent systems.

rosny is a lightweight library for building concurrent systems. Installation Tested on: Linux Python = 3.6 From pip: pip install rosny From source: p

Ruslan Baikulov 6 Oct 05, 2021
A concurrent sync tool which works with multiple sources and targets.

Concurrent Sync A concurrent sync tool which works similar to rsync. It supports syncing given sources with multiple targets concurrently. Requirement

Halit Şimşek 2 Jan 11, 2022
Thread-safe asyncio-aware queue for Python

Mixed sync-async queue, supposed to be used for communicating between classic synchronous (threaded) code and asynchronous

aio-libs 665 Jan 03, 2023
Unsynchronize asyncio by using an ambient event loop, or executing in separate threads or processes.

unsync Unsynchronize asyncio by using an ambient event loop, or executing in separate threads or processes. Quick Overview Functions marked with the @

Alex Sherman 802 Dec 28, 2022
Simple package to enhance Python's concurrent.futures for memory efficiency

future-map is a Python library to use together with the official concurrent.futures module.

Arai Hiroki 2 Nov 15, 2022
Jug: A Task-Based Parallelization Framework

Jug: A Task-Based Parallelization Framework Jug allows you to write code that is broken up into tasks and run different tasks on different processors.

Luis Pedro Coelho 387 Dec 21, 2022
SCOOP (Scalable COncurrent Operations in Python)

SCOOP (Scalable COncurrent Operations in Python) is a distributed task module allowing concurrent parallel programming on various environments, from h

Yannick Hold 573 Dec 27, 2022
Raise asynchronous exceptions in other thread, control the timeout of blocks or callables with a context manager or a decorator

stopit Raise asynchronous exceptions in other threads, control the timeout of blocks or callables with two context managers and two decorators. Attent

Gilles Lenfant 97 Oct 12, 2022