Skip to content

Commit 6af47b9

Browse files
author
Marcelo Vanzin
committed
[SPARK-28150][CORE] Log in user before getting delegation tokens.
This ensures that tokens are always created with an empty UGI, which allows multiple contexts to be (sequentially) started from the same JVM. Tested with code attached to the bug, and also usual kerberos tests. Closes apache#24955 from vanzin/SPARK-28150. Authored-by: Marcelo Vanzin <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent cded421 commit 6af47b9

File tree

1 file changed

+20
-6
lines changed

1 file changed

+20
-6
lines changed

core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala

+20-6
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,27 @@ private[spark] class HadoopDelegationTokenManager(
136136

137137
/**
138138
* Fetch new delegation tokens for configured services, storing them in the given credentials.
139-
* Tokens are fetched for the current logged in user.
140139
*
141140
* @param creds Credentials object where to store the delegation tokens.
142-
* @return The time by which the tokens must be renewed.
143141
*/
144-
def obtainDelegationTokens(creds: Credentials): Long = {
145-
delegationTokenProviders.values.flatMap { provider =>
142+
def obtainDelegationTokens(creds: Credentials): Unit = {
143+
val freshUGI = doLogin()
144+
freshUGI.doAs(new PrivilegedExceptionAction[Unit]() {
145+
override def run(): Unit = {
146+
val (newTokens, _) = obtainDelegationTokens()
147+
creds.addAll(newTokens)
148+
}
149+
})
150+
}
151+
152+
/**
153+
* Fetch new delegation tokens for configured services.
154+
*
155+
* @return 2-tuple (credentials with new tokens, time by which the tokens must be renewed)
156+
*/
157+
private def obtainDelegationTokens(): (Credentials, Long) = {
158+
val creds = new Credentials()
159+
val nextRenewal = delegationTokenProviders.values.flatMap { provider =>
146160
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
147161
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
148162
} else {
@@ -151,6 +165,7 @@ private[spark] class HadoopDelegationTokenManager(
151165
None
152166
}
153167
}.foldLeft(Long.MaxValue)(math.min)
168+
(creds, nextRenewal)
154169
}
155170

156171
// Visible for testing.
@@ -228,8 +243,7 @@ private[spark] class HadoopDelegationTokenManager(
228243
private def obtainTokensAndScheduleRenewal(ugi: UserGroupInformation): Credentials = {
229244
ugi.doAs(new PrivilegedExceptionAction[Credentials]() {
230245
override def run(): Credentials = {
231-
val creds = new Credentials()
232-
val nextRenewal = obtainDelegationTokens(creds)
246+
val (creds, nextRenewal) = obtainDelegationTokens()
233247

234248
// Calculate the time when new credentials should be created, based on the configured
235249
// ratio.

0 commit comments

Comments
 (0)