Better locks in cache

This commit is contained in:
Alexandre Archambault 2015-12-30 01:34:48 +01:00
parent 8540ba3078
commit a1db5f1fdc
1 changed files with 113 additions and 65 deletions

View File

@ -2,6 +2,7 @@ package coursier
import java.net.{HttpURLConnection, URL}
import java.nio.channels.{ OverlappingFileLockException, FileLock }
import java.nio.file.{ StandardCopyOption, Files => NioFiles }
import java.security.MessageDigest
import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService}
@ -48,6 +49,98 @@ object Cache {
))
}
private def readFullyTo(
in: InputStream,
out: OutputStream,
logger: Option[Logger],
url: String
): Unit = {
val b = Array.fill[Byte](bufferSize)(0)
@tailrec
def helper(count: Long): Unit = {
val read = in.read(b)
if (read >= 0) {
out.write(b, 0, read)
out.flush()
logger.foreach(_.downloadProgress(url, count + read))
helper(count + read)
}
}
helper(0L)
}
private def withLockFor[T](file: File)(f: => FileError \/ T): FileError \/ T = {
val lockFile = new File(file.getParentFile, s"${file.getName}.lock")
lockFile.getParentFile.mkdirs()
var out = new FileOutputStream(lockFile)
try {
var lock: FileLock = null
try {
lock = out.getChannel.tryLock()
if (lock == null)
-\/(FileError.Locked(file))
else
try f
finally {
lock.release()
lock = null
out.close()
out = null
lockFile.delete()
}
}
catch {
case e: OverlappingFileLockException =>
-\/(FileError.Locked(file))
}
finally if (lock != null) lock.release()
} finally if (out != null) out.close()
}
private def downloading[T](
url: String,
file: File,
logger: Option[Logger]
)(
f: => FileError \/ T
): FileError \/ T =
try {
val o = new Object
val prev = urlLocks.putIfAbsent(url, o)
if (prev == null) {
logger.foreach(_.downloadingArtifact(url, file))
val res =
try f
catch { case e: Exception =>
logger.foreach(_.downloadedArtifact(url, success = false))
throw e
}
finally {
urlLocks.remove(url)
}
logger.foreach(_.downloadedArtifact(url, success = true))
res
} else
-\/(FileError.ConcurrentDownload(url))
}
catch { case e: Exception =>
-\/(FileError.DownloadError(e.getMessage))
}
private def temporaryFile(file: File): File = {
val dir = file.getParentFile
val name = file.getName
new File(dir, s"$name.part")
}
private def download(
artifact: Artifact,
cache: Seq[(String, File)],
@ -130,81 +223,36 @@ object Cache {
fromDatesOpt.getOrElse(true)
}
// FIXME Things can go wrong here and are not properly handled,
// e.g. what if the connection gets closed during the transfer?
// (partial file on disk?)
def remote(file: File, url: String): EitherT[Task, FileError, Unit] =
EitherT {
Task {
try {
val o = new Object
val prev = urlLocks.putIfAbsent(url, o)
if (prev == null) {
logger.foreach(_.downloadingArtifact(url, file))
withLockFor(file) {
downloading(url, file, logger) {
val conn = urlConn(url)
val r = try {
val conn = urlConn(url)
for (len <- Option(conn.getContentLengthLong) if len >= 0L)
logger.foreach(_.downloadLength(url, len))
for (len <- Option(conn.getContentLengthLong).filter(_ >= 0L))
logger.foreach(_.downloadLength(url, len))
val in = new BufferedInputStream(conn.getInputStream, bufferSize)
val in = new BufferedInputStream(conn.getInputStream, bufferSize)
val tmp = temporaryFile(file)
val result =
try {
file.getParentFile.mkdirs()
val out = new FileOutputStream(file)
try {
var lock: FileLock = null
try {
lock = out.getChannel.tryLock()
if (lock == null)
-\/(FileError.Locked(file))
else {
val b = Array.fill[Byte](bufferSize)(0)
val result =
try {
tmp.getParentFile.mkdirs()
val out = new FileOutputStream(tmp)
try \/-(readFullyTo(in, out, logger, url))
finally out.close()
} finally in.close()
@tailrec
def helper(count: Long): Unit = {
val read = in.read(b)
if (read >= 0) {
out.write(b, 0, read)
out.flush()
logger.foreach(_.downloadProgress(url, count + read))
helper(count + read)
}
}
file.getParentFile.mkdirs()
NioFiles.move(tmp.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE)
helper(0L)
\/-(())
}
}
catch {
case e: OverlappingFileLockException =>
-\/(FileError.Locked(file))
}
finally if (lock != null) lock.release()
} finally out.close()
} finally in.close()
for (lastModified <- Option(conn.getLastModified) if lastModified > 0L)
file.setLastModified(lastModified)
for (lastModified <- Option(conn.getLastModified).filter(_ > 0L))
file.setLastModified(lastModified)
result
}
catch { case e: Exception =>
logger.foreach(_.downloadedArtifact(url, success = false))
throw e
}
finally {
urlLocks.remove(url)
}
logger.foreach(_.downloadedArtifact(url, success = true))
r
} else
-\/(FileError.ConcurrentDownload(url))
}
catch { case e: Exception =>
-\/(FileError.DownloadError(e.getMessage))
result
}
}
}
}