module BDCS.API.Server(mkApp,
proxyAPI,
runServer,
ServerStatus(..),
SocketException(..))
where
import BDCS.API.Compose(ComposeInfo(..), ComposeMsgAsk(..), ComposeMsgResp(..), compose)
import BDCS.API.Config(ServerConfig(..))
import BDCS.API.Recipes(openOrCreateRepo, commitRecipeDirectory)
import BDCS.API.Utils(GitLock(..))
import BDCS.API.V0(V0API, v0ApiServer)
import BDCS.API.Version(buildVersion)
import BDCS.DB(schemaVersion, getDbVersion)
import Control.Concurrent.Async(Async, async, cancel, replicateConcurrently_, waitCatch)
import qualified Control.Concurrent.ReadWriteLock as RWL
import Control.Concurrent.STM.TChan(newTChan, readTChan)
import Control.Concurrent.STM.TMVar(TMVar, newTMVar, putTMVar, readTMVar, takeTMVar)
import Control.Conditional(whenM)
import qualified Control.Exception as CE
import Control.Monad(forever, void)
import Control.Monad.Except(runExceptT)
import Control.Monad.Logger(runFileLoggingT, runStderrLoggingT)
import Control.Monad.STM(atomically)
import Data.Aeson
import Data.IORef(IORef, atomicModifyIORef', newIORef, readIORef)
import qualified Data.Map as Map
import Data.Sequence((|>), Seq(..), deleteAt, empty, findIndexL, index)
import Data.String.Conversions(cs)
import qualified Data.Text as T
import Database.Persist.Sqlite
import GHC.Conc(retry)
import GHC.Exts(toList)
import qualified GI.Ggit as Git
import Network.Socket
import Network.Wai
import Network.Wai.Handler.Warp
import Network.Wai.Middleware.Cors
import Servant
import System.Directory(createDirectoryIfMissing, doesPathExist, removePathForcibly)
import System.Environment(lookupEnv)
import System.FilePath.Posix((</>))
import System.Posix.Files(setFileMode, setOwnerAndGroup)
import System.Posix.User(GroupEntry(..), getGroupEntryForName)
import Text.Read(readMaybe)
data SocketException = BadFileDescriptor
| BadGroup String
| NoSocketError
deriving(Show)
instance CE.Exception SocketException
type InProgressMap = Map.Map T.Text (Async (), ComposeInfo)
data ServerStatus = ServerStatus
{ srvApi :: String
, srvBackend :: String
, srvBuild :: String
, srvSchemaVersion :: String
, srvDbVersion :: String
, srvDbSupported :: Bool
} deriving (Eq, Show)
instance ToJSON ServerStatus where
toJSON ServerStatus{..} = object
[ "api" .= srvApi
, "backend" .= srvBackend
, "build" .= srvBuild
, "schema_version" .= srvSchemaVersion
, "db_version" .= srvDbVersion
, "db_supported" .= srvDbSupported ]
instance FromJSON ServerStatus where
parseJSON = withObject "server status" $ \o -> do
srvApi <- o .: "api"
srvBackend <- o .: "backend"
srvBuild <- o .: "build"
srvSchemaVersion <- o .: "schema_version"
srvDbVersion <- o .: "db_version"
srvDbSupported <- o .: "db_supported"
return ServerStatus{..}
type CommonAPI = "api" :> "status" :> Get '[JSON] ServerStatus
maxComposes :: Int
maxComposes = 1
serverStatus :: ServerConfig -> Handler ServerStatus
serverStatus ServerConfig{..} = do
version <- dbVersion
return (ServerStatus "0" "weldr" buildVersion (show schemaVersion) (show version) (schemaVersion == version))
where
dbVersion = do
result <- runExceptT $ runSqlPool getDbVersion cfgPool
case result of
Left _ -> return 0
Right version -> return version
commonServer :: ServerConfig -> Server CommonAPI
commonServer cfg = serverStatus cfg
type CombinedAPI = CommonAPI
:<|> "api" :> "v0" :> V0API
combinedServer :: ServerConfig -> Server CombinedAPI
combinedServer cfg = commonServer cfg
:<|> v0ApiServer cfg
appCors :: Middleware
appCors = cors (const $ Just policy)
where
policy = simpleCorsResourcePolicy
{ corsRequestHeaders = ["Content-Type"]
, corsMethods = "DELETE" : "PUT" : simpleMethods }
proxyAPI :: Proxy CombinedAPI
proxyAPI = Proxy
application :: ServerConfig -> Application
application cfg = appCors
$ serve proxyAPI
$ combinedServer cfg
mkApp :: FilePath -> FilePath -> FilePath -> IO Application
mkApp bdcsPath gitRepoPath sqliteDbPath = do
pool <- runStderrLoggingT $ createSqlitePool (cs sqliteDbPath) 5
Git.init
repo <- openOrCreateRepo gitRepoPath
void $ commitRecipeDirectory repo "master" gitRepoPath
lock <- RWL.new
chan <- atomically newTChan
let cfg = ServerConfig { cfgRepoLock = GitLock lock repo,
cfgChan = chan,
cfgPool = pool,
cfgBdcs = bdcsPath,
cfgResultsDir = "/var/lib/composer" }
createDirectoryIfMissing True (cfgResultsDir cfg)
void $ async $ composeServer cfg
return $ application cfg
runServer :: FilePath -> String -> FilePath -> FilePath -> FilePath -> IO ()
runServer socketPath socketGroup bdcsPath gitRepoPath sqliteDbPath = void $ withSocketsDo $ do
sock <- getSocket socketPath
app <- mkApp bdcsPath gitRepoPath sqliteDbPath
runSettingsSocket defaultSettings sock app
where
getSocket :: FilePath -> IO Socket
getSocket fp = lookupEnv "LISTEN_FDS" >>= \case
Nothing -> if fp == "" then CE.throw NoSocketError else newSocket fp
Just s -> case readMaybe s of
Nothing -> CE.throw BadFileDescriptor
Just fd -> mkSocket fd AF_UNIX Stream defaultProtocol Bound
newSocket :: FilePath -> IO Socket
newSocket path = do
whenM (doesPathExist path) $
removePathForcibly path
gid <- CE.catch (groupID <$> getGroupEntryForName socketGroup)
(\(_ :: CE.IOException) -> CE.throw $ BadGroup socketGroup)
s <- socket AF_UNIX Stream defaultProtocol
bind s (SockAddrUnix path)
listen s 1
setFileMode path 0o660
setOwnerAndGroup path 0 gid
return s
composeServer :: ServerConfig -> IO ()
composeServer ServerConfig{..} = do
inProgressRef <- newIORef Map.empty
worklist <- atomically $ newTMVar empty
void $ async $ messagesThread inProgressRef worklist
replicateConcurrently_ maxComposes (workerThread inProgressRef worklist)
where
addCompose :: IORef InProgressMap -> ComposeInfo -> Async () -> IO ()
addCompose ref ci@ComposeInfo{..} thread =
void $ atomicModifyIORef' ref (\m -> (Map.insert ciId (thread, ci) m, ()))
removeCompose :: IORef InProgressMap -> T.Text -> IO ()
removeCompose ref uuid =
void $ atomicModifyIORef' ref (\m -> (Map.delete uuid m, ()))
workerThread :: IORef InProgressMap -> TMVar (Seq ComposeInfo) -> IO ()
workerThread inProgressRef worklist = forever $ do
ci <- atomically $ takeTMVar worklist >>= \case
(x :<| xs) -> putTMVar worklist xs >> return x
_ -> retry
thread <- async $ runFileLoggingT (ciResultsDir ci </> "compose.log")
(compose cfgBdcs cfgPool ci)
addCompose inProgressRef ci thread
void $ waitCatch thread
removeCompose inProgressRef (ciId ci)
messagesThread :: IORef InProgressMap -> TMVar (Seq ComposeInfo) -> IO ()
messagesThread inProgressRef worklist = forever $ atomically (readTChan cfgChan) >>= \case
(AskBuildsWaiting, Just r) -> do
lst <- atomically $ readTMVar worklist
atomically $ putTMVar r (RespBuildsWaiting $ map ciId (toList lst))
(AskBuildsInProgress, Just r) -> do
inProgress <- map snd . Map.elems <$> readIORef inProgressRef
atomically $ putTMVar r (RespBuildsInProgress $ map ciId inProgress)
(AskCancelBuild buildId, Just r) -> do
inProgress <- readIORef inProgressRef
case Map.lookup buildId inProgress of
Just (thread, ci) -> do cancel thread
removeCompose inProgressRef buildId
removePathForcibly (ciResultsDir ci)
atomically $ putTMVar r (RespBuildCancelled True)
_ -> atomically $ putTMVar r (RespBuildCancelled False)
(AskCompose ci, _) -> atomically $ do
lst <- takeTMVar worklist
putTMVar worklist (lst |> ci)
(AskDequeueBuild buildId, Just r) -> do
ci <- atomically $ do
lst <- takeTMVar worklist
case findIndexL (\e -> ciId e == buildId) lst of
Nothing -> return Nothing
Just ndx -> do let ele = index lst ndx
putTMVar worklist (deleteAt ndx lst)
return $ Just ele
case ci of
Just ComposeInfo{..} -> do
removePathForcibly ciResultsDir
atomically $ putTMVar r (RespBuildDequeued True)
Nothing -> atomically $ putTMVar r (RespBuildDequeued False)
_ -> return ()