Assertion error and Theano function
Posted on Пт 03 Август 2018 in programming notes
Introduction¶
Theano is used in one of the projects I'm working on. The project is a web server and it accepts requests and process them using Theano function. When the server tries to process two simultaneous requests, it's failed with the error:
File "theano/scan_module/scan_perform.pyx", line 397, in theano.scan_module.scan_perform.perform (/home/dinara/work_projects/ds_voxrec_api/.theano/compiledir_Linux-4.4--generic-x86_64-with- Ubuntu-16.04-xenial-x86_64-3.5.2-64/scan_perform/mod.cpp:4490) AssertionError: The compute map of output 0 should contain 1 at the end of execution, not 0.
I start to google it and quickly find this issue on Theano github: Make Theano function threads safe (was Assertion failure in lazy_rec_eval).
Okay, now I see that Theano isn't thread safe, but anyway the server must work somehow.
Solution¶
The issue topicstarter commented:
I currently solved this problem by using a multiprocessing.pool.ThreadPool with only one worker to ensure that all evaluations and the compilation of the function occur on the same thread.
I solve my problem in a different way. There is no request for parallelism and sequential process is approved.
Here is code snippets of what I had at start. First, it's a class uses Theano function:
import theano
import theano.tensor as T
class BasicModel:
def __init__(self):
x = T.imatrix('x')
net, _ = models.load(settings.MODEL_FILE, 1, x)
print("Building model...")
self.predict = theano.function(inputs=[x], outputs=net.y)
...
print("Initialization complete...")
def do_something(self, text):
...
self.predict(basic_np_array.T)
...
return info
Next, it's a view:
class BasicView(APIView):
def post(self, request):
data = request.data.dict()
output = ''
model = BasicModel()
if 'text' in data:
output = model.do_something(data['text'])
return Response({'text': output})
Solution is pretty simple. I decide to use Thread
, Queue
and Semaphore
from threading
. I use Semaphore
to protect the part of the code that must be used once at the time and Queue
to get results back from Thread
. Let's see the final variant of the class:
import theano
import theano.tensor as T
from threading import Semaphore
sem = Semaphore()
class BasicModel:
def __init__(self):
x = T.imatrix('x')
net, _ = models.load(settings.MODEL_FILE, 1, x)
print("Building model...")
self.predict = theano.function(inputs=[x], outputs=net.y)
...
print("Initialization complete...")
# parameter q is a Queue where punctuator puts the result text from current thread
def do_something(self, text, q):
# this is the threads unsafe part of the code
sem.acquire()
...
self.predict(basic_np_array.T)
...
# now let next thread to execute it
sem.release()
# do not forget to save result
q.put(info)
And the view:
from queue import Queue
from threading import Thread
q = Queue(maxsize=0)
class BasicView(APIView):
def post(self, request):
data = request.data.dict()
output = ''
model = BasicModel()
if 'text' in data:
# start a thread and put the method in it
t = Thread(target=model.do_something, args=(data['text'], q), daemon=True)
t.start()
t.join()
# after thread is finished get the result form a queue
output = q.get()
return Response({'text': output})
It's a lazy solution and it isn't tested on live system yet. But it works fine on test system and errors are gone now. #maketheanothreadssafeagain