Watch downloads from other coursier

Rather than failing with lock error
This commit is contained in:
Alexandre Archambault 2017-06-05 18:57:24 +02:00
parent 5a53b8447e
commit 3ac230f0be
3 changed files with 252 additions and 56 deletions

View File

@ -159,7 +159,14 @@ object Cache {
}
}
def withLockFor[T](cache: File, file: File)(f: => FileError \/ T): FileError \/ T = {
private def withLockOr[T](
cache: File,
file: File
)(
f: => FileError \/ T,
ifLocked: => Option[FileError \/ T]
): FileError \/ T = {
val lockFile = new File(file.getParentFile, s"${file.getName}.lock")
var out: FileOutputStream = null
@ -169,30 +176,46 @@ object Cache {
out = new FileOutputStream(lockFile)
}
try {
var lock: FileLock = null
try {
lock = out.getChannel.tryLock()
if (lock == null)
-\/(FileError.Locked(file))
else
try f
finally {
lock.release()
lock = null
out.close()
out = null
lockFile.delete()
}
@tailrec
def loop(): FileError \/ T = {
val resOpt = {
var lock: FileLock = null
try {
lock = out.getChannel.tryLock()
if (lock == null)
ifLocked
else
try Some(f)
finally {
lock.release()
lock = null
out.close()
out = null
lockFile.delete()
}
}
catch {
case _: OverlappingFileLockException =>
ifLocked
}
finally if (lock != null) lock.release()
}
catch {
case e: OverlappingFileLockException =>
-\/(FileError.Locked(file))
resOpt match {
case Some(res) => res
case None =>
loop()
}
finally if (lock != null) lock.release()
} finally if (out != null) out.close()
}
try loop()
finally if (out != null) out.close()
}
def withLockFor[T](cache: File, file: File)(f: => FileError \/ T): FileError \/ T =
withLockOr(cache, file)(f, Some(-\/(FileError.Locked(file))))
private def defaultRetryCount = 3
@ -222,25 +245,16 @@ object Cache {
val res =
if (prev == null) {
logger.foreach(_.downloadingArtifact(url, file))
val res =
try \/-(f)
catch {
case nfe: FileNotFoundException if nfe.getMessage != null =>
logger.foreach(_.downloadedArtifact(url, success = false))
-\/(-\/(FileError.NotFound(nfe.getMessage)))
case e: Exception =>
logger.foreach(_.downloadedArtifact(url, success = false))
throw e
}
finally {
urlLocks.remove(url)
}
for (res0 <- res)
logger.foreach(_.downloadedArtifact(url, success = res0.isRight))
res.merge[FileError \/ T]
} else
-\/(FileError.ConcurrentDownload(url))
@ -394,18 +408,68 @@ object Cache {
}
}
private def contentLength(
url: String,
authentication: Option[Authentication],
logger0: Option[Logger]
): FileError \/ Option[Long] = {
val logger = logger0.map(Logger.Extended(_))
var conn: URLConnection = null
try {
conn = urlConnection(url, authentication)
conn match {
case c: HttpURLConnection =>
logger.foreach(_.gettingLength(url))
var success = false
try {
c.setRequestMethod("HEAD")
val len = Some(c.getContentLength) // TODO Use getContentLengthLong when switching to Java >= 7
.filter(_ >= 0)
.map(_.toLong)
// TODO 404 Not found could be checked here
success = true
logger.foreach(_.gettingLengthResult(url, len))
len.right
} finally {
if (!success)
logger.foreach(_.gettingLengthResult(url, None))
}
case other =>
-\/(FileError.DownloadError(s"Cannot do HEAD request with connection $other ($url)"))
}
} finally {
if (conn != null)
conn match {
case conn0: HttpURLConnection =>
conn0.disconnect()
case _ =>
}
}
}
private def download(
artifact: Artifact,
cache: File,
checksums: Set[String],
cachePolicy: CachePolicy,
pool: ExecutorService,
logger: Option[Logger] = None,
logger0: Option[Logger] = None,
ttl: Option[Duration] = defaultTtl
): Task[Seq[((File, String), FileError \/ Unit)]] = {
implicit val pool0 = pool
val logger = logger0.map(Logger.Extended(_))
// Reference file - if it exists, and we get not found errors on some URLs, we assume
// we can keep track of these missing, and not try to get them again later.
val referenceFileOpt = artifact
@ -585,9 +649,13 @@ object Cache {
): EitherT[Task, FileError, Unit] =
EitherT {
Task {
withLockFor(cache, file) {
val tmp = temporaryFile(file)
var lenOpt = Option.empty[Option[Long]]
def doDownload(): FileError \/ Unit =
downloading(url, file, logger) {
val tmp = temporaryFile(file)
val alreadyDownloaded = tmp.length()
@ -622,7 +690,7 @@ object Cache {
// TODO Use the safer getContentLengthLong when switching back to Java >= 7
for (len0 <- Option(conn.getContentLength) if len0 >= 0L) {
val len = len0 + (if (partialDownload) alreadyDownloaded else 0L)
logger.foreach(_.downloadLength(url, len, alreadyDownloaded))
logger.foreach(_.downloadLength(url, len, alreadyDownloaded, watching = false))
}
val in = new BufferedInputStream(conn.getInputStream, bufferSize)
@ -658,7 +726,60 @@ object Cache {
}
}
}
def checkDownload(): Option[FileError \/ Unit] = {
def progress(currentLen: Long): Unit =
if (lenOpt.isEmpty) {
lenOpt = Some(contentLength(url, artifact.authentication, logger).toOption.flatten)
for (o <- lenOpt; len <- o)
logger.foreach(_.downloadLength(url, len, currentLen, watching = true))
} else
logger.foreach(_.downloadProgress(url, currentLen))
def done(): Unit =
if (lenOpt.isEmpty) {
lenOpt = Some(contentLength(url, artifact.authentication, logger).toOption.flatten)
for (o <- lenOpt; len <- o)
logger.foreach(_.downloadLength(url, len, len, watching = true))
} else
for (o <- lenOpt; len <- o)
logger.foreach(_.downloadProgress(url, len))
if (file.exists()) {
done()
Some(().right)
} else {
// yes, Thread.sleep. 'tis our thread pool anyway.
// (And the various resources make it not straightforward to switch to a more Task-based internal API here.)
Thread.sleep(20L)
val currentLen = tmp.length()
if (currentLen == 0L && file.exists()) { // check again if file exists in case it was created in the mean time
done()
Some(().right)
} else {
progress(currentLen)
None
}
}
}
logger.foreach(_.downloadingArtifact(url, file))
var res: FileError \/ Unit = null
try {
res = withLockOr(cache, file)(
doDownload(),
checkDownload()
)
} finally {
logger.foreach(_.downloadedArtifact(url, success = res != null && res.isRight))
}
res
}
}
@ -938,7 +1059,7 @@ object Cache {
checksums = checksums0.collect { case Some(c) => c }.toSet,
cachePolicy,
pool,
logger = logger,
logger0 = logger,
ttl = ttl
).map { results =>
val checksum = checksums0.find {
@ -1106,10 +1227,11 @@ object Cache {
def downloadingArtifact(url: String, file: File): Unit = {}
@deprecated("Use / override the variant with 3 arguments instead", "1.0.0-M10")
@deprecated("extend Logger.Extended instead and use / override the variant with 4 arguments", "1.0.0-M10")
def downloadLength(url: String, length: Long): Unit = {}
def downloadLength(url: String, totalLength: Long, alreadyDownloaded: Long): Unit = {
downloadLength(url, totalLength)
@deprecated("extend Logger.Extended instead and use / override the variant with 4 arguments", "1.0.0-RC4")
def downloadLength(url: String, length: Long, alreadyDownloaded: Long): Unit = {
downloadLength(url, length)
}
def downloadProgress(url: String, downloaded: Long): Unit = {}
@ -1119,6 +1241,48 @@ object Cache {
def checkingUpdatesResult(url: String, currentTimeOpt: Option[Long], remoteTimeOpt: Option[Long]): Unit = {}
}
object Logger {
// adding new methods to this one, not to break bin compat in 2.10 / 2.11
abstract class Extended extends Logger {
def downloadLength(url: String, totalLength: Long, alreadyDownloaded: Long, watching: Boolean): Unit = {
downloadLength(url, totalLength, 0L)
}
def gettingLength(url: String): Unit = {}
def gettingLengthResult(url: String, length: Option[Long]): Unit = {}
}
object Extended {
def apply(logger: Logger): Extended =
logger match {
case e: Extended => e
case _ =>
new Extended {
override def foundLocally(url: String, f: File) =
logger.foundLocally(url, f)
override def downloadingArtifact(url: String, file: File) =
logger.downloadingArtifact(url, file)
override def downloadLength(url: String, length: Long) =
logger.downloadLength(url, length)
override def downloadLength(url: String, length: Long, alreadyDownloaded: Long) =
logger.downloadLength(url, length, alreadyDownloaded)
override def downloadProgress(url: String, downloaded: Long) =
logger.downloadProgress(url, downloaded)
override def downloadedArtifact(url: String, success: Boolean) =
logger.downloadedArtifact(url, success)
override def checkingUpdates(url: String, currentTimeOpt: Option[Long]) =
checkingUpdates(url, currentTimeOpt)
override def checkingUpdatesResult(url: String, currentTimeOpt: Option[Long], remoteTimeOpt: Option[Long]) =
checkingUpdatesResult(url, currentTimeOpt, remoteTimeOpt)
}
}
}
}
var bufferSize = 1024*1024
def readFullySync(is: InputStream) = {

View File

@ -76,7 +76,8 @@ object TermDisplay {
private sealed abstract class Info extends Product with Serializable {
def fraction: Option[Double]
def display(): String
def display(isDone: Boolean): String
def watching: Boolean
}
private final case class DownloadInfo(
@ -84,7 +85,8 @@ object TermDisplay {
previouslyDownloaded: Long,
length: Option[Long],
startTime: Long,
updateCheck: Boolean
updateCheck: Boolean,
watching: Boolean
) extends Info {
/** 0.0 to 1.0 */
def fraction: Option[Double] = length.map(downloaded.toDouble / _)
@ -110,13 +112,29 @@ object TermDisplay {
}
}
def display(): String = {
val decile = (10.0 * fraction.getOrElse(0.0)).toInt
assert(decile >= 0)
assert(decile <= 10)
def display(isDone: Boolean): String = {
fraction.fold(" " * 6)(p => f"${100.0 * p}%5.1f%%") +
" [" + ("#" * decile) + (" " * (10 - decile)) + "] " +
val actualFraction = fraction
.orElse(if (isDone) Some(1.0) else None)
.orElse(if (downloaded == 0L) Some(0.0) else None)
val start =
actualFraction match {
case None =>
val elem = if (watching) "." else "?"
s" [ $elem ] "
case Some(frac) =>
val elem = if (watching) "." else "#"
val decile = (10.0 * frac).toInt
assert(decile >= 0)
assert(decile <= 10)
f"${100.0 * frac}%5.1f%%" +
" [" + (elem * decile) + (" " * (10 - decile)) + "] "
}
start +
byteCount(downloaded) +
rate().fold("")(r => s" (${byteCount(r.toLong)} / s)")
}
@ -132,8 +150,9 @@ object TermDisplay {
remoteTimeOpt: Option[Long],
isDone: Boolean
) extends Info {
def watching = false
def fraction = None
def display(): String = {
def display(isDone: Boolean): String = {
if (isDone)
(currentTimeOpt, remoteTimeOpt) match {
case (Some(current), Some(remote)) =>
@ -215,7 +234,7 @@ object TermDisplay {
)(
update0: Info => Info
): Unit = {
downloads.synchronized {
val inf = downloads.synchronized {
downloads -= url
val info = infos.remove(url)
@ -223,11 +242,13 @@ object TermDisplay {
if (success)
doneQueue += (url -> update0(info))
info
}
if (fallbackMode && success) {
// FIXME What about concurrent accesses to out from the thread above?
out.write(fallbackMessage)
out.write((if (inf.watching) "(watching) " else "") + fallbackMessage)
out.flush()
}
@ -308,7 +329,7 @@ object TermDisplay {
(q, dw)
}
for ((url, info) <- done0 ++ downloads0) {
for (((url, info), isDone) <- done0.iterator.map((_, true)) ++ downloads0.iterator.map((_, false))) {
assert(info != null, s"Incoherent state ($url)")
if (!printedAnything0) {
@ -318,7 +339,7 @@ object TermDisplay {
truncatedPrintln(url)
out.clearLine(2)
out.write(s" ${info.display()}\n")
out.write(s" ${info.display(isDone)}\n")
}
val displayedCount = (done0 ++ downloads0).length
@ -401,7 +422,7 @@ object TermDisplay {
class TermDisplay(
out: Writer,
val fallbackMode: Boolean = TermDisplay.defaultFallbackMode
) extends Cache.Logger {
) extends Cache.Logger.Extended {
import TermDisplay._
@ -470,16 +491,20 @@ class TermDisplay(
override def downloadingArtifact(url: String, file: File): Unit =
updateRunnable.newEntry(
url,
DownloadInfo(0L, 0L, None, System.currentTimeMillis(), updateCheck = false),
DownloadInfo(0L, 0L, None, System.currentTimeMillis(), updateCheck = false, watching = false),
s"Downloading $url\n"
)
override def downloadLength(url: String, totalLength: Long, alreadyDownloaded: Long): Unit = {
override def downloadLength(url: String, totalLength: Long, alreadyDownloaded: Long, watching: Boolean): Unit = {
val info = updateRunnable.infos.get(url)
assert(info != null)
val newInfo = info match {
case info0: DownloadInfo =>
info0.copy(length = Some(totalLength), previouslyDownloaded = alreadyDownloaded)
info0.copy(
length = Some(totalLength),
previouslyDownloaded = alreadyDownloaded,
watching = watching
)
case _ =>
throw new Exception(s"Incoherent display state for $url")
}

View File

@ -43,7 +43,14 @@ object Mima {
mimaBinaryIssueFilters ++= {
import com.typesafe.tools.mima.core._
Seq()
Seq(
// these are private, don't know why they end-up appearing here
// (probably related to https://github.com/typesafehub/migration-manager/issues/34)
(pb: Problem) => pb.matchName.forall(!_.startsWith("coursier.TermDisplay#DownloadInfo")),
(pb: Problem) => pb.matchName.forall(!_.startsWith("coursier.TermDisplay$DownloadInfo")),
(pb: Problem) => pb.matchName.forall(!_.startsWith("coursier.TermDisplay#CheckUpdateInfo")),
(pb: Problem) => pb.matchName.forall(!_.startsWith("coursier.TermDisplay#Info"))
)
}
}