Assertion error and Theano function

Posted on Пт 03 Август 2018 in programming notes

Introduction

Theano is used in one of the projects I'm working on. The project is a web server and it accepts requests and process them using Theano function. When the server tries to process two simultaneous requests, it's failed with the error:

File "theano/scan_module/scan_perform.pyx", line 397, in theano.scan_module.scan_perform.perform (/home/dinara/work_projects/ds_voxrec_api/.theano/compiledir_Linux-4.4--generic-x86_64-with- Ubuntu-16.04-xenial-x86_64-3.5.2-64/scan_perform/mod.cpp:4490) AssertionError: The compute map of output 0 should contain 1 at the end of execution, not 0.

I start to google it and quickly find this issue on Theano github: Make Theano function threads safe (was Assertion failure in lazy_rec_eval).

Okay, now I see that Theano isn't thread safe, but anyway the server must work somehow.

Solution

The issue topicstarter commented:

I currently solved this problem by using a multiprocessing.pool.ThreadPool with only one worker to ensure that all evaluations and the compilation of the function occur on the same thread.

I solve my problem in a different way. There is no request for parallelism and sequential process is approved.

Here is code snippets of what I had at start. First, it's a class uses Theano function:

import theano
import theano.tensor as T


class BasicModel:

    def __init__(self):
        x = T.imatrix('x')
        net, _ = models.load(settings.MODEL_FILE, 1, x)

        print("Building model...")
        self.predict = theano.function(inputs=[x], outputs=net.y)
        ...
        print("Initialization complete...")

    def do_something(self, text):
        ...
        self.predict(basic_np_array.T)
        ...
        return info

Next, it's a view:

class BasicView(APIView):

    def post(self, request):
        data = request.data.dict()
        output = ''
        model = BasicModel()

        if 'text' in data:
            output = model.do_something(data['text'])

        return Response({'text': output})

Solution is pretty simple. I decide to use Thread, Queue and Semaphore from threading. I use Semaphore to protect the part of the code that must be used once at the time and Queue to get results back from Thread. Let's see the final variant of the class:

import theano
import theano.tensor as T
from threading import Semaphore

sem = Semaphore()


class BasicModel:

    def __init__(self):
        x = T.imatrix('x')
        net, _ = models.load(settings.MODEL_FILE, 1, x)

        print("Building model...")
        self.predict = theano.function(inputs=[x], outputs=net.y)
        ...
        print("Initialization complete...")

    # parameter q is a Queue where punctuator puts the result text from current thread
    def do_something(self, text, q):
        # this is the threads unsafe part of the code
        sem.acquire()
        ...
        self.predict(basic_np_array.T)
        ...
        # now let next thread to execute it
        sem.release()
        # do not forget to save result
        q.put(info)

And the view:

from queue import Queue
from threading import Thread

q = Queue(maxsize=0)


class BasicView(APIView):

    def post(self, request):
        data = request.data.dict()
        output = ''
        model = BasicModel()

        if 'text' in data:
            # start a thread and put the method in it
            t = Thread(target=model.do_something, args=(data['text'], q), daemon=True)
            t.start()
            t.join()
            # after thread is finished get the result form a queue
            output = q.get()

        return Response({'text': output})

It's a lazy solution and it isn't tested on live system yet. But it works fine on test system and errors are gone now. #maketheanothreadssafeagain