View Javadoc

1   /*
2    * Copyright (c) 2005, London e-Science Centre, Imperial College London, UK
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
6    *
7    * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
8    * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
9    * - Neither the name of the London e-Science Centre nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
10   *
11   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
12   */
13  package org.icenigrid.gridsam.core.plugin.connector.globus;
14  
15  import java.net.MalformedURLException;
16  
17  import org.apache.commons.logging.Log;
18  import org.apache.commons.logging.LogFactory;
19  import org.globus.gram.Gram;
20  import org.globus.gram.GramException;
21  import org.globus.gram.GramJob;
22  import org.icenigrid.gridsam.core.JobManagerException;
23  import org.icenigrid.gridsam.core.JobState;
24  import org.icenigrid.gridsam.core.SubmissionException;
25  import org.icenigrid.gridsam.core.plugin.DRMConnector;
26  import org.icenigrid.gridsam.core.plugin.JobContext;
27  import org.ietf.jgss.GSSCredential;
28  import org.ietf.jgss.GSSException;
29  
30  /**
31   * A DRMConnector for retrieving the job status from the globus gatekeeper
32   */
33  public class GRAMStatusStage implements DRMConnector {
34      
35      /**
36       * logger
37       */
38      private static Log sLog = LogFactory.getLog(GRAMStatusStage.class);
39      
40      /**
41       * number of retry when the globus gatekeeper is not responding (error code
42       * 79)
43       */
44      public static final int DEFAULT_RETRY = 5;
45      
46      /**
47       * retry count
48       */
49      private int oRetryCount = DEFAULT_RETRY;
50      
51      /**
52       * set the retry count
53       *
54       * @param pRetryCount
55       *            retry count
56       */
57      public void setMaximumRetryCount(int pRetryCount) {
58          if (pRetryCount < 1) {
59              oRetryCount = 1;
60          }
61          oRetryCount = pRetryCount;
62      }
63      
64      /**
65       * get the retry count
66       *
67       * @return int the retry count
68       */
69      public int getMaximumRetryCount() {
70          return oRetryCount;
71      }
72      
73      /**
74       * execute a job through this DRMConnector
75       *
76       * @param pContext
77       *            the context in which this DRMConnector executes
78       */
79      public void execute(JobContext pContext) {
80          try {
81              String xGlobusID = (String) pContext.getJobInstance()
82              .getProperties().get("urn:gridsam:globus:id");
83              if (xGlobusID == null) {
84                  // probably a pure staging job
85                  return;
86              }
87              
88              // load credential associated with the job
89              GSSCredential xCredential = null;
90              
91              try {
92                  xCredential = GlobusCredentialSupport.getCredential(pContext);
93              } catch (JobManagerException xEx) {
94                  throw new SubmissionException(
95                      "unable to load credential to query job: "
96                      + xEx.getMessage(), xEx);
97              }
98              
99              // recreate the GramJob instance
100             GramJob xJob = new GramJob((String) pContext.getJobInstance()
101             .getProperties().get("urn:gridsam:globus:rsl"));
102             xJob.setCredentials(xCredential);
103             try {
104                 xJob.setID(xGlobusID);
105             } catch (MalformedURLException xEx) {
106                 throw new SubmissionException("unable to rebind to job: "
107                     + xEx.getMessage(), xEx);
108             }
109             
110             // get job status
111             boolean xDone=false;
112             for (int xRetry = 0; (xRetry < getMaximumRetryCount() && !xDone); xRetry++) {
113                 try {
114                     Gram.jobStatus(xJob);
115                     
116                     switch (xJob.getStatus()) {
117                         case GramJob.STATUS_ACTIVE:
118                             pContext.getLog().info(
119                                 "globus job is active - " + xJob.getIDAsString());
120                             // update only if job status is Not active already, otherwise this update throws an exception
121                             if( !pContext.getJobInstance().wasInStage(JobState.ACTIVE) &&
122                                 !pContext.getJobInstance().wasInStage(JobState.ACTIVE_QUEUED)) {
123                                 pContext.getJobInstance().advanceJobState(JobState.ACTIVE,
124                                     "globus job is active");
125                             }
126                             
127                             break;
128                             
129                         case GramJob.STATUS_PENDING:
130                             pContext.getLog().info(
131                                 "globus job is pending - " + xJob.getIDAsString());
132                             // update only if job status is Not pending already, otherwise this update throws an exception
133                             if(!pContext.getJobInstance().wasInStage(JobState.ACTIVE_QUEUED)){
134                                 pContext.getJobInstance().advanceJobState(JobState.ACTIVE_QUEUED,
135                                     "globus job is pending");
136                             }
137                             break;
138                         case GramJob.STATUS_DONE:
139                             pContext.getLog().info(
140                                 "globus job is done - " + xJob.getIDAsString());
141                             if(!pContext.getJobInstance().wasInStage(JobState.ACTIVE) &&
142                                !pContext.getJobInstance().wasInStage(JobState.ACTIVE_QUEUED)){
143                                 // we might have missed the active event because of the polling, fake it
144                                 pContext.getJobInstance().advanceJobState(JobState.ACTIVE,
145                                     "globus job is active");
146                             }
147                             pContext.getJobInstance().advanceJobState(
148                                 JobState.EXECUTED, "globus job completed");
149                             xDone=true;
150                             break;
151                         case GramJob.STATUS_FAILED:
152                             pContext.getLog().info(
153                                 "globus job failed - " + xJob.getIDAsString());
154                             if(!pContext.getJobInstance().wasInStage(JobState.ACTIVE)){
155                                 // we might have missed the active event because of the polling, fake it
156                                 pContext.getJobInstance().advanceJobState(JobState.ACTIVE,
157                                     "globus job is active");
158                             }
159                             throw new SubmissionException("globus job failed - "
160                                 + xJob.getStatusAsString());
161                             
162                         default:
163                             pContext.getLog().debug(
164                                 "globus job status (" + xJob.getStatusAsString()
165                                 + ") - " + xGlobusID);
166                     }
167 
168                 } catch (GramException xEx) {
169                     if (xEx.getErrorCode() == GramException.ERROR_CONTACTING_JOB_MANAGER) {
170                         // due to a lack of status message from CoG after the
171                         // job is finished,
172                         // we just have to assume the job is done
173                         pContext.getLog().info(
174                             "globus job is considered done - "
175                             + xJob.getIDAsString());
176                         if(!pContext.getJobInstance().wasInStage(JobState.ACTIVE)){
177                             // we might have missed the active event because of the polling, fake it
178                             pContext.getJobInstance().advanceJobState(JobState.ACTIVE,
179                                 "globus job is active");
180                         }
181                         pContext.getJobInstance().advanceJobState(
182                             JobState.EXECUTED, "globus job completed");
183                         break;
184                     } else {
185                         pContext.getLog()
186                         .warn(
187                             "Retrying after globus error - "
188                             + xEx.getMessage());
189                         if (xRetry == getMaximumRetryCount() - 1) {
190                             // retry count reached
191                             throw new SubmissionException(
192                                 "unable to retrieve job status from globus gatekeeper: "
193                                 + xEx.getMessage(), xEx);
194                         }
195                     }
196                 } catch (GSSException xEx) {
197                     throw new SubmissionException(
198                         "unable to retrieve job status using credential: "
199                         + xEx.getMessage(), xEx);
200                 }
201             }
202         } catch (SubmissionException xEx) {
203             pContext.getJobInstance().advanceJobState(JobState.FAILED,
204                 xEx.getMessage());
205         }
206     }
207     
208 }