Quickstart and Examples
parallel
's main goal is to make the process of parallelizing code transparent to us developers. For that, it offers 2 main functions (with their respective non-blocking versions):
parallel.map
: to parallelize the same function with multiple arguments.parallel.par
: to spawn different functions.
parallel.map
Used to spawn multiple executions of the same function with different parameters:
def download_and_store(url, expected_code=200): resp = requests.get(url) if resp.status_code != expected_code: raise ValueError() result = store_in_db(resp.json()) return result urls = ['https://python.org', 'https://rmotr.com', '...'] results = parallel.map(download_and_store, urls) python_result = results[0]
Multiple params can be passed just as sequences:
urls = [ ('https://python.org', 204), ('https://rmotr.com', 201), ] results = parallel.map(download_and_store, urls)
And in dictionaries:
urls = { 'python': ('https://python.org', 204), 'rmotr': ('https://rmotr.com', 201), } results = parallel.map(download_and_store, urls) python_result = results['python']
(more about parameters below)
parallel.par
Used when you need to run different functions in parallel:
def get_price_bitcoin(exchange): pass def get_price_ether(exchange): pass def get_price_ripple(exchange): pass def index(request): prices = parallel.par({ 'btc': (get_price_bitcoin, 'bitstamp') 'eth': parallel.future(get_price_ether, exchange='bitfinex'), 'xrp': parallel.future(get_price_ripple, 'bitstamp'), }) return render_template('index.html', context={ 'prices': prices })
We'll keep using these same examples to explore all the features of parallel
. For most parts, these will work in the same way for all map
, async_map
, par
and async_par
.
Choosing an executor
parallel
can transparently provide multithreading or multiprocess execution for your code.
# Multithreading: results = parallel.thread.map(download_and_store, urls) # Attribute results = parallel.map(download_and_store, urls, executor=parallel.THREAD_EXECUTOR) # Parameter # Multiprocessing: results = parallel.process.map(download_and_store, urls) # Attribute results = parallel.map(download_and_store, urls, executor=parallel.PROCESS_EXECUTOR) # Parameter
Parameters for executors can be passed directly (similar to concurrent.futures
):
results = parallel.map( download_and_store, urls, executor=parallel.THREAD_EXECUTOR, timeout=10, max_workers=5)
Decorating functions
If you rely on parallel tasks in a constant basis, you can choose to decorate the function to make it easier for later:
@parallel.decorate def download_and_store(url): pass urls = ['https://python.org', 'https://rmotr.com', '...'] results = download_and_store.map(urls) # Use the function as a handler
Executors and parameters all work in the same way:
results = download_and_store.thread.map(urls, timeout=10, max_workers=5) results = download_and_store.process.map(urls, timeout=10, max_workers=5)
For parallel.par
, the advantage of decorating functions is the easy passage of parameters:
@parallel.decorate def get_price_bitcoin(exchange): pass @parallel.decorate def get_price_ether(exchange): pass prices = parallel.par({ 'btc': get_price_bitcoin.future('bitstamp'), 'eth': get_price_ether.future(exchange='bitfinex'), })
Any decorated function can still be used normally, the decorator only adds the parallel
attributes:
result = download_and_store('https://python.org') btc = get_price_bitcoin('bitstamp')
Parameters for ease of mind
One of the biggest limitations with concurrent.futures
is the passage of parameters. parallel
resolves all those issues and even incorporates some good ideas to simplify your work:
Sequences or dictionaries
Do you prefer to see results in a sequential manner, or in a named one? Either can be used:
def download_and_store(url): pass # Sequential: results = parallel.map(download_and_store, [ 'https://python.org', 'https://rmotr.com' ]) results == ['python.org results', 'rmotr.com results'] # Named (with a dictionary): results = parallel.map(download_and_store, { 'python': 'https://python.org', 'rmotr': 'https://rmotr.com' }) results == { 'python': 'python.org results', 'rmotr': 'rmotr.com results' }
Named and optional parameters
When functions get more advanced, or invocations vary dynamically, concurrent.futures
is more limited. parallel
works with all the use cases that you might encounter. Consider the following function as an example:
def download_and_store(url, content_type, db_table='websites', log_level='info', options=None): # ... some code ... pass
Passing multiple arguments
In its simplest form, passing multiple ordered arguments works just by passing the parameters in a sequence:
results = parallel.map(download_and_store, [ ('https://python.org', 'json'), # download_and_store('https://python.org', 'json') ('https://rmotr.com', 'xml') # download_and_store('https://rmotr.com', 'xml') ])
Passing named parameters
To pass named parameters, just use a dictionary inside the parameter's sequence:
results = parallel.map(download_and_store, [ # download_and_store('https://python.org', 'json', db_table='webpages') ('https://python.org', 'json', {'db_table': 'webpages'}), # download_and_store('https://rmotr.com', 'xml', db_table='debug') ('https://rmotr.com', 'xml', {'log_level': 'debug'}) ])
Extras, to avoid repeating yourself
Instead of passing the same named parameter over and over again, pass it as an extra:
results = parallel.map(download_and_store, [ ('https://python.org', 'json'), ('https://rmotr.com', 'xml') ], extras={ 'db_table': 'temporary_webpages' })
In this example, all the URLs will be stored in temporary_webpages
.
Advanced use cases
When faced with very extreme scenarios of function invocation, you can resort back to parallel.arg
. For example, if your function receives a dictionary as parameter, parallel
by default will interpret it as a named parameter. To avoid that, just use parallel.arg
:
results = parallel.map(download_and_store, { 'python': parallel.arg('https://python.org', options={'Content-Type': 'application/json'}), 'rmotr': 'https://rmotr.com' })
For parallel.par
, you can use parallel.future
:
prices = parallel.par({ 'btc': parallel.future(get_price_bitcoin, exchange='bitstamp') 'eth': parallel.future(get_price_ether, 'bitfinex', options={'Content-Type': 'application/json'}), })
Error Handling
By default, if a function raises an exception, parallel
will propagate it, this is the expected behavior. But parallel
also includes error handling capabilities. By using the silent
argument, you can silence exceptions and access the details of the failed tasks afterwards (including the exception raised).
def add(x, y): pass results = parallel.map(add, [ (2, 3), ('a', 5), # will fail ('hello ', 'world') ], silent=True) print(results.failures) # True (there are failed tasks) r1, r2, r3 = results assert r1 == 5 assert r3 == 'hello world' assert r2 == parallel.FailedTask( params=('a', 5), ex=TypeError('unsupported operand type(s) for +: 'int' and 'str'') ) # Read below for more info on `replace_failed` assert results.replace_failed(None) == [5, None, 'hello world']
The parallel.FailedTask
model includes information of the failed tasks, including the arguments (both sequential and named) passed.
What map
, par
and other methods return is an instance of the parallel.BaseResult
class. These results include a few convenient methods:
results.failures
(Boolean):True
if there were any tasks that failed with an exception.results.failed
(List): A list of all the failed tasks.results.succeeded
(List): A list of all the successful tasks.results.replace_failed
: Receives a value to use for replacement of all the failed tasks.