mirror of https://github.com/sbt/sbt.git
Review: the new configurable request timeout value should be `10.minutes` and should only be used for upload
This commit is contained in:
parent
084ca08f34
commit
8c0010a086
|
|
@ -10,18 +10,20 @@ package sbt
|
||||||
package internal
|
package internal
|
||||||
package sona
|
package sona
|
||||||
|
|
||||||
import gigahorse.*, support.apachehttp.Gigahorse
|
import gigahorse.*
|
||||||
import java.net.URLEncoder
|
import gigahorse.support.apachehttp.Gigahorse
|
||||||
import java.util.Base64
|
|
||||||
import java.nio.charset.StandardCharsets
|
|
||||||
import java.nio.file.Path
|
|
||||||
import sbt.util.Logger
|
import sbt.util.Logger
|
||||||
import sjsonnew.JsonFormat
|
import sjsonnew.JsonFormat
|
||||||
import sjsonnew.support.scalajson.unsafe.{ Converter, Parser }
|
|
||||||
import sjsonnew.shaded.scalajson.ast.unsafe.JValue
|
import sjsonnew.shaded.scalajson.ast.unsafe.JValue
|
||||||
|
import sjsonnew.support.scalajson.unsafe.{ Converter, Parser }
|
||||||
|
|
||||||
|
import java.net.URLEncoder
|
||||||
|
import java.nio.charset.StandardCharsets
|
||||||
|
import java.nio.file.Path
|
||||||
|
import java.util.Base64
|
||||||
import scala.annotation.nowarn
|
import scala.annotation.nowarn
|
||||||
import scala.concurrent.*, duration.*
|
import scala.concurrent.*
|
||||||
|
import scala.concurrent.duration.*
|
||||||
|
|
||||||
class Sona(client: SonaClient) extends AutoCloseable {
|
class Sona(client: SonaClient) extends AutoCloseable {
|
||||||
def uploadBundle(
|
def uploadBundle(
|
||||||
|
|
@ -36,15 +38,19 @@ class Sona(client: SonaClient) extends AutoCloseable {
|
||||||
def close(): Unit = client.close()
|
def close(): Unit = client.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
class SonaClient(reqTransform: Request => Request, requestTimeout: FiniteDuration)
|
class SonaClient(reqTransform: Request => Request, uploadRequestTimeout: FiniteDuration)
|
||||||
extends AutoCloseable {
|
extends AutoCloseable {
|
||||||
import SonaClient.baseUrl
|
import SonaClient.baseUrl
|
||||||
|
|
||||||
private val gigahorseConfig = Gigahorse.config
|
private val http = {
|
||||||
.withRequestTimeout(requestTimeout)
|
val defaultHttpRequestTimeout = 2.minutes
|
||||||
.withReadTimeout(requestTimeout)
|
|
||||||
|
|
||||||
private val http = Gigahorse.http(gigahorseConfig)
|
val gigahorseConfig = Gigahorse.config
|
||||||
|
.withRequestTimeout(defaultHttpRequestTimeout)
|
||||||
|
.withReadTimeout(defaultHttpRequestTimeout)
|
||||||
|
|
||||||
|
Gigahorse.http(gigahorseConfig)
|
||||||
|
}
|
||||||
|
|
||||||
def uploadBundle(
|
def uploadBundle(
|
||||||
bundleZipPath: Path,
|
bundleZipPath: Path,
|
||||||
|
|
@ -52,7 +58,12 @@ class SonaClient(reqTransform: Request => Request, requestTimeout: FiniteDuratio
|
||||||
publishingType: PublishingType,
|
publishingType: PublishingType,
|
||||||
log: Logger,
|
log: Logger,
|
||||||
): String = {
|
): String = {
|
||||||
val res = retryF(maxAttempt = 2) { (attempt: Int) =>
|
val maxAttempt = 2
|
||||||
|
val waitDurationBetweenAtttempt = 5.seconds
|
||||||
|
// Adding an extra 5.seconds as security margins
|
||||||
|
val totalAwaitDuration = maxAttempt * uploadRequestTimeout + maxAttempt * waitDurationBetweenAtttempt + 5.seconds
|
||||||
|
|
||||||
|
val res = retryF(maxAttempt, waitDurationBetweenAtttempt) { (attempt: Int) =>
|
||||||
log.info(s"uploading bundle to the Central Portal (attempt: $attempt)")
|
log.info(s"uploading bundle to the Central Portal (attempt: $attempt)")
|
||||||
// addQuery string doesn't work for post
|
// addQuery string doesn't work for post
|
||||||
val q = queryString(
|
val q = queryString(
|
||||||
|
|
@ -69,12 +80,13 @@ class SonaClient(reqTransform: Request => Request, requestTimeout: FiniteDuratio
|
||||||
FormPart("bundle", bundleZipPath.toFile())
|
FormPart("bundle", bundleZipPath.toFile())
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
.withRequestTimeout(uploadRequestTimeout)
|
||||||
http.run(reqTransform(req), Gigahorse.asString)
|
http.run(reqTransform(req), Gigahorse.asString)
|
||||||
}
|
}
|
||||||
awaitWithMessage(res, "uploading...", log)
|
awaitWithMessage(res, "uploading...", log, totalAwaitDuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
def queryString(kv: (String, String)*): String =
|
private def queryString(kv: (String, String)*): String =
|
||||||
kv.map {
|
kv.map {
|
||||||
case (k, v) =>
|
case (k, v) =>
|
||||||
val encodedV = URLEncoder.encode(v, "UTF-8")
|
val encodedV = URLEncoder.encode(v, "UTF-8")
|
||||||
|
|
@ -110,16 +122,16 @@ class SonaClient(reqTransform: Request => Request, requestTimeout: FiniteDuratio
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def deploymentStatus(deploymentId: String): PublisherStatus = {
|
private def deploymentStatus(deploymentId: String): PublisherStatus = {
|
||||||
val res = retryF(maxAttempt = 5) { (attempt: Int) =>
|
val res = retryF(maxAttempt = 5, waitDurationBetweenAttempt = 5.seconds) { (attempt: Int) =>
|
||||||
deploymentStatusF(deploymentId)
|
deploymentStatusF(deploymentId)
|
||||||
}
|
}
|
||||||
Await.result(res, 600.seconds)
|
Await.result(res, 10.minutes)
|
||||||
}
|
}
|
||||||
|
|
||||||
/** https://central.sonatype.org/publish/publish-portal-api/#verify-status-of-the-deployment
|
/** https://central.sonatype.org/publish/publish-portal-api/#verify-status-of-the-deployment
|
||||||
*/
|
*/
|
||||||
def deploymentStatusF(deploymentId: String): Future[PublisherStatus] = {
|
private def deploymentStatusF(deploymentId: String): Future[PublisherStatus] = {
|
||||||
val req = Gigahorse
|
val req = Gigahorse
|
||||||
.url(s"${baseUrl}/publisher/status")
|
.url(s"${baseUrl}/publisher/status")
|
||||||
.addQueryString("id" -> deploymentId)
|
.addQueryString("id" -> deploymentId)
|
||||||
|
|
@ -130,43 +142,52 @@ class SonaClient(reqTransform: Request => Request, requestTimeout: FiniteDuratio
|
||||||
/** Retry future function on any error.
|
/** Retry future function on any error.
|
||||||
*/
|
*/
|
||||||
@nowarn
|
@nowarn
|
||||||
def retryF[A1](maxAttempt: Int)(f: Int => Future[A1]): Future[A1] = {
|
private def retryF[A1](maxAttempt: Int, waitDurationBetweenAttempt: FiniteDuration)(
|
||||||
|
f: Int => Future[A1]
|
||||||
|
): Future[A1] = {
|
||||||
import scala.concurrent.ExecutionContext.Implicits.*
|
import scala.concurrent.ExecutionContext.Implicits.*
|
||||||
def impl(retry: Int): Future[A1] = {
|
def impl(retry: Int): Future[A1] = {
|
||||||
val res = f(retry + 1)
|
val res = f(retry + 1)
|
||||||
res.recoverWith {
|
res.recoverWith {
|
||||||
case _ if retry < maxAttempt =>
|
case _ if retry < maxAttempt =>
|
||||||
Thread.sleep(5000)
|
sleep(waitDurationBetweenAttempt).flatMap(_ => impl(retry + 1))
|
||||||
impl(retry + 1)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
impl(0)
|
impl(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
def awaitWithMessage[A1](f: Future[A1], msg: String, log: Logger): A1 = {
|
private def awaitWithMessage[A1](
|
||||||
|
f: Future[A1],
|
||||||
|
msg: String,
|
||||||
|
log: Logger,
|
||||||
|
awaitDuration: FiniteDuration,
|
||||||
|
): A1 = {
|
||||||
import scala.concurrent.ExecutionContext.Implicits.*
|
import scala.concurrent.ExecutionContext.Implicits.*
|
||||||
def loop(attempt: Int): Unit =
|
def logLoop(attempt: Int): Unit =
|
||||||
if (!f.isCompleted) {
|
if (!f.isCompleted) {
|
||||||
if (attempt > 0) {
|
if (attempt > 0) {
|
||||||
log.info(msg)
|
log.info(msg)
|
||||||
}
|
}
|
||||||
Future {
|
sleep(30.second).foreach(_ => logLoop(attempt + 1))
|
||||||
blocking {
|
|
||||||
Thread.sleep(30.second.toMillis)
|
|
||||||
}
|
|
||||||
}.foreach(_ => loop(attempt + 1))
|
|
||||||
} else ()
|
} else ()
|
||||||
loop(0)
|
logLoop(0)
|
||||||
Await.result(f, requestTimeout + 5.seconds)
|
Await.result(f, awaitDuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
def close(): Unit = http.close()
|
def close(): Unit = http.close()
|
||||||
|
|
||||||
|
private def sleep(duration: FiniteDuration)(implicit executor: ExecutionContext): Future[Unit] =
|
||||||
|
Future {
|
||||||
|
blocking {
|
||||||
|
Thread.sleep(duration.toMillis)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object Sona {
|
object Sona {
|
||||||
def host: String = SonaClient.host
|
def host: String = SonaClient.host
|
||||||
def oauthClient(userName: String, userToken: String, requestTimeout: FiniteDuration): Sona =
|
def oauthClient(userName: String, userToken: String, uploadRequestTimeout: FiniteDuration): Sona =
|
||||||
new Sona(SonaClient.oauthClient(userName, userToken, requestTimeout))
|
new Sona(SonaClient.oauthClient(userName, userToken, uploadRequestTimeout))
|
||||||
}
|
}
|
||||||
|
|
||||||
object SonaClient {
|
object SonaClient {
|
||||||
|
|
@ -177,8 +198,12 @@ object SonaClient {
|
||||||
Parser.parseFromByteBuffer(r.bodyAsByteBuffer).get
|
Parser.parseFromByteBuffer(r.bodyAsByteBuffer).get
|
||||||
def as[A1: JsonFormat]: FullResponse => A1 = asJson.andThen(Converter.fromJsonUnsafe[A1])
|
def as[A1: JsonFormat]: FullResponse => A1 = asJson.andThen(Converter.fromJsonUnsafe[A1])
|
||||||
val asPublisherStatus: FullResponse => PublisherStatus = as[PublisherStatus]
|
val asPublisherStatus: FullResponse => PublisherStatus = as[PublisherStatus]
|
||||||
def oauthClient(userName: String, userToken: String, requestTimeout: FiniteDuration): SonaClient =
|
def oauthClient(
|
||||||
new SonaClient(OAuthClient(userName, userToken), requestTimeout)
|
userName: String,
|
||||||
|
userToken: String,
|
||||||
|
uploadRequestTimeout: FiniteDuration
|
||||||
|
): SonaClient =
|
||||||
|
new SonaClient(OAuthClient(userName, userToken), uploadRequestTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
private case class OAuthClient(userName: String, userToken: String)
|
private case class OAuthClient(userName: String, userToken: String)
|
||||||
|
|
|
||||||
|
|
@ -3108,7 +3108,7 @@ object Classpaths {
|
||||||
val uuid = UUID.randomUUID().toString().take(8)
|
val uuid = UUID.randomUUID().toString().take(8)
|
||||||
s"$o:$v:$uuid"
|
s"$o:$v:$uuid"
|
||||||
},
|
},
|
||||||
sonaRequestTimeout := 2.minutes,
|
sonaUploadRequestTimeout := 10.minutes,
|
||||||
)
|
)
|
||||||
|
|
||||||
@nowarn("cat=deprecation")
|
@nowarn("cat=deprecation")
|
||||||
|
|
|
||||||
|
|
@ -571,7 +571,7 @@ object Keys {
|
||||||
val sonaBundle = taskKey[File]("Local bundle for Sonatype publishing").withRank(DTask)
|
val sonaBundle = taskKey[File]("Local bundle for Sonatype publishing").withRank(DTask)
|
||||||
val localStaging = settingKey[Option[Resolver]]("Local staging resolver for Sonatype publishing").withRank(CSetting)
|
val localStaging = settingKey[Option[Resolver]]("Local staging resolver for Sonatype publishing").withRank(CSetting)
|
||||||
val sonaDeploymentName = settingKey[String]("The name used for deployment").withRank(DSetting)
|
val sonaDeploymentName = settingKey[String]("The name used for deployment").withRank(DSetting)
|
||||||
val sonaRequestTimeout = settingKey[FiniteDuration]("Request timeout for Sonatype publishing").withRank(DSetting)
|
val sonaUploadRequestTimeout = settingKey[FiniteDuration]("Request timeout for Sonatype publishing").withRank(DSetting)
|
||||||
|
|
||||||
val classifiersModule = taskKey[GetClassifiersModule]("classifiers-module").withRank(CTask)
|
val classifiersModule = taskKey[GetClassifiersModule]("classifiers-module").withRank(CTask)
|
||||||
val compatibilityWarningOptions = settingKey[CompatibilityWarningOptions]("Configures warnings around Maven incompatibility.").withRank(CSetting)
|
val compatibilityWarningOptions = settingKey[CompatibilityWarningOptions]("Configures warnings around Maven incompatibility.").withRank(CSetting)
|
||||||
|
|
|
||||||
|
|
@ -49,10 +49,10 @@ see https://www.scala-sbt.org/1.x/docs/Using-Sonatype.html for details.""")
|
||||||
s0.fail
|
s0.fail
|
||||||
} else {
|
} else {
|
||||||
val deploymentName = extracted.get(Keys.sonaDeploymentName)
|
val deploymentName = extracted.get(Keys.sonaDeploymentName)
|
||||||
val requestTimeout = extracted.get(Keys.sonaRequestTimeout)
|
val uploadRequestTimeout = extracted.get(Keys.sonaUploadRequestTimeout)
|
||||||
val (s1, bundle) = extracted.runTask(Keys.sonaBundle, s0)
|
val (s1, bundle) = extracted.runTask(Keys.sonaBundle, s0)
|
||||||
val (s2, creds) = extracted.runTask(Keys.credentials, s1)
|
val (s2, creds) = extracted.runTask(Keys.credentials, s1)
|
||||||
val client = fromCreds(creds, requestTimeout)
|
val client = fromCreds(creds, uploadRequestTimeout)
|
||||||
try {
|
try {
|
||||||
client.uploadBundle(bundle.toPath(), deploymentName, publishingType, log)
|
client.uploadBundle(bundle.toPath(), deploymentName, publishingType, log)
|
||||||
s2
|
s2
|
||||||
|
|
@ -62,10 +62,10 @@ see https://www.scala-sbt.org/1.x/docs/Using-Sonatype.html for details.""")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def fromCreds(creds: Seq[Credentials], requestTimeout: FiniteDuration): Sona = {
|
private def fromCreds(creds: Seq[Credentials], uploadRequestTimeout: FiniteDuration): Sona = {
|
||||||
val cred = Credentials
|
val cred = Credentials
|
||||||
.forHost(creds, Sona.host)
|
.forHost(creds, Sona.host)
|
||||||
.getOrElse(throw new MessageOnlyException(s"no credentials are found for ${Sona.host}"))
|
.getOrElse(throw new MessageOnlyException(s"no credentials are found for ${Sona.host}"))
|
||||||
Sona.oauthClient(cred.userName, cred.passwd, requestTimeout)
|
Sona.oauthClient(cred.userName, cred.passwd, uploadRequestTimeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue