jmeter-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pmoua...@apache.org
Subject svn commit: r1645532 - in /jmeter/trunk: src/components/org/apache/jmeter/visualizers/backend/ src/components/org/apache/jmeter/visualizers/backend/graphite/ xdocs/
Date Sun, 14 Dec 2014 21:57:17 GMT
Author: pmouawad
Date: Sun Dec 14 21:57:17 2014
New Revision: 1645532

URL: http://svn.apache.org/r1645532
Log:
Bug 57321 - BackendListener reports partial results in master-slave configuration (nightly
build r1642603) 
Bugzilla Id: 57321

Modified:
    jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java
    jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/SamplerMetric.java
    jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/graphite/GraphiteBackendListenerClient.java
    jmeter/trunk/xdocs/changes.xml

Modified: jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java
URL: http://svn.apache.org/viewvc/jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java?rev=1645532&r1=1645531&r2=1645532&view=diff
==============================================================================
--- jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java
(original)
+++ jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/BackendListener.java
Sun Dec 14 21:57:17 2014
@@ -19,11 +19,13 @@
 package org.apache.jmeter.visualizers.backend;
 
 import java.io.Serializable;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
 
@@ -50,6 +52,18 @@ public class BackendListener extends Abs
     /**
      * 
      */
+    private static final class ListenerClientData {
+        private BackendListenerClient client;
+        private BlockingQueue<SampleResult> queue;
+        private AtomicLong queueWaits; // how many times we had to wait to queue a SampleResult
       
+        private AtomicLong queueWaitTime; // how long we had to wait (nanoSeconds)
+        private int instanceCount; // number of active tests
+        private CountDownLatch latch;
+    }
+    
+    /**
+     * 
+     */
     private static final long serialVersionUID = 8184103677832024335L;
 
     private static final Logger LOGGER = LoggingManager.getLoggerForClass();
@@ -63,6 +77,11 @@ public class BackendListener extends Abs
      * Queue size
      */
     public static final String QUEUE_SIZE = "QUEUE_SIZE";
+
+    /**
+     * Lock used to protect accumulators update + instanceCount update
+     */
+    private static final Object LOCK = new Object();
     
     /**
      * Property key representing the arguments for the BackendListenerClient.
@@ -75,33 +94,33 @@ public class BackendListener extends Abs
      */
     private Class<?> clientClass;
 
-    /**
-     * If true, the BackendListenerClient class implements teardownTest.
-     * Created by testStarted; copied to cloned instances.
-     */
-    private boolean isToBeRegistered;
-
-    /**
-     * The BackendListenerClient instance 
-     */
-    private transient BackendListenerClient backendListenerClient = null;
-
-
     public static final String DEFAULT_QUEUE_SIZE = "5000";
     
-    private transient BlockingQueue<SampleResult> queue; // created by server in readResolve
method
-    
-    private AtomicLong queueWaits; // how many times we had to wait to queue a sample
-    
-    private AtomicLong queueWaitTime; // how long we had to wait (nanoSeconds)
-
     // Create unique object as marker for end of queue
     private transient static final SampleResult FINAL_SAMPLE_RESULT = new SampleResult();
+    
+    // Name of the test element. Set up by testStarted().
+    private transient String myName;
+    
+    // Holds listenerClientData for this test element
+    private transient ListenerClientData listenerClientData;
+        
+    /*
+     * This is needed for distributed testing where there is 1 instance
+     * per server. But we need the total to be shared.
+     */
+    //@GuardedBy("LOCK") - needed to ensure consistency between this and instanceCount
+    private static final Map<String, ListenerClientData> queuesByTestElementName =

+            new ConcurrentHashMap<String, ListenerClientData>();
 
     /**
      * Create a BackendListener.
      */
     public BackendListener() {
+        synchronized (LOCK) {
+            queuesByTestElementName.clear();
+        }
+
         setArguments(new Arguments());    
     }
 
@@ -113,7 +132,6 @@ public class BackendListener extends Abs
     public Object clone() {
         BackendListener clone = (BackendListener) super.clone();
         clone.clientClass = this.clientClass;
-        clone.isToBeRegistered = this.isToBeRegistered;
         return clone;
     }
 
@@ -121,9 +139,6 @@ public class BackendListener extends Abs
         String name = getClassname().trim();
         try {
             clientClass = Class.forName(name, false, Thread.currentThread().getContextClassLoader());
-            Method method = clientClass.getMethod("teardownTest", new Class[]{BackendListenerContext.class});
-            isToBeRegistered = !method.getDeclaringClass().equals(AbstractBackendListenerClient.class);
-            LOGGER.info("Created class: " + name + ". Uses teardownTest: " + isToBeRegistered);
         } catch (Exception e) {
             LOGGER.error(whoAmI() + "\tException initialising: " + name, e);
         }   
@@ -144,51 +159,6 @@ public class BackendListener extends Abs
         return sb.toString();
     }
 
-    // TestStateListener implementation
-    /**
-     *  Implements TestStateListener.testStarted() 
-     **/
-    @Override
-    public void testStarted() {
-        testStarted("");
-    }
-
-    /** Implements TestStateListener.testStarted(String) 
-     **/
-    @Override
-    public void testStarted(String host) {
-        if(LOGGER.isDebugEnabled()){
-            LOGGER.debug(whoAmI() + "\ttestStarted(" + host + ")");
-        }
-        int queueSize;
-        final String size = getQueueSize();
-        try {
-            queueSize = Integer.parseInt(size);
-        } catch (NumberFormatException nfe) {
-            LOGGER.warn("Invalid queue size '" + size + "' defaulting to " + DEFAULT_QUEUE_SIZE);
-            queueSize = Integer.parseInt(DEFAULT_QUEUE_SIZE);            
-        }
-        queue = new ArrayBlockingQueue<SampleResult>(queueSize); 
-        initClass();
-        queueWaits = new AtomicLong(0L);
-        queueWaitTime = new AtomicLong(0L);
-        LOGGER.info(getName()+":Starting worker with class:"+clientClass +" and queue capacity:"+getQueueSize());
-
-        backendListenerClient = createBackendListenerClientImpl(clientClass);
-        BackendListenerContext context = new BackendListenerContext((Arguments)getArguments().clone());
-        
-        try {
-            backendListenerClient.setupTest(context);
-        } catch (Exception e) {
-            throw new java.lang.IllegalStateException("Failed calling setupTest", e);
-        }
-
-        Worker worker = new Worker(backendListenerClient, (Arguments) getArguments().clone(),
queue);
-        worker.setDaemon(true);
-        worker.start();
-        LOGGER.info(getName()+": Started  worker with class:"+clientClass);
-        
-    }
 
     /* (non-Javadoc)
      * @see org.apache.jmeter.samplers.SampleListener#sampleOccurred(org.apache.jmeter.samplers.SampleEvent)
@@ -198,30 +168,31 @@ public class BackendListener extends Abs
         Arguments args = getArguments();
         BackendListenerContext context = new BackendListenerContext(args);
 
-        SampleResult sr = backendListenerClient.createSampleResult(context, event.getResult());
+        SampleResult sr = listenerClientData.client.createSampleResult(context, event.getResult());
         try {
-            if (!queue.offer(sr)){ // we failed to add the element first time
-                queueWaits.incrementAndGet();
+            if (!listenerClientData.queue.offer(sr)){ // we failed to add the element first
time
+                listenerClientData.queueWaits.incrementAndGet();
                 long t1 = System.nanoTime();
-                queue.put(sr);
+                listenerClientData.queue.put(sr);
                 long t2 = System.nanoTime();
-                queueWaitTime.addAndGet(t2-t1);
+                listenerClientData.queueWaitTime.addAndGet(t2-t1);
             }
         } catch (Exception err) {
             LOGGER.error("sampleOccurred, failed to queue the sample", err);
         }
     }
     
+    
     /**
      * Thread that dequeus data from queue to send it to {@link BackendListenerClient}
      */
     private static final class Worker extends Thread {
         
-        private final BlockingQueue<SampleResult> queue;
+        private final ListenerClientData listenerClientData;
         private final BackendListenerContext context;
         private final BackendListenerClient backendListenerClient;
-        private Worker(BackendListenerClient backendListenerClient, Arguments arguments,
BlockingQueue<SampleResult> q){
-            queue = q;
+        private Worker(BackendListenerClient backendListenerClient, Arguments arguments,
ListenerClientData listenerClientData){
+            this.listenerClientData = listenerClientData;
             // Allow BackendListenerClient implementations to get access to test element
name
             arguments.addArgument(TestElement.NAME, getName()); 
             context = new BackendListenerContext(arguments);
@@ -232,50 +203,55 @@ public class BackendListener extends Abs
         @Override
         public void run() {
             boolean isDebugEnabled = LOGGER.isDebugEnabled();
-            List<SampleResult> sampleResults = new ArrayList<SampleResult>(queue.size());
+            List<SampleResult> sampleResults = new ArrayList<SampleResult>(listenerClientData.queue.size());
             try {
-                boolean endOfLoop = false;
-                while (!endOfLoop) {
-                    if(isDebugEnabled) {
-                        LOGGER.debug("Thread:"+Thread.currentThread().getName()+" taking
SampleResult from queue:"+queue.size());
-                    }
-                    SampleResult sampleResult = queue.take();
-                    if(isDebugEnabled) {
-                        LOGGER.debug("Thread:"+Thread.currentThread().getName()+" took SampleResult:"+sampleResult+",
isFinal:" + (sampleResult==FINAL_SAMPLE_RESULT));
-                    }
-                    while (!(endOfLoop = (sampleResult == FINAL_SAMPLE_RESULT)) &&
sampleResult != null ) { // try to process as many as possible
-                        sampleResults.add(sampleResult);
+                try {
+
+                    boolean endOfLoop = false;
+                    while (!endOfLoop) {
                         if(isDebugEnabled) {
-                            LOGGER.debug("Thread:"+Thread.currentThread().getName()+" polling
from queue:"+queue.size());
+                            LOGGER.debug("Thread:"+Thread.currentThread().getName()+" taking
SampleResult from queue:"+listenerClientData.queue.size());
                         }
-                        sampleResult = queue.poll(); // returns null if nothing on queue
currently
+                        SampleResult sampleResult = listenerClientData.queue.take();
                         if(isDebugEnabled) {
-                            LOGGER.debug("Thread:"+Thread.currentThread().getName()+" took
from queue:"+sampleResult+", isFinal:" + (sampleResult==FINAL_SAMPLE_RESULT));
+                            LOGGER.debug("Thread:"+Thread.currentThread().getName()+" took
SampleResult:"+sampleResult+", isFinal:" + (sampleResult==FINAL_SAMPLE_RESULT));
+                        }
+                        while (!(endOfLoop = (sampleResult == FINAL_SAMPLE_RESULT)) &&
sampleResult != null ) { // try to process as many as possible
+                            sampleResults.add(sampleResult);
+                            if(isDebugEnabled) {
+                                LOGGER.debug("Thread:"+Thread.currentThread().getName()+"
polling from queue:"+listenerClientData.queue.size());
+                            }
+                            sampleResult = listenerClientData.queue.poll(); // returns null
if nothing on queue currently
+                            if(isDebugEnabled) {
+                                LOGGER.debug("Thread:"+Thread.currentThread().getName()+"
took from queue:"+sampleResult+", isFinal:" + (sampleResult==FINAL_SAMPLE_RESULT));
+                            }
+                        }
+                        if(isDebugEnabled) {
+                            LOGGER.debug("Thread:"+Thread.currentThread().getName()+
+                                    " exiting with FINAL EVENT:"+(sampleResult == FINAL_SAMPLE_RESULT)
+                                    +", null:" + (sampleResult==null));
+                        }
+                        sendToListener(backendListenerClient, context, sampleResults);
+                        if(!endOfLoop) {
+                            LockSupport.parkNanos(100);
                         }
                     }
-                    if(isDebugEnabled) {
-                        LOGGER.debug("Thread:"+Thread.currentThread().getName()+
-                                " exiting with FINAL EVENT:"+(sampleResult == FINAL_SAMPLE_RESULT)
-                                +", null:" + (sampleResult==null));
-                    }
-                    sendToListener(backendListenerClient, context, sampleResults);
-                    if(!endOfLoop) {
-                        LockSupport.parkNanos(100);
-                    }
+                } catch (InterruptedException e) {
+                    // NOOP
                 }
-            } catch (InterruptedException e) {
-                // NOOP
+                // We may have been interrupted
+                sendToListener(backendListenerClient, context, sampleResults);
+                LOGGER.info("Worker ended");
+            } finally {
+                listenerClientData.latch.countDown();
             }
-            // We may have been interrupted
-            sendToListener(backendListenerClient, context, sampleResults);
-            LOGGER.info("Worker ended");
         }
     }
     
 
     /**
      * Send sampleResults to {@link BackendListenerClient}
-     * @param backendListenerClient 
+     * @param backendListenerClient {@link BackendListenerClient}
      * @param context {@link BackendListenerContext}
      * @param sampleResults List of {@link SampleResult}
      */
@@ -290,8 +266,7 @@ public class BackendListener extends Abs
 
     /**
      * Returns reference to {@link BackendListener}
-     *
-     *
+     * @param clientClass {@link BackendListenerClient} client class
      * @return BackendListenerClient reference.
      */
     static BackendListenerClient createBackendListenerClientImpl(Class<?> clientClass)
{
@@ -306,27 +281,98 @@ public class BackendListener extends Abs
         }
     }
 
+    // TestStateListener implementation
+    /**
+     *  Implements TestStateListener.testStarted() 
+     **/
+    @Override
+    public void testStarted() {
+        testStarted("local"); //$NON-NLS-1$
+    }
+
+    /** Implements TestStateListener.testStarted(String) 
+     **/
+    @Override
+    public void testStarted(String host) {
+        if(LOGGER.isDebugEnabled()){
+            LOGGER.debug(whoAmI() + "\ttestStarted(" + host + ")");
+        }
+                
+        int queueSize;
+        final String size = getQueueSize();
+        try {
+            queueSize = Integer.parseInt(size);
+        } catch (NumberFormatException nfe) {
+            LOGGER.warn("Invalid queue size '" + size + "' defaulting to " + DEFAULT_QUEUE_SIZE);
+            queueSize = Integer.parseInt(DEFAULT_QUEUE_SIZE);            
+        }
+
+        synchronized (LOCK) {
+            myName = getName();
+            listenerClientData = queuesByTestElementName.get(myName);
+            if (listenerClientData == null){
+                // We need to do this to ensure in Distributed testing 
+                // that only 1 instance of BackendListenerClient is used
+                initClass();
+                BackendListenerClient backendListenerClient = createBackendListenerClientImpl(clientClass);
+                BackendListenerContext context = new BackendListenerContext((Arguments)getArguments().clone());
+
+                listenerClientData = new ListenerClientData();
+                listenerClientData.queue = new ArrayBlockingQueue<SampleResult>(queueSize);

+                listenerClientData.queueWaits = new AtomicLong(0L);
+                listenerClientData.queueWaitTime = new AtomicLong(0L);
+                listenerClientData.latch = new CountDownLatch(1);
+                listenerClientData.client = backendListenerClient;
+                LOGGER.info(getName()+":Starting worker with class:"+clientClass +" and queue
capacity:"+getQueueSize());
+                Worker worker = new Worker(backendListenerClient, (Arguments) getArguments().clone(),
listenerClientData);
+                worker.setDaemon(true);
+                worker.start();
+                LOGGER.info(getName()+": Started  worker with class:"+clientClass);
+                try {
+                    backendListenerClient.setupTest(context);
+                } catch (Exception e) {
+                    throw new java.lang.IllegalStateException("Failed calling setupTest",
e);
+                }
+                queuesByTestElementName.put(myName, listenerClientData);
+            }
+            listenerClientData.instanceCount++;
+        }        
+    }
+    
     /**
      * Method called at the end of the test. This is called only on one instance
      * of BackendListener. This method will loop through all of the other
      * BackendListenerClients which have been registered (automatically in the
      * constructor) and notify them that the test has ended, allowing the
      * BackendListenerClients to cleanup.
+     * Implements TestStateListener.testEnded(String)
      */
     @Override
-    public void testEnded() {
+    public void testEnded(String host) {
+        synchronized (LOCK) {
+            ListenerClientData listenerClientData = queuesByTestElementName.get(myName);
+            if(LOGGER.isDebugEnabled()) {
+                LOGGER.debug("testEnded called on instance "+myName+"#"+listenerClientData.instanceCount);
+            }
+            listenerClientData.instanceCount--;
+            if (listenerClientData.instanceCount > 0){
+                // Not the last instance of myName
+                return;
+            }
+        }
         try {
-            queue.put(FINAL_SAMPLE_RESULT);
+            listenerClientData.queue.put(FINAL_SAMPLE_RESULT);
         } catch (Exception ex) {
             LOGGER.warn("testEnded() with exception:"+ex.getMessage(), ex);
         }
-        if (queueWaits.get() > 0) {
-            LOGGER.warn("QueueWaits: "+queueWaits+"; QueueWaitTime: "+queueWaitTime+" (nanoseconds),
you may need to increase queue capacity, see property 'backend_queue_capacity'");        
   
+        if (listenerClientData.queueWaits.get() > 0) {
+            LOGGER.warn("QueueWaits: "+listenerClientData.queueWaits+"; QueueWaitTime: "+listenerClientData.queueWaitTime+
+                    " (nanoseconds), you may need to increase queue capacity, see property
'backend_queue_capacity'");            
         }
-        
         try {
+            listenerClientData.latch.await();
             BackendListenerContext context = new BackendListenerContext(getArguments());
-            backendListenerClient.teardownTest(context);
+            listenerClientData.client.teardownTest(context);
         } catch (Exception e) {
             throw new java.lang.IllegalStateException("Failed calling teardownTest", e);
         }
@@ -335,8 +381,8 @@ public class BackendListener extends Abs
     /** Implements TestStateListener.testEnded(String) 
      **/
     @Override
-    public void testEnded(String host) {
-        testEnded();
+    public void testEnded() {
+        testEnded("local"); //$NON-NLS-1$
     }
 
     /**

Modified: jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/SamplerMetric.java
URL: http://svn.apache.org/viewvc/jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/SamplerMetric.java?rev=1645532&r1=1645531&r2=1645532&view=diff
==============================================================================
--- jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/SamplerMetric.java (original)
+++ jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/SamplerMetric.java Sun
Dec 14 21:57:17 2014
@@ -45,9 +45,9 @@ public class SamplerMetric {
      */
     public synchronized void add(SampleResult result) {
         if(result.isSuccessful()) {
-            successes++;
+            successes+=result.getSampleCount()-result.getErrorCount();
         } else {
-            failures++;
+            failures+=result.getErrorCount();
         }
         long time = result.getTime();
         if(result.isSuccessful()) {

Modified: jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/graphite/GraphiteBackendListenerClient.java
URL: http://svn.apache.org/viewvc/jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/graphite/GraphiteBackendListenerClient.java?rev=1645532&r1=1645531&r2=1645532&view=diff
==============================================================================
--- jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/graphite/GraphiteBackendListenerClient.java
(original)
+++ jmeter/trunk/src/components/org/apache/jmeter/visualizers/backend/graphite/GraphiteBackendListenerClient.java
Sun Dec 14 21:57:17 2014
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
@@ -64,6 +65,7 @@ public class GraphiteBackendListenerClie
     private static final int MAX_POOL_SIZE = 1;
     private static final String DEFAULT_PERCENTILES = "90;95;99";
     private static final String SEPARATOR = ";"; //$NON-NLS-1$
+    private static final Object LOCK = new Object();
 
     private String graphiteHost;
     private int graphitePort;
@@ -77,6 +79,7 @@ public class GraphiteBackendListenerClie
     private GraphiteMetricsSender pickleMetricsManager;
 
     private ScheduledExecutorService scheduler;
+    private ScheduledFuture<?> timerHandle;
     
     public GraphiteBackendListenerClient() {
         super();
@@ -84,42 +87,50 @@ public class GraphiteBackendListenerClie
 
     @Override
     public void run() {
+        sendMetrics();
+    }
+
+    /**
+     * Send metrics to Graphite
+     */
+    protected void sendMetrics() {
         // Need to convert millis to seconds for Graphite
-        long timestamp = TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-        for (Map.Entry<String, SamplerMetric> entry : getMetricsPerSampler().entrySet())
{
-            SamplerMetric metric = entry.getValue();
-            if(entry.getKey().equals(CUMULATED_METRICS)) {
-                addMetrics(timestamp, CUMULATED_CONTEXT_NAME, metric);
-            } else {
-                addMetrics(timestamp, AbstractGraphiteMetricsSender.sanitizeString(entry.getKey()),
metric);                
+        long timestampInSeconds = TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+        synchronized (LOCK) {
+            for (Map.Entry<String, SamplerMetric> entry : getMetricsPerSampler().entrySet())
{
+                SamplerMetric metric = entry.getValue();
+                if(entry.getKey().equals(CUMULATED_METRICS)) {
+                    addMetrics(timestampInSeconds, CUMULATED_CONTEXT_NAME, metric);
+                } else {
+                    addMetrics(timestampInSeconds, AbstractGraphiteMetricsSender.sanitizeString(entry.getKey()),
metric);                
+                }
+                // We are computing on interval basis so cleanup
+                metric.resetForTimeInterval();
             }
-            // We are computing on interval basis so cleanup
-            metric.resetForTimeInterval();
-        }
-        
-        pickleMetricsManager.addMetric(timestamp, CUMULATED_CONTEXT_NAME, METRIC_MIN_ACTIVE_THREADS,
Integer.toString(getUserMetrics().getMaxActiveThreads()));
-        pickleMetricsManager.addMetric(timestamp, CUMULATED_CONTEXT_NAME, METRIC_MAX_ACTIVE_THREADS,
Integer.toString(getUserMetrics().getMinActiveThreads()));
-        pickleMetricsManager.addMetric(timestamp, CUMULATED_CONTEXT_NAME, METRIC_MEAN_ACTIVE_THREADS,
Integer.toString(getUserMetrics().getMeanActiveThreads()));
-        pickleMetricsManager.addMetric(timestamp, CUMULATED_CONTEXT_NAME, METRIC_STARTED_THREADS,
Integer.toString(getUserMetrics().getStartedThreads()));
-        pickleMetricsManager.addMetric(timestamp, CUMULATED_CONTEXT_NAME, METRIC_STOPPED_THREADS,
Integer.toString(getUserMetrics().getFinishedThreads()));
+        }        
+        pickleMetricsManager.addMetric(timestampInSeconds, CUMULATED_CONTEXT_NAME, METRIC_MIN_ACTIVE_THREADS,
Integer.toString(getUserMetrics().getMaxActiveThreads()));
+        pickleMetricsManager.addMetric(timestampInSeconds, CUMULATED_CONTEXT_NAME, METRIC_MAX_ACTIVE_THREADS,
Integer.toString(getUserMetrics().getMinActiveThreads()));
+        pickleMetricsManager.addMetric(timestampInSeconds, CUMULATED_CONTEXT_NAME, METRIC_MEAN_ACTIVE_THREADS,
Integer.toString(getUserMetrics().getMeanActiveThreads()));
+        pickleMetricsManager.addMetric(timestampInSeconds, CUMULATED_CONTEXT_NAME, METRIC_STARTED_THREADS,
Integer.toString(getUserMetrics().getStartedThreads()));
+        pickleMetricsManager.addMetric(timestampInSeconds, CUMULATED_CONTEXT_NAME, METRIC_STOPPED_THREADS,
Integer.toString(getUserMetrics().getFinishedThreads()));
 
         pickleMetricsManager.writeAndSendMetrics();
     }
 
 
     /**
-     * @param timestamp
-     * @param contextName
-     * @param metric
+     * @param timestampInSeconds long
+     * @param contextName String
+     * @param metric {@link SamplerMetric}
      */
-    private void addMetrics(long timestamp, String contextName, SamplerMetric metric) {
-        pickleMetricsManager.addMetric(timestamp, contextName, METRIC_FAILED_REQUESTS, Integer.toString(metric.getFailures()));
-        pickleMetricsManager.addMetric(timestamp, contextName, METRIC_SUCCESSFUL_REQUESTS,
Integer.toString(metric.getSuccesses()));
-        pickleMetricsManager.addMetric(timestamp, contextName, METRIC_TOTAL_REQUESTS, Integer.toString(metric.getTotal()));
-        pickleMetricsManager.addMetric(timestamp, contextName, METRIC_MIN_RESPONSE_TIME,
Double.toString(metric.getMinTime()));
-        pickleMetricsManager.addMetric(timestamp, contextName, METRIC_MAX_RESPONSE_TIME,
Double.toString(metric.getMaxTime()));
+    private void addMetrics(long timestampInSeconds, String contextName, SamplerMetric metric)
{
+        pickleMetricsManager.addMetric(timestampInSeconds, contextName, METRIC_FAILED_REQUESTS,
Integer.toString(metric.getFailures()));
+        pickleMetricsManager.addMetric(timestampInSeconds, contextName, METRIC_SUCCESSFUL_REQUESTS,
Integer.toString(metric.getSuccesses()));
+        pickleMetricsManager.addMetric(timestampInSeconds, contextName, METRIC_TOTAL_REQUESTS,
Integer.toString(metric.getTotal()));
+        pickleMetricsManager.addMetric(timestampInSeconds, contextName, METRIC_MIN_RESPONSE_TIME,
Double.toString(metric.getMinTime()));
+        pickleMetricsManager.addMetric(timestampInSeconds, contextName, METRIC_MAX_RESPONSE_TIME,
Double.toString(metric.getMaxTime()));
         for (Map.Entry<String, Float> entry : percentiles.entrySet()) {
-            pickleMetricsManager.addMetric(timestamp, contextName, 
+            pickleMetricsManager.addMetric(timestampInSeconds, contextName, 
                     entry.getKey(), 
                     Double.toString(metric.getPercentile(entry.getValue().floatValue())));
           
         }
@@ -142,14 +153,16 @@ public class GraphiteBackendListenerClie
     @Override
     public void handleSampleResults(List<SampleResult> sampleResults,
             BackendListenerContext context) {
-        for (SampleResult sampleResult : sampleResults) {
-            getUserMetrics().add(sampleResult);
-            if(!summaryOnly && samplersToFilter.contains(sampleResult.getSampleLabel()))
{
-                SamplerMetric samplerMetric = getSamplerMetric(sampleResult.getSampleLabel());
-                samplerMetric.add(sampleResult);
+        synchronized (LOCK) {
+            for (SampleResult sampleResult : sampleResults) {
+                getUserMetrics().add(sampleResult);
+                if(!summaryOnly && samplersToFilter.contains(sampleResult.getSampleLabel()))
{
+                    SamplerMetric samplerMetric = getSamplerMetric(sampleResult.getSampleLabel());
+                    samplerMetric.add(sampleResult);
+                }
+                SamplerMetric cumulatedMetrics = getSamplerMetric(CUMULATED_METRICS);
+                cumulatedMetrics.add(sampleResult);                    
             }
-            SamplerMetric cumulatedMetrics = getSamplerMetric(CUMULATED_METRICS);
-            cumulatedMetrics.add(sampleResult);                    
         }
     }
 
@@ -188,17 +201,23 @@ public class GraphiteBackendListenerClie
         }
         scheduler = Executors.newScheduledThreadPool(MAX_POOL_SIZE);
         // Don't change this as metrics are per second
-        scheduler.scheduleAtFixedRate(this, ONE_SECOND, ONE_SECOND, TimeUnit.SECONDS);
+        this.timerHandle = scheduler.scheduleAtFixedRate(this, ONE_SECOND, ONE_SECOND, TimeUnit.SECONDS);
     }
 
     @Override
     public void teardownTest(BackendListenerContext context) throws Exception {
+        boolean cancelState = timerHandle.cancel(false);
+        if(LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Canceled state:"+cancelState);
+        }
         scheduler.shutdown();
         try {
             scheduler.awaitTermination(30, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             LOGGER.error("Error waiting for end of scheduler");
         }
+        // Send last set of data before ending
+        sendMetrics();
         
         samplersToFilter.clear();
         pickleMetricsManager.destroy();

Modified: jmeter/trunk/xdocs/changes.xml
URL: http://svn.apache.org/viewvc/jmeter/trunk/xdocs/changes.xml?rev=1645532&r1=1645531&r2=1645532&view=diff
==============================================================================
--- jmeter/trunk/xdocs/changes.xml (original)
+++ jmeter/trunk/xdocs/changes.xml Sun Dec 14 21:57:17 2014
@@ -153,7 +153,7 @@ See  <bugzilla>56357</bugzilla> for deta
 <h3>Listeners</h3>
 <ul>
 <li><bug>57262</bug>Aggregate Report, Aggregate Graph and Summary Report
export : headers use keys instead of labels</li>
-<li><bug>57321</bug>BackendListener reports wrong number of Active Users
in master-slave configuration (nightly build r1642603)</li>
+<li><bug>57321</bug>BackendListener reports partial results in master-slave
configuration (nightly build r1642603)</li>
 </ul>
 
 <h3>Timers, Assertions, Config, Pre- &amp; Post-Processors</h3>



Mime
View raw message