domenica 4 marzo 2012

Non concurrent HTTP requests on Cluster with Infinispan


On Server-Client Architecture there are two type of call, one that only retrieve information from server (Read-Only) and one that can also change server's status (R/W).
Tipically, the first one can run in concurrency, also with the same parameters, without problem (also maybe a performance increase by cache system).
The second one changes the internal server's status, create file, update database and so on. And, when several calls with same parametres reach the server simoultaneusly, problems often can occur.
When the client is a browser this problem can be mitigated with javascript, disabling button once clicked for example, but cannot be eliminated. Worse if your client isn't a browser.
There is my solution, adopted on a cluster of four machine using JBoss Infinispan.

In my webapp I use Jersey + Guice to serve request, this is a simple class that manage POST request:


      @POST  
      @Path("mpay")  
      @Produces(MEDIATYPE_WEBPAGEUTF8)  
      public Object landMPay(@QueryParam("TransactionID") String transactionId, @QueryParam("ResultCode") String resultcode) {  
           logger.debug("Get user from token, {}.", transactionId);  
           .
           .
           .


Now I want only one concurrent call with same TransactionId parameter. I created two annotation, one for Method (@UniqueCallOnCluster) and ane for Parameter (@KeyParameter).
I use first annotation as:


      @POST  
      @Path("mpay")  
      @UniqueCallOnCluster
      @Produces(MEDIATYPE_WEBPAGEUTF8)  
      public Object landMPay(@QueryParam("TransactionID") String transactionId, @QueryParam("ResultCode") String resultcode) {  
           logger.debug("Get user from token, {}.", transactionId);  
           .
           .
           .


to permit only one concurrent call with all same parameters. But I want to permit only one concurrent call with same TransactionId parameter, ignoring ResultCode values:


      @POST  
      @Path("mpay")  
      @UniqueCallOnCluster
      @Produces(MEDIATYPE_WEBPAGEUTF8)  
      public Object landMPay(@KeyParameter @QueryParam("TransactionID") String transactionId, @QueryParam("ResultCode") String resultcode) {  
           logger.debug("Get user from token, {}.", transactionId);  
           .
           .
           .


The interceptor that manage this annotations use Infinispan to tell other server that him is starting to serve this call with this parameter/s:


 
 import java.lang.annotation.Annotation;  
 import java.util.ArrayList;  
 import java.util.List;  
 import javax.transaction.TransactionManager;  
 import javax.ws.rs.core.Response;  
 import javax.ws.rs.core.Response.Status;  
 import org.aopalliance.intercept.MethodInterceptor;  
 import org.aopalliance.intercept.MethodInvocation;  
 import com.google.common.base.Joiner;  
 import com.google.inject.Inject;  


 public class UniqueCallOnClusterInterceptor implements MethodInterceptor {  


      @Inject  
      private Constants constants;  


      @Inject  
      private KeyCallOnClusterService keyCallOnClusterService;  


      public Object invoke(MethodInvocation invocation) throws Throwable {  

  
           String classname = invocation.getMethod().getDeclaringClass().getSimpleName();  
           String methodName = invocation.getMethod().getName();  
           String key = classname + "_" + methodName + "_" + extractParameterValue(invocation);  
           TransactionManager tm = keyCallOnClusterService.getTransactionManager();  

      
           try {  
                if (tm != null) {  
                     tm.begin();  
                     boolean success = keyCallOnClusterService.lock(key);  
                     if (!success) {  
                          return Response.status(Status.CONFLICT).entity("Another call with same parameter is in progress.").build();  
                     }  
                }  
                String runningServer = (String) keyCallOnClusterService.get(key);  
                if (runningServer != null) {  
                     return Response.status(Status.CONFLICT).entity("Another call with same parameter is in progress.").build();  
                }  
                keyCallOnClusterService.put(key, constants.getHostname());  
           } finally {  
                if (tm != null)  
                     tm.commit();  
           }  


           try {  
                return invocation.proceed();  
           } finally {  
                keyCallOnClusterService.remove(key);  
           }  
      }  


      public String extractParameterValue(MethodInvocation invocation) {  
           List<String> list = new ArrayList<String>();  
           Annotation[][] parameterAnnotations = invocation.getMethod().getParameterAnnotations();  
           for (int i = 0; i < parameterAnnotations.length; i++) {  
                Annotation[] annotations = parameterAnnotations[i];  
                for (int j = 0; j < annotations.length; j++) {  
                     Annotation annotation = annotations[j];  
                     if (annotation instanceof KeyParameter) {  
                          list.add(String.valueOf(invocation.getArguments()[i]));  
                          break;  
                     }  
                }  
           }  
           if (list.size() == 0)  
                return Joiner.on(", ").useForNull("null").join(invocation.getArguments());  
           return Joiner.on(", ").useForNull("null").join(list);  
      }  
 }  

The KeyCallOnClusterService class is a facility for the Infinispan's Cache Object.
Here the configuration of Infinispan:


 <?xml version="1.0" encoding="UTF-8"?>  
 <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
      xsi:schemaLocation="urn:infinispan:config:5.1 http://www.infinispan.org/schemas/infinispan-config-5.1.xsd"  
      xmlns="urn:infinispan:config:5.1">  
      <global>  
           <globalJmxStatistics enabled="true"  
                allowDuplicateDomains="true"></globalJmxStatistics>  
           <transport clusterName="cluster"  
                distributedSyncTimeout="3000">  
                <properties>  
                     <property name="configurationFile" value="infinispan-udp.cfg.xml" />  
                </properties>  
           </transport>  
           <shutdown hookBehavior="DONT_REGISTER" />  
      </global>  
      <namedCache name="current-call-cache">  
           <jmxStatistics enabled="true" />  
           <transaction transactionMode="TRANSACTIONAL" lockingMode="PESSIMISTIC"  
                transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"></transaction>  
           <locking isolationLevel="REPEATABLE_READ" useLockStriping="false" />  
           <eviction strategy="LRU" maxEntries="600" />  
           <!-- Due minuti -->  
           <expiration lifespan="120000" />  
           <storeAsBinary enabled="true" />  
           <clustering mode="replication">  
                <stateTransfer timeout="5000" fetchInMemoryState="true" />  
                <sync replTimeout="5000" />  
           </clustering>  
      </namedCache>  
 </infinispan>  


The result is that when arrive concurrent request with same TransactionId, the first is served regularly, the other get a HTTP 409 Conflict.
You can use ab and wireshark to test this configuration.


Tools:
Jersey 1.6 + Guice 2.0
Google Guava 11.0.1
JBoss Infinispan 5.1.1

Good clustering at all!



Nessun commento: