From f84e9ad9386cfd06a801a365bab1ea3918f9e520 Mon Sep 17 00:00:00 2001 From: Alexandre Archambault Date: Wed, 30 Dec 2015 01:34:32 +0100 Subject: [PATCH] Changes in files --- .../main/scala/coursier/cli/TermDisplay.scala | 4 +- .../src/main/scala/coursier/Fetch.scala | 82 +++++ .../main/scala/coursier/core/Repository.scala | 50 +-- .../main/scala/coursier/core/Resolution.scala | 59 +++- .../coursier/core/ResolutionProcess.scala | 15 +- .../scala/coursier/ivy/IvyRepository.scala | 3 +- .../coursier/maven/MavenRepository.scala | 12 +- fetch-js/src/main/scala/coursier/Fetch.scala | 26 -- .../src/main/scala/coursier/Platform.scala | 9 +- .../scala/scalaz/concurrent/package.scala | 19 +- files/src/main/scala/coursier/Fetch.scala | 29 -- files/src/main/scala/coursier/Files.scala | 321 +++++++++++------- files/src/main/scala/coursier/Platform.scala | 7 +- .../scala/coursier/test/CentralTests.scala | 2 +- .../scala/coursier/test/ResolutionTests.scala | 2 +- .../scala/coursier/test/TestRepository.scala | 2 +- web/src/main/scala/coursier/web/Backend.scala | 6 +- 17 files changed, 379 insertions(+), 269 deletions(-) create mode 100644 core/shared/src/main/scala/coursier/Fetch.scala delete mode 100644 fetch-js/src/main/scala/coursier/Fetch.scala delete mode 100644 files/src/main/scala/coursier/Fetch.scala diff --git a/cli/src/main/scala/coursier/cli/TermDisplay.scala b/cli/src/main/scala/coursier/cli/TermDisplay.scala index 62f98adc3..d973da446 100644 --- a/cli/src/main/scala/coursier/cli/TermDisplay.scala +++ b/cli/src/main/scala/coursier/cli/TermDisplay.scala @@ -1,6 +1,6 @@ package coursier.cli -import java.io.Writer +import java.io.{File, Writer} import java.util.concurrent._ import ammonite.terminal.{ TTY, Ansi } @@ -109,7 +109,7 @@ class TermDisplay(out: Writer) extends Logger { q.put(Right(())) } - override def downloadingArtifact(url: String): Unit = { + override def downloadingArtifact(url: String, file: File): Unit = { assert(!infos.containsKey(url)) val prev = infos.putIfAbsent(url, Info(0L, None)) assert(prev == null) diff --git a/core/shared/src/main/scala/coursier/Fetch.scala b/core/shared/src/main/scala/coursier/Fetch.scala new file mode 100644 index 000000000..7bd8a15e9 --- /dev/null +++ b/core/shared/src/main/scala/coursier/Fetch.scala @@ -0,0 +1,82 @@ +package coursier + +import scalaz._ + +object Fetch { + + type Content[F[_]] = Artifact => EitherT[F, String, String] + + + type MD = Seq[( + (Module, String), + Seq[String] \/ (Artifact.Source, Project) + )] + + type Metadata[F[_]] = Seq[(Module, String)] => F[MD] + + /** + * Try to find `module` among `repositories`. + * + * Look at `repositories` from the left, one-by-one, and stop at first success. + * Else, return all errors, in the same order. + * + * The `version` field of the returned `Project` in case of success may not be + * equal to the provided one, in case the latter is not a specific + * version (e.g. version interval). Which version get chosen depends on + * the repository implementation. + */ + def find[F[_]]( + repositories: Seq[Repository], + module: Module, + version: String, + fetch: Content[F] + )(implicit + F: Monad[F] + ): EitherT[F, Seq[String], (Artifact.Source, Project)] = { + + val lookups = repositories + .map(repo => repo -> repo.find(module, version, fetch).run) + + val task = lookups.foldLeft[F[Seq[String] \/ (Artifact.Source, Project)]](F.point(-\/(Nil))) { + case (acc, (repo, eitherProjTask)) => + F.bind(acc) { + case -\/(errors) => + F.map(eitherProjTask)(_.flatMap{case (source, project) => + if (project.module == module) \/-((source, project)) + else -\/(s"Wrong module returned (expected: $module, got: ${project.module})") + }.leftMap(error => error +: errors)) + + case res @ \/-(_) => + F.point(res) + } + } + + EitherT(F.map(task)(_.leftMap(_.reverse))) + .map {case x @ (_, proj) => + assert(proj.module == module) + x + } + } + + def apply[F[_]]( + repositories: Seq[core.Repository], + fetch: Content[F], + extra: Content[F]* + )(implicit + F: Nondeterminism[F] + ): Metadata[F] = { + + modVers => + F.map( + F.gatherUnordered( + modVers.map { case (module, version) => + def get(fetch: Content[F]) = + find(repositories, module, version, fetch) + F.map((get(fetch) /: extra)(_ orElse get(_)) + .run)((module, version) -> _) + } + ) + )(_.toSeq) + } + +} \ No newline at end of file diff --git a/core/shared/src/main/scala/coursier/core/Repository.scala b/core/shared/src/main/scala/coursier/core/Repository.scala index 69bcfa457..8c853874c 100644 --- a/core/shared/src/main/scala/coursier/core/Repository.scala +++ b/core/shared/src/main/scala/coursier/core/Repository.scala @@ -1,5 +1,7 @@ package coursier.core +import coursier.Fetch + import scala.language.higherKinds import scalaz._ @@ -10,7 +12,7 @@ trait Repository { def find[F[_]]( module: Module, version: String, - fetch: Repository.Fetch[F] + fetch: Fetch.Content[F] )(implicit F: Monad[F] ): EitherT[F, String, (Artifact.Source, Project)] @@ -18,52 +20,6 @@ trait Repository { object Repository { - type Fetch[F[_]] = Artifact => EitherT[F, String, String] - - /** - * Try to find `module` among `repositories`. - * - * Look at `repositories` from the left, one-by-one, and stop at first success. - * Else, return all errors, in the same order. - * - * The `version` field of the returned `Project` in case of success may not be - * equal to the provided one, in case the latter is not a specific - * version (e.g. version interval). Which version get chosen depends on - * the repository implementation. - */ - def find[F[_]]( - repositories: Seq[Repository], - module: Module, - version: String, - fetch: Repository.Fetch[F] - )(implicit - F: Monad[F] - ): EitherT[F, Seq[String], (Artifact.Source, Project)] = { - - val lookups = repositories - .map(repo => repo -> repo.find(module, version, fetch).run) - - val task = lookups.foldLeft[F[Seq[String] \/ (Artifact.Source, Project)]](F.point(-\/(Nil))) { - case (acc, (repo, eitherProjTask)) => - F.bind(acc) { - case -\/(errors) => - F.map(eitherProjTask)(_.flatMap{case (source, project) => - if (project.module == module) \/-((source, project)) - else -\/(s"Wrong module returned (expected: $module, got: ${project.module})") - }.leftMap(error => error +: errors)) - - case res @ \/-(_) => - F.point(res) - } - } - - EitherT(F.map(task)(_.leftMap(_.reverse))) - .map {case x @ (_, proj) => - assert(proj.module == module) - x - } - } - implicit class ArtifactExtensions(val underlying: Artifact) extends AnyVal { def withDefaultChecksums: Artifact = underlying.copy(checksumUrls = underlying.checksumUrls ++ Seq( diff --git a/core/shared/src/main/scala/coursier/core/Resolution.scala b/core/shared/src/main/scala/coursier/core/Resolution.scala index 89306f1ff..f9eb25759 100644 --- a/core/shared/src/main/scala/coursier/core/Resolution.scala +++ b/core/shared/src/main/scala/coursier/core/Resolution.scala @@ -165,33 +165,35 @@ object Resolution { def merge( dependencies: TraversableOnce[Dependency], forceVersions: Map[Module, String] - ): (Seq[Dependency], Seq[Dependency]) = { + ): (Seq[Dependency], Seq[Dependency], Map[Module, String]) = { val mergedByModVer = dependencies .toList .groupBy(dep => dep.module) .map { case (module, deps) => module -> { - forceVersions.get(module) match { + val (versionOpt, updatedDeps) = forceVersions.get(module) match { case None => - if (deps.lengthCompare(1) == 0) \/-(deps) + if (deps.lengthCompare(1) == 0) (Some(deps.head.version), \/-(deps)) else { val versions = deps .map(_.version) .distinct val versionOpt = mergeVersions(versions) - versionOpt match { + (versionOpt, versionOpt match { case Some(version) => \/-(deps.map(dep => dep.copy(version = version))) case None => -\/(deps) - } + }) } case Some(forcedVersion) => - \/-(deps.map(dep => dep.copy(version = forcedVersion))) + (Some(forcedVersion), \/-(deps.map(dep => dep.copy(version = forcedVersion)))) } + + (updatedDeps, versionOpt) } } @@ -201,11 +203,13 @@ object Resolution { ( merged - .collect{case -\/(dep) => dep} + .collect { case (-\/(dep), _) => dep } .flatten, merged - .collect{case \/-(dep) => dep} - .flatten + .collect { case (\/-(dep), _) => dep } + .flatten, + mergedByModVer + .collect { case (mod, (_, Some(ver))) => mod -> ver } ) } @@ -449,10 +453,10 @@ case class Resolution( * May contain dependencies added in previous iterations, but no more * required. These are filtered below, see `newDependencies`. * - * Returns a tuple made of the conflicting dependencies, and all - * the dependencies. + * Returns a tuple made of the conflicting dependencies, all + * the dependencies, and the retained version of each module. */ - def nextDependenciesAndConflicts: (Seq[Dependency], Seq[Dependency]) = + def nextDependenciesAndConflicts: (Seq[Dependency], Seq[Dependency], Map[Module, String]) = // TODO Provide the modules whose version was forced by dependency overrides too merge( rootDependencies.map(withDefaultConfig) ++ dependencies ++ transitiveDependencies, @@ -478,7 +482,7 @@ case class Resolution( */ def isDone: Boolean = { def isFixPoint = { - val (nextConflicts, _) = nextDependenciesAndConflicts + val (nextConflicts, _, _) = nextDependenciesAndConflicts dependencies == (newDependencies ++ nextConflicts) && conflicts == nextConflicts.toSet @@ -497,7 +501,7 @@ case class Resolution( * The versions of all the dependencies returned are erased (emptied). */ def reverseDependencies: Map[Dependency, Vector[Dependency]] = { - val (updatedConflicts, updatedDeps) = nextDependenciesAndConflicts + val (updatedConflicts, updatedDeps, _) = nextDependenciesAndConflicts val trDepsSeq = for { @@ -566,7 +570,7 @@ case class Resolution( } private def nextNoMissingUnsafe: Resolution = { - val (newConflicts, _) = nextDependenciesAndConflicts + val (newConflicts, _, _) = nextDependenciesAndConflicts copy( dependencies = newDependencies ++ newConflicts, @@ -752,7 +756,7 @@ case class Resolution( .artifacts(dep, proj) } yield artifact - def artifactsByDep: Seq[(Dependency, Artifact)] = + def dependencyArtifacts: Seq[(Dependency, Artifact)] = for { dep <- minDependencies.toSeq (source, proj) <- projectCache @@ -769,4 +773,27 @@ case class Resolution( .get(dep.moduleVersion) .toSeq } yield (dep, err) + + def part(dependencies: Set[Dependency]): Resolution = { + val (_, _, finalVersions) = nextDependenciesAndConflicts + + @tailrec def helper(current: Set[Dependency]): Set[Dependency] = { + val newDeps = current ++ current + .flatMap(finalDependencies0) + .map(dep => dep.copy(version = finalVersions.getOrElse(dep.module, dep.version))) + + val anyNewDep = (newDeps -- current).nonEmpty + + if (anyNewDep) + helper(newDeps) + else + newDeps + } + + copy( + rootDependencies = dependencies, + dependencies = helper(dependencies) + // don't know if something should be done about conflicts + ) + } } diff --git a/core/shared/src/main/scala/coursier/core/ResolutionProcess.scala b/core/shared/src/main/scala/coursier/core/ResolutionProcess.scala index 5e19adf25..640c0e499 100644 --- a/core/shared/src/main/scala/coursier/core/ResolutionProcess.scala +++ b/core/shared/src/main/scala/coursier/core/ResolutionProcess.scala @@ -7,7 +7,7 @@ import scala.annotation.tailrec sealed trait ResolutionProcess { def run[F[_]]( - fetch: ResolutionProcess.Fetch[F], + fetch: Fetch.Metadata[F], maxIterations: Int = -1 )(implicit F: Monad[F] @@ -34,7 +34,7 @@ sealed trait ResolutionProcess { } def next[F[_]]( - fetch: ResolutionProcess.Fetch[F] + fetch: Fetch.Metadata[F] )(implicit F: Monad[F] ): F[ResolutionProcess] = { @@ -58,7 +58,7 @@ case class Missing( cont: Resolution => ResolutionProcess ) extends ResolutionProcess { - def next(results: ResolutionProcess.FetchResult): ResolutionProcess = { + def next(results: Fetch.MD): ResolutionProcess = { val errors = results .collect{case (modVer, -\/(errs)) => modVer -> errs } @@ -72,7 +72,7 @@ case class Missing( val depMgmtMissing = depMgmtMissing0 -- results.map(_._1) def cont0(res: Resolution) = { - val res0 = + val res0 = successes.foldLeft(res){case (acc, (modVer, (source, proj))) => acc.copy(projectCache = acc.projectCache + ( modVer -> (source, acc.withDependencyManagement(proj)) @@ -121,12 +121,5 @@ object ResolutionProcess { else Missing(resolution0.missingFromCache.toSeq, resolution0, apply) } - - type FetchResult = Seq[( - (Module, String), - Seq[String] \/ (Artifact.Source, Project) - )] - - type Fetch[F[_]] = Seq[(Module, String)] => F[FetchResult] } diff --git a/core/shared/src/main/scala/coursier/ivy/IvyRepository.scala b/core/shared/src/main/scala/coursier/ivy/IvyRepository.scala index c49054e60..ab0c814c9 100644 --- a/core/shared/src/main/scala/coursier/ivy/IvyRepository.scala +++ b/core/shared/src/main/scala/coursier/ivy/IvyRepository.scala @@ -1,5 +1,6 @@ package coursier.ivy +import coursier.Fetch import coursier.core._ import scala.annotation.tailrec import scala.util.matching.Regex @@ -180,7 +181,7 @@ case class IvyRepository(pattern: String) extends Repository { def find[F[_]]( module: Module, version: String, - fetch: Repository.Fetch[F] + fetch: Fetch.Content[F] )(implicit F: Monad[F] ): EitherT[F, String, (Artifact.Source, Project)] = { diff --git a/core/shared/src/main/scala/coursier/maven/MavenRepository.scala b/core/shared/src/main/scala/coursier/maven/MavenRepository.scala index f8f3c7981..6fc0b02f3 100644 --- a/core/shared/src/main/scala/coursier/maven/MavenRepository.scala +++ b/core/shared/src/main/scala/coursier/maven/MavenRepository.scala @@ -1,5 +1,6 @@ package coursier.maven +import coursier.Fetch import coursier.core._ import coursier.core.compatibility.encodeURIComponent @@ -39,6 +40,7 @@ object MavenRepository { val defaultConfigurations = Map( "compile" -> Seq.empty, "runtime" -> Seq("compile"), + "default" -> Seq("runtime"), "test" -> Seq("runtime") ) @@ -141,7 +143,7 @@ case class MavenRepository( def versions[F[_]]( module: Module, - fetch: Repository.Fetch[F] + fetch: Fetch.Content[F] )(implicit F: Monad[F] ): EitherT[F, String, Versions] = @@ -163,7 +165,7 @@ case class MavenRepository( def snapshotVersioning[F[_]]( module: Module, version: String, - fetch: Repository.Fetch[F] + fetch: Fetch.Content[F] )(implicit F: Monad[F] ): EitherT[F, String, SnapshotVersioning] = { @@ -187,7 +189,7 @@ case class MavenRepository( def findNoInterval[F[_]]( module: Module, version: String, - fetch: Repository.Fetch[F] + fetch: Fetch.Content[F] )(implicit F: Monad[F] ): EitherT[F, String, Project] = @@ -226,7 +228,7 @@ case class MavenRepository( module: Module, version: String, versioningValue: Option[String], - fetch: Repository.Fetch[F] + fetch: Fetch.Content[F] )(implicit F: Monad[F] ): EitherT[F, String, Project] = { @@ -247,7 +249,7 @@ case class MavenRepository( def find[F[_]]( module: Module, version: String, - fetch: Repository.Fetch[F] + fetch: Fetch.Content[F] )(implicit F: Monad[F] ): EitherT[F, String, (Artifact.Source, Project)] = { diff --git a/fetch-js/src/main/scala/coursier/Fetch.scala b/fetch-js/src/main/scala/coursier/Fetch.scala deleted file mode 100644 index 3e7d220d5..000000000 --- a/fetch-js/src/main/scala/coursier/Fetch.scala +++ /dev/null @@ -1,26 +0,0 @@ -package coursier - -import scalaz.concurrent.Task - -object Fetch { - - implicit def default( - repositories: Seq[core.Repository] - ): ResolutionProcess.Fetch[Task] = - apply(repositories, Platform.artifact) - - def apply( - repositories: Seq[core.Repository], - fetch: Repository.Fetch[Task] - ): ResolutionProcess.Fetch[Task] = { - - modVers => Task.gatherUnordered( - modVers.map { case (module, version) => - Repository.find(repositories, module, version, fetch) - .run - .map((module, version) -> _) - } - ) - } - -} diff --git a/fetch-js/src/main/scala/coursier/Platform.scala b/fetch-js/src/main/scala/coursier/Platform.scala index 799c20fb5..70e386506 100644 --- a/fetch-js/src/main/scala/coursier/Platform.scala +++ b/fetch-js/src/main/scala/coursier/Platform.scala @@ -75,7 +75,7 @@ object Platform { p.future } - val artifact: Repository.Fetch[Task] = { artifact => + val artifact: Fetch.Content[Task] = { artifact => EitherT( Task { implicit ec => get(artifact.url) @@ -87,13 +87,18 @@ object Platform { ) } + implicit def fetch( + repositories: Seq[core.Repository] + ): Fetch.Metadata[Task] = + Fetch(repositories, Platform.artifact) + trait Logger { def fetching(url: String): Unit def fetched(url: String): Unit def other(url: String, msg: String): Unit } - def artifactWithLogger(logger: Logger): Repository.Fetch[Task] = { artifact => + def artifactWithLogger(logger: Logger): Fetch.Content[Task] = { artifact => EitherT( Task { implicit ec => Future(logger.fetching(artifact.url)) diff --git a/fetch-js/src/main/scala/scalaz/concurrent/package.scala b/fetch-js/src/main/scala/scalaz/concurrent/package.scala index 0a8a8591f..65cb276a6 100644 --- a/fetch-js/src/main/scala/scalaz/concurrent/package.scala +++ b/fetch-js/src/main/scala/scalaz/concurrent/package.scala @@ -2,7 +2,11 @@ package scalaz import scala.concurrent.{ ExecutionContext, Future } -/** Minimal Future-based Task */ +/** + * Minimal Future-based Task. + * + * Likely to be flawed, but does the job. + */ package object concurrent { trait Task[T] { self => @@ -32,10 +36,19 @@ package object concurrent { def runF(implicit ec: ExecutionContext) = Future.traverse(tasks)(_.runF) } - implicit val taskMonad: Monad[Task] = - new Monad[Task] { + implicit val taskMonad: Nondeterminism[Task] = + new Nondeterminism[Task] { def point[A](a: => A): Task[A] = Task.now(a) def bind[A,B](fa: Task[A])(f: A => Task[B]): Task[B] = fa.flatMap(f) + override def reduceUnordered[A, M](fs: Seq[Task[A]])(implicit R: Reducer[A, M]): Task[M] = + Task { implicit ec => + val f = Future.sequence(fs.map(_.runF)) + f.map { l => + (R.zero /: l)(R.snoc) + } + } + def chooseAny[A](head: Task[A], tail: Seq[Task[A]]): Task[(A, Seq[Task[A]])] = + ??? } } diff --git a/files/src/main/scala/coursier/Fetch.scala b/files/src/main/scala/coursier/Fetch.scala deleted file mode 100644 index 2c0ab0e81..000000000 --- a/files/src/main/scala/coursier/Fetch.scala +++ /dev/null @@ -1,29 +0,0 @@ -package coursier - -import scalaz.concurrent.Task - -object Fetch { - - implicit def default( - repositories: Seq[core.Repository] - ): ResolutionProcess.Fetch[Task] = - apply(repositories, Platform.artifact) - - def apply( - repositories: Seq[core.Repository], - fetch: Repository.Fetch[Task], - extra: Repository.Fetch[Task]* - ): ResolutionProcess.Fetch[Task] = { - - modVers => Task.gatherUnordered( - modVers.map { case (module, version) => - def get(fetch: Repository.Fetch[Task]) = - Repository.find(repositories, module, version, fetch) - (get(fetch) /: extra)(_ orElse get(_)) - .run - .map((module, version) -> _) - } - ) - } - -} diff --git a/files/src/main/scala/coursier/Files.scala b/files/src/main/scala/coursier/Files.scala index b7e6d7287..e798338d8 100644 --- a/files/src/main/scala/coursier/Files.scala +++ b/files/src/main/scala/coursier/Files.scala @@ -3,13 +3,13 @@ package coursier import java.net.URL import java.nio.channels.{ OverlappingFileLockException, FileLock } import java.security.MessageDigest -import java.util.concurrent.{ Executors, ExecutorService } +import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService} import scala.annotation.tailrec import scalaz._ import scalaz.concurrent.{ Task, Strategy } -import java.io._ +import java.io.{ Serializable => _, _ } case class Files( cache: Seq[(String, File)], @@ -17,6 +17,8 @@ case class Files( concurrentDownloadCount: Int = Files.defaultConcurrentDownloadCount ) { + import Files.urlLocks + lazy val defaultPool = Executors.newFixedThreadPool(concurrentDownloadCount, Strategy.DefaultDaemonThreadFactory) @@ -54,7 +56,7 @@ case class Files( def download( artifact: Artifact, - withChecksums: Boolean = true, + checksums: Set[String], logger: Option[Files.Logger] = None )(implicit cachePolicy: CachePolicy, @@ -64,90 +66,122 @@ case class Files( .extra .getOrElse("local", artifact) - val pairs = - Seq(artifact0.url -> artifact.url) ++ { - if (withChecksums) - (artifact0.checksumUrls.keySet intersect artifact.checksumUrls.keySet) - .toList - .map(sumType => artifact0.checksumUrls(sumType) -> artifact.checksumUrls(sumType)) - else - Nil + val checksumPairs = checksums + .intersect(artifact0.checksumUrls.keySet) + .intersect(artifact.checksumUrls.keySet) + .toSeq + .map(sumType => artifact0.checksumUrls(sumType) -> artifact.checksumUrls(sumType)) + + val pairs = (artifact0.url -> artifact.url) +: checksumPairs + + + def locally(file: File, url: String): EitherT[Task, FileError, File] = + EitherT { + Task { + if (file.exists()) { + logger.foreach(_.foundLocally(url, file)) + \/-(file) + } else + -\/(FileError.NotFound(file.toString): FileError) + } } + def downloadIfDifferent(file: File, url: String): EitherT[Task, FileError, Boolean] = { + ??? + } - def locally(file: File, url: String) = - Task { - if (file.exists()) { - logger.foreach(_.foundLocally(url, file)) - \/-(file) - } else - -\/(FileError.NotFound(file.toString): FileError) + def test = { + val t: Task[List[((File, String), FileError \/ Boolean)]] = Nondeterminism[Task].gather(checksumPairs.map { case (file, url) => + val f = new File(file) + downloadIfDifferent(f, url).run.map((f, url) -> _) + }) + + t.map { l => + val noChange = l.nonEmpty && l.forall { case (_, e) => e.exists(x => x) } + + val anyChange = l.exists { case (_, e) => e.exists(x => !x) } + val anyRecoverableError = l.exists { + case (_, -\/(err: FileError.Recoverable)) => true + case _ => false + } } - // FIXME Things can go wrong here and are not properly handled, + } + + // FIXME Things can go wrong here and are possibly not properly handled, // e.g. what if the connection gets closed during the transfer? // (partial file on disk?) - def remote(file: File, url: String) = - Task { - try { - logger.foreach(_.downloadingArtifact(url)) + def remote(file: File, url: String): EitherT[Task, FileError, File] = + EitherT { + Task { + try { + val o = new Object + val prev = urlLocks.putIfAbsent(url, o) + if (prev == null) { + logger.foreach(_.downloadingArtifact(url, file)) - val conn = new URL(url).openConnection() // FIXME Should this be closed? - // Dummy user-agent instead of the default "Java/...", - // so that we are not returned incomplete/erroneous metadata - // (Maven 2 compatibility? - happens for snapshot versioning metadata, - // this is SO FUCKING CRAZY) - conn.setRequestProperty("User-Agent", "") + val r = try { + val conn = new URL(url).openConnection() // FIXME Should this be closed? + // Dummy user-agent instead of the default "Java/...", + // so that we are not returned incomplete/erroneous metadata + // (Maven 2 compatibility? - happens for snapshot versioning metadata, + // this is SO FUCKING CRAZY) + conn.setRequestProperty("User-Agent", "") - for (len <- Option(conn.getContentLengthLong).filter(_ >= 0L)) - logger.foreach(_.downloadLength(url, len)) + for (len <- Option(conn.getContentLengthLong).filter(_ >= 0L)) + logger.foreach(_.downloadLength(url, len)) - val in = new BufferedInputStream(conn.getInputStream, Files.bufferSize) + val in = new BufferedInputStream(conn.getInputStream, Files.bufferSize) - val result = - try { - file.getParentFile.mkdirs() - val out = new FileOutputStream(file) - try { - var lock: FileLock = null - try { - lock = out.getChannel.tryLock() - if (lock == null) - -\/(FileError.Locked(file.toString)) - else { - val b = Array.fill[Byte](Files.bufferSize)(0) + val result = try { + file.getParentFile.mkdirs() + val out = new FileOutputStream(file) + try { + var lock: FileLock = null + try { + lock = out.getChannel.tryLock() + if (lock == null) + -\/(FileError.Locked(file)) + else { + val b = Array.fill[Byte](Files.bufferSize)(0) - @tailrec - def helper(count: Long): Unit = { - val read = in.read(b) - if (read >= 0) { - out.write(b, 0, read) - logger.foreach(_.downloadProgress(url, count + read)) - helper(count + read) + @tailrec + def helper(count: Long): Unit = { + val read = in.read(b) + if (read >= 0) { + out.write(b, 0, read) + out.flush() + logger.foreach(_.downloadProgress(url, count + read)) + helper(count + read) + } + } + + helper(0L) + \/-(file) } - } + } catch { case e: OverlappingFileLockException => + -\/(FileError.Locked(file)) + } finally if (lock != null) lock.release() + } finally out.close() + } finally in.close() - helper(0L) - \/-(file) - } - } - catch { - case e: OverlappingFileLockException => - -\/(FileError.Locked(file.toString)) - } - finally if (lock != null) lock.release() - } finally out.close() - } finally in.close() + for (lastModified <- Option(conn.getLastModified).filter(_ > 0L)) + file.setLastModified(lastModified) - for (lastModified <- Option(conn.getLastModified).filter(_ > 0L)) - file.setLastModified(lastModified) - - logger.foreach(_.downloadedArtifact(url, success = true)) - result - } - catch { case e: Exception => - logger.foreach(_.downloadedArtifact(url, success = false)) - -\/(FileError.DownloadError(e.getMessage)) + result + } catch { case e: Exception => + logger.foreach(_.downloadedArtifact(url, success = false)) + throw e + } finally { + urlLocks.remove(url) + } + logger.foreach(_.downloadedArtifact(url, success = true)) + r + } else + -\/(FileError.ConcurrentDownload(url)) + } catch { case e: Exception => + -\/(FileError.DownloadError(e.getMessage)) + } } } @@ -160,8 +194,8 @@ case class Files( assert(!f.startsWith("file:/"), s"Wrong file detection: $f, $url") cachePolicy[FileError \/ File]( _.isLeft )( - locally(file, url) )( - _ => remote(file, url) + locally(file, url).run )( + _ => remote(file, url).run ).map(e => (file, url) -> e.map(_ => ())) } else Task { @@ -182,75 +216,115 @@ case class Files( sumType: String )(implicit pool: ExecutorService = defaultPool - ): Task[FileError \/ Unit] = { + ): EitherT[Task, FileError, Unit] = { val artifact0 = withLocal(artifact) .extra .getOrElse("local", artifact) + EitherT { + artifact0.checksumUrls.get(sumType) match { + case Some(sumFile) => + Task { + val sum = scala.io.Source.fromFile(sumFile) + .getLines() + .toStream + .headOption + .mkString + .takeWhile(!_.isSpaceChar) - artifact0.checksumUrls.get(sumType) match { - case Some(sumFile) => - Task { - val sum = scala.io.Source.fromFile(sumFile) - .getLines() - .toStream - .headOption - .mkString - .takeWhile(!_.isSpaceChar) + val f = new File(artifact0.url) + val md = MessageDigest.getInstance(sumType) + val is = new FileInputStream(f) + val res = try { + var lock: FileLock = null + try { + lock = is.getChannel.tryLock(0L, Long.MaxValue, true) + if (lock == null) + -\/(FileError.Locked(f)) + else { + Files.withContent(is, md.update(_, 0, _)) + \/-(()) + } + } + catch { + case e: OverlappingFileLockException => + -\/(FileError.Locked(f)) + } + finally if (lock != null) lock.release() + } finally is.close() - val md = MessageDigest.getInstance(sumType) - val is = new FileInputStream(new File(artifact0.url)) - try Files.withContent(is, md.update(_, 0, _)) - finally is.close() + res.flatMap { _ => + val digest = md.digest() + val calculatedSum = f"${BigInt(1, digest)}%040x" - val digest = md.digest() - val calculatedSum = f"${BigInt(1, digest)}%040x" + if (sum == calculatedSum) + \/-(()) + else + -\/(FileError.WrongChecksum(sumType, calculatedSum, sum, artifact0.url, sumFile)) + } + } - if (sum == calculatedSum) - \/-(()) - else - -\/(FileError.WrongChecksum(sumType, calculatedSum, sum, artifact0.url, sumFile)) - } - - case None => - Task.now(-\/(FileError.ChecksumNotFound(sumType, artifact0.url))) + case None => + Task.now(-\/(FileError.ChecksumNotFound(sumType, artifact0.url))) + } } } def file( artifact: Artifact, - checksum: Option[String] = Some("SHA-1"), + checksums: Seq[Option[String]] = Seq(Some("SHA-1")), logger: Option[Files.Logger] = None )(implicit cachePolicy: CachePolicy, pool: ExecutorService = defaultPool - ): EitherT[Task, FileError, File] = - EitherT { - val res = download(artifact, withChecksums = checksum.nonEmpty, logger = logger).map { - results => - val ((f, _), res) = results.head - res.map(_ => f) - } + ): EitherT[Task, FileError, File] = { + val checksums0 = if (checksums.isEmpty) Seq(None) else checksums - checksum.fold(res) { sumType => - res.flatMap { - case err @ -\/(_) => Task.now(err) - case \/-(f) => - validateChecksum(artifact, sumType) - .map(_.map(_ => f)) + val res = EitherT { + download( + artifact, + checksums = checksums0.collect { case Some(c) => c }.toSet, + logger = logger + ).map { results => + val checksum = checksums0.find { + case None => true + case Some(c) => + artifact.checksumUrls.get(c).exists { cUrl => + results.exists { case ((_, u), b) => + u == cUrl && b.isRight + } + } + } + + val ((f, _), res) = results.head + res.flatMap { _ => + checksum match { + case None => + // FIXME All the checksums should be in the error, possibly with their URLs + // from artifact.checksumUrls + -\/(FileError.ChecksumNotFound(checksums0.last.get, "")) + case Some(c) => \/-((f, c)) + } } } } + res.flatMap { + case (f, None) => EitherT(Task.now[FileError \/ File](\/-(f))) + case (f, Some(c)) => + validateChecksum(artifact, c).map(_ => f) + } + } + def fetch( - checksum: Option[String] = Some("SHA-1"), + checksums: Seq[Option[String]] = Seq(Some("SHA-1")), logger: Option[Files.Logger] = None )(implicit cachePolicy: CachePolicy, pool: ExecutorService = defaultPool - ): Repository.Fetch[Task] = { + ): Fetch.Content[Task] = { artifact => - file(artifact, checksum = checksum, logger = logger)(cachePolicy).leftMap(_.message).map { f => + file(artifact, checksums = checksums, logger = logger)(cachePolicy).leftMap(_.message).map { f => // FIXME Catch error here? scala.io.Source.fromFile(f)("UTF-8").mkString } @@ -267,9 +341,11 @@ object Files { val defaultConcurrentDownloadCount = 6 + private val urlLocks = new ConcurrentHashMap[String, Object] + trait Logger { def foundLocally(url: String, f: File): Unit = {} - def downloadingArtifact(url: String): Unit = {} + def downloadingArtifact(url: String, file: File): Unit = {} def downloadLength(url: String, length: Long): Unit = {} def downloadProgress(url: String, downloaded: Long): Unit = {} def downloadedArtifact(url: String, success: Boolean): Unit = {} @@ -315,7 +391,7 @@ object Files { } -sealed trait FileError { +sealed trait FileError extends Product with Serializable { def message: String } @@ -327,9 +403,6 @@ object FileError { case class NotFound(file: String) extends FileError { def message = s"$file: not found" } - case class Locked(file: String) extends FileError { - def message = s"$file: locked" - } case class ChecksumNotFound(sumType: String, file: String) extends FileError { def message = s"$file: $sumType checksum not found" } @@ -337,4 +410,12 @@ object FileError { def message = s"$file: $sumType checksum validation failed" } + sealed trait Recoverable extends FileError + case class Locked(file: File) extends Recoverable { + def message = s"$file: locked" + } + case class ConcurrentDownload(url: String) extends Recoverable { + def message = s"$url: concurrent download" + } + } diff --git a/files/src/main/scala/coursier/Platform.scala b/files/src/main/scala/coursier/Platform.scala index c376fefe3..375372f6b 100644 --- a/files/src/main/scala/coursier/Platform.scala +++ b/files/src/main/scala/coursier/Platform.scala @@ -39,7 +39,7 @@ object Platform { } } - val artifact: Repository.Fetch[Task] = { artifact => + val artifact: Fetch.Content[Task] = { artifact => EitherT { val url = new URL(artifact.url) @@ -53,4 +53,9 @@ object Platform { } } + implicit def fetch( + repositories: Seq[core.Repository] + ): Fetch.Metadata[Task] = + Fetch(repositories, Platform.artifact) + } diff --git a/tests/shared/src/test/scala/coursier/test/CentralTests.scala b/tests/shared/src/test/scala/coursier/test/CentralTests.scala index 7ea36ea21..f8b4b85fc 100644 --- a/tests/shared/src/test/scala/coursier/test/CentralTests.scala +++ b/tests/shared/src/test/scala/coursier/test/CentralTests.scala @@ -4,7 +4,7 @@ package test import utest._ import scala.async.Async.{ async, await } -import coursier.Fetch.default +import coursier.Platform.fetch import coursier.test.compatibility._ object CentralTests extends TestSuite { diff --git a/tests/shared/src/test/scala/coursier/test/ResolutionTests.scala b/tests/shared/src/test/scala/coursier/test/ResolutionTests.scala index 627ed288b..123461fc1 100644 --- a/tests/shared/src/test/scala/coursier/test/ResolutionTests.scala +++ b/tests/shared/src/test/scala/coursier/test/ResolutionTests.scala @@ -17,7 +17,7 @@ object ResolutionTests extends TestSuite { ) = Resolution(deps, filter = filter, forceVersions = forceVersions) .process - .run(Fetch.default(repositories)) + .run(Platform.fetch(repositories)) .runF implicit class ProjectOps(val p: Project) extends AnyVal { diff --git a/tests/shared/src/test/scala/coursier/test/TestRepository.scala b/tests/shared/src/test/scala/coursier/test/TestRepository.scala index 85144d66c..fcbe0030f 100644 --- a/tests/shared/src/test/scala/coursier/test/TestRepository.scala +++ b/tests/shared/src/test/scala/coursier/test/TestRepository.scala @@ -13,7 +13,7 @@ class TestRepository(projects: Map[(Module, String), Project]) extends Repositor def find[F[_]]( module: Module, version: String, - fetch: Repository.Fetch[F] + fetch: Fetch.Content[F] )(implicit F: Monad[F] ) = diff --git a/web/src/main/scala/coursier/web/Backend.scala b/web/src/main/scala/coursier/web/Backend.scala index 8ae7d05a9..f68fbc07a 100644 --- a/web/src/main/scala/coursier/web/Backend.scala +++ b/web/src/main/scala/coursier/web/Backend.scala @@ -36,12 +36,12 @@ class Backend($: BackendScope[Unit, State]) { def fetch( repositories: Seq[core.Repository], - fetch: Repository.Fetch[Task] - ): ResolutionProcess.Fetch[Task] = { + fetch: Fetch.Content[Task] + ): Fetch.Metadata[Task] = { modVers => Task.gatherUnordered( modVers.map { case (module, version) => - Repository.find(repositories, module, version, fetch) + Fetch.find(repositories, module, version, fetch) .run .map((module, version) -> _) }