Add coursier.util.Schedulable

This commit is contained in:
Alexandre Archambault 2018-02-27 22:54:49 +01:00
parent 838a340b89
commit ca62830d23
2 changed files with 102 additions and 72 deletions

View File

@ -13,12 +13,10 @@ import coursier.internal.FileUtil
import coursier.util.Base64.Encoder
import scala.annotation.tailrec
import scalaz.Nondeterminism
import scalaz.concurrent.{Strategy, Task}
import java.io.{Serializable => _, _}
import java.nio.charset.Charset
import coursier.util.EitherT
import coursier.util.{EitherT, Schedulable}
import scala.concurrent.duration.{Duration, DurationInt}
import scala.util.Try
@ -359,17 +357,15 @@ object Cache {
}
}
private def download(
private def download[F[_]](
artifact: Artifact,
cache: File,
checksums: Set[String],
cachePolicy: CachePolicy,
pool: ExecutorService,
logger: Option[Logger] = None,
ttl: Option[Duration] = defaultTtl
): Task[Seq[((File, String), Either[FileError, Unit])]] = {
implicit val pool0 = pool
logger: Option[Logger],
ttl: Option[Duration]
)(implicit S: Schedulable[F]): F[Seq[((File, String), Either[FileError, Unit])]] = {
// Reference file - if it exists, and we get not found errors on some URLs, we assume
// we can keep track of these missing, and not try to get them again later.
@ -380,9 +376,9 @@ object Cache {
def referenceFileExists: Boolean = referenceFileOpt.exists(_.exists())
def fileLastModified(file: File): EitherT[Task, FileError, Option[Long]] =
def fileLastModified(file: File): EitherT[F, FileError, Option[Long]] =
EitherT {
Task {
S.schedule(pool) {
Right {
val lastModified = file.lastModified()
if (lastModified > 0L)
@ -397,9 +393,9 @@ object Cache {
url: String,
currentLastModifiedOpt: Option[Long], // for the logger
logger: Option[Logger]
): EitherT[Task, FileError, Option[Long]] =
): EitherT[F, FileError, Option[Long]] =
EitherT {
Task {
S.schedule(pool) {
var conn: URLConnection = null
try {
@ -441,19 +437,19 @@ object Cache {
}
}
def fileExists(file: File): Task[Boolean] =
Task {
def fileExists(file: File): F[Boolean] =
S.schedule(pool) {
file.exists()
}
def ttlFile(file: File): File =
new File(file.getParent, s".${file.getName}.checked")
def lastCheck(file: File): Task[Option[Long]] = {
def lastCheck(file: File): F[Option[Long]] = {
val ttlFile0 = ttlFile(file)
Task {
S.schedule(pool) {
if (ttlFile0.exists())
Some(ttlFile0.lastModified()).filter(_ > 0L)
else
@ -474,17 +470,17 @@ object Cache {
}
}
def shouldDownload(file: File, url: String): EitherT[Task, FileError, Boolean] = {
def shouldDownload(file: File, url: String): EitherT[F, FileError, Boolean] = {
def checkNeeded = ttl.fold(Task.now(true)) { ttl =>
def checkNeeded = ttl.fold(S.point(true)) { ttl =>
if (ttl.isFinite())
lastCheck(file).flatMap {
case None => Task.now(true)
S.bind(lastCheck(file)) {
case None => S.point(true)
case Some(ts) =>
Task(System.currentTimeMillis()).map(_ > ts + ttl.toMillis)
S.map(S.schedule(pool)(System.currentTimeMillis()))(_ > ts + ttl.toMillis)
}
else
Task.now(false)
S.point(false)
}
def check = for {
@ -500,22 +496,22 @@ object Cache {
}
EitherT {
fileExists(file).flatMap {
S.bind(fileExists(file)) {
case false =>
Task.now(Right(true))
S.point(Right(true))
case true =>
checkNeeded.flatMap {
S.bind(checkNeeded) {
case false =>
Task.now(Right(false))
S.point(Right(false))
case true =>
check.run.flatMap {
S.bind(check.run) {
case Right(false) =>
Task {
S.schedule(pool) {
doTouchCheckFile(file)
Right(false)
}
case other =>
Task.now(other)
S.point(other)
}
}
}
@ -543,9 +539,9 @@ object Cache {
def remote(
file: File,
url: String
): EitherT[Task, FileError, Unit] =
): EitherT[F, FileError, Unit] =
EitherT {
Task {
S.schedule(pool) {
val tmp = CachePath.temporaryFile(file)
@ -677,20 +673,20 @@ object Cache {
def errFile(file: File) = new File(file.getParentFile, "." + file.getName + ".error")
def remoteKeepErrors(file: File, url: String): EitherT[Task, FileError, Unit] = {
def remoteKeepErrors(file: File, url: String): EitherT[F, FileError, Unit] = {
val errFile0 = errFile(file)
def validErrFileExists =
EitherT {
Task[Either[FileError, Boolean]] {
S.schedule[Either[FileError, Boolean]](pool) {
Right(referenceFileExists && errFile0.exists())
}
}
def createErrFile =
EitherT {
Task[Either[FileError, Unit]] {
S.schedule[Either[FileError, Unit]](pool) {
if (referenceFileExists) {
if (!errFile0.exists())
FileUtil.write(errFile0, "".getBytes(UTF_8))
@ -702,7 +698,7 @@ object Cache {
def deleteErrFile =
EitherT {
Task[Either[FileError, Unit]] {
S.schedule[Either[FileError, Unit]](pool) {
if (errFile0.exists())
errFile0.delete()
@ -712,11 +708,11 @@ object Cache {
def retainError =
EitherT {
remote(file, url).run.flatMap {
S.bind(remote(file, url).run) {
case err @ Left(FileError.NotFound(_, Some(true))) =>
createErrFile.run.map(_ => err)
S.map(createErrFile.run)(_ => err: Either[FileError, Unit])
case other =>
deleteErrFile.run.map(_ => other)
S.map(deleteErrFile.run)(_ => other)
}
}
@ -724,7 +720,7 @@ object Cache {
case CachePolicy.FetchMissing | CachePolicy.LocalOnly | CachePolicy.LocalUpdate | CachePolicy.LocalUpdateChanging =>
validErrFileExists.flatMap { exists =>
if (exists)
EitherT(Task.now[Either[FileError, Unit]](Left(FileError.NotFound(url, Some(true)))))
EitherT(S.point[Either[FileError, Unit]](Left(FileError.NotFound(url, Some(true)))))
else
retainError
}
@ -734,7 +730,7 @@ object Cache {
}
}
def localInfo(file: File, url: String): EitherT[Task, FileError, Boolean] = {
def localInfo(file: File, url: String): EitherT[F, FileError, Boolean] = {
val errFile0 = errFile(file)
@ -748,12 +744,12 @@ object Cache {
else
Right(false)
EitherT(Task(res))
EitherT(S.schedule(pool)(res))
}
def checkFileExists(file: File, url: String, log: Boolean = true): EitherT[Task, FileError, Unit] =
def checkFileExists(file: File, url: String, log: Boolean = true): EitherT[F, FileError, Unit] =
EitherT {
Task {
S.schedule(pool) {
if (file.exists()) {
logger.foreach(_.foundLocally(url, file))
Right(())
@ -780,19 +776,19 @@ object Cache {
val requiredArtifactCheck = artifact.extra.get("required") match {
case None =>
EitherT(Task.now[Either[FileError, Unit]](Right(())))
EitherT(S.point[Either[FileError, Unit]](Right(())))
case Some(required) =>
cachePolicy0 match {
case CachePolicy.LocalOnly | CachePolicy.LocalUpdateChanging | CachePolicy.LocalUpdate =>
val file = localFile(required.url, cache, artifact.authentication.map(_.user))
localInfo(file, required.url).flatMap {
case true =>
EitherT(Task.now[Either[FileError, Unit]](Right(())))
EitherT(S.point[Either[FileError, Unit]](Right(())))
case false =>
EitherT(Task.now[Either[FileError, Unit]](Left(FileError.NotFound(file.toString))))
EitherT(S.point[Either[FileError, Unit]](Left(FileError.NotFound(file.toString))))
}
case _ =>
EitherT(Task.now[Either[FileError, Unit]](Right(())))
EitherT(S.point[Either[FileError, Unit]](Right(())))
}
}
@ -815,7 +811,7 @@ object Cache {
case true =>
remoteKeepErrors(file, url)
case false =>
EitherT(Task.now[Either[FileError, Unit]](Right(())))
EitherT(S.point[Either[FileError, Unit]](Right(())))
}
cachePolicy0 match {
@ -834,13 +830,10 @@ object Cache {
}
}
requiredArtifactCheck
.flatMap(_ => res)
.run
.map((file, url) -> _)
S.map(requiredArtifactCheck.flatMap(_ => res).run)((file, url) -> _)
}
Nondeterminism[Task].gather(tasks)
S.gather(tasks)
}
def parseChecksum(content: String): Option[BigInteger] = {
@ -883,14 +876,12 @@ object Cache {
.mkString))
}
def validateChecksum(
def validateChecksum[F[_]](
artifact: Artifact,
sumType: String,
cache: File,
pool: ExecutorService
): EitherT[Task, FileError, Unit] = {
implicit val pool0 = pool
)(implicit S: Schedulable[F]): EitherT[F, FileError, Unit] = {
val localFile0 = localFile(artifact.url, cache, artifact.authentication.map(_.user))
@ -899,7 +890,7 @@ object Cache {
case Some(sumUrl) =>
val sumFile = localFile(sumUrl, cache, artifact.authentication.map(_.user))
Task {
S.schedule(pool) {
val sumOpt = parseRawChecksum(FileUtil.readAllBytes(sumFile))
sumOpt match {
@ -930,12 +921,12 @@ object Cache {
}
case None =>
Task.now(Left(FileError.ChecksumNotFound(sumType, localFile0.getPath)))
S.point[Either[FileError, Unit]](Left(FileError.ChecksumNotFound(sumType, localFile0.getPath)))
}
}
}
def file(
def file[F[_]](
artifact: Artifact,
cache: File = default,
cachePolicy: CachePolicy = CachePolicy.UpdateChanging,
@ -943,14 +934,12 @@ object Cache {
logger: Option[Logger] = None,
pool: ExecutorService = defaultPool,
ttl: Option[Duration] = defaultTtl
): EitherT[Task, FileError, File] = {
implicit val pool0 = pool
)(implicit S: Schedulable[F]): EitherT[F, FileError, File] = {
val checksums0 = if (checksums.isEmpty) Seq(None) else checksums
val res = EitherT {
download(
S.map(download(
artifact,
cache,
checksums = checksums0.collect { case Some(c) => c }.toSet,
@ -958,7 +947,7 @@ object Cache {
pool,
logger = logger,
ttl = ttl
).map { results =>
)) { results =>
val checksum = checksums0.find {
case None => true
case Some(c) =>
@ -983,20 +972,20 @@ object Cache {
}
res.flatMap {
case (f, None) => EitherT(Task.now[Either[FileError, File]](Right(f)))
case (f, None) => EitherT(S.point[Either[FileError, File]](Right(f)))
case (f, Some(c)) =>
validateChecksum(artifact, c, cache, pool).map(_ => f)
}
}
def fetch(
def fetch[F[_]](
cache: File = default,
cachePolicy: CachePolicy = CachePolicy.UpdateChanging,
checksums: Seq[Option[String]] = defaultChecksums,
logger: Option[Logger] = None,
pool: ExecutorService = defaultPool,
ttl: Option[Duration] = defaultTtl
): Fetch.Content[Task] = {
)(implicit S: Schedulable[F]): Fetch.Content[F] = {
artifact =>
file(
artifact,
@ -1060,7 +1049,7 @@ object Cache {
} else
notFound(f)
EitherT(Task.now[Either[String, String]](res))
EitherT(S.point[Either[String, String]](res))
}
}
@ -1102,8 +1091,7 @@ object Cache {
val defaultConcurrentDownloadCount = 6
lazy val defaultPool =
Executors.newFixedThreadPool(defaultConcurrentDownloadCount, Strategy.DefaultDaemonThreadFactory)
lazy val defaultPool = Schedulable.fixedThreadPool(defaultConcurrentDownloadCount)
lazy val defaultTtl: Option[Duration] = {
def fromString(s: String) =

View File

@ -0,0 +1,42 @@
package coursier.util
import java.util.concurrent.{ExecutorService, Executors, ThreadFactory}
import scala.language.higherKinds
import scalaz.concurrent.{Task => ScalazTask}
trait Schedulable[F[_]] extends Gather[F] {
def schedule[A](pool: ExecutorService)(f: => A): F[A]
}
object Schedulable {
implicit val scalazTask: Schedulable[ScalazTask] =
new Schedulable[ScalazTask] {
def point[A](a: A) =
ScalazTask.point(a)
def schedule[A](pool: ExecutorService)(f: => A) =
ScalazTask(f)(pool)
def gather[A](elems: Seq[ScalazTask[A]]) =
ScalazTask.taskInstance.gather(elems)
def bind[A, B](elem: ScalazTask[A])(f: A => ScalazTask[B]) =
ScalazTask.taskInstance.bind(elem)(f)
}
def fixedThreadPool(size: Int): ExecutorService =
Executors.newFixedThreadPool(
size,
// from scalaz.concurrent.Strategy.DefaultDaemonThreadFactory
new ThreadFactory {
val defaultThreadFactory = Executors.defaultThreadFactory()
def newThread(r: Runnable) = {
val t = defaultThreadFactory.newThread(r)
t.setDaemon(true)
t
}
}
)
}