portals-jetspeed-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From woon...@apache.org
Subject svn commit: r591869 - in /portals/jetspeed-2/trunk: components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/ jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/
Date Mon, 05 Nov 2007 02:25:19 GMT
Author: woonsan
Date: Sun Nov  4 18:25:18 2007
New Revision: 591869

URL: http://svn.apache.org/viewvc?rev=591869&view=rev
Log:
[JS2-785] Parallel Rendering on Websphere 6.1
Fixed the the problem: using commonj work manager to render portlets, the timeout option does
not work.

Modified:
    portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/AsyncPageAggregatorImpl.java
    portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/CommonjWorkerMonitorImpl.java
    portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/PortletRendererImpl.java
    portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/WorkerMonitorImpl.java
    portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/PortletRenderer.java
    portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/WorkerMonitor.java

Modified: portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/AsyncPageAggregatorImpl.java
URL: http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/AsyncPageAggregatorImpl.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
--- portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/AsyncPageAggregatorImpl.java
(original)
+++ portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/AsyncPageAggregatorImpl.java
Sun Nov  4 18:25:18 2007
@@ -216,27 +216,7 @@
         }
 
         // synchronize on completion of all jobs
-        iter = parallelJobs.iterator();
-        try 
-        {
-            while (iter.hasNext()) 
-            {
-                RenderingJob job = (RenderingJob) iter.next();
-                PortletContent portletContent = job.getPortletContent();
-                
-                synchronized (portletContent) 
-                {
-                    if (!portletContent.isComplete()) 
-                    {
-                        portletContent.wait();
-                    }
-                }
-            }
-        }
-        catch (Exception e)
-        {
-            log.error("Exception during synchronizing all portlet rendering jobs.", e);
-        }
+        renderer.waitForRenderingJobs(parallelJobs);
         
         // Now, restore it to non parallel mode for rendering layout portlets.
         CurrentWorkerContext.setParallelRenderingMode(false);

Modified: portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/CommonjWorkerMonitorImpl.java
URL: http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/CommonjWorkerMonitorImpl.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
--- portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/CommonjWorkerMonitorImpl.java
(original)
+++ portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/CommonjWorkerMonitorImpl.java
Sun Nov  4 18:25:18 2007
@@ -20,16 +20,22 @@
 import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.util.List;
-import java.util.LinkedList;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.jetspeed.aggregator.RenderingJob;
+import org.apache.jetspeed.aggregator.Worker;
 import org.apache.jetspeed.aggregator.WorkerMonitor;
 import org.apache.jetspeed.aggregator.PortletContent;
-import org.apache.jetspeed.aggregator.CurrentWorkerContext;
+
+import org.apache.pluto.om.window.PortletWindow;
+import org.apache.pluto.om.common.ObjectID;
 
 import commonj.work.WorkManager;
 import commonj.work.Work;
@@ -48,16 +54,36 @@
 {
 
     public static final String ACCESS_CONTROL_CONTEXT_WORKER_ATTR = AccessControlContext.class.getName();
+    public static final String COMMONJ_WORK_ITEM_ATTR = WorkItem.class.getName();
+    public static final String WORKER_THREAD_ATTR = Worker.class.getName();
     
     /** CommonJ Work Manamger provided by JavaEE container */
     protected WorkManager workManager;
-    
-    /** Work items to be monitored for timeout checking */
-    protected List workItemsMonitored = Collections.synchronizedList(new LinkedList());
 
+    /** If true, invoke interrupt() on the worker thread when the job is timeout. */
+    protected boolean interruptOnTimeout = true;
+    
+    /** Enable rendering job works monitor thread for timeout checking */
+    protected boolean jobWorksMonitorEnabled = true;
+    
+    /** Rendering job works to be monitored for timeout checking */
+    protected Map jobWorksMonitored = Collections.synchronizedMap(new HashMap());
+    
     public CommonjWorkerMonitorImpl(WorkManager workManager)
     {
+        this(workManager, true);
+    }
+    
+    public CommonjWorkerMonitorImpl(WorkManager workManager, boolean jobWorksMonitorEnabled)
+    {
+        this(workManager, jobWorksMonitorEnabled, true);
+    }
+    
+    public CommonjWorkerMonitorImpl(WorkManager workManager, boolean jobWorksMonitorEnabled,
boolean interruptOnTimeout)
+    {
         this.workManager = workManager;
+        this.jobWorksMonitorEnabled = jobWorksMonitorEnabled;
+        this.interruptOnTimeout = interruptOnTimeout;
     }
     
     /** Commons logging */
@@ -68,18 +94,21 @@
     
     public void start()
     {
-        jobMonitor = new CommonjWorkerRenderingJobTimeoutMonitor(1000);
-        jobMonitor.start();
+        if (this.jobWorksMonitorEnabled)
+        {
+            jobMonitor = new CommonjWorkerRenderingJobTimeoutMonitor(1000);
+            jobMonitor.start();
+        }
     }
 
     public void stop()
-    {    
-    	if (jobMonitor != null)
+    {
+        if (jobMonitor != null)
         {
-    		jobMonitor.endThread();
+            jobMonitor.endThread();
         }
-        
-    	jobMonitor = null;
+
+        jobMonitor = null;
     }
     
     /**
@@ -95,8 +124,14 @@
         
         try
         {
-            WorkItem workItem = this.workManager.schedule(new RenderingJobCommonjWork(job),
this);
-            this.workItemsMonitored.add(workItem);
+            RenderingJobCommonjWork jobWork = new RenderingJobCommonjWork(job);
+            WorkItem workItem = this.workManager.schedule(jobWork, this);
+            job.setWorkerAttribute(COMMONJ_WORK_ITEM_ATTR, workItem);
+            
+            if (this.jobWorksMonitorEnabled)
+            {
+                this.jobWorksMonitored.put(workItem, jobWork);
+            }
         }
         catch (Throwable t)
         {
@@ -110,6 +145,61 @@
     }
     
     /**
+     * Wait for all rendering jobs in the collection to finish successfully or otherwise.

+     * @param renderingJobs the Collection of rendering job objects to wait for.
+     */
+    public void waitForRenderingJobs(List renderingJobs)
+    {
+        if (this.jobWorksMonitorEnabled)
+        {
+            try 
+            {
+                for (Iterator iter = renderingJobs.iterator(); iter.hasNext(); )
+                {
+                    RenderingJob job = (RenderingJob) iter.next();
+                    PortletContent portletContent = job.getPortletContent();
+                    
+                    synchronized (portletContent) 
+                    {
+                        if (!portletContent.isComplete()) 
+                        {
+                            portletContent.wait();
+                        }
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                log.error("Exception during synchronizing all portlet rendering jobs.", e);
+            }
+        }
+        else
+        {
+            // We cannot use WorkingManager#waitForAll(workitems, timeout_ms) for timeout.
+            // The second argument could be either WorkManager.IMMEDIATE or WorkManager.INDEFINITE.
+            
+            try
+            {
+                if (!renderingJobs.isEmpty())
+                {
+                    Object lock = new Object();
+                    MonitoringJobCommonjWork monitoringWork = new MonitoringJobCommonjWork(lock,
renderingJobs);
+                    
+                    synchronized (lock)
+                    {
+                        WorkItem monitorWorkItem = this.workManager.schedule(monitoringWork,
this);
+                        lock.wait();
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                log.error("Exception during synchronizing all portlet rendering jobs.", e);
+            }
+        }
+    }
+    
+    /**
      * Returns a snapshot of the available jobs
      * @return available jobs
      */
@@ -123,6 +213,8 @@
         return 0;
     }
     
+    // commonj.work.WorkListener implementations
+    
     public void workAccepted(WorkEvent we)
     {
         WorkItem workItem = we.getWorkItem();
@@ -133,7 +225,11 @@
     {
         WorkItem workItem = we.getWorkItem();
         if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workRejected: " + workItem);
-        removeMonitoredWorkItem(workItem);
+        
+        if (this.jobWorksMonitorEnabled)
+        {
+            removeMonitoredJobWork(workItem);
+        }
     }
 
     public void workStarted(WorkEvent we)
@@ -146,12 +242,16 @@
     {
         WorkItem workItem = we.getWorkItem();
         if (log.isDebugEnabled()) log.debug("[CommonjWorkMonitorImpl] workCompleted: " +
workItem);
-        removeMonitoredWorkItem(workItem);
+        
+        if (this.jobWorksMonitorEnabled)
+        {
+            removeMonitoredJobWork(workItem);
+        }
     }
     
-    protected boolean removeMonitoredWorkItem(WorkItem workItem)
+    protected Object removeMonitoredJobWork(WorkItem workItem)
     {
-        return this.workItemsMonitored.remove(workItem);
+        return this.jobWorksMonitored.remove(workItem);
     }
     
     class RenderingJobCommonjWork implements Work
@@ -159,10 +259,6 @@
 
         protected RenderingJob job;
 
-        public RenderingJobCommonjWork()
-        {
-        }
-        
         public RenderingJobCommonjWork(RenderingJob job)
         {
             this.job = job;
@@ -175,6 +271,11 @@
         
         public void run()
         {
+            if (jobWorksMonitorEnabled || interruptOnTimeout)
+            {
+                this.job.setWorkerAttribute(WORKER_THREAD_ATTR, Thread.currentThread());
+            }
+            
             this.job.run();
         }
         
@@ -182,16 +283,106 @@
         {
         }
         
-        public void setRenderingJob(RenderingJob job)
-        {
-            this.job = job;
-        }
-        
         public RenderingJob getRenderingJob()
         {
             return this.job;
         }
+    }
+
+    class MonitoringJobCommonjWork implements Work
+    {
+        
+        protected Object lock;
+        protected List renderingJobs;
 
+        public MonitoringJobCommonjWork(Object lock, List jobs)
+        {
+            this.lock = lock;
+            this.renderingJobs = new ArrayList(jobs);
+        }
+        
+        public boolean isDaemon()
+        {
+            return false;
+        }
+        
+        public void run()
+        {
+            try
+            {
+                while (!this.renderingJobs.isEmpty())
+                {
+                    for (Iterator it = this.renderingJobs.iterator(); it.hasNext(); )
+                    {
+                        RenderingJob job = (RenderingJob) it.next();
+                        WorkItem workItem = (WorkItem) job.getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
+                        int status = WorkEvent.WORK_ACCEPTED;
+                        
+                        if (workItem != null)
+                        {
+                            status = workItem.getStatus();
+                        }
+                        
+                        boolean isTimeout = job.isTimeout();
+                        
+                        if (isTimeout)
+                        {
+                            PortletContent content = job.getPortletContent();
+                            
+                            if (interruptOnTimeout)
+                            {
+                                Thread worker = (Thread) job.getWorkerAttribute(WORKER_THREAD_ATTR);
+                                
+                                if (worker != null)
+                                {
+                                    synchronized (content)
+                                    {
+                                        if (!content.isComplete()) {
+                                            worker.interrupt();
+                                            content.wait();
+                                        }
+                                    }
+                                }
+                            }
+                            else
+                            {
+                                synchronized (content)
+                                {
+                                    content.completeWithError();
+                                }
+                            }
+                        }
+                        
+                        if (status == WorkEvent.WORK_COMPLETED || status == WorkEvent.WORK_REJECTED
|| isTimeout)
+                        {
+                            it.remove();
+                        }                    
+                    }
+                    
+                    if (!this.renderingJobs.isEmpty())
+                    {
+                        synchronized (this)
+                        {
+                            wait(100);
+                        }
+                    }
+                }
+                
+                synchronized (this.lock)
+                {
+                    this.lock.notify();
+                }
+            }
+            catch (Exception e)
+            {
+                log.error("Exceptiong during job timeout monitoring.", e);
+            }
+        }
+        
+        public void release()
+        {
+        }
+        
     }
 
     class CommonjWorkerRenderingJobTimeoutMonitor extends Thread {
@@ -224,20 +415,30 @@
             while (shouldRun) {
                 try 
                 {
-                    synchronized (workItemsMonitored) 
+                    List timeoutJobWorks = new ArrayList();
+                    Collection jobWorks = jobWorksMonitored.values();
+                    
+                    for (Iterator it = jobWorks.iterator(); it.hasNext(); )
                     {
-                        for (Iterator it = workItemsMonitored.iterator(); it.hasNext(); )
+                        RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it.next();
+                        RenderingJob job = jobWork.getRenderingJob();
+                        
+                        if (job.isTimeout())
                         {
-                            WorkItem workItem = (WorkItem) it.next();
-                            int status = workItem.getStatus();
-                            
-                            if (status == WorkEvent.WORK_COMPLETED || status == WorkEvent.WORK_REJECTED)
-                            {
-                                it.remove();
-                            }
-                            else
-                            {
-                            }
+                            timeoutJobWorks.add(jobWork);
+                        }
+                    }
+                    
+                    // Now, we can kill the timeout worker(s).
+                    for (Iterator it = timeoutJobWorks.iterator(); it.hasNext(); )
+                    {
+                        RenderingJobCommonjWork jobWork = (RenderingJobCommonjWork) it.next();
+                        RenderingJob job = jobWork.getRenderingJob();
+
+                        // If the job is just completed, then do not kill the worker.
+                        if (job.isTimeout())
+                        {
+                            killJobWork(jobWork);
                         }
                     }
                 } 
@@ -259,5 +460,42 @@
                 }
             }
         }
+        
+        public void killJobWork(RenderingJobCommonjWork jobWork) {
+            RenderingJob job = jobWork.getRenderingJob();
+            
+            try {
+                if (log.isWarnEnabled()) {
+                    PortletWindow window = job.getWindow();
+                    ObjectID windowId = (null != window ? window.getId() : null);
+                    log.warn("Portlet Rendering job to be interrupted by timeout (" + job.getTimeout()
+ "ms): " + windowId);
+                }
+
+                PortletContent content = job.getPortletContent();
+                Thread worker = (Thread) job.getWorkerAttribute(WORKER_THREAD_ATTR);
+                
+                if (worker != null)
+                {
+                    synchronized (content)
+                    {
+                        if (!content.isComplete()) {
+                            worker.interrupt();
+                            content.wait();
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Exceptiong during job killing.", e);
+            } finally {
+                WorkItem workItem = (WorkItem) job.getWorkerAttribute(COMMONJ_WORK_ITEM_ATTR);
+                
+                if (workItem != null)
+                {
+                    removeMonitoredJobWork(workItem);
+                }
+            }
+        }
+        
     }
+    
 }

Modified: portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/PortletRendererImpl.java
URL: http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/PortletRendererImpl.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
--- portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/PortletRendererImpl.java
(original)
+++ portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/PortletRendererImpl.java
Sun Nov  4 18:25:18 2007
@@ -20,6 +20,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.List;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -380,6 +381,15 @@
             log.error("render() failed: " + e1.toString(), e1);
             fragment.overrideRenderedContent(e1.getLocalizedMessage());            
         }
+    }
+    
+    /**
+     * Wait for all rendering jobs in the collection to finish successfully or otherwise.

+     * @param renderingJobs the Collection of rendering job objects to wait for.
+     */
+    public void waitForRenderingJobs(List renderingJobs)
+    {
+        this.workMonitor.waitForRenderingJobs(renderingJobs);
     }
 
     /**

Modified: portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/WorkerMonitorImpl.java
URL: http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/WorkerMonitorImpl.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
--- portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/WorkerMonitorImpl.java
(original)
+++ portals/jetspeed-2/trunk/components/jetspeed-portal/src/main/java/org/apache/jetspeed/aggregator/impl/WorkerMonitorImpl.java
Sun Nov  4 18:25:18 2007
@@ -204,6 +204,34 @@
             }
         }
     }
+    
+    /**
+     * Wait for all rendering jobs in the collection to finish successfully or otherwise.

+     * @param renderingJobs the Collection of rendering job objects to wait for.
+     */
+    public void waitForRenderingJobs(List renderingJobs)
+    {
+        try 
+        {
+            for (Iterator iter = renderingJobs.iterator(); iter.hasNext(); )
+            {
+                RenderingJob job = (RenderingJob) iter.next();
+                PortletContent portletContent = job.getPortletContent();
+                
+                synchronized (portletContent) 
+                {
+                    if (!portletContent.isComplete()) 
+                    {
+                        portletContent.wait();
+                    }
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            log.error("Exception during synchronizing all portlet rendering jobs.", e);
+        }
+    }
 
     /**
      * Put back the worker in the idle queue unless there are pending jobs and

Modified: portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/PortletRenderer.java
URL: http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/PortletRenderer.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
--- portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/PortletRenderer.java
(original)
+++ portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/PortletRenderer.java
Sun Nov  4 18:25:18 2007
@@ -16,6 +16,8 @@
  */
 package org.apache.jetspeed.aggregator;
 
+import java.util.List;
+
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
@@ -87,6 +89,12 @@
      */
     public void processRenderingJob(RenderingJob job);
         
+    /**
+     * Wait for all rendering jobs in the collection to finish successfully or otherwise.

+     * @param renderingJobs the Collection of rendering job objects to wait for.
+     */
+    public void waitForRenderingJobs(List renderingJobs);
+    
     /**
      * Retrieve the ContentDispatcher for the specified request
      */

Modified: portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/WorkerMonitor.java
URL: http://svn.apache.org/viewvc/portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/WorkerMonitor.java?rev=591869&r1=591868&r2=591869&view=diff
==============================================================================
--- portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/WorkerMonitor.java
(original)
+++ portals/jetspeed-2/trunk/jetspeed-api/src/main/java/org/apache/jetspeed/aggregator/WorkerMonitor.java
Sun Nov  4 18:25:18 2007
@@ -1,9 +1,9 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  * 
  *      http://www.apache.org/licenses/LICENSE-2.0
@@ -16,6 +16,8 @@
  */
 package org.apache.jetspeed.aggregator;
 
+import java.util.List;
+
 import org.apache.jetspeed.util.Queue;
 
 /**
@@ -67,4 +69,9 @@
      */
     void process(RenderingJob job);
     
+    /**
+     * Wait for all rendering jobs in the collection to finish successfully or otherwise.

+     * @param renderingJobs the Collection of rendering job objects to wait for.
+     */
+    public void waitForRenderingJobs(List renderingJobs);
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: jetspeed-dev-unsubscribe@portals.apache.org
For additional commands, e-mail: jetspeed-dev-help@portals.apache.org


Mime
View raw message