1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.security.token;
20
21 import java.io.IOException;
22 import java.lang.reflect.UndeclaredThrowableException;
23 import java.security.PrivilegedExceptionAction;
24
25 import com.google.protobuf.ServiceException;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.classification.InterfaceStability;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.Table;
36 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
37 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
39 import org.apache.hadoop.hbase.security.User;
40 import org.apache.hadoop.hbase.security.UserProvider;
41 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
42 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
43 import org.apache.hadoop.io.Text;
44 import org.apache.hadoop.mapred.JobConf;
45 import org.apache.hadoop.mapreduce.Job;
46 import org.apache.hadoop.security.UserGroupInformation;
47 import org.apache.hadoop.security.token.Token;
48 import org.apache.zookeeper.KeeperException;
49
50
51
52
53 @InterfaceAudience.Public
54 @InterfaceStability.Evolving
55 public class TokenUtil {
56
57 private static final Log LOG = LogFactory.getLog(TokenUtil.class);
58
59
60
61
62
63
64
65 @Deprecated
66 public static Token<AuthenticationTokenIdentifier> obtainToken(
67 Configuration conf) throws IOException {
68 try (Connection connection = ConnectionFactory.createConnection(conf)) {
69 return obtainToken(connection);
70 }
71 }
72
73
74
75
76
77
78 public static Token<AuthenticationTokenIdentifier> obtainToken(
79 Connection conn) throws IOException {
80 Table meta = null;
81 try {
82 meta = conn.getTable(TableName.META_TABLE_NAME);
83 CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
84 AuthenticationProtos.AuthenticationService.BlockingInterface service =
85 AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
86 AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
87 AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
88
89 return ProtobufUtil.toToken(response.getToken());
90 } catch (ServiceException se) {
91 ProtobufUtil.toIOException(se);
92 } finally {
93 if (meta != null) {
94 meta.close();
95 }
96 }
97
98 return null;
99 }
100
101
102
103
104
105
106 public static Token<AuthenticationTokenIdentifier> obtainToken(
107 final Connection conn, User user) throws IOException, InterruptedException {
108 return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
109 @Override
110 public Token<AuthenticationTokenIdentifier> run() throws Exception {
111 return obtainToken(conn);
112 }
113 });
114 }
115
116
117 private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
118 throws IOException {
119 return token.getService() != null
120 ? token.getService() : new Text("default");
121 }
122
123
124
125
126
127
128
129
130
131
132 @Deprecated
133 public static void obtainAndCacheToken(final Configuration conf,
134 UserGroupInformation user)
135 throws IOException, InterruptedException {
136 Connection conn = ConnectionFactory.createConnection(conf);
137 try {
138 UserProvider userProvider = UserProvider.instantiate(conf);
139 obtainAndCacheToken(conn, userProvider.create(user));
140 } finally {
141 conn.close();
142 }
143 }
144
145
146
147
148
149
150
151
152
153 public static void obtainAndCacheToken(final Connection conn,
154 User user)
155 throws IOException, InterruptedException {
156 try {
157 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
158
159 if (token == null) {
160 throw new IOException("No token returned for user " + user.getName());
161 }
162 if (LOG.isDebugEnabled()) {
163 LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
164 user.getName());
165 }
166 user.addToken(token);
167 } catch (IOException ioe) {
168 throw ioe;
169 } catch (InterruptedException ie) {
170 throw ie;
171 } catch (RuntimeException re) {
172 throw re;
173 } catch (Exception e) {
174 throw new UndeclaredThrowableException(e,
175 "Unexpected exception obtaining token for user " + user.getName());
176 }
177 }
178
179
180
181
182
183
184
185
186
187
188
189 @Deprecated
190 public static void obtainTokenForJob(final Configuration conf,
191 UserGroupInformation user, Job job)
192 throws IOException, InterruptedException {
193 Connection conn = ConnectionFactory.createConnection(conf);
194 try {
195 UserProvider userProvider = UserProvider.instantiate(conf);
196 obtainTokenForJob(conn, userProvider.create(user), job);
197 } finally {
198 conn.close();
199 }
200 }
201
202
203
204
205
206
207
208
209
210
211 public static void obtainTokenForJob(final Connection conn,
212 User user, Job job)
213 throws IOException, InterruptedException {
214 try {
215 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
216
217 if (token == null) {
218 throw new IOException("No token returned for user " + user.getName());
219 }
220 Text clusterId = getClusterId(token);
221 if (LOG.isDebugEnabled()) {
222 LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
223 user.getName() + " on cluster " + clusterId.toString());
224 }
225 job.getCredentials().addToken(clusterId, token);
226 } catch (IOException ioe) {
227 throw ioe;
228 } catch (InterruptedException ie) {
229 throw ie;
230 } catch (RuntimeException re) {
231 throw re;
232 } catch (Exception e) {
233 throw new UndeclaredThrowableException(e,
234 "Unexpected exception obtaining token for user " + user.getName());
235 }
236 }
237
238
239
240
241
242
243
244
245
246
247 @Deprecated
248 public static void obtainTokenForJob(final JobConf job,
249 UserGroupInformation user)
250 throws IOException, InterruptedException {
251 Connection conn = ConnectionFactory.createConnection(job);
252 try {
253 UserProvider userProvider = UserProvider.instantiate(job);
254 obtainTokenForJob(conn, job, userProvider.create(user));
255 } finally {
256 conn.close();
257 }
258 }
259
260
261
262
263
264
265
266
267
268
269 public static void obtainTokenForJob(final Connection conn, final JobConf job, User user)
270 throws IOException, InterruptedException {
271 try {
272 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
273
274 if (token == null) {
275 throw new IOException("No token returned for user " + user.getName());
276 }
277 Text clusterId = getClusterId(token);
278 if (LOG.isDebugEnabled()) {
279 LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
280 user.getName() + " on cluster " + clusterId.toString());
281 }
282 job.getCredentials().addToken(clusterId, token);
283 } catch (IOException ioe) {
284 throw ioe;
285 } catch (InterruptedException ie) {
286 throw ie;
287 } catch (RuntimeException re) {
288 throw re;
289 } catch (Exception e) {
290 throw new UndeclaredThrowableException(e,
291 "Unexpected exception obtaining token for user "+user.getName());
292 }
293 }
294
295
296
297
298
299
300
301
302
303
304
305 public static void addTokenForJob(final Connection conn, final JobConf job, User user)
306 throws IOException, InterruptedException {
307
308 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
309 if (token == null) {
310 token = obtainToken(conn, user);
311 }
312 job.getCredentials().addToken(token.getService(), token);
313 }
314
315
316
317
318
319
320
321
322
323
324
325 public static void addTokenForJob(final Connection conn, User user, Job job)
326 throws IOException, InterruptedException {
327 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
328 if (token == null) {
329 token = obtainToken(conn, user);
330 }
331 job.getCredentials().addToken(token.getService(), token);
332 }
333
334
335
336
337
338
339
340
341
342
343
344 public static boolean addTokenIfMissing(Connection conn, User user)
345 throws IOException, InterruptedException {
346 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
347 if (token == null) {
348 token = obtainToken(conn, user);
349 user.getUGI().addToken(token.getService(), token);
350 return true;
351 }
352 return false;
353 }
354
355
356
357
358
359 private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
360 throws IOException, InterruptedException {
361 ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
362 try {
363 String clusterId = ZKClusterId.readClusterIdZNode(zkw);
364 if (clusterId == null) {
365 throw new IOException("Failed to get cluster ID");
366 }
367 return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
368 } catch (KeeperException e) {
369 throw new IOException(e);
370 } finally {
371 zkw.close();
372 }
373 }
374 }