Quick concurrent programming in Python

Download: github

Even though multithreading in python is quite inefficient because of GIL, it is helpful in some cases, like scraping data. While making some scrapers for my PhD work, I have created a lightweight module that allows you to quickly implement parallel worker pattern easily. The design is sketched as follow:

Distributor Worker Worker Worker Worker Input queue Output queue lock automatically automatically

A distributor has many workers (threads), one input queue and one output queue. It also has a lock which the workers can use for synchronization.

The below command initializes a 20-worker distributor:

1
distributor = ThreadDistributor(20)

To add a task to distributor, use:
distributor.add_task(task_type, input):task_type is a class which inherits from Task class. Before running, the run method of this class has to be implemented. Notice that run function must be iteratable and, hence, yields something. Make it yield None if you don’t want it to return any value to the output queue.

The following lines define a squaring task class:

1
2
3
4
class ComputationTask(Task):
   
    def run(self):
        yield self.inp**2;

What you need to do is to add tasks to distributor. The distributor will automatically distribute tasks to its workers. When there is no more task, it automatically stops all workers.

This is a simple example of computing squares of 1,000,000 integers:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from ThreadDistributor import ThreadDistributor, Task

class ComputationTask(Task):
   
    def run(self):
        yield self.inp**2;

   
class InitializeTask(Task):
   
    def run(self):
        for i in xrange(1000000):    
            self.distributor.add_task(ComputationTask, i)

        yield None
       
       
if __name__ == "__main__":
    distributor = ThreadDistributor(20)
    distributor.add_task(InitializeTask)
    distributor.run()

Other functions:

stop(self): stops all workers by putting STOP signal at the end of work queue. This means threads stop only when all works in the work queue (before STOP) are done.

results(self): yields results from the output queue.

add_worker(self, name=None): add one more worker to ThreadDistributor instance. name is the name of the new worker.