1
2
3
4
5
6
7
8
9
10
11
12
13 package org.icenigrid.gridsam.core.plugin.connector.globus;
14
15 import java.net.MalformedURLException;
16
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19 import org.globus.gram.Gram;
20 import org.globus.gram.GramException;
21 import org.globus.gram.GramJob;
22 import org.icenigrid.gridsam.core.JobManagerException;
23 import org.icenigrid.gridsam.core.JobState;
24 import org.icenigrid.gridsam.core.SubmissionException;
25 import org.icenigrid.gridsam.core.plugin.DRMConnector;
26 import org.icenigrid.gridsam.core.plugin.JobContext;
27 import org.ietf.jgss.GSSCredential;
28 import org.ietf.jgss.GSSException;
29
30
31
32
33 public class GRAMStatusStage implements DRMConnector {
34
35
36
37
38 private static Log sLog = LogFactory.getLog(GRAMStatusStage.class);
39
40
41
42
43
44 public static final int DEFAULT_RETRY = 5;
45
46
47
48
49 private int oRetryCount = DEFAULT_RETRY;
50
51
52
53
54
55
56
57 public void setMaximumRetryCount(int pRetryCount) {
58 if (pRetryCount < 1) {
59 oRetryCount = 1;
60 }
61 oRetryCount = pRetryCount;
62 }
63
64
65
66
67
68
69 public int getMaximumRetryCount() {
70 return oRetryCount;
71 }
72
73
74
75
76
77
78
79 public void execute(JobContext pContext) {
80 try {
81 String xGlobusID = (String) pContext.getJobInstance()
82 .getProperties().get("urn:gridsam:globus:id");
83 if (xGlobusID == null) {
84
85 return;
86 }
87
88
89 GSSCredential xCredential = null;
90
91 try {
92 xCredential = GlobusCredentialSupport.getCredential(pContext);
93 } catch (JobManagerException xEx) {
94 throw new SubmissionException(
95 "unable to load credential to query job: "
96 + xEx.getMessage(), xEx);
97 }
98
99
100 GramJob xJob = new GramJob((String) pContext.getJobInstance()
101 .getProperties().get("urn:gridsam:globus:rsl"));
102 xJob.setCredentials(xCredential);
103 try {
104 xJob.setID(xGlobusID);
105 } catch (MalformedURLException xEx) {
106 throw new SubmissionException("unable to rebind to job: "
107 + xEx.getMessage(), xEx);
108 }
109
110
111 boolean xDone=false;
112 for (int xRetry = 0; (xRetry < getMaximumRetryCount() && !xDone); xRetry++) {
113 try {
114 Gram.jobStatus(xJob);
115
116 switch (xJob.getStatus()) {
117 case GramJob.STATUS_ACTIVE:
118 pContext.getLog().info(
119 "globus job is active - " + xJob.getIDAsString());
120
121 if( !pContext.getJobInstance().wasInStage(JobState.ACTIVE) &&
122 !pContext.getJobInstance().wasInStage(JobState.ACTIVE_QUEUED)) {
123 pContext.getJobInstance().advanceJobState(JobState.ACTIVE,
124 "globus job is active");
125 }
126
127 break;
128
129 case GramJob.STATUS_PENDING:
130 pContext.getLog().info(
131 "globus job is pending - " + xJob.getIDAsString());
132
133 if(!pContext.getJobInstance().wasInStage(JobState.ACTIVE_QUEUED)){
134 pContext.getJobInstance().advanceJobState(JobState.ACTIVE_QUEUED,
135 "globus job is pending");
136 }
137 break;
138 case GramJob.STATUS_DONE:
139 pContext.getLog().info(
140 "globus job is done - " + xJob.getIDAsString());
141 if(!pContext.getJobInstance().wasInStage(JobState.ACTIVE) &&
142 !pContext.getJobInstance().wasInStage(JobState.ACTIVE_QUEUED)){
143
144 pContext.getJobInstance().advanceJobState(JobState.ACTIVE,
145 "globus job is active");
146 }
147 pContext.getJobInstance().advanceJobState(
148 JobState.EXECUTED, "globus job completed");
149 xDone=true;
150 break;
151 case GramJob.STATUS_FAILED:
152 pContext.getLog().info(
153 "globus job failed - " + xJob.getIDAsString());
154 if(!pContext.getJobInstance().wasInStage(JobState.ACTIVE)){
155
156 pContext.getJobInstance().advanceJobState(JobState.ACTIVE,
157 "globus job is active");
158 }
159 throw new SubmissionException("globus job failed - "
160 + xJob.getStatusAsString());
161
162 default:
163 pContext.getLog().debug(
164 "globus job status (" + xJob.getStatusAsString()
165 + ") - " + xGlobusID);
166 }
167
168 } catch (GramException xEx) {
169 if (xEx.getErrorCode() == GramException.ERROR_CONTACTING_JOB_MANAGER) {
170
171
172
173 pContext.getLog().info(
174 "globus job is considered done - "
175 + xJob.getIDAsString());
176 if(!pContext.getJobInstance().wasInStage(JobState.ACTIVE)){
177
178 pContext.getJobInstance().advanceJobState(JobState.ACTIVE,
179 "globus job is active");
180 }
181 pContext.getJobInstance().advanceJobState(
182 JobState.EXECUTED, "globus job completed");
183 break;
184 } else {
185 pContext.getLog()
186 .warn(
187 "Retrying after globus error - "
188 + xEx.getMessage());
189 if (xRetry == getMaximumRetryCount() - 1) {
190
191 throw new SubmissionException(
192 "unable to retrieve job status from globus gatekeeper: "
193 + xEx.getMessage(), xEx);
194 }
195 }
196 } catch (GSSException xEx) {
197 throw new SubmissionException(
198 "unable to retrieve job status using credential: "
199 + xEx.getMessage(), xEx);
200 }
201 }
202 } catch (SubmissionException xEx) {
203 pContext.getJobInstance().advanceJobState(JobState.FAILED,
204 xEx.getMessage());
205 }
206 }
207
208 }