From 3ac230f0beaee00f72b57d8df04323b6c9b5070a Mon Sep 17 00:00:00 2001 From: Alexandre Archambault Date: Mon, 5 Jun 2017 18:57:24 +0200 Subject: [PATCH] Watch downloads from other coursier Rather than failing with lock error --- cache/src/main/scala/coursier/Cache.scala | 240 +++++++++++++++--- .../src/main/scala/coursier/TermDisplay.scala | 59 +++-- project/Mima.scala | 9 +- 3 files changed, 252 insertions(+), 56 deletions(-) diff --git a/cache/src/main/scala/coursier/Cache.scala b/cache/src/main/scala/coursier/Cache.scala index 1cc86e997..f98842bb4 100644 --- a/cache/src/main/scala/coursier/Cache.scala +++ b/cache/src/main/scala/coursier/Cache.scala @@ -159,7 +159,14 @@ object Cache { } } - def withLockFor[T](cache: File, file: File)(f: => FileError \/ T): FileError \/ T = { + private def withLockOr[T]( + cache: File, + file: File + )( + f: => FileError \/ T, + ifLocked: => Option[FileError \/ T] + ): FileError \/ T = { + val lockFile = new File(file.getParentFile, s"${file.getName}.lock") var out: FileOutputStream = null @@ -169,30 +176,46 @@ object Cache { out = new FileOutputStream(lockFile) } - try { - var lock: FileLock = null - try { - lock = out.getChannel.tryLock() - if (lock == null) - -\/(FileError.Locked(file)) - else - try f - finally { - lock.release() - lock = null - out.close() - out = null - lockFile.delete() - } + @tailrec + def loop(): FileError \/ T = { + + val resOpt = { + var lock: FileLock = null + try { + lock = out.getChannel.tryLock() + if (lock == null) + ifLocked + else + try Some(f) + finally { + lock.release() + lock = null + out.close() + out = null + lockFile.delete() + } + } + catch { + case _: OverlappingFileLockException => + ifLocked + } + finally if (lock != null) lock.release() } - catch { - case e: OverlappingFileLockException => - -\/(FileError.Locked(file)) + + resOpt match { + case Some(res) => res + case None => + loop() } - finally if (lock != null) lock.release() - } finally if (out != null) out.close() + } + + try loop() + finally if (out != null) out.close() } + def withLockFor[T](cache: File, file: File)(f: => FileError \/ T): FileError \/ T = + withLockOr(cache, file)(f, Some(-\/(FileError.Locked(file)))) + private def defaultRetryCount = 3 @@ -222,25 +245,16 @@ object Cache { val res = if (prev == null) { - logger.foreach(_.downloadingArtifact(url, file)) - val res = try \/-(f) catch { case nfe: FileNotFoundException if nfe.getMessage != null => - logger.foreach(_.downloadedArtifact(url, success = false)) -\/(-\/(FileError.NotFound(nfe.getMessage))) - case e: Exception => - logger.foreach(_.downloadedArtifact(url, success = false)) - throw e } finally { urlLocks.remove(url) } - for (res0 <- res) - logger.foreach(_.downloadedArtifact(url, success = res0.isRight)) - res.merge[FileError \/ T] } else -\/(FileError.ConcurrentDownload(url)) @@ -394,18 +408,68 @@ object Cache { } } + private def contentLength( + url: String, + authentication: Option[Authentication], + logger0: Option[Logger] + ): FileError \/ Option[Long] = { + + val logger = logger0.map(Logger.Extended(_)) + + var conn: URLConnection = null + + try { + conn = urlConnection(url, authentication) + + conn match { + case c: HttpURLConnection => + logger.foreach(_.gettingLength(url)) + + var success = false + try { + c.setRequestMethod("HEAD") + val len = Some(c.getContentLength) // TODO Use getContentLengthLong when switching to Java >= 7 + .filter(_ >= 0) + .map(_.toLong) + + // TODO 404 Not found could be checked here + + success = true + logger.foreach(_.gettingLengthResult(url, len)) + + len.right + } finally { + if (!success) + logger.foreach(_.gettingLengthResult(url, None)) + } + + case other => + -\/(FileError.DownloadError(s"Cannot do HEAD request with connection $other ($url)")) + } + } finally { + if (conn != null) + conn match { + case conn0: HttpURLConnection => + conn0.disconnect() + case _ => + } + } + } + private def download( artifact: Artifact, cache: File, checksums: Set[String], cachePolicy: CachePolicy, pool: ExecutorService, - logger: Option[Logger] = None, + logger0: Option[Logger] = None, ttl: Option[Duration] = defaultTtl ): Task[Seq[((File, String), FileError \/ Unit)]] = { implicit val pool0 = pool + val logger = logger0.map(Logger.Extended(_)) + // Reference file - if it exists, and we get not found errors on some URLs, we assume // we can keep track of these missing, and not try to get them again later. val referenceFileOpt = artifact @@ -585,9 +649,13 @@ object Cache { ): EitherT[Task, FileError, Unit] = EitherT { Task { - withLockFor(cache, file) { + + val tmp = temporaryFile(file) + + var lenOpt = Option.empty[Option[Long]] + + def doDownload(): FileError \/ Unit = downloading(url, file, logger) { - val tmp = temporaryFile(file) val alreadyDownloaded = tmp.length() @@ -622,7 +690,7 @@ object Cache { // TODO Use the safer getContentLengthLong when switching back to Java >= 7 for (len0 <- Option(conn.getContentLength) if len0 >= 0L) { val len = len0 + (if (partialDownload) alreadyDownloaded else 0L) - logger.foreach(_.downloadLength(url, len, alreadyDownloaded)) + logger.foreach(_.downloadLength(url, len, alreadyDownloaded, watching = false)) } val in = new BufferedInputStream(conn.getInputStream, bufferSize) @@ -658,7 +726,60 @@ object Cache { } } } + + def checkDownload(): Option[FileError \/ Unit] = { + + def progress(currentLen: Long): Unit = + if (lenOpt.isEmpty) { + lenOpt = Some(contentLength(url, artifact.authentication, logger).toOption.flatten) + for (o <- lenOpt; len <- o) + logger.foreach(_.downloadLength(url, len, currentLen, watching = true)) + } else + logger.foreach(_.downloadProgress(url, currentLen)) + + def done(): Unit = + if (lenOpt.isEmpty) { + lenOpt = Some(contentLength(url, artifact.authentication, logger).toOption.flatten) + for (o <- lenOpt; len <- o) + logger.foreach(_.downloadLength(url, len, len, watching = true)) + } else + for (o <- lenOpt; len <- o) + logger.foreach(_.downloadProgress(url, len)) + + if (file.exists()) { + done() + Some(().right) + } else { + // yes, Thread.sleep. 'tis our thread pool anyway. + // (And the various resources make it not straightforward to switch to a more Task-based internal API here.) + Thread.sleep(20L) + + val currentLen = tmp.length() + + if (currentLen == 0L && file.exists()) { // check again if file exists in case it was created in the mean time + done() + Some(().right) + } else { + progress(currentLen) + None + } + } } + + logger.foreach(_.downloadingArtifact(url, file)) + + var res: FileError \/ Unit = null + + try { + res = withLockOr(cache, file)( + doDownload(), + checkDownload() + ) + } finally { + logger.foreach(_.downloadedArtifact(url, success = res != null && res.isRight)) + } + + res } } @@ -938,7 +1059,7 @@ object Cache { checksums = checksums0.collect { case Some(c) => c }.toSet, cachePolicy, pool, - logger = logger, + logger0 = logger, ttl = ttl ).map { results => val checksum = checksums0.find { @@ -1106,10 +1227,11 @@ object Cache { def downloadingArtifact(url: String, file: File): Unit = {} - @deprecated("Use / override the variant with 3 arguments instead", "1.0.0-M10") + @deprecated("extend Logger.Extended instead and use / override the variant with 4 arguments", "1.0.0-M10") def downloadLength(url: String, length: Long): Unit = {} - def downloadLength(url: String, totalLength: Long, alreadyDownloaded: Long): Unit = { - downloadLength(url, totalLength) + @deprecated("extend Logger.Extended instead and use / override the variant with 4 arguments", "1.0.0-RC4") + def downloadLength(url: String, length: Long, alreadyDownloaded: Long): Unit = { + downloadLength(url, length) } def downloadProgress(url: String, downloaded: Long): Unit = {} @@ -1119,6 +1241,48 @@ object Cache { def checkingUpdatesResult(url: String, currentTimeOpt: Option[Long], remoteTimeOpt: Option[Long]): Unit = {} } + object Logger { + // adding new methods to this one, not to break bin compat in 2.10 / 2.11 + abstract class Extended extends Logger { + def downloadLength(url: String, totalLength: Long, alreadyDownloaded: Long, watching: Boolean): Unit = { + downloadLength(url, totalLength, 0L) + } + + def gettingLength(url: String): Unit = {} + def gettingLengthResult(url: String, length: Option[Long]): Unit = {} + } + + object Extended { + def apply(logger: Logger): Extended = + logger match { + case e: Extended => e + case _ => + new Extended { + override def foundLocally(url: String, f: File) = + logger.foundLocally(url, f) + + override def downloadingArtifact(url: String, file: File) = + logger.downloadingArtifact(url, file) + + override def downloadLength(url: String, length: Long) = + logger.downloadLength(url, length) + override def downloadLength(url: String, length: Long, alreadyDownloaded: Long) = + logger.downloadLength(url, length, alreadyDownloaded) + + override def downloadProgress(url: String, downloaded: Long) = + logger.downloadProgress(url, downloaded) + + override def downloadedArtifact(url: String, success: Boolean) = + logger.downloadedArtifact(url, success) + override def checkingUpdates(url: String, currentTimeOpt: Option[Long]) = + checkingUpdates(url, currentTimeOpt) + override def checkingUpdatesResult(url: String, currentTimeOpt: Option[Long], remoteTimeOpt: Option[Long]) = + checkingUpdatesResult(url, currentTimeOpt, remoteTimeOpt) + } + } + } + } + var bufferSize = 1024*1024 def readFullySync(is: InputStream) = { diff --git a/cache/src/main/scala/coursier/TermDisplay.scala b/cache/src/main/scala/coursier/TermDisplay.scala index 59e8abe88..c77a0955f 100644 --- a/cache/src/main/scala/coursier/TermDisplay.scala +++ b/cache/src/main/scala/coursier/TermDisplay.scala @@ -76,7 +76,8 @@ object TermDisplay { private sealed abstract class Info extends Product with Serializable { def fraction: Option[Double] - def display(): String + def display(isDone: Boolean): String + def watching: Boolean } private final case class DownloadInfo( @@ -84,7 +85,8 @@ object TermDisplay { previouslyDownloaded: Long, length: Option[Long], startTime: Long, - updateCheck: Boolean + updateCheck: Boolean, + watching: Boolean ) extends Info { /** 0.0 to 1.0 */ def fraction: Option[Double] = length.map(downloaded.toDouble / _) @@ -110,13 +112,29 @@ object TermDisplay { } } - def display(): String = { - val decile = (10.0 * fraction.getOrElse(0.0)).toInt - assert(decile >= 0) - assert(decile <= 10) + def display(isDone: Boolean): String = { - fraction.fold(" " * 6)(p => f"${100.0 * p}%5.1f%%") + - " [" + ("#" * decile) + (" " * (10 - decile)) + "] " + + val actualFraction = fraction + .orElse(if (isDone) Some(1.0) else None) + .orElse(if (downloaded == 0L) Some(0.0) else None) + + val start = + actualFraction match { + case None => + val elem = if (watching) "." else "?" + s" [ $elem ] " + case Some(frac) => + val elem = if (watching) "." else "#" + + val decile = (10.0 * frac).toInt + assert(decile >= 0) + assert(decile <= 10) + + f"${100.0 * frac}%5.1f%%" + + " [" + (elem * decile) + (" " * (10 - decile)) + "] " + } + + start + byteCount(downloaded) + rate().fold("")(r => s" (${byteCount(r.toLong)} / s)") } @@ -132,8 +150,9 @@ object TermDisplay { remoteTimeOpt: Option[Long], isDone: Boolean ) extends Info { + def watching = false def fraction = None - def display(): String = { + def display(isDone: Boolean): String = { if (isDone) (currentTimeOpt, remoteTimeOpt) match { case (Some(current), Some(remote)) => @@ -215,7 +234,7 @@ object TermDisplay { )( update0: Info => Info ): Unit = { - downloads.synchronized { + val inf = downloads.synchronized { downloads -= url val info = infos.remove(url) @@ -223,11 +242,13 @@ object TermDisplay { if (success) doneQueue += (url -> update0(info)) + + info } if (fallbackMode && success) { // FIXME What about concurrent accesses to out from the thread above? - out.write(fallbackMessage) + out.write((if (inf.watching) "(watching) " else "") + fallbackMessage) out.flush() } @@ -308,7 +329,7 @@ object TermDisplay { (q, dw) } - for ((url, info) <- done0 ++ downloads0) { + for (((url, info), isDone) <- done0.iterator.map((_, true)) ++ downloads0.iterator.map((_, false))) { assert(info != null, s"Incoherent state ($url)") if (!printedAnything0) { @@ -318,7 +339,7 @@ object TermDisplay { truncatedPrintln(url) out.clearLine(2) - out.write(s" ${info.display()}\n") + out.write(s" ${info.display(isDone)}\n") } val displayedCount = (done0 ++ downloads0).length @@ -401,7 +422,7 @@ object TermDisplay { class TermDisplay( out: Writer, val fallbackMode: Boolean = TermDisplay.defaultFallbackMode -) extends Cache.Logger { +) extends Cache.Logger.Extended { import TermDisplay._ @@ -470,16 +491,20 @@ class TermDisplay( override def downloadingArtifact(url: String, file: File): Unit = updateRunnable.newEntry( url, - DownloadInfo(0L, 0L, None, System.currentTimeMillis(), updateCheck = false), + DownloadInfo(0L, 0L, None, System.currentTimeMillis(), updateCheck = false, watching = false), s"Downloading $url\n" ) - override def downloadLength(url: String, totalLength: Long, alreadyDownloaded: Long): Unit = { + override def downloadLength(url: String, totalLength: Long, alreadyDownloaded: Long, watching: Boolean): Unit = { val info = updateRunnable.infos.get(url) assert(info != null) val newInfo = info match { case info0: DownloadInfo => - info0.copy(length = Some(totalLength), previouslyDownloaded = alreadyDownloaded) + info0.copy( + length = Some(totalLength), + previouslyDownloaded = alreadyDownloaded, + watching = watching + ) case _ => throw new Exception(s"Incoherent display state for $url") } diff --git a/project/Mima.scala b/project/Mima.scala index 8bf084a89..123f30462 100644 --- a/project/Mima.scala +++ b/project/Mima.scala @@ -43,7 +43,14 @@ object Mima { mimaBinaryIssueFilters ++= { import com.typesafe.tools.mima.core._ - Seq() + Seq( + // these are private, don't know why they end-up appearing here + // (probably related to https://github.com/typesafehub/migration-manager/issues/34) + (pb: Problem) => pb.matchName.forall(!_.startsWith("coursier.TermDisplay#DownloadInfo")), + (pb: Problem) => pb.matchName.forall(!_.startsWith("coursier.TermDisplay$DownloadInfo")), + (pb: Problem) => pb.matchName.forall(!_.startsWith("coursier.TermDisplay#CheckUpdateInfo")), + (pb: Problem) => pb.matchName.forall(!_.startsWith("coursier.TermDisplay#Info")) + ) } }