From 87e5f33754bb6e637d27d65c4b8ff5f43a149941 Mon Sep 17 00:00:00 2001 From: Alexandre Archambault Date: Mon, 12 Mar 2018 11:18:19 +0100 Subject: [PATCH] Add coursier.util.Task --- README.md | 33 ++++++++++--------- .../scala/coursier/util/PlatformTask.scala | 8 +++++ .../scala/coursier/util/PlatformTask.scala | 33 +++++++++++++++++++ .../src/main/scala/coursier/util/Task.scala | 30 +++++++++++++++++ .../main/scala/coursier/util/TaskGather.scala | 12 +++++++ doc/readme/README.md | 29 ++++++++-------- 6 files changed, 117 insertions(+), 28 deletions(-) create mode 100644 cache/js/src/main/scala/coursier/util/PlatformTask.scala create mode 100644 cache/jvm/src/main/scala/coursier/util/PlatformTask.scala create mode 100644 cache/shared/src/main/scala/coursier/util/Task.scala create mode 100644 cache/shared/src/main/scala/coursier/util/TaskGather.scala diff --git a/README.md b/README.md index dc7d9c882..c284041a6 100644 --- a/README.md +++ b/README.md @@ -160,8 +160,7 @@ val start = Resolution( Create a fetch function able to get things from a few repositories via a local cache, ```scala -import coursier.interop.scalaz._ -import scalaz.concurrent.Task +import coursier.util.Task val repositories = Seq( Cache.ivy2Local, @@ -173,7 +172,9 @@ val fetch = Fetch.from(repositories, Cache.fetch[Task]()) Then run the resolution per-se, ```scala -val resolution = start.process.run(fetch).unsafePerformSync +import scala.concurrent.ExecutionContext.Implicits.global + +val resolution = start.process.run(fetch).unsafeRun() ``` That will fetch and use metadata. @@ -186,11 +187,11 @@ These would mean that the resolution wasn't able to get metadata about some depe Then fetch and get local copies of the artifacts themselves (the JARs) with ```scala import java.io.File -import scalaz.concurrent.Task +import coursier.util.Gather -val localArtifacts: Seq[Either[FileError, File]] = Task.gatherUnordered( - resolution.artifacts.map(Cache.file(_).run) -).unsafePerformSync +val localArtifacts: Seq[Either[FileError, File]] = Gather[Task].gather( + resolution.artifacts.map(Cache.file[Task](_).run) +).unsafeRun() ``` @@ -471,7 +472,7 @@ The resolution process will go on by giving successive `Resolution`s, until the `start` above is only the initial state - it is far from over, as the `isDone` method on it tells, ```scala scala> start.isDone -res5: Boolean = false +res6: Boolean = false ``` @@ -510,7 +511,7 @@ scala> MavenRepository( | "https://nexus.corp.com/content/repositories/releases", | authentication = Some(Authentication("user", "pass")) | ) -res7: coursier.maven.MavenRepository = MavenRepository(https://nexus.corp.com/content/repositories/releases,None,true,Some(Authentication(user, *******))) +res8: coursier.maven.MavenRepository = MavenRepository(https://nexus.corp.com/content/repositories/releases,None,true,Some(Authentication(user, *******))) ``` Now that we have repositories, we're going to mix these with things from the `coursier-cache` module, @@ -520,7 +521,7 @@ Given a sequence of dependencies, designated by their `Module` (organisation and and version (just a `String`), it gives either errors (`Seq[String]`) or metadata (`(Artifact.Source, Project)`), wrapping the whole in a monad `F`. ```scala -val fetch = Fetch.from(repositories, Cache.fetch()) +val fetch = Fetch.from(repositories, Cache.fetch[Task]()) ``` The monad used by `Fetch.from` is `scalaz.concurrent.Task`, but the resolution process is not tied to a particular @@ -546,7 +547,9 @@ resolution is particularly complex, in which case `maxIterations` could be incre Let's run the whole resolution, ```scala -val resolution = start.process.run(fetch).unsafePerformSync +import scala.concurrent.ExecutionContext.Implicits.global + +val resolution = start.process.run(fetch).unsafeRun() ``` To get additional feedback during the resolution, we can give the `Cache.default` method above @@ -570,11 +573,11 @@ which are dependencies whose versions could not be unified. Then, if all went well, we can fetch and get local copies of the artifacts themselves (the JARs) with ```scala import java.io.File -import scalaz.concurrent.Task +import coursier.util.Gather -val localArtifacts: Seq[Either[FileError, File]] = Task.gatherUnordered( - resolution.artifacts.map(Cache.file(_).run) -).unsafePerformSync +val localArtifacts: Seq[Either[FileError, File]] = Gather[Task].gather( + resolution.artifacts.map(Cache.file[Task](_).run) +).unsafeRun() ``` We're using the `Cache.file` method, that can also be given a `Logger` (for more feedback) and a custom thread pool. diff --git a/cache/js/src/main/scala/coursier/util/PlatformTask.scala b/cache/js/src/main/scala/coursier/util/PlatformTask.scala new file mode 100644 index 000000000..171f9b72c --- /dev/null +++ b/cache/js/src/main/scala/coursier/util/PlatformTask.scala @@ -0,0 +1,8 @@ +package coursier.util + +abstract class PlatformTask { + + implicit val gather: Gather[Task] = + new TaskGather {} + +} diff --git a/cache/jvm/src/main/scala/coursier/util/PlatformTask.scala b/cache/jvm/src/main/scala/coursier/util/PlatformTask.scala new file mode 100644 index 000000000..37d47cacc --- /dev/null +++ b/cache/jvm/src/main/scala/coursier/util/PlatformTask.scala @@ -0,0 +1,33 @@ +package coursier.util + +import java.util.concurrent.ExecutorService + +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future} +import scala.concurrent.duration.Duration + +abstract class PlatformTask { self => + + def schedule[A](pool: ExecutorService)(f: => A): Task[A] = { + + val ec0 = pool match { + case eces: ExecutionContextExecutorService => eces + case _ => ExecutionContext.fromExecutorService(pool) // FIXME Is this instantiation costly? Cache it? + } + + Task(_ => Future(f)(ec0)) + } + + implicit val schedulable: Schedulable[Task] = + new TaskGather with Schedulable[Task] { + def schedule[A](pool: ExecutorService)(f: => A) = self.schedule(pool)(f) + } + + def gather: Gather[Task] = + schedulable + + implicit class PlatformTaskOps[T](private val task: Task[T]) { + def unsafeRun()(implicit ec: ExecutionContext): T = + Await.result(task.future(), Duration.Inf) + } + +} diff --git a/cache/shared/src/main/scala/coursier/util/Task.scala b/cache/shared/src/main/scala/coursier/util/Task.scala new file mode 100644 index 000000000..298c80517 --- /dev/null +++ b/cache/shared/src/main/scala/coursier/util/Task.scala @@ -0,0 +1,30 @@ +package coursier.util + +import scala.concurrent.{ExecutionContext, Future} + +final case class Task[T](value: ExecutionContext => Future[T]) extends AnyVal { + + def map[U](f: T => U): Task[U] = + Task(implicit ec => value(ec).map(f)) + def flatMap[U](f: T => Task[U]): Task[U] = + Task(implicit ec => value(ec).flatMap(t => f(t).value(ec))) + + def handle[U >: T](f: PartialFunction[Throwable, U]): Task[U] = + Task(ec => value(ec).recover(f)(ec)) + + def future()(implicit ec: ExecutionContext): Future[T] = + value(ec) +} + +object Task extends PlatformTask { + + def point[A](a: A): Task[A] = { + val future = Future.successful(a) + Task(_ => future) + } + + def delay[A](a: => A): Task[A] = + Task(ec => Future(a)(ec)) + +} + diff --git a/cache/shared/src/main/scala/coursier/util/TaskGather.scala b/cache/shared/src/main/scala/coursier/util/TaskGather.scala new file mode 100644 index 000000000..ce674f26d --- /dev/null +++ b/cache/shared/src/main/scala/coursier/util/TaskGather.scala @@ -0,0 +1,12 @@ +package coursier.util + +import scala.concurrent.Future + +trait TaskGather extends Gather[Task] { + def point[A](a: A) = Task.point(a) + def bind[A, B](elem: Task[A])(f: A => Task[B]) = + elem.flatMap(f) + + def gather[A](elems: Seq[Task[A]]) = + Task(implicit ec => Future.sequence(elems.map(_.value(ec)))) +} diff --git a/doc/readme/README.md b/doc/readme/README.md index c0be438f7..5ba7299ac 100644 --- a/doc/readme/README.md +++ b/doc/readme/README.md @@ -187,8 +187,7 @@ val start = Resolution( Create a fetch function able to get things from a few repositories via a local cache, ```tut:silent -import coursier.interop.scalaz._ -import scalaz.concurrent.Task +import coursier.util.Task val repositories = Seq( Cache.ivy2Local, @@ -200,7 +199,9 @@ val fetch = Fetch.from(repositories, Cache.fetch[Task]()) Then run the resolution per-se, ```tut:silent -val resolution = start.process.run(fetch).unsafePerformSync +import scala.concurrent.ExecutionContext.Implicits.global + +val resolution = start.process.run(fetch).unsafeRun() ``` That will fetch and use metadata. @@ -213,11 +214,11 @@ These would mean that the resolution wasn't able to get metadata about some depe Then fetch and get local copies of the artifacts themselves (the JARs) with ```tut:silent import java.io.File -import scalaz.concurrent.Task +import coursier.util.Gather -val localArtifacts: Seq[Either[FileError, File]] = Task.gatherUnordered( - resolution.artifacts.map(Cache.file(_).run) -).unsafePerformSync +val localArtifacts: Seq[Either[FileError, File]] = Gather[Task].gather( + resolution.artifacts.map(Cache.file[Task](_).run) +).unsafeRun() ``` @@ -547,7 +548,7 @@ Given a sequence of dependencies, designated by their `Module` (organisation and and version (just a `String`), it gives either errors (`Seq[String]`) or metadata (`(Artifact.Source, Project)`), wrapping the whole in a monad `F`. ```tut:silent -val fetch = Fetch.from(repositories, Cache.fetch()) +val fetch = Fetch.from(repositories, Cache.fetch[Task]()) ``` The monad used by `Fetch.from` is `scalaz.concurrent.Task`, but the resolution process is not tied to a particular @@ -573,7 +574,9 @@ resolution is particularly complex, in which case `maxIterations` could be incre Let's run the whole resolution, ```tut:silent -val resolution = start.process.run(fetch).unsafePerformSync +import scala.concurrent.ExecutionContext.Implicits.global + +val resolution = start.process.run(fetch).unsafeRun() ``` To get additional feedback during the resolution, we can give the `Cache.default` method above @@ -597,11 +600,11 @@ which are dependencies whose versions could not be unified. Then, if all went well, we can fetch and get local copies of the artifacts themselves (the JARs) with ```tut:silent import java.io.File -import scalaz.concurrent.Task +import coursier.util.Gather -val localArtifacts: Seq[Either[FileError, File]] = Task.gatherUnordered( - resolution.artifacts.map(Cache.file(_).run) -).unsafePerformSync +val localArtifacts: Seq[Either[FileError, File]] = Gather[Task].gather( + resolution.artifacts.map(Cache.file[Task](_).run) +).unsafeRun() ``` We're using the `Cache.file` method, that can also be given a `Logger` (for more feedback) and a custom thread pool.