Add coursier.util.Task

This commit is contained in:
Alexandre Archambault 2018-03-12 11:18:19 +01:00
parent dc2e4996f4
commit 87e5f33754
6 changed files with 117 additions and 28 deletions

View File

@ -160,8 +160,7 @@ val start = Resolution(
Create a fetch function able to get things from a few repositories via a local cache,
```scala
import coursier.interop.scalaz._
import scalaz.concurrent.Task
import coursier.util.Task
val repositories = Seq(
Cache.ivy2Local,
@ -173,7 +172,9 @@ val fetch = Fetch.from(repositories, Cache.fetch[Task]())
Then run the resolution per-se,
```scala
val resolution = start.process.run(fetch).unsafePerformSync
import scala.concurrent.ExecutionContext.Implicits.global
val resolution = start.process.run(fetch).unsafeRun()
```
That will fetch and use metadata.
@ -186,11 +187,11 @@ These would mean that the resolution wasn't able to get metadata about some depe
Then fetch and get local copies of the artifacts themselves (the JARs) with
```scala
import java.io.File
import scalaz.concurrent.Task
import coursier.util.Gather
val localArtifacts: Seq[Either[FileError, File]] = Task.gatherUnordered(
resolution.artifacts.map(Cache.file(_).run)
).unsafePerformSync
val localArtifacts: Seq[Either[FileError, File]] = Gather[Task].gather(
resolution.artifacts.map(Cache.file[Task](_).run)
).unsafeRun()
```
@ -471,7 +472,7 @@ The resolution process will go on by giving successive `Resolution`s, until the
`start` above is only the initial state - it is far from over, as the `isDone` method on it tells,
```scala
scala> start.isDone
res5: Boolean = false
res6: Boolean = false
```
@ -510,7 +511,7 @@ scala> MavenRepository(
| "https://nexus.corp.com/content/repositories/releases",
| authentication = Some(Authentication("user", "pass"))
| )
res7: coursier.maven.MavenRepository = MavenRepository(https://nexus.corp.com/content/repositories/releases,None,true,Some(Authentication(user, *******)))
res8: coursier.maven.MavenRepository = MavenRepository(https://nexus.corp.com/content/repositories/releases,None,true,Some(Authentication(user, *******)))
```
Now that we have repositories, we're going to mix these with things from the `coursier-cache` module,
@ -520,7 +521,7 @@ Given a sequence of dependencies, designated by their `Module` (organisation and
and version (just a `String`), it gives either errors (`Seq[String]`) or metadata (`(Artifact.Source, Project)`),
wrapping the whole in a monad `F`.
```scala
val fetch = Fetch.from(repositories, Cache.fetch())
val fetch = Fetch.from(repositories, Cache.fetch[Task]())
```
The monad used by `Fetch.from` is `scalaz.concurrent.Task`, but the resolution process is not tied to a particular
@ -546,7 +547,9 @@ resolution is particularly complex, in which case `maxIterations` could be incre
Let's run the whole resolution,
```scala
val resolution = start.process.run(fetch).unsafePerformSync
import scala.concurrent.ExecutionContext.Implicits.global
val resolution = start.process.run(fetch).unsafeRun()
```
To get additional feedback during the resolution, we can give the `Cache.default` method above
@ -570,11 +573,11 @@ which are dependencies whose versions could not be unified.
Then, if all went well, we can fetch and get local copies of the artifacts themselves (the JARs) with
```scala
import java.io.File
import scalaz.concurrent.Task
import coursier.util.Gather
val localArtifacts: Seq[Either[FileError, File]] = Task.gatherUnordered(
resolution.artifacts.map(Cache.file(_).run)
).unsafePerformSync
val localArtifacts: Seq[Either[FileError, File]] = Gather[Task].gather(
resolution.artifacts.map(Cache.file[Task](_).run)
).unsafeRun()
```
We're using the `Cache.file` method, that can also be given a `Logger` (for more feedback) and a custom thread pool.

View File

@ -0,0 +1,8 @@
package coursier.util
abstract class PlatformTask {
implicit val gather: Gather[Task] =
new TaskGather {}
}

View File

@ -0,0 +1,33 @@
package coursier.util
import java.util.concurrent.ExecutorService
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import scala.concurrent.duration.Duration
abstract class PlatformTask { self =>
def schedule[A](pool: ExecutorService)(f: => A): Task[A] = {
val ec0 = pool match {
case eces: ExecutionContextExecutorService => eces
case _ => ExecutionContext.fromExecutorService(pool) // FIXME Is this instantiation costly? Cache it?
}
Task(_ => Future(f)(ec0))
}
implicit val schedulable: Schedulable[Task] =
new TaskGather with Schedulable[Task] {
def schedule[A](pool: ExecutorService)(f: => A) = self.schedule(pool)(f)
}
def gather: Gather[Task] =
schedulable
implicit class PlatformTaskOps[T](private val task: Task[T]) {
def unsafeRun()(implicit ec: ExecutionContext): T =
Await.result(task.future(), Duration.Inf)
}
}

View File

@ -0,0 +1,30 @@
package coursier.util
import scala.concurrent.{ExecutionContext, Future}
final case class Task[T](value: ExecutionContext => Future[T]) extends AnyVal {
def map[U](f: T => U): Task[U] =
Task(implicit ec => value(ec).map(f))
def flatMap[U](f: T => Task[U]): Task[U] =
Task(implicit ec => value(ec).flatMap(t => f(t).value(ec)))
def handle[U >: T](f: PartialFunction[Throwable, U]): Task[U] =
Task(ec => value(ec).recover(f)(ec))
def future()(implicit ec: ExecutionContext): Future[T] =
value(ec)
}
object Task extends PlatformTask {
def point[A](a: A): Task[A] = {
val future = Future.successful(a)
Task(_ => future)
}
def delay[A](a: => A): Task[A] =
Task(ec => Future(a)(ec))
}

View File

@ -0,0 +1,12 @@
package coursier.util
import scala.concurrent.Future
trait TaskGather extends Gather[Task] {
def point[A](a: A) = Task.point(a)
def bind[A, B](elem: Task[A])(f: A => Task[B]) =
elem.flatMap(f)
def gather[A](elems: Seq[Task[A]]) =
Task(implicit ec => Future.sequence(elems.map(_.value(ec))))
}

View File

@ -187,8 +187,7 @@ val start = Resolution(
Create a fetch function able to get things from a few repositories via a local cache,
```tut:silent
import coursier.interop.scalaz._
import scalaz.concurrent.Task
import coursier.util.Task
val repositories = Seq(
Cache.ivy2Local,
@ -200,7 +199,9 @@ val fetch = Fetch.from(repositories, Cache.fetch[Task]())
Then run the resolution per-se,
```tut:silent
val resolution = start.process.run(fetch).unsafePerformSync
import scala.concurrent.ExecutionContext.Implicits.global
val resolution = start.process.run(fetch).unsafeRun()
```
That will fetch and use metadata.
@ -213,11 +214,11 @@ These would mean that the resolution wasn't able to get metadata about some depe
Then fetch and get local copies of the artifacts themselves (the JARs) with
```tut:silent
import java.io.File
import scalaz.concurrent.Task
import coursier.util.Gather
val localArtifacts: Seq[Either[FileError, File]] = Task.gatherUnordered(
resolution.artifacts.map(Cache.file(_).run)
).unsafePerformSync
val localArtifacts: Seq[Either[FileError, File]] = Gather[Task].gather(
resolution.artifacts.map(Cache.file[Task](_).run)
).unsafeRun()
```
@ -547,7 +548,7 @@ Given a sequence of dependencies, designated by their `Module` (organisation and
and version (just a `String`), it gives either errors (`Seq[String]`) or metadata (`(Artifact.Source, Project)`),
wrapping the whole in a monad `F`.
```tut:silent
val fetch = Fetch.from(repositories, Cache.fetch())
val fetch = Fetch.from(repositories, Cache.fetch[Task]())
```
The monad used by `Fetch.from` is `scalaz.concurrent.Task`, but the resolution process is not tied to a particular
@ -573,7 +574,9 @@ resolution is particularly complex, in which case `maxIterations` could be incre
Let's run the whole resolution,
```tut:silent
val resolution = start.process.run(fetch).unsafePerformSync
import scala.concurrent.ExecutionContext.Implicits.global
val resolution = start.process.run(fetch).unsafeRun()
```
To get additional feedback during the resolution, we can give the `Cache.default` method above
@ -597,11 +600,11 @@ which are dependencies whose versions could not be unified.
Then, if all went well, we can fetch and get local copies of the artifacts themselves (the JARs) with
```tut:silent
import java.io.File
import scalaz.concurrent.Task
import coursier.util.Gather
val localArtifacts: Seq[Either[FileError, File]] = Task.gatherUnordered(
resolution.artifacts.map(Cache.file(_).run)
).unsafePerformSync
val localArtifacts: Seq[Either[FileError, File]] = Gather[Task].gather(
resolution.artifacts.map(Cache.file[Task](_).run)
).unsafeRun()
```
We're using the `Cache.file` method, that can also be given a `Logger` (for more feedback) and a custom thread pool.