From e8af9e7aba76ec996d1ef7428061a377f9f363b6 Mon Sep 17 00:00:00 2001 From: Alexandre Archambault Date: Wed, 9 Nov 2016 10:19:38 -0800 Subject: [PATCH] Parallelize scaladex lookups --- .../main/scala-2.11/coursier/cli/Helper.scala | 13 +-- .../coursier/cli/scaladex/Scaladex.scala | 82 ++++++++++--------- 2 files changed, 50 insertions(+), 45 deletions(-) diff --git a/cli/src/main/scala-2.11/coursier/cli/Helper.scala b/cli/src/main/scala-2.11/coursier/cli/Helper.scala index bd9896a62..3a143a229 100644 --- a/cli/src/main/scala-2.11/coursier/cli/Helper.scala +++ b/cli/src/main/scala-2.11/coursier/cli/Helper.scala @@ -15,8 +15,9 @@ import scala.annotation.tailrec import scala.concurrent.duration.Duration import scala.util.Try -import scalaz.{Failure, Success, \/-, -\/} +import scalaz.{Failure, Nondeterminism, Success, \/-, -\/} import scalaz.concurrent.{ Task, Strategy } +import scalaz.std.list._ object Helper { def fileRepr(f: File) = f.toString @@ -187,11 +188,11 @@ class Helper( val scaladex = Scaladex.cached(fetchs: _*) - val res = scaladexRawDependencies.map { s => + val res = Nondeterminism[Task].gather(scaladexRawDependencies.map { s => val deps = scaladex.dependencies( s, "2.11", - if (verbosityLevel >= 0) Console.err.println(_) else _ => () + if (verbosityLevel >= 2) Console.err.println(_) else _ => () ) deps.map { modVers => @@ -206,14 +207,14 @@ class Helper( } .maxBy(_._1) - if (verbosityLevel >= 0) + if (verbosityLevel >= 1) Console.err.println(s"Keeping version ${keptVer.repr}") modVers0 } else modVers - } - } + }.run + }).unsafePerformSync logger.foreach(_.stop()) diff --git a/cli/src/main/scala-2.11/coursier/cli/scaladex/Scaladex.scala b/cli/src/main/scala-2.11/coursier/cli/scaladex/Scaladex.scala index 33036cb05..bc2c75c22 100644 --- a/cli/src/main/scala-2.11/coursier/cli/scaladex/Scaladex.scala +++ b/cli/src/main/scala-2.11/coursier/cli/scaladex/Scaladex.scala @@ -2,15 +2,17 @@ package coursier.cli.scaladex import java.net.HttpURLConnection import java.nio.charset.StandardCharsets +import java.util.concurrent.ExecutorService import argonaut._, Argonaut._, ArgonautShapeless._ import coursier.core.{ Artifact, Attributes } import coursier.{ Fetch, Module } -import scalaz.{-\/, \/, \/-} +import scalaz.{ -\/, EitherT, Monad, Nondeterminism, \/, \/- } import scalaz.Scalaz.ToEitherOps import scalaz.Scalaz.ToEitherOpsFromEither import scalaz.concurrent.Task +import scalaz.std.list._ object Scaladex { @@ -32,50 +34,50 @@ object Scaladex { version: String ) - def apply(): Scaladex = - Scaladex { url => - var conn: HttpURLConnection = null + def apply(pool: ExecutorService): Scaladex[Task] = + Scaladex({ url => + EitherT(Task({ + var conn: HttpURLConnection = null - val b = try { - conn = new java.net.URL(url).openConnection().asInstanceOf[HttpURLConnection] - coursier.Platform.readFullySync(conn.getInputStream) - } finally { - if (conn != null) - conn.disconnect() - } + val b = try { + conn = new java.net.URL(url).openConnection().asInstanceOf[HttpURLConnection] + coursier.Platform.readFullySync(conn.getInputStream) + } finally { + if (conn != null) + conn.disconnect() + } - new String(b, StandardCharsets.UTF_8) - } + new String(b, StandardCharsets.UTF_8).right[String] + })(pool)) + }, Nondeterminism[Task]) - def cached(fetch: Fetch.Content[Task]*): Scaladex = - Scaladex { + def cached(fetch: Fetch.Content[Task]*): Scaladex[Task] = + Scaladex({ url => def get(fetch: Fetch.Content[Task]) = fetch( Artifact(url, Map(), Map(), Attributes("", ""), changing = true, None) ) - (get(fetch.head) /: fetch.tail)(_ orElse get(_)).run.unsafePerformSync match { - case -\/(err) => - throw new Exception(s"Fetching $url: $err") - case \/-(s) => s - } - } + (get(fetch.head) /: fetch.tail)(_ orElse get(_)) + }, Nondeterminism[Task]) } // TODO Add F[_] type param, change `fetch` type to `String => EitherT[F, String, String]`, adjust method signatures accordingly, ... -case class Scaladex(fetch: String => String) { +case class Scaladex[F[_]](fetch: String => EitherT[F, String, String], F: Nondeterminism[F]) { + + private implicit def F0 = F // quick & dirty API for querying scaladex - def search(name: String, target: String, scalaVersion: String): String \/ Seq[Scaladex.SearchResult] = { + def search(name: String, target: String, scalaVersion: String): EitherT[F, String, Seq[Scaladex.SearchResult]] = { val s = fetch( // FIXME Escaping s"https://index.scala-lang.org/api/scastie/search?q=$name&target=$target&scalaVersion=$scalaVersion" ) - s.decodeEither[List[Scaladex.SearchResult]].disjunction + s.flatMap(s => EitherT.fromDisjunction[F](s.decodeEither[List[Scaladex.SearchResult]].disjunction)) } /** @@ -85,14 +87,14 @@ case class Scaladex(fetch: String => String) { * @param artifactName: Scaladex artifact name * @return */ - def artifactInfos(organization: String, repository: String, artifactName: String): String \/ Scaladex.ArtifactInfos = { + def artifactInfos(organization: String, repository: String, artifactName: String): EitherT[F, String, Scaladex.ArtifactInfos] = { val s = fetch( // FIXME Escaping s"https://index.scala-lang.org/api/scastie/project?organization=$organization&repository=$repository&artifact=$artifactName" ) - s.decodeEither[Scaladex.ArtifactInfos].disjunction + s.flatMap(s => EitherT.fromDisjunction[F](s.decodeEither[Scaladex.ArtifactInfos].disjunction)) } /** @@ -101,7 +103,7 @@ case class Scaladex(fetch: String => String) { * @param repository: GitHub repository name * @return */ - def artifactNames(organization: String, repository: String): String \/ Seq[String] = { + def artifactNames(organization: String, repository: String): EitherT[F, String, Seq[String]] = { val s = fetch( // FIXME Escaping @@ -110,7 +112,7 @@ case class Scaladex(fetch: String => String) { case class Result(artifacts: List[String]) - s.decodeEither[Result].disjunction.map(_.artifacts) + s.flatMap(s => EitherT.fromDisjunction[F](s.decodeEither[Result].disjunction.map(_.artifacts))) } @@ -119,7 +121,7 @@ case class Scaladex(fetch: String => String) { * * Latest version only. */ - def dependencies(name: String, scalaVersion: String, logger: String => Unit): String \/ Seq[(Module, String)] = { + def dependencies(name: String, scalaVersion: String, logger: String => Unit): EitherT[F, String, Seq[(Module, String)]] = { val idx = name.indexOf('/') val orgNameOrError = @@ -127,22 +129,22 @@ case class Scaladex(fetch: String => String) { val org = name.take(idx) val repo = name.drop(idx + 1) - artifactNames(org, repo).map((org, repo, _)) + artifactNames(org, repo).map((org, repo, _)): EitherT[F, String, (String, String, Seq[String])] } else search(name, "JVM", scalaVersion) // FIXME Don't hardcode .flatMap { case Seq(first, _*) => logger(s"Using ${first.organization}/${first.repository} for $name") - (first.organization, first.repository, first.artifacts).right + EitherT.fromDisjunction[F]((first.organization, first.repository, first.artifacts).right): EitherT[F, String, (String, String, Seq[String])] case Seq() => - s"No project found for $name".left + EitherT.fromDisjunction[F](s"No project found for $name".left): EitherT[F, String, (String, String, Seq[String])] } orgNameOrError.flatMap { case (ghOrg, ghRepo, artifactNames) => - val moduleVersions = artifactNames.flatMap { artifactName => - artifactInfos(ghOrg, ghRepo, artifactName) match { + val moduleVersions = F.map(F.gather(artifactNames.map { artifactName => + F.map(artifactInfos(ghOrg, ghRepo, artifactName).run) { case -\/(err) => logger(s"Cannot get infos about artifact $artifactName from $ghOrg/$ghRepo: $err, ignoring it") Nil @@ -150,12 +152,14 @@ case class Scaladex(fetch: String => String) { logger(s"Found module ${infos.groupId}:${infos.artifactId}:${infos.version}") Seq(Module(infos.groupId, infos.artifactId) -> infos.version) } - } + }))(_.flatten) - if (moduleVersions.isEmpty) - s"No module found for $ghOrg/$ghRepo".left - else - moduleVersions.right + EitherT(F.map(moduleVersions) { l => + if (l.isEmpty) + s"No module found for $ghOrg/$ghRepo".left + else + l.right + }) } }