149 points by pyper-dev 6 days ago | 35 comments
We're excited to introduce the Pyper package for concurrency & parallelism in Python. Pyper is a flexible framework for concurrent / parallel data processing, following the functional paradigm.
Source code can be found on [github](https://github.com/pyper-dev/pyper)
Key features:
Intuitive API: Easy to learn, easy to think about. Implements clean abstractions to seamlessly unify threaded, multiprocessed, and asynchronous work.
Functional Paradigm: Python functions are the building blocks of data pipelines. Let's you write clean, reusable code naturally.
Safety: Hides the heavy lifting of underlying task execution and resource clean-up. No more worrying about race conditions, memory leaks, or thread-level error handling.
Efficiency: Designed from the ground up for lazy execution, using queues, workers, and generators.
Pure Python: Lightweight, with zero sub-dependencies.
We'd love to hear any feedback on this project!
solidasparagus 3 days ago
But I'm not sure I can use this even though I have a specific use-case that feels like it would work well (high-performance pure Python downloading from cloud object storage). The examples are a bit too simple and I don't understand how I can do more complicated things.
I chunk up my work, run it in parallel and then I need to do a fan-in step to reduce my chunks - how do you do that in Pyper?
Can the processes have state? Pure functions are nice, but if I'm reaching for multiprocess, I need performance and if I need performance, I'll often want a cache of some sort (I don't want to pickle and re-instantiate a cloud client every time I download some bytes for instance).
How do exceptions work? Observability? Logs/prints?
Then there's stuff that is probably asking too much from this project, but I get it if I write my own python pipeline so it matters to me - rate limiting WIP, cancellation, progress bars.
But if some of these problems are/were solved and it offers an easy way to use multiprocessing in python, I would probably use it!
pyper-dev 2 days ago
One thing I'd mention is that we don't really imagine Pyper as a whole observability and orchestration platform. It's really a package for writing Python functions and executing them concurrently, in a flexible pattern that can be integrated with other tools.
For example, I'm personally a fan of Prefect as an observability platform-- you could define pipelines in Pyper then wrap it in a Prefect flow for orchestration logic.
Exception handling and logging can also be handled by orchestration tools (or in the business logic if appropriate, literally using try... except...)
For a simple progress bar, tqdm is probably the first thing to try. As it wraps anything iterable, applying it to a pipeline might look like:
import time
from pyper import task
from tqdm import tqdm
@task(branch=True)
def func(limit: int):
for i in range(limit):
time.sleep(0.1)
yield i
def main():
for _ in tqdm(func(limit=20), total=20):
pass
if __name__ == "__main__":
main()
halfcat 3 days ago
Have you tried multiprocessing.shared_memory to address this?
solidasparagus 3 days ago
IIRC multiprocessing.shared_memory is a much more low-level of abstraction than most python stuff, so I think I'd need to figure out how to make the client use the shared memory and I'm not sure if I could.
globular-toast 3 days ago
Concurrency in general isn't about parallelism. It's just about doing multiple things at the same time.
cess11 3 days ago
I've also used 'fork in Picolisp a lot for this kind of thing, and also Elixir, which arguably has much nicer pipes.
But hey, it's good that Python after like thirty years or so is trying to get decent concurrency. Eventually people that use it as a first language might learn about such things too.
solidasparagus 2 days ago
cess11 2 days ago
However, it's a real problem that 'beginner languages' like Python and Javascript doesn't readily do multithread computation, something which has been the default on personal computers for quite a while now and available for at least twenty years.
rtpg 3 days ago
I don't really need pipelining that much, but pipelining along with a certain level of durability and easy multiprocessing support? Now we're talking
t43562 3 days ago
I suppose one excellent thing about this would be if you could just change 1 parameter and switch from multiprocessing to threaded.
rtpg 3 days ago
I'm not sure how well async Python libs are tested against working in a world with multiple event loops, but I bet there are a _lot_ of latent bugs in that space.
giancarlostoro 3 days ago
> pipeline = task(get_data, branch=True) \
> | task(step1, workers=20) \
> | task(step2, workers=20) \
> | task(step3, workers=20, multiprocess=True)
dec0dedab0de 3 days ago
you could reassign every line, but it would look nicer with chained functions.
pipeline = task(get_data, branch=True)
pipeline = pipeline | task(step1, workers=20)
pipeline = pipeline | task(step2, workers=20)
pipeline = pipeline | task(step3, workers=20, multiprocess=True)
edit:I would be tempted to do something like this:
steps = [task(step1, workers=20),
task(step2, workers=20),
task(step3, workers=20, multiprocess=True)]
pipeline = task(get_data, branch=True)
for step in steps:
pipeline = pipeline.__or__(step)
Rickster35 2 days ago
pipeline = task(get_data, branch=True).pipe(
task(step1, workers=20)).pipe(
task(step2, workers=20)).pipe(
task(step3, workers=20, multiprocess=True))
That's probably the chained method approach for those with this preference.me-vs-cat 1 day ago
pipeline = task(...)
pipeline |= task(...)
So does this style: steps = [task(...), task(...)]
pipeline = functools.reduce(operator.or_, steps)
But it appears you can just change "task" to "Task" and then: pipeline = pyper.Pipeline([Task(...), Task(...)])
morkalork 3 days ago
giancarlostoro 3 days ago
I've not been doing Python day-to-day so I'm starting to lose my touch on all the nice little tricks.
Rickster35 3 days ago
pipeline = (
task(get_data, branch=True)
| task(step1, workers=20)
| task(step2, workers=20)
| task(step3, workers=20, multiprocess=True)
)Square brackets would create a list and braces would create a set of course. The contents still can be split over different lines-- just pointing that this syntax doesn't do the same thing.
asplake 3 days ago
giancarlostoro 3 days ago
yablak 3 days ago