| import json |
| import os |
| from shutil import copyfile, rmtree |
| |
| from docker.errors import ContextException |
| from docker.tls import TLSConfig |
| |
| from .config import ( |
| get_context_host, |
| get_meta_dir, |
| get_meta_file, |
| get_tls_dir, |
| ) |
| |
| |
| class Context: |
| """A context.""" |
| |
| def __init__(self, name, orchestrator=None, host=None, endpoints=None, |
| tls=False): |
| if not name: |
| raise Exception("Name not provided") |
| self.name = name |
| self.context_type = None |
| self.orchestrator = orchestrator |
| self.endpoints = {} |
| self.tls_cfg = {} |
| self.meta_path = "IN MEMORY" |
| self.tls_path = "IN MEMORY" |
| |
| if not endpoints: |
| # set default docker endpoint if no endpoint is set |
| default_endpoint = "docker" if ( |
| not orchestrator or orchestrator == "swarm" |
| ) else orchestrator |
| |
| self.endpoints = { |
| default_endpoint: { |
| "Host": get_context_host(host, tls), |
| "SkipTLSVerify": not tls |
| } |
| } |
| return |
| |
| # check docker endpoints |
| for k, v in endpoints.items(): |
| if not isinstance(v, dict): |
| # unknown format |
| raise ContextException( |
| f"Unknown endpoint format for context {name}: {v}", |
| ) |
| |
| self.endpoints[k] = v |
| if k != "docker": |
| continue |
| |
| self.endpoints[k]["Host"] = v.get("Host", get_context_host( |
| host, tls)) |
| self.endpoints[k]["SkipTLSVerify"] = bool(v.get( |
| "SkipTLSVerify", not tls)) |
| |
| def set_endpoint( |
| self, name="docker", host=None, tls_cfg=None, |
| skip_tls_verify=False, def_namespace=None): |
| self.endpoints[name] = { |
| "Host": get_context_host(host, not skip_tls_verify), |
| "SkipTLSVerify": skip_tls_verify |
| } |
| if def_namespace: |
| self.endpoints[name]["DefaultNamespace"] = def_namespace |
| |
| if tls_cfg: |
| self.tls_cfg[name] = tls_cfg |
| |
| def inspect(self): |
| return self.__call__() |
| |
| @classmethod |
| def load_context(cls, name): |
| meta = Context._load_meta(name) |
| if meta: |
| instance = cls( |
| meta["Name"], |
| orchestrator=meta["Metadata"].get("StackOrchestrator", None), |
| endpoints=meta.get("Endpoints", None)) |
| instance.context_type = meta["Metadata"].get("Type", None) |
| instance._load_certs() |
| instance.meta_path = get_meta_dir(name) |
| return instance |
| return None |
| |
| @classmethod |
| def _load_meta(cls, name): |
| meta_file = get_meta_file(name) |
| if not os.path.isfile(meta_file): |
| return None |
| |
| metadata = {} |
| try: |
| with open(meta_file) as f: |
| metadata = json.load(f) |
| except (OSError, KeyError, ValueError) as e: |
| # unknown format |
| raise Exception( |
| f"Detected corrupted meta file for context {name} : {e}" |
| ) from e |
| |
| # for docker endpoints, set defaults for |
| # Host and SkipTLSVerify fields |
| for k, v in metadata["Endpoints"].items(): |
| if k != "docker": |
| continue |
| metadata["Endpoints"][k]["Host"] = v.get( |
| "Host", get_context_host(None, False)) |
| metadata["Endpoints"][k]["SkipTLSVerify"] = bool( |
| v.get("SkipTLSVerify", True)) |
| |
| return metadata |
| |
| def _load_certs(self): |
| certs = {} |
| tls_dir = get_tls_dir(self.name) |
| for endpoint in self.endpoints.keys(): |
| if not os.path.isdir(os.path.join(tls_dir, endpoint)): |
| continue |
| ca_cert = None |
| cert = None |
| key = None |
| for filename in os.listdir(os.path.join(tls_dir, endpoint)): |
| if filename.startswith("ca"): |
| ca_cert = os.path.join(tls_dir, endpoint, filename) |
| elif filename.startswith("cert"): |
| cert = os.path.join(tls_dir, endpoint, filename) |
| elif filename.startswith("key"): |
| key = os.path.join(tls_dir, endpoint, filename) |
| if all([ca_cert, cert, key]): |
| verify = None |
| if endpoint == "docker" and not self.endpoints["docker"].get( |
| "SkipTLSVerify", False): |
| verify = True |
| certs[endpoint] = TLSConfig( |
| client_cert=(cert, key), ca_cert=ca_cert, verify=verify) |
| self.tls_cfg = certs |
| self.tls_path = tls_dir |
| |
| def save(self): |
| meta_dir = get_meta_dir(self.name) |
| if not os.path.isdir(meta_dir): |
| os.makedirs(meta_dir) |
| with open(get_meta_file(self.name), "w") as f: |
| f.write(json.dumps(self.Metadata)) |
| |
| tls_dir = get_tls_dir(self.name) |
| for endpoint, tls in self.tls_cfg.items(): |
| if not os.path.isdir(os.path.join(tls_dir, endpoint)): |
| os.makedirs(os.path.join(tls_dir, endpoint)) |
| |
| ca_file = tls.ca_cert |
| if ca_file: |
| copyfile(ca_file, os.path.join( |
| tls_dir, endpoint, os.path.basename(ca_file))) |
| |
| if tls.cert: |
| cert_file, key_file = tls.cert |
| copyfile(cert_file, os.path.join( |
| tls_dir, endpoint, os.path.basename(cert_file))) |
| copyfile(key_file, os.path.join( |
| tls_dir, endpoint, os.path.basename(key_file))) |
| |
| self.meta_path = get_meta_dir(self.name) |
| self.tls_path = get_tls_dir(self.name) |
| |
| def remove(self): |
| if os.path.isdir(self.meta_path): |
| rmtree(self.meta_path) |
| if os.path.isdir(self.tls_path): |
| rmtree(self.tls_path) |
| |
| def __repr__(self): |
| return f"<{self.__class__.__name__}: '{self.name}'>" |
| |
| def __str__(self): |
| return json.dumps(self.__call__(), indent=2) |
| |
| def __call__(self): |
| result = self.Metadata |
| result.update(self.TLSMaterial) |
| result.update(self.Storage) |
| return result |
| |
| def is_docker_host(self): |
| return self.context_type is None |
| |
| @property |
| def Name(self): |
| return self.name |
| |
| @property |
| def Host(self): |
| if not self.orchestrator or self.orchestrator == "swarm": |
| endpoint = self.endpoints.get("docker", None) |
| if endpoint: |
| return endpoint.get("Host", None) |
| return None |
| |
| return self.endpoints[self.orchestrator].get("Host", None) |
| |
| @property |
| def Orchestrator(self): |
| return self.orchestrator |
| |
| @property |
| def Metadata(self): |
| meta = {} |
| if self.orchestrator: |
| meta = {"StackOrchestrator": self.orchestrator} |
| return { |
| "Name": self.name, |
| "Metadata": meta, |
| "Endpoints": self.endpoints |
| } |
| |
| @property |
| def TLSConfig(self): |
| key = self.orchestrator |
| if not key or key == "swarm": |
| key = "docker" |
| if key in self.tls_cfg.keys(): |
| return self.tls_cfg[key] |
| return None |
| |
| @property |
| def TLSMaterial(self): |
| certs = {} |
| for endpoint, tls in self.tls_cfg.items(): |
| cert, key = tls.cert |
| certs[endpoint] = list( |
| map(os.path.basename, [tls.ca_cert, cert, key])) |
| return { |
| "TLSMaterial": certs |
| } |
| |
| @property |
| def Storage(self): |
| return { |
| "Storage": { |
| "MetadataPath": self.meta_path, |
| "TLSPath": self.tls_path |
| }} |