App Engine Task Queue


December 5, 2013 Maithilish

11.3. Task Queue

Once we have Symbols in the cache, all that it takes to persist is to retrieve them, one by one, from the cache and persist with a call to JDO. This simple task seems to be achievable through RPC, but it will not work as intended on App Engine. Let’s understand the limitations imposed by the App Engine.
RPC has to be completed within 60 seconds, and when there are few symbols, call may finish within the deadline. But as their number increases, persistence being a resource hungry operation, App Engine begins to throw DeadlineExceededException. Next, we try to overcome this issue by invoking separate RPC for each Symbol. While this overcomes the deadline issue, it flogs the App Engine with multiple simultaneous RPC, which occasionally results in persistence error.
For long running or resource hungry process, App Engine provides Task Queue service, which allows us to organize the work into small, discrete units called tasks, and add them to task queue for later execution. These tasks are initiated by the user request, but they run outside that request and not bound by the usual request deadline.
Before we get into task queues, we have to make some preparations to access the App Engine datastore.
Jdo Config

In a earlier chapter, we used jdoconfig.xml to configure a database connection to HSQLDB, and we need to modify the configuration to access App Engine datastore as follows:

META-INF/jdoconfig.xml

        <!-- GAE Datastore -->

        <persistence-manager-factory name="datastore">
                <property name="javax.jdo.PersistenceManagerFactoryClass"
                        value="org.datanucleus.api.jdo.JDOPersistenceManagerFactory" />
                <property name="javax.jdo.option.ConnectionURL" value="appengine" />

                <property name="javax.jdo.option.NontransactionalRead" value="true" />
                <property name="javax.jdo.option.NontransactionalWrite"
                        value="true" />
                <property name="javax.jdo.option.RetainValues" value="true" />
                <property name="datanucleus.appengine.autoCreateDatastoreTxns"
                        value="true" />
                <property name="datanucleus.cache.level2.type" value="soft" />
                <property name="datanucleus.metadata.validate" value="false" />
        </persistence-manager-factory>

In case, this configuration is not available in jdoconfig.xml, then App Engine throws, a rather confusing, error – NoClassDefFoundError. Same error also props up when JDO version is not set to v2 in App Engine Settings window. So, you know what to look for when you encounter this exception.
com.google.apphosting.utils.jetty.JettyLogger warn
WARNING: Error for /fins/gaeInsertTask
java.lang.NoClassDefFoundError: Could not initialize class in.fins.server.dao.jdo.PMF
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:186)
        ....
Identity

Earlier in domain classes, DataGroup and Data, we used long type for Id, which is also the primary key. These classes are also child objects of others because of one-to-many relationship, and in such cases when primary key is of type long, App Engine JDO throws JDOFatalUserException.

javax.jdo.JDOFatalUserException: Error in meta-data for field 
  in.fins.shared.DataGroup.id : Cannot have a primary key of type long and 
  be a child object (owning field is in.fins.shared.Symbol.dataGroups").
To overcome this error, we have to use type String for primary key id in in.fins.shared.DataGroup and in.fins.shared.Data classes. Then, in mapping file package.jdo, we have to map the field into App Engine specific type gae.encoded-pk using extension feature of JDO.
in.fins.shared/package.jdo

    <class name="DataGroup">
            <field name="id" primary-key="true" value-strategy="identity">
                    <extension vendor-name="datanucleus" key="gae.encoded-pk"
                            value="true" />
            </field>                    
            <field name="category" />
    ....
                        
    <class name="Data">
            <field name="id" primary-key="true" value-strategy="identity">
                    <extension vendor-name="datanucleus" key="gae.encoded-pk"
                            value="true" />
            </field>
            <field name="date" /        
    ....

This solves the App Engine specific error, but in case you rollback to regular app server, then it complains that type String is not allowed for primary key when value-strategy is identity. So, when you want to move the app to regular app server, change the value-strategy attribute from identity to uuid-string (for HSQLDB) or auid (for MySQL), which should resolve the issue.

ID as String

It is a good practice to use id field of long type in domain classes and use strategies like Join Table to overcome App Engine related issues. But, we intentionally stick with the present domain model to highlight the issues with id of String type.
With these preparations, we are ready to use App Engine’s datastore to persist our domain classes.
DataStorePanel

DataStorePanel is a custom widget with just one button – Persist Symbols.

DataStorePanel
Figure 11.5. DataStorePanel


Unlike UploadPanel and CachePanel, DataStorePanel uses GWT RPC instead of FormPanel and form submission.
Only interesting thing in this widget is Click handler, which makes RPC call to persistSymbols() method of SymbolService.
in.fins.client.widget/DataStorePanel.java

....
    Handler("persistButton")
    void onInsertButtonClick(ClickEvent event) {
            ISymbolServiceAsync dataStoreService = GWT.create(ISymbolService.class);
            dataStoreService.persistSymbols("insert-queue",
                new AsyncCallback<String>() {

                 @Override
                 public void onSuccess(String result) {
                      StatusEvent se = new StatusEvent(result);
                      EventBus.get().fireEvent(se);
                 }   

                 @Override
                 public void onFailure(Throwable caught) {
                      StatusEvent se = new StatusEvent(caught.getMessage());
                      EventBus.get().fireEvent(se);
                 }   
          }); 
   }

String insert-queue argument to method persistSymbols(), tells server to use that queue to persist the symbols.
SymbolService RPC

Service method, persistSymbols() is a new method in existing SymbolService. Its definition is added to ISymbolService and ISymbolServiceAsync interfaces in the client and its implementation to SymbolService in the server.

in.fins.client.rpc/ISymbolService.java

@RemoteServiceRelativePath("symbolService")
public interface ISymbolService extends RemoteService {

        ....

        public String persistSymbols(String qName);
}

in.fins.client.rpc/ISymbolServiceAsync.java

public interface ISymbolServiceAsync {

        ....                     

        void persistSymbols(String qName, AsyncCallback<String> callback);        
}

in.fins.server.service/SymbolService.java

     @Override
     public String persistSymbols(String qName) {

          // persist symbols through ITask interface
     }

Task Interface

In app servers, persistSymbols() method can retrieve Symbols from cache one by one and then using DAO, persist them to datastore. But, in App Engine, same doesn’t work properly. To handle these differences, interaction between SymbolService.persistSymbols() method and underlying datastore is done through an Interface, ITask as show in the next figure.

Task interface
Figure 11.6. Task interface


ITask interface contains a single method execute(), which is implemented by GaePersistTask and PersistTask classes. SymbolService.persistSymbols() method, based on the server type, instantiates either of these two classes and assigns it to ITask field. Then, it retrieves the list of symbol names from the cache and calls ITask.execute() method for each symbol name.
Let’s have a look at PersistTask, which is used by regular app servers.
in.fins.server.task/PersistTask.java

public class PersistTask implements ITask {

        private DaoFactory daoFactory;

        @Override
        public void execute(Map<String, Object> parameters) throws Exception {
                String ormName = (String) parameters.get("ormName");
                ORM orm = DaoFactory.getOrmType(ormName);
                daoFactory = DaoFactory.getDaoFactory(orm);
                String symbolName = (String) parameters.get("symbolName");
                ICache cache = Utils.createInstance(ICache.class,
                                "in.fins.server.cache.SimpleCache");
                cache.createCache();
                Symbol symbol = (Symbol) cache.get(symbolName);
                persistSymbol(symbol);
        }

        public void persistSymbol(Symbol symbol) throws Exception {
                try {
                        IDao<Symbol> symbolDao = daoFactory.getDao(Symbol.class);
                        symbolDao.insert(symbol);
                } catch (Exception e) {
                        log.warning(e.getMessage());
                        throw e;
                }
        }
}

SymbolService.persistSymbol() calls execute() method passing symbol name and Orm type through parameters and execute() method, using these parameters, obtains symbol from the cache and also appropriate DaoFactory. From DaoFactory, it gets SymbolDao and calls insert method to persist the symbol.
But things get slightly complicated when Fins run on App Engine, which uses GaePersistTask.
in.fins.server.task/PersistTask.java

public class GaePersistTask extends HttpServlet implements ITask {

        private DaoFactory daoFactory;

        @Override
        public void execute(Map<String, Object> parameters) throws Exception {
                String qName = (String) parameters.get("queueName");
                String symbolName = (String) parameters.get("symbolName");

              // add a task to Task Queue to 
              // persist a Symbol

        }

Here execute() method obtains symbol name and queue name from parameters and then adds a task to App Engine task queue. Later, App Engine dequeues the tasks one by one, which persist the symbol.
It is a two step process, first we have to define a Task and then, add them to Queue. Let’s see how it is done in detail.
App Engine Tasks

App Engine Task is simply a regular servlet, and GaePersistTask extends HttpServlet so that it acts as a App Engine Task. Please note that we combined ITask and HttpServlet into a single class, GaePersistTask, just reduce a file, but it may be split into separate classes one that implements ITask and another one to act task servlet.

When App Engine dequeues the task, it invokes the servlet and calls either HttpServlet.doPost() or HttpServlet.doGet() method of the task. Place persistence code in doPost() method as shown below.
in.fins.server.task/GaePersistTask.java

        @Override
        protected void doPost(HttpServletRequest req, HttpServletResponse resp)
                        throws ServletException, IOException {
                try {
                        ICache cache = Utils.createInstance(ICache.class,
                                        "in.fins.server.cache.GaeCache");
                        String symbolName = req.getParameter("symbolName");
                        cache.createCache();
                        Symbol symbol = (Symbol) cache.get(symbolName);
                        persistSymbol(symbol);
                } catch (Exception e) {
                        log.warning(e.getMessage());
                }
        }

From HttpServletRequest, it obtains symbol name and retrieves the corresponding Symbol from the cache and then, it persists the Symbol using DaoFactory and SymbolDao as explained earlier in case of PersistTask.
Next, we have to map the task servlet to an URL, like we do in case of any other regular servlet, adding a servlet mapping to the descriptor file web.xml.
war/WEB-INF/web.xml

 <servlet>
         <servlet-name>gaePersistTask</servlet-name>
         <servlet-class>in.fins.server.task.GaePersistTask</servlet-class>
 </servlet>
 <servlet-mapping>
         <servlet-name>gaePersistTask</servlet-name>
         <url-pattern>/fins/gaePersistTask</url-pattern>
 </servlet-mapping>

App Engine Queue

Task Queue API supports Push queues and Pull queues. Push queues process tasks, based on the processing rate configured in the queue definition, while Pull queues provides greater control over the task processing and also allows tasks to be consumed by code external to the application. In Fins, we use push queues as they are simple to use.

Queue definition file, queue.xml, defines queues. For convenience, App Engine provides a default push queue and inserts tasks, which are without a queue name, into the default queue. In addition to default queue, we may configure additional named queues through queue definition file queue.xml.
war/WEB-INF/queue.xml

<queue-entries>
  <queue>
    <name>insert-queue</name>
    <rate>1/s</rate>
    <bucket-size>10</bucket-size>
  </queue>
</queue-entries>

The above definition, configures push queue named insert-queue with bucket-size of 10 and rate of 1 task per second. Task queue uses Token bucket algorithm to control the rate of task execution. Rate directive tells that App Engine should refill the bucket with one token per second and bucket can hold a max 10 tokens.
App Engine executes push tasks by calling application-specific URL, which means we have to provide the URL of task when we add them to task queue. Following code adds GaePersistTask to insert-queue.
in.fins.server.task/PersistTask.java

    
    TaskOptions task = withUrl("/fins/gaePersistTask").param("symbolName",
symbolName).method(Method.POST);
    Queue queue = QueueFactory.getQueue(qName);
    queue.add(task);

TaskOptions.Builder.withUrl() method is used to create a task and set various options. Arguments to this builder method indicate that, URL of the task is /fins/gaePersistTask and symbol name is a request parameter and method is POST. Then, task is added to queue. App Engine dequeues the tasks one by one by calling the URL /fins/gaePersistTask which invokes the GaePersistTask and calls it doPost() method with symbol name as the request parameter.
Task instances

Now, question is how many instances of GaePersistTask gets created when we persist say, three symbols and the answer is four instances. First, we create an instance of GaePersist task in SymbolService.persistSymbols() method and call its execute method which adds three tasks to task queue. For each task, App engine invokes its URL, which creates an GaePersistTask servlet. In normal app server, only one instance of servlet gets created, but it is important to note that here tasks are executed on a cloud platform where each request may get executed on some instance of app engine, on some server, in some data center and somewhere in the world. App Engine instance in Data Center X creates a GaePersistTask servlet to execute the first task and another servlet instance in Data Center Y may handle the second task and so on.

Following figure helps to clarify what plays out in the cloud, when we persist three Symbols.
Task Queue
Figure 11.7. Task Queue


DataStore Insert

To persist symbols, IDao defines a new method insert() which is implemented by in.fins.server.dao.jdo.SymbolDao and in.fins.server.dao.hibernate.SymbolDao. We have not taken efforts to implement it for MyBatis, which is slightly complicated as compared to the other two.

While persisting the symbol, insert() method first checks whether a symbol with the same name already existing in datastore and if so, it updates the existing symbol else it insert the symbol.

Manage TaskQueue and DataStore

In Development Mode, App Engine provides Development Console – http://localhost:8888/_ah/admin which is useful to manage DataStore and Queue. Development console has options to view or delete datastore entities. It also provides stats about the default and named task queues and an option to purge pending tasks.
App Engine, in development mode, uses a binary file war/WEB-INF/appengine-generated/local_db.bin as its local datastore. In case you want to zap the entire datastore, then hack is to delete this file and restart the App Engine.
Next section explains the use of App Engine Backends to free the frontend exclusively for user requests.