Merge branch 'master' of https://github.com/kubernetes-client/haskell into kubeconfig
This commit is contained in:
@@ -15,6 +15,7 @@ dependencies:
|
||||
- x509-validation
|
||||
- http-client >=0.5 && <0.6
|
||||
- http-client-tls
|
||||
- microlens >= 0.4.3 && <0.5
|
||||
- bytestring >=0.10.0 && <0.11
|
||||
- text >=0.11 && <1.3
|
||||
- safe-exceptions <0.2
|
||||
|
||||
@@ -11,21 +11,25 @@ import qualified Data.ByteString as B
|
||||
import qualified Data.ByteString.Lazy as LazyB
|
||||
import Data.Default.Class (def)
|
||||
import Data.Either (rights)
|
||||
import Data.Monoid ((<>))
|
||||
import Data.PEM (pemContent, pemParseBS)
|
||||
import qualified Data.Text as T
|
||||
import qualified Data.Text.Encoding as T
|
||||
import qualified Data.Text.IO as T
|
||||
import Data.Typeable (Typeable)
|
||||
import Data.X509 (SignedCertificate,
|
||||
decodeSignedCertificate)
|
||||
import qualified Data.X509 as X509
|
||||
import Data.X509.CertificateStore (makeCertificateStore)
|
||||
import Data.X509.CertificateStore (CertificateStore, makeCertificateStore)
|
||||
import qualified Data.X509.Validation as X509
|
||||
import Lens.Micro (Lens', lens, set)
|
||||
import Network.Connection (TLSSettings (..))
|
||||
import qualified Network.HTTP.Client as NH
|
||||
import Network.HTTP.Client.TLS (mkManagerSettings)
|
||||
import Network.TLS (Credential, defaultParamsClient)
|
||||
import qualified Network.TLS as TLS
|
||||
import qualified Network.TLS.Extra as TLS
|
||||
import System.Environment (getEnv)
|
||||
import System.X509 (getSystemCertificateStore)
|
||||
|
||||
-- |Sets the master URI in the 'K.KubernetesConfig'.
|
||||
@@ -46,7 +50,7 @@ setTokenAuth
|
||||
-> K.KubernetesConfig
|
||||
-> K.KubernetesConfig
|
||||
setTokenAuth token kcfg = kcfg
|
||||
{ K.configAuthMethods = [K.AnyAuthMethod (K.AuthApiKeyBearerToken token)]
|
||||
{ K.configAuthMethods = [K.AnyAuthMethod (K.AuthApiKeyBearerToken $ "Bearer " <> token)]
|
||||
}
|
||||
|
||||
-- |Creates a 'NH.Manager' that can handle TLS.
|
||||
@@ -67,25 +71,22 @@ defaultTLSClientParams = do
|
||||
}
|
||||
}
|
||||
|
||||
clientHooksL :: Lens' TLS.ClientParams TLS.ClientHooks
|
||||
clientHooksL = lens TLS.clientHooks (\cp ch -> cp { TLS.clientHooks = ch })
|
||||
|
||||
onServerCertificateL :: Lens' TLS.ClientParams (CertificateStore -> TLS.ValidationCache -> X509.ServiceID -> X509.CertificateChain -> IO [X509.FailedReason])
|
||||
onServerCertificateL =
|
||||
clientHooksL . lens TLS.onServerCertificate (\ch osc -> ch { TLS.onServerCertificate = osc })
|
||||
|
||||
-- |Don't check whether the cert presented by the server matches the name of the server you are connecting to.
|
||||
-- This is necessary if you specify the server host by its IP address.
|
||||
disableServerNameValidation :: TLS.ClientParams -> TLS.ClientParams
|
||||
disableServerNameValidation cp = cp
|
||||
{ TLS.clientHooks = (TLS.clientHooks cp)
|
||||
{ TLS.onServerCertificate = X509.validate
|
||||
X509.HashSHA256
|
||||
def
|
||||
def { X509.checkFQHN = False }
|
||||
}
|
||||
}
|
||||
disableServerNameValidation =
|
||||
set onServerCertificateL (X509.validate X509.HashSHA256 def (def { X509.checkFQHN = False }))
|
||||
|
||||
-- |Insecure mode. The client will not validate the server cert at all.
|
||||
disableServerCertValidation :: TLS.ClientParams -> TLS.ClientParams
|
||||
disableServerCertValidation cp = cp
|
||||
{ TLS.clientHooks = (TLS.clientHooks cp)
|
||||
{ TLS.onServerCertificate = (\_ _ _ _ -> return [])
|
||||
}
|
||||
}
|
||||
disableServerCertValidation = set onServerCertificateL (\_ _ _ _ -> return [])
|
||||
|
||||
-- |Use a custom CA store.
|
||||
setCAStore :: [SignedCertificate] -> TLS.ClientParams -> TLS.ClientParams
|
||||
@@ -95,13 +96,13 @@ setCAStore certs cp = cp
|
||||
}
|
||||
}
|
||||
|
||||
onCertificateRequestL :: Lens' TLS.ClientParams (([TLS.CertificateType], Maybe [TLS.HashAndSignatureAlgorithm], [X509.DistinguishedName]) -> IO (Maybe (X509.CertificateChain, TLS.PrivKey)))
|
||||
onCertificateRequestL =
|
||||
clientHooksL . lens TLS.onCertificateRequest (\ch ocr -> ch { TLS.onCertificateRequest = ocr })
|
||||
|
||||
-- |Use a client cert for authentication.
|
||||
setClientCert :: Credential -> TLS.ClientParams -> TLS.ClientParams
|
||||
setClientCert cred cp = cp
|
||||
{ TLS.clientHooks = (TLS.clientHooks cp)
|
||||
{ TLS.onCertificateRequest = (\_ -> return (Just cred))
|
||||
}
|
||||
}
|
||||
setClientCert cred = set onCertificateRequestL (\_ -> return $ Just cred)
|
||||
|
||||
-- |Parses a PEM-encoded @ByteString@ into a list of certificates.
|
||||
parsePEMCerts :: B.ByteString -> Either String [SignedCertificate]
|
||||
@@ -119,3 +120,17 @@ loadPEMCerts p = do
|
||||
liftIO (B.readFile p)
|
||||
>>= either (throwM . ParsePEMCertsException) return
|
||||
. parsePEMCerts
|
||||
|
||||
serviceAccountDir :: FilePath
|
||||
serviceAccountDir = "/var/run/secrets/kubernetes.io/serviceaccount"
|
||||
|
||||
cluster :: (MonadIO m, MonadThrow m) => m (NH.Manager, K.KubernetesConfig)
|
||||
cluster = do
|
||||
caStore <- loadPEMCerts $ serviceAccountDir ++ "/ca.crt"
|
||||
defTlsParams <- liftIO defaultTLSClientParams
|
||||
mgr <- liftIO . newManager . setCAStore caStore $ disableServerNameValidation defTlsParams
|
||||
tok <- liftIO . T.readFile $ serviceAccountDir ++ "/token"
|
||||
host <- liftIO $ getEnv "KUBERNETES_SERVICE_HOST"
|
||||
port <- liftIO $ getEnv "KUBERNETES_SERVICE_PORT"
|
||||
config <- setTokenAuth tok . setMasterURI (T.pack $ "https://" ++ host ++ ":" ++ port) <$> liftIO K.newConfig
|
||||
return (mgr, config)
|
||||
|
||||
71
kubernetes-watch/README.md
Normal file
71
kubernetes-watch/README.md
Normal file
@@ -0,0 +1,71 @@
|
||||
# kubernetes-watch-client
|
||||
|
||||
Client for streaming events from watch enabled endpoints.
|
||||
|
||||
## Example
|
||||
Following is a simple example which
|
||||
just streams to stdout. First some setup - this assumes kubernetes is accessible
|
||||
at http://localhost:8001, e.g. after running `kubectl proxy`:
|
||||
|
||||
```haskell
|
||||
> import qualified Data.ByteString.Streaming.Char8 as Q
|
||||
|
||||
> manager <- newManager defaultManagerSettings
|
||||
> defaultConfig <- newConfig
|
||||
> config = defaultConfig { configHost = "http://localhost:8001", configValidateAuthMethods = False }
|
||||
> request = listEndpointsForAllNamespaces (Accept MimeJSON)
|
||||
```
|
||||
|
||||
Launching 'dispatchWatch' with the above we get a stream of endpoints data:
|
||||
|
||||
```haskell
|
||||
> dispatchWatch manager config request Q.stdout
|
||||
{"type":\"ADDED\","object":{"kind":\"Endpoints\","apiVersion":"v1","metadata":{"name":"heapster" ....
|
||||
```
|
||||
|
||||
A more complex example involving some ggprocessing of the stream, the following
|
||||
prints out the event types of each event. First, define functions to allow us apply
|
||||
a parser to a stream:
|
||||
|
||||
|
||||
```haskell
|
||||
import Data.Aeson
|
||||
import qualified Data.ByteString.Streaming.Char8 as Q
|
||||
import Data.JsonStream.Parser
|
||||
import qualified Streaming.Prelude as S
|
||||
|
||||
-- | Parse the stream using the given parser.
|
||||
streamParse ::
|
||||
FromJSON a =>
|
||||
Parser a
|
||||
-> Q.ByteString IO r
|
||||
-> Stream (Of [a]) IO r
|
||||
streamParse parser byteStream = do
|
||||
byteStream & Q.lines & parseEvent parser
|
||||
|
||||
-- | Parse a single event from the stream.
|
||||
parseEvent ::
|
||||
(FromJSON a, Monad m) =>
|
||||
Parser a
|
||||
-> Stream (Q.ByteString m) m r
|
||||
-> Stream (Of [a]) m r
|
||||
parseEvent parser byteStream = S.map (parseByteString parser) (S.mapped Q.toStrict byteStream)
|
||||
```
|
||||
|
||||
Next, define the parser and apply it to the stream:
|
||||
|
||||
```haskell
|
||||
> eventParser = value :: Parser (WatchEvent V1Endpoints)
|
||||
> withResponseBody body = streamParse eventParser body & S.map (map eventType)
|
||||
> dispatchWatch manager config request (S.print . withResponseBody)
|
||||
[\"ADDED\"]
|
||||
[\"ADDED\"]
|
||||
[\"MODIFIED\"]
|
||||
...
|
||||
```
|
||||
|
||||
Packages in this example:
|
||||
* Data.Aeson -- from [aeson](https://hackage.haskell.org/package/aeson)
|
||||
* Data.ByteString.Streaming.Char8 from [streaming-bytestring](https://hackage.haskell.org/package/streaming-bytestring-0.1.5/docs/Data-ByteString-Streaming-Char8.html)
|
||||
* Data.JsonStream.Parser from [json-stream](https://hackage.haskell.org/package/json-stream-0.4.1.5/docs/Data-JsonStream-Parser.html)
|
||||
* Streaming.Prelude from [streaming](https://hackage.haskell.org/package/streaming-0.2.0.0/docs/Streaming-Prelude.html)
|
||||
15
kubernetes-watch/package.yaml
Normal file
15
kubernetes-watch/package.yaml
Normal file
@@ -0,0 +1,15 @@
|
||||
name: kubernetes-watch
|
||||
version: 0.1.0.0
|
||||
library:
|
||||
source-dirs: src
|
||||
dependencies:
|
||||
- base >=4.7 && <5.0
|
||||
- aeson >=1.0 && <2.0
|
||||
- bytestring >=0.10.0 && <0.11
|
||||
- http-client >=0.5 && <0.6
|
||||
- mtl >=2.2.1
|
||||
- streaming-bytestring >= 0.1.5 && < 0.2.0
|
||||
- text >=0.11 && <1.3
|
||||
- kubernetes == 0.1.0.0
|
||||
|
||||
|
||||
95
kubernetes-watch/src/Kubernetes/Watch/Client.hs
Normal file
95
kubernetes-watch/src/Kubernetes/Watch/Client.hs
Normal file
@@ -0,0 +1,95 @@
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
module Kubernetes.Watch.Client
|
||||
( WatchEvent
|
||||
, eventType
|
||||
, eventObject
|
||||
, dispatchWatch
|
||||
) where
|
||||
|
||||
import Control.Monad
|
||||
import Control.Monad.Trans (lift)
|
||||
import Data.Aeson
|
||||
import qualified Data.ByteString as B
|
||||
import qualified Data.ByteString.Streaming.Char8 as Q
|
||||
import qualified Data.Text as T
|
||||
import Kubernetes.Core
|
||||
import Kubernetes.Client
|
||||
import Kubernetes.MimeTypes
|
||||
import Kubernetes.Model (Watch(..))
|
||||
import Network.HTTP.Client
|
||||
|
||||
data WatchEvent a = WatchEvent
|
||||
{ _eventType :: T.Text
|
||||
, _eventObject :: a
|
||||
} deriving (Eq, Show)
|
||||
|
||||
instance FromJSON a => FromJSON (WatchEvent a) where
|
||||
parseJSON (Object x) = WatchEvent <$> x .: "type" <*> x .: "object"
|
||||
parseJSON _ = fail "Expected an object"
|
||||
|
||||
instance ToJSON a => ToJSON (WatchEvent a) where
|
||||
toJSON x = object
|
||||
[ "type" .= _eventType x
|
||||
, "object" .= _eventObject x
|
||||
]
|
||||
|
||||
-- | Type of the 'WatchEvent'.
|
||||
eventType :: WatchEvent a -> T.Text
|
||||
eventType = _eventType
|
||||
|
||||
-- | Object within the 'WatchEvent'.
|
||||
eventObject :: WatchEvent a -> a
|
||||
eventObject = _eventObject
|
||||
|
||||
{-| Dispatch a request setting watch to true. Takes a consumer function
|
||||
which consumes the 'Q.ByteString' stream. Following is a simple example which
|
||||
just streams to stdout. First some setup - this assumes kubernetes is accessible
|
||||
at http://localhost:8001, e.g. after running /kubectl proxy/:
|
||||
|
||||
@
|
||||
import qualified Data.ByteString.Streaming.Char8 as Q
|
||||
|
||||
manager <- newManager defaultManagerSettings
|
||||
defaultConfig <- newConfig
|
||||
config = defaultConfig { configHost = "http://localhost:8001", configValidateAuthMethods = False }
|
||||
request = listEndpointsForAllNamespaces (Accept MimeJSON)
|
||||
@
|
||||
|
||||
Launching 'dispatchWatch' with the above we get a stream of endpoints data:
|
||||
|
||||
@
|
||||
> dispatchWatch manager config request Q.stdout
|
||||
{"type":\"ADDED\","object":{"kind":\"Endpoints\","apiVersion":"v1","metadata":{"name":"heapster" ....
|
||||
@
|
||||
-}
|
||||
dispatchWatch ::
|
||||
(HasOptionalParam req Watch, MimeType accept, MimeType contentType) =>
|
||||
Manager
|
||||
-> KubernetesConfig
|
||||
-> KubernetesRequest req contentType resp accept
|
||||
-> (Q.ByteString IO () -> IO a)
|
||||
-> IO a
|
||||
dispatchWatch manager config request apply = do
|
||||
let watchRequest = applyOptionalParam request (Watch True)
|
||||
(InitRequest req) <- _toInitRequest config watchRequest
|
||||
withHTTP req manager $ \resp -> apply $ responseBody resp
|
||||
|
||||
withHTTP ::
|
||||
Request
|
||||
-> Manager
|
||||
-> (Response (Q.ByteString IO ()) -> IO a)
|
||||
-> IO a
|
||||
withHTTP request manager f = withResponse request manager f'
|
||||
where
|
||||
f' resp = do
|
||||
let p = (from . brRead . responseBody) resp
|
||||
f (resp {responseBody = p})
|
||||
from :: IO B.ByteString -> Q.ByteString IO ()
|
||||
from io = go
|
||||
where
|
||||
go = do
|
||||
bs <- lift io
|
||||
unless (B.null bs) $ do
|
||||
Q.chunk bs
|
||||
go
|
||||
@@ -3,6 +3,5 @@ extra-deps:
|
||||
packages:
|
||||
- kubernetes
|
||||
- kubernetes-client-helper
|
||||
- kubernetes-watch
|
||||
- kubeconfig
|
||||
nix:
|
||||
enable: true
|
||||
|
||||
Reference in New Issue
Block a user