sbt/cache/src/main/scala/coursier/Cache.scala

602 lines
18 KiB
Scala
Raw Normal View History

2015-06-25 01:18:57 +02:00
package coursier
2015-12-30 01:34:33 +01:00
import java.net.{HttpURLConnection, URL}
2015-07-06 02:48:26 +02:00
import java.nio.channels.{ OverlappingFileLockException, FileLock }
2015-12-30 01:34:48 +01:00
import java.nio.file.{ StandardCopyOption, Files => NioFiles }
import java.security.MessageDigest
2015-12-30 01:34:32 +01:00
import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService}
2015-06-25 01:18:57 +02:00
2015-12-31 16:26:18 +01:00
import coursier.ivy.IvyRepository
2015-06-25 01:18:57 +02:00
import scala.annotation.tailrec
import scalaz._
import scalaz.concurrent.{ Task, Strategy }
2015-06-25 01:18:57 +02:00
2015-12-30 01:34:32 +01:00
import java.io.{ Serializable => _, _ }
2015-06-25 01:18:57 +02:00
2015-12-30 01:34:41 +01:00
object Cache {
// Check SHA-1 if available, else be fine with no checksum
val defaultChecksums = Seq(Some("SHA-1"), None)
2015-12-30 01:34:43 +01:00
private def withLocal(artifact: Artifact, cache: Seq[(String, File)]): Artifact = {
def local(url: String) =
if (url.startsWith("file:///"))
url.stripPrefix("file://")
else if (url.startsWith("file:/"))
url.stripPrefix("file:")
2015-12-30 01:34:41 +01:00
else {
val localPathOpt = cache.collectFirst {
case (base, cacheDir) if url.startsWith(base) =>
cacheDir + "/" + url.stripPrefix(base)
}
2015-12-30 01:34:41 +01:00
localPathOpt.getOrElse {
// FIXME Means we were handed an artifact from repositories other than the known ones
println(cache.mkString("\n"))
println(url)
???
}
}
if (artifact.extra.contains("local"))
artifact
else
artifact.copy(extra = artifact.extra + ("local" ->
artifact.copy(
url = local(artifact.url),
checksumUrls = artifact.checksumUrls
.mapValues(local)
.toVector
.toMap,
extra = Map.empty
)
))
}
2015-12-30 01:34:48 +01:00
private def readFullyTo(
in: InputStream,
out: OutputStream,
logger: Option[Logger],
2015-12-30 01:34:49 +01:00
url: String,
alreadyDownloaded: Long
2015-12-30 01:34:48 +01:00
): Unit = {
val b = Array.fill[Byte](bufferSize)(0)
@tailrec
def helper(count: Long): Unit = {
val read = in.read(b)
if (read >= 0) {
out.write(b, 0, read)
out.flush()
logger.foreach(_.downloadProgress(url, count + read))
helper(count + read)
}
}
2015-12-30 01:34:49 +01:00
helper(alreadyDownloaded)
2015-12-30 01:34:48 +01:00
}
private def withLockFor[T](file: File)(f: => FileError \/ T): FileError \/ T = {
val lockFile = new File(file.getParentFile, s"${file.getName}.lock")
lockFile.getParentFile.mkdirs()
var out = new FileOutputStream(lockFile)
try {
var lock: FileLock = null
try {
lock = out.getChannel.tryLock()
if (lock == null)
-\/(FileError.Locked(file))
else
try f
finally {
lock.release()
lock = null
out.close()
out = null
lockFile.delete()
}
}
catch {
case e: OverlappingFileLockException =>
-\/(FileError.Locked(file))
}
finally if (lock != null) lock.release()
} finally if (out != null) out.close()
}
private def downloading[T](
url: String,
file: File,
logger: Option[Logger]
)(
f: => FileError \/ T
): FileError \/ T =
try {
val o = new Object
val prev = urlLocks.putIfAbsent(url, o)
if (prev == null) {
logger.foreach(_.downloadingArtifact(url, file))
val res =
try \/-(f)
catch {
case nfe: FileNotFoundException if nfe.getMessage != null =>
logger.foreach(_.downloadedArtifact(url, success = false))
-\/(-\/(FileError.NotFound(nfe.getMessage)))
case e: Exception =>
logger.foreach(_.downloadedArtifact(url, success = false))
throw e
2015-12-30 01:34:48 +01:00
}
finally {
urlLocks.remove(url)
}
for (res0 <- res)
logger.foreach(_.downloadedArtifact(url, success = res0.isRight))
2015-12-30 01:34:48 +01:00
res.merge
2015-12-30 01:34:48 +01:00
} else
-\/(FileError.ConcurrentDownload(url))
}
catch { case e: Exception =>
2015-12-30 01:34:49 +01:00
-\/(FileError.DownloadError(s"Caught $e (${e.getMessage})"))
2015-12-30 01:34:48 +01:00
}
private def temporaryFile(file: File): File = {
val dir = file.getParentFile
val name = file.getName
new File(dir, s"$name.part")
}
2015-12-30 01:34:49 +01:00
private val partialContentResponseCode = 206
2015-12-30 01:34:43 +01:00
private def download(
artifact: Artifact,
2015-12-30 01:34:41 +01:00
cache: Seq[(String, File)],
2015-12-30 01:34:32 +01:00
checksums: Set[String],
cachePolicy: CachePolicy,
2015-12-30 01:34:41 +01:00
pool: ExecutorService,
2015-12-30 01:34:41 +01:00
logger: Option[Logger] = None
): Task[Seq[((File, String), FileError \/ Unit)]] = {
2015-12-30 01:34:41 +01:00
implicit val pool0 = pool
val artifact0 = withLocal(artifact, cache)
.extra
.getOrElse("local", artifact)
2015-12-30 01:34:33 +01:00
val pairs =
Seq(artifact0.url -> artifact.url) ++ {
checksums
.intersect(artifact0.checksumUrls.keySet)
.intersect(artifact.checksumUrls.keySet)
.toSeq
.map(sumType => artifact0.checksumUrls(sumType) -> artifact.checksumUrls(sumType))
}
2015-12-30 01:34:33 +01:00
def urlConn(url: String) = {
val conn = new URL(url).openConnection() // FIXME Should this be closed?
// Dummy user-agent instead of the default "Java/...",
// so that we are not returned incomplete/erroneous metadata
// (Maven 2 compatibility? - happens for snapshot versioning metadata,
2015-12-30 01:34:43 +01:00
// this is SO FSCKING CRAZY)
2015-12-30 01:34:33 +01:00
conn.setRequestProperty("User-Agent", "")
conn
}
2015-12-30 01:34:32 +01:00
2015-12-30 01:34:33 +01:00
def fileLastModified(file: File): EitherT[Task, FileError, Option[Long]] =
EitherT {
Task {
\/- {
val lastModified = file.lastModified()
if (lastModified > 0L)
Some(lastModified)
else
None
} : FileError \/ Option[Long]
}
}
2015-12-30 01:34:33 +01:00
def urlLastModified(url: String): EitherT[Task, FileError, Option[Long]] =
EitherT {
Task {
urlConn(url) match {
case c: HttpURLConnection =>
c.setRequestMethod("HEAD")
val remoteLastModified = c.getLastModified
\/- {
if (remoteLastModified > 0L)
Some(remoteLastModified)
else
None
}
2015-11-29 20:21:45 +01:00
2015-12-30 01:34:33 +01:00
case other =>
-\/(FileError.DownloadError(s"Cannot do HEAD request with connection $other ($url)"))
}
}
2015-12-30 01:34:32 +01:00
}
2015-12-30 01:34:33 +01:00
def shouldDownload(file: File, url: String): EitherT[Task, FileError, Boolean] =
for {
fileLastModOpt <- fileLastModified(file)
urlLastModOpt <- urlLastModified(url)
} yield {
val fromDatesOpt = for {
fileLastMod <- fileLastModOpt
urlLastMod <- urlLastModOpt
} yield fileLastMod < urlLastMod
fromDatesOpt.getOrElse(true)
}
2015-12-30 01:34:32 +01:00
2015-12-30 01:34:33 +01:00
def remote(file: File, url: String): EitherT[Task, FileError, Unit] =
2015-12-30 01:34:32 +01:00
EitherT {
Task {
2015-12-30 01:34:48 +01:00
withLockFor(file) {
downloading(url, file, logger) {
2015-12-30 01:34:49 +01:00
val tmp = temporaryFile(file)
val alreadyDownloaded = tmp.length()
val conn0 = urlConn(url)
val (partialDownload, conn) = conn0 match {
case conn0: HttpURLConnection if alreadyDownloaded > 0L =>
conn0.setRequestProperty("Range", s"bytes=$alreadyDownloaded-")
if (conn0.getResponseCode == partialContentResponseCode) {
val ackRange = Option(conn0.getHeaderField("Content-Range")).getOrElse("")
2015-12-30 01:34:48 +01:00
2015-12-30 01:34:49 +01:00
if (ackRange.startsWith(s"bytes $alreadyDownloaded-"))
(true, conn0)
else
// unrecognized Content-Range header -> start a new connection with no resume
(false, urlConn(url))
} else
(false, conn0)
case _ => (false, conn0)
}
for (len0 <- Option(conn.getContentLengthLong) if len0 >= 0L) {
val len = len0 + (if (partialDownload) alreadyDownloaded else 0L)
2015-12-30 01:34:48 +01:00
logger.foreach(_.downloadLength(url, len))
2015-12-30 01:34:49 +01:00
}
2015-12-30 01:34:48 +01:00
val in = new BufferedInputStream(conn.getInputStream, bufferSize)
val result =
try {
tmp.getParentFile.mkdirs()
2015-12-30 01:34:49 +01:00
val out = new FileOutputStream(tmp, partialDownload)
try \/-(readFullyTo(in, out, logger, url, if (partialDownload) alreadyDownloaded else 0L))
2015-12-30 01:34:48 +01:00
finally out.close()
} finally in.close()
file.getParentFile.mkdirs()
NioFiles.move(tmp.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE)
for (lastModified <- Option(conn.getLastModified) if lastModified > 0L)
file.setLastModified(lastModified)
result
}
2015-12-30 01:34:32 +01:00
}
}
}
2015-12-30 01:34:33 +01:00
def checkFileExists(file: File, url: String): EitherT[Task, FileError, Unit] =
EitherT {
Task {
if (file.exists()) {
logger.foreach(_.foundLocally(url, file))
\/-(())
} else
-\/(FileError.NotFound(file.toString))
}
}
val tasks =
for ((f, url) <- pairs) yield {
val file = new File(f)
val res =
if (url.startsWith("file:/")) {
2016-01-11 21:20:55 +01:00
// for debug purposes, flaky with URL-encoded chars anyway
// def filtered(s: String) =
// s.stripPrefix("file:/").stripPrefix("//").stripSuffix("/")
// assert(
// filtered(url) == filtered(file.toURI.toString),
// s"URL: ${filtered(url)}, file: ${filtered(file.toURI.toString)}"
// )
2015-12-30 01:34:33 +01:00
checkFileExists(file, url)
} else
cachePolicy match {
case CachePolicy.LocalOnly =>
checkFileExists(file, url)
case CachePolicy.UpdateChanging | CachePolicy.Update =>
shouldDownload(file, url).flatMap {
case true =>
remote(file, url)
case false =>
EitherT(Task.now(\/-(()) : FileError \/ Unit))
}
case CachePolicy.FetchMissing =>
checkFileExists(file, url) orElse remote(file, url)
case CachePolicy.ForceDownload =>
2015-12-30 01:34:33 +01:00
remote(file, url)
}
2015-12-30 01:34:33 +01:00
res.run.map((file, url) -> _)
}
Nondeterminism[Task].gather(tasks)
}
def validateChecksum(
artifact: Artifact,
2015-12-30 01:34:41 +01:00
sumType: String,
cache: Seq[(String, File)],
pool: ExecutorService
2015-12-30 01:34:32 +01:00
): EitherT[Task, FileError, Unit] = {
2015-12-30 01:34:41 +01:00
implicit val pool0 = pool
val artifact0 = withLocal(artifact, cache)
.extra
.getOrElse("local", artifact)
2015-12-30 01:34:32 +01:00
EitherT {
artifact0.checksumUrls.get(sumType) match {
case Some(sumFile) =>
Task {
val sum = new String(NioFiles.readAllBytes(new File(sumFile).toPath), "UTF-8")
.linesIterator
2015-12-30 01:34:32 +01:00
.toStream
.headOption
.mkString
.takeWhile(!_.isSpaceChar)
val f = new File(artifact0.url)
val md = MessageDigest.getInstance(sumType)
val is = new FileInputStream(f)
val res = try {
var lock: FileLock = null
try {
lock = is.getChannel.tryLock(0L, Long.MaxValue, true)
if (lock == null)
-\/(FileError.Locked(f))
else {
2015-12-30 01:34:41 +01:00
withContent(is, md.update(_, 0, _))
2015-12-30 01:34:32 +01:00
\/-(())
}
}
catch {
case e: OverlappingFileLockException =>
-\/(FileError.Locked(f))
}
finally if (lock != null) lock.release()
} finally is.close()
res.flatMap { _ =>
val digest = md.digest()
val calculatedSum = f"${BigInt(1, digest)}%040x"
if (sum == calculatedSum)
\/-(())
else
-\/(FileError.WrongChecksum(sumType, calculatedSum, sum, artifact0.url, sumFile))
}
}
2015-12-30 01:34:32 +01:00
case None =>
Task.now(-\/(FileError.ChecksumNotFound(sumType, artifact0.url)))
}
2015-06-25 01:18:57 +02:00
}
}
def file(
artifact: Artifact,
2016-01-03 16:38:29 +01:00
cache: Seq[(String, File)] = default,
cachePolicy: CachePolicy = CachePolicy.FetchMissing,
checksums: Seq[Option[String]] = defaultChecksums,
2015-12-30 01:34:41 +01:00
logger: Option[Logger] = None,
pool: ExecutorService = defaultPool
2015-12-30 01:34:32 +01:00
): EitherT[Task, FileError, File] = {
2015-12-30 01:34:41 +01:00
implicit val pool0 = pool
2015-12-30 01:34:32 +01:00
val checksums0 = if (checksums.isEmpty) Seq(None) else checksums
val res = EitherT {
download(
artifact,
2015-12-30 01:34:41 +01:00
cache,
2015-12-30 01:34:32 +01:00
checksums = checksums0.collect { case Some(c) => c }.toSet,
2015-12-30 01:34:41 +01:00
cachePolicy,
pool,
2015-12-30 01:34:32 +01:00
logger = logger
).map { results =>
val checksum = checksums0.find {
case None => true
case Some(c) =>
artifact.checksumUrls.get(c).exists { cUrl =>
results.exists { case ((_, u), b) =>
u == cUrl && b.isRight
}
}
}
2015-12-30 01:34:32 +01:00
val ((f, _), res) = results.head
res.flatMap { _ =>
checksum match {
case None =>
// FIXME All the checksums should be in the error, possibly with their URLs
// from artifact.checksumUrls
-\/(FileError.ChecksumNotFound(checksums0.last.get, ""))
case Some(c) => \/-((f, c))
}
}
}
}
2015-12-30 01:34:32 +01:00
res.flatMap {
case (f, None) => EitherT(Task.now[FileError \/ File](\/-(f)))
case (f, Some(c)) =>
2015-12-30 01:34:41 +01:00
validateChecksum(artifact, c, cache, pool).map(_ => f)
2015-12-30 01:34:32 +01:00
}
}
2015-11-29 20:21:45 +01:00
def fetch(
2016-01-03 16:38:29 +01:00
cache: Seq[(String, File)] = default,
cachePolicy: CachePolicy = CachePolicy.FetchMissing,
checksums: Seq[Option[String]] = defaultChecksums,
2015-12-30 01:34:41 +01:00
logger: Option[Logger] = None,
pool: ExecutorService = defaultPool
2015-12-30 01:34:32 +01:00
): Fetch.Content[Task] = {
artifact =>
2015-12-30 01:34:41 +01:00
file(
artifact,
cache,
cachePolicy,
checksums = checksums,
logger = logger,
pool = pool
).leftMap(_.message).map { f =>
// FIXME Catch error here?
new String(NioFiles.readAllBytes(f.toPath), "UTF-8")
}
}
private lazy val ivy2HomeUri = {
// a bit touchy on Windows... - don't try to manually write down the URI with s"file://..."
val str = new File(sys.props("user.home") + "/.ivy2/").toURI.toString
if (str.endsWith("/"))
str
else
str + "/"
}
2015-12-31 16:26:18 +01:00
lazy val ivy2Local = IvyRepository(
ivy2HomeUri + "local/" +
2015-12-31 16:26:18 +01:00
"[organisation]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/" +
"[artifact](-[classifier]).[ext]",
dropInfoAttributes = true
)
lazy val ivy2Cache = IvyRepository(
ivy2HomeUri + "cache/" +
"(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[organisation]/[module]/[type]s/[artifact]-[revision](-[classifier]).[ext]",
metadataPatternOpt = Some(
ivy2HomeUri + "cache/" +
"(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[organisation]/[module]/[type]-[revision](-[classifier]).[ext]"
),
withChecksums = false,
withSignatures = false,
dropInfoAttributes = true
)
lazy val defaultBase = new File(
sys.env.getOrElse(
"COURSIER_CACHE",
sys.props("user.home") + "/.coursier/cache/v1"
)
).getAbsoluteFile
2016-01-03 16:38:29 +01:00
lazy val default = Seq(
"http://" -> new File(defaultBase, "http"),
"https://" -> new File(defaultBase, "https")
)
val defaultConcurrentDownloadCount = 6
2015-06-25 01:18:57 +02:00
2015-12-30 01:34:41 +01:00
lazy val defaultPool =
Executors.newFixedThreadPool(defaultConcurrentDownloadCount, Strategy.DefaultDaemonThreadFactory)
2015-12-30 01:34:32 +01:00
private val urlLocks = new ConcurrentHashMap[String, Object]
trait Logger {
2015-11-29 20:22:25 +01:00
def foundLocally(url: String, f: File): Unit = {}
2015-12-30 01:34:32 +01:00
def downloadingArtifact(url: String, file: File): Unit = {}
2015-11-29 20:22:25 +01:00
def downloadLength(url: String, length: Long): Unit = {}
def downloadProgress(url: String, downloaded: Long): Unit = {}
def downloadedArtifact(url: String, success: Boolean): Unit = {}
}
2015-06-25 01:18:57 +02:00
var bufferSize = 1024*1024
def readFullySync(is: InputStream) = {
val buffer = new ByteArrayOutputStream()
val data = Array.ofDim[Byte](16384)
var nRead = is.read(data, 0, data.length)
while (nRead != -1) {
buffer.write(data, 0, nRead)
nRead = is.read(data, 0, data.length)
}
buffer.flush()
buffer.toByteArray
}
def readFully(is: => InputStream) =
Task {
\/.fromTryCatchNonFatal {
val is0 = is
val b =
try readFullySync(is0)
finally is0.close()
new String(b, "UTF-8")
} .leftMap(_.getMessage)
}
def withContent(is: InputStream, f: (Array[Byte], Int) => Unit): Unit = {
val data = Array.ofDim[Byte](16384)
var nRead = is.read(data, 0, data.length)
while (nRead != -1) {
f(data, nRead)
nRead = is.read(data, 0, data.length)
}
}
}
2015-12-30 01:34:32 +01:00
sealed trait FileError extends Product with Serializable {
def message: String
}
object FileError {
case class DownloadError(message0: String) extends FileError {
def message = s"Download error: $message0"
}
case class NotFound(file: String) extends FileError {
2016-01-23 15:42:09 +01:00
def message = s"Not found: $file"
}
case class ChecksumNotFound(sumType: String, file: String) extends FileError {
2016-01-23 15:42:09 +01:00
def message = s"$sumType checksum not found: $file"
}
case class WrongChecksum(sumType: String, got: String, expected: String, file: String, sumFile: String) extends FileError {
2016-01-23 15:42:09 +01:00
def message = s"$sumType checksum validation failed: $file"
}
2015-12-30 01:34:32 +01:00
sealed trait Recoverable extends FileError
case class Locked(file: File) extends Recoverable {
2016-01-23 15:42:09 +01:00
def message = s"Locked: $file"
2015-12-30 01:34:32 +01:00
}
case class ConcurrentDownload(url: String) extends Recoverable {
2016-01-23 15:42:09 +01:00
def message = s"Concurrent download: $url"
2015-12-30 01:34:32 +01:00
}
2015-06-25 01:18:57 +02:00
}