File manager - Edit - /home/newsbmcs.com/public_html/static/img/logo/api.tar
Back
secret.py 0000644 00000005465 15027743616 0006430 0 ustar 00 import base64 from .. import errors, utils class SecretApiMixin: @utils.minimum_version('1.25') def create_secret(self, name, data, labels=None, driver=None): """ Create a secret Args: name (string): Name of the secret data (bytes): Secret data to be stored labels (dict): A mapping of labels to assign to the secret driver (DriverConfig): A custom driver configuration. If unspecified, the default ``internal`` driver will be used Returns (dict): ID of the newly created secret """ if not isinstance(data, bytes): data = data.encode('utf-8') data = base64.b64encode(data) data = data.decode('ascii') body = { 'Data': data, 'Name': name, 'Labels': labels } if driver is not None: if utils.version_lt(self._version, '1.31'): raise errors.InvalidVersion( 'Secret driver is only available for API version > 1.31' ) body['Driver'] = driver url = self._url('/secrets/create') return self._result( self._post_json(url, data=body), True ) @utils.minimum_version('1.25') @utils.check_resource('id') def inspect_secret(self, id): """ Retrieve secret metadata Args: id (string): Full ID of the secret to inspect Returns (dict): A dictionary of metadata Raises: :py:class:`docker.errors.NotFound` if no secret with that ID exists """ url = self._url('/secrets/{0}', id) return self._result(self._get(url), True) @utils.minimum_version('1.25') @utils.check_resource('id') def remove_secret(self, id): """ Remove a secret Args: id (string): Full ID of the secret to remove Returns (boolean): True if successful Raises: :py:class:`docker.errors.NotFound` if no secret with that ID exists """ url = self._url('/secrets/{0}', id) res = self._delete(url) self._raise_for_status(res) return True @utils.minimum_version('1.25') def secrets(self, filters=None): """ List secrets Args: filters (dict): A map of filters to process on the secrets list. Available filters: ``names`` Returns (list): A list of secrets """ url = self._url('/secrets') params = {} if filters: params['filters'] = utils.convert_filters(filters) return self._result(self._get(url, params=params), True) service.py 0000644 00000045417 15027743616 0006604 0 ustar 00 from .. import auth, errors, utils from ..types import ServiceMode def _check_api_features(version, task_template, update_config, endpoint_spec, rollback_config): def raise_version_error(param, min_version): raise errors.InvalidVersion( f'{param} is not supported in API version < {min_version}' ) if update_config is not None: if utils.version_lt(version, '1.25'): if 'MaxFailureRatio' in update_config: raise_version_error('UpdateConfig.max_failure_ratio', '1.25') if 'Monitor' in update_config: raise_version_error('UpdateConfig.monitor', '1.25') if utils.version_lt(version, '1.28'): if update_config.get('FailureAction') == 'rollback': raise_version_error( 'UpdateConfig.failure_action rollback', '1.28' ) if utils.version_lt(version, '1.29'): if 'Order' in update_config: raise_version_error('UpdateConfig.order', '1.29') if rollback_config is not None: if utils.version_lt(version, '1.28'): raise_version_error('rollback_config', '1.28') if utils.version_lt(version, '1.29'): if 'Order' in update_config: raise_version_error('RollbackConfig.order', '1.29') if endpoint_spec is not None: if utils.version_lt(version, '1.32') and 'Ports' in endpoint_spec: if any(p.get('PublishMode') for p in endpoint_spec['Ports']): raise_version_error('EndpointSpec.Ports[].mode', '1.32') if task_template is not None: if 'ForceUpdate' in task_template and utils.version_lt( version, '1.25'): raise_version_error('force_update', '1.25') if task_template.get('Placement'): if utils.version_lt(version, '1.30'): if task_template['Placement'].get('Platforms'): raise_version_error('Placement.platforms', '1.30') if utils.version_lt(version, '1.27'): if task_template['Placement'].get('Preferences'): raise_version_error('Placement.preferences', '1.27') if task_template.get('ContainerSpec'): container_spec = task_template.get('ContainerSpec') if utils.version_lt(version, '1.25'): if container_spec.get('TTY'): raise_version_error('ContainerSpec.tty', '1.25') if container_spec.get('Hostname') is not None: raise_version_error('ContainerSpec.hostname', '1.25') if container_spec.get('Hosts') is not None: raise_version_error('ContainerSpec.hosts', '1.25') if container_spec.get('Groups') is not None: raise_version_error('ContainerSpec.groups', '1.25') if container_spec.get('DNSConfig') is not None: raise_version_error('ContainerSpec.dns_config', '1.25') if container_spec.get('Healthcheck') is not None: raise_version_error('ContainerSpec.healthcheck', '1.25') if utils.version_lt(version, '1.28'): if container_spec.get('ReadOnly') is not None: raise_version_error('ContainerSpec.dns_config', '1.28') if container_spec.get('StopSignal') is not None: raise_version_error('ContainerSpec.stop_signal', '1.28') if utils.version_lt(version, '1.30'): if container_spec.get('Configs') is not None: raise_version_error('ContainerSpec.configs', '1.30') if container_spec.get('Privileges') is not None: raise_version_error('ContainerSpec.privileges', '1.30') if utils.version_lt(version, '1.35'): if container_spec.get('Isolation') is not None: raise_version_error('ContainerSpec.isolation', '1.35') if utils.version_lt(version, '1.38'): if container_spec.get('Init') is not None: raise_version_error('ContainerSpec.init', '1.38') if task_template.get('Resources'): if utils.version_lt(version, '1.32'): if task_template['Resources'].get('GenericResources'): raise_version_error('Resources.generic_resources', '1.32') def _merge_task_template(current, override): merged = current.copy() if override is not None: for ts_key, ts_value in override.items(): if ts_key == 'ContainerSpec': if 'ContainerSpec' not in merged: merged['ContainerSpec'] = {} for cs_key, cs_value in override['ContainerSpec'].items(): if cs_value is not None: merged['ContainerSpec'][cs_key] = cs_value elif ts_value is not None: merged[ts_key] = ts_value return merged class ServiceApiMixin: @utils.minimum_version('1.24') def create_service( self, task_template, name=None, labels=None, mode=None, update_config=None, networks=None, endpoint_config=None, endpoint_spec=None, rollback_config=None ): """ Create a service. Args: task_template (TaskTemplate): Specification of the task to start as part of the new service. name (string): User-defined name for the service. Optional. labels (dict): A map of labels to associate with the service. Optional. mode (ServiceMode): Scheduling mode for the service (replicated or global). Defaults to replicated. update_config (UpdateConfig): Specification for the update strategy of the service. Default: ``None`` rollback_config (RollbackConfig): Specification for the rollback strategy of the service. Default: ``None`` networks (:py:class:`list`): List of network names or IDs or :py:class:`~docker.types.NetworkAttachmentConfig` to attach the service to. Default: ``None``. endpoint_spec (EndpointSpec): Properties that can be configured to access and load balance a service. Default: ``None``. Returns: A dictionary containing an ``ID`` key for the newly created service. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ _check_api_features( self._version, task_template, update_config, endpoint_spec, rollback_config ) url = self._url('/services/create') headers = {} image = task_template.get('ContainerSpec', {}).get('Image', None) if image is None: raise errors.DockerException( 'Missing mandatory Image key in ContainerSpec' ) if mode and not isinstance(mode, dict): mode = ServiceMode(mode) registry, repo_name = auth.resolve_repository_name(image) auth_header = auth.get_config_header(self, registry) if auth_header: headers['X-Registry-Auth'] = auth_header if utils.version_lt(self._version, '1.25'): networks = networks or task_template.pop('Networks', None) data = { 'Name': name, 'Labels': labels, 'TaskTemplate': task_template, 'Mode': mode, 'Networks': utils.convert_service_networks(networks), 'EndpointSpec': endpoint_spec } if update_config is not None: data['UpdateConfig'] = update_config if rollback_config is not None: data['RollbackConfig'] = rollback_config return self._result( self._post_json(url, data=data, headers=headers), True ) @utils.minimum_version('1.24') @utils.check_resource('service') def inspect_service(self, service, insert_defaults=None): """ Return information about a service. Args: service (str): Service name or ID. insert_defaults (boolean): If true, default values will be merged into the service inspect output. Returns: (dict): A dictionary of the server-side representation of the service, including all relevant properties. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ url = self._url('/services/{0}', service) params = {} if insert_defaults is not None: if utils.version_lt(self._version, '1.29'): raise errors.InvalidVersion( 'insert_defaults is not supported in API version < 1.29' ) params['insertDefaults'] = insert_defaults return self._result(self._get(url, params=params), True) @utils.minimum_version('1.24') @utils.check_resource('task') def inspect_task(self, task): """ Retrieve information about a task. Args: task (str): Task ID Returns: (dict): Information about the task. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ url = self._url('/tasks/{0}', task) return self._result(self._get(url), True) @utils.minimum_version('1.24') @utils.check_resource('service') def remove_service(self, service): """ Stop and remove a service. Args: service (str): Service name or ID Returns: ``True`` if successful. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ url = self._url('/services/{0}', service) resp = self._delete(url) self._raise_for_status(resp) return True @utils.minimum_version('1.24') def services(self, filters=None, status=None): """ List services. Args: filters (dict): Filters to process on the nodes list. Valid filters: ``id``, ``name`` , ``label`` and ``mode``. Default: ``None``. status (bool): Include the service task count of running and desired tasks. Default: ``None``. Returns: A list of dictionaries containing data about each service. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ params = { 'filters': utils.convert_filters(filters) if filters else None } if status is not None: if utils.version_lt(self._version, '1.41'): raise errors.InvalidVersion( 'status is not supported in API version < 1.41' ) params['status'] = status url = self._url('/services') return self._result(self._get(url, params=params), True) @utils.minimum_version('1.25') @utils.check_resource('service') def service_logs(self, service, details=False, follow=False, stdout=False, stderr=False, since=0, timestamps=False, tail='all', is_tty=None): """ Get log stream for a service. Note: This endpoint works only for services with the ``json-file`` or ``journald`` logging drivers. Args: service (str): ID or name of the service details (bool): Show extra details provided to logs. Default: ``False`` follow (bool): Keep connection open to read logs as they are sent by the Engine. Default: ``False`` stdout (bool): Return logs from ``stdout``. Default: ``False`` stderr (bool): Return logs from ``stderr``. Default: ``False`` since (int): UNIX timestamp for the logs staring point. Default: 0 timestamps (bool): Add timestamps to every log line. tail (string or int): Number of log lines to be returned, counting from the current end of the logs. Specify an integer or ``'all'`` to output all log lines. Default: ``all`` is_tty (bool): Whether the service's :py:class:`ContainerSpec` enables the TTY option. If omitted, the method will query the Engine for the information, causing an additional roundtrip. Returns (generator): Logs for the service. """ params = { 'details': details, 'follow': follow, 'stdout': stdout, 'stderr': stderr, 'since': since, 'timestamps': timestamps, 'tail': tail } url = self._url('/services/{0}/logs', service) res = self._get(url, params=params, stream=True) if is_tty is None: is_tty = self.inspect_service( service )['Spec']['TaskTemplate']['ContainerSpec'].get('TTY', False) return self._get_result_tty(True, res, is_tty) @utils.minimum_version('1.24') def tasks(self, filters=None): """ Retrieve a list of tasks. Args: filters (dict): A map of filters to process on the tasks list. Valid filters: ``id``, ``name``, ``service``, ``node``, ``label`` and ``desired-state``. Returns: (:py:class:`list`): List of task dictionaries. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ params = { 'filters': utils.convert_filters(filters) if filters else None } url = self._url('/tasks') return self._result(self._get(url, params=params), True) @utils.minimum_version('1.24') @utils.check_resource('service') def update_service(self, service, version, task_template=None, name=None, labels=None, mode=None, update_config=None, networks=None, endpoint_config=None, endpoint_spec=None, fetch_current_spec=False, rollback_config=None): """ Update a service. Args: service (string): A service identifier (either its name or service ID). version (int): The version number of the service object being updated. This is required to avoid conflicting writes. task_template (TaskTemplate): Specification of the updated task to start as part of the service. name (string): New name for the service. Optional. labels (dict): A map of labels to associate with the service. Optional. mode (ServiceMode): Scheduling mode for the service (replicated or global). Defaults to replicated. update_config (UpdateConfig): Specification for the update strategy of the service. Default: ``None``. rollback_config (RollbackConfig): Specification for the rollback strategy of the service. Default: ``None`` networks (:py:class:`list`): List of network names or IDs or :py:class:`~docker.types.NetworkAttachmentConfig` to attach the service to. Default: ``None``. endpoint_spec (EndpointSpec): Properties that can be configured to access and load balance a service. Default: ``None``. fetch_current_spec (boolean): Use the undefined settings from the current specification of the service. Default: ``False`` Returns: A dictionary containing a ``Warnings`` key. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ _check_api_features( self._version, task_template, update_config, endpoint_spec, rollback_config ) if fetch_current_spec: inspect_defaults = True if utils.version_lt(self._version, '1.29'): inspect_defaults = None current = self.inspect_service( service, insert_defaults=inspect_defaults )['Spec'] else: current = {} url = self._url('/services/{0}/update', service) data = {} headers = {} data['Name'] = current.get('Name') if name is None else name data['Labels'] = current.get('Labels') if labels is None else labels if mode is not None: if not isinstance(mode, dict): mode = ServiceMode(mode) data['Mode'] = mode else: data['Mode'] = current.get('Mode') data['TaskTemplate'] = _merge_task_template( current.get('TaskTemplate', {}), task_template ) container_spec = data['TaskTemplate'].get('ContainerSpec', {}) image = container_spec.get('Image', None) if image is not None: registry, repo_name = auth.resolve_repository_name(image) auth_header = auth.get_config_header(self, registry) if auth_header: headers['X-Registry-Auth'] = auth_header if update_config is not None: data['UpdateConfig'] = update_config else: data['UpdateConfig'] = current.get('UpdateConfig') if rollback_config is not None: data['RollbackConfig'] = rollback_config else: data['RollbackConfig'] = current.get('RollbackConfig') if networks is not None: converted_networks = utils.convert_service_networks(networks) if utils.version_lt(self._version, '1.25'): data['Networks'] = converted_networks else: data['TaskTemplate']['Networks'] = converted_networks elif utils.version_lt(self._version, '1.25'): data['Networks'] = current.get('Networks') elif data['TaskTemplate'].get('Networks') is None: current_task_template = current.get('TaskTemplate', {}) current_networks = current_task_template.get('Networks') if current_networks is None: current_networks = current.get('Networks') if current_networks is not None: data['TaskTemplate']['Networks'] = current_networks if endpoint_spec is not None: data['EndpointSpec'] = endpoint_spec else: data['EndpointSpec'] = current.get('EndpointSpec') resp = self._post_json( url, data=data, params={'version': version}, headers=headers ) return self._result(resp, json=True) swarm.py 0000644 00000043251 15027743616 0006267 0 ustar 00 import http.client as http_client import logging from .. import errors, types, utils from ..constants import DEFAULT_SWARM_ADDR_POOL, DEFAULT_SWARM_SUBNET_SIZE log = logging.getLogger(__name__) class SwarmApiMixin: def create_swarm_spec(self, *args, **kwargs): """ Create a :py:class:`docker.types.SwarmSpec` instance that can be used as the ``swarm_spec`` argument in :py:meth:`~docker.api.swarm.SwarmApiMixin.init_swarm`. Args: task_history_retention_limit (int): Maximum number of tasks history stored. snapshot_interval (int): Number of logs entries between snapshot. keep_old_snapshots (int): Number of snapshots to keep beyond the current snapshot. log_entries_for_slow_followers (int): Number of log entries to keep around to sync up slow followers after a snapshot is created. heartbeat_tick (int): Amount of ticks (in seconds) between each heartbeat. election_tick (int): Amount of ticks (in seconds) needed without a leader to trigger a new election. dispatcher_heartbeat_period (int): The delay for an agent to send a heartbeat to the dispatcher. node_cert_expiry (int): Automatic expiry for nodes certificates. external_cas (:py:class:`list`): Configuration for forwarding signing requests to an external certificate authority. Use a list of :py:class:`docker.types.SwarmExternalCA`. name (string): Swarm's name labels (dict): User-defined key/value metadata. signing_ca_cert (str): The desired signing CA certificate for all swarm node TLS leaf certificates, in PEM format. signing_ca_key (str): The desired signing CA key for all swarm node TLS leaf certificates, in PEM format. ca_force_rotate (int): An integer whose purpose is to force swarm to generate a new signing CA certificate and key, if none have been specified. autolock_managers (boolean): If set, generate a key and use it to lock data stored on the managers. log_driver (DriverConfig): The default log driver to use for tasks created in the orchestrator. Returns: :py:class:`docker.types.SwarmSpec` Raises: :py:class:`docker.errors.APIError` If the server returns an error. Example: >>> spec = client.api.create_swarm_spec( snapshot_interval=5000, log_entries_for_slow_followers=1200 ) >>> client.api.init_swarm( advertise_addr='eth0', listen_addr='0.0.0.0:5000', force_new_cluster=False, swarm_spec=spec ) """ ext_ca = kwargs.pop('external_ca', None) if ext_ca: kwargs['external_cas'] = [ext_ca] return types.SwarmSpec(self._version, *args, **kwargs) @utils.minimum_version('1.24') def get_unlock_key(self): """ Get the unlock key for this Swarm manager. Returns: A ``dict`` containing an ``UnlockKey`` member """ return self._result(self._get(self._url('/swarm/unlockkey')), True) @utils.minimum_version('1.24') def init_swarm(self, advertise_addr=None, listen_addr='0.0.0.0:2377', force_new_cluster=False, swarm_spec=None, default_addr_pool=None, subnet_size=None, data_path_addr=None, data_path_port=None): """ Initialize a new Swarm using the current connected engine as the first node. Args: advertise_addr (string): Externally reachable address advertised to other nodes. This can either be an address/port combination in the form ``192.168.1.1:4567``, or an interface followed by a port number, like ``eth0:4567``. If the port number is omitted, the port number from the listen address is used. If ``advertise_addr`` is not specified, it will be automatically detected when possible. Default: None listen_addr (string): Listen address used for inter-manager communication, as well as determining the networking interface used for the VXLAN Tunnel Endpoint (VTEP). This can either be an address/port combination in the form ``192.168.1.1:4567``, or an interface followed by a port number, like ``eth0:4567``. If the port number is omitted, the default swarm listening port is used. Default: '0.0.0.0:2377' force_new_cluster (bool): Force creating a new Swarm, even if already part of one. Default: False swarm_spec (dict): Configuration settings of the new Swarm. Use ``APIClient.create_swarm_spec`` to generate a valid configuration. Default: None default_addr_pool (list of strings): Default Address Pool specifies default subnet pools for global scope networks. Each pool should be specified as a CIDR block, like '10.0.0.0/8'. Default: None subnet_size (int): SubnetSize specifies the subnet size of the networks created from the default subnet pool. Default: None data_path_addr (string): Address or interface to use for data path traffic. For example, 192.168.1.1, or an interface, like eth0. data_path_port (int): Port number to use for data path traffic. Acceptable port range is 1024 to 49151. If set to ``None`` or 0, the default port 4789 will be used. Default: None Returns: (str): The ID of the created node. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ url = self._url('/swarm/init') if swarm_spec is not None and not isinstance(swarm_spec, dict): raise TypeError('swarm_spec must be a dictionary') if default_addr_pool is not None: if utils.version_lt(self._version, '1.39'): raise errors.InvalidVersion( 'Address pool is only available for API version >= 1.39' ) # subnet_size becomes 0 if not set with default_addr_pool if subnet_size is None: subnet_size = DEFAULT_SWARM_SUBNET_SIZE if subnet_size is not None: if utils.version_lt(self._version, '1.39'): raise errors.InvalidVersion( 'Subnet size is only available for API version >= 1.39' ) # subnet_size is ignored if set without default_addr_pool if default_addr_pool is None: default_addr_pool = DEFAULT_SWARM_ADDR_POOL data = { 'AdvertiseAddr': advertise_addr, 'ListenAddr': listen_addr, 'DefaultAddrPool': default_addr_pool, 'SubnetSize': subnet_size, 'ForceNewCluster': force_new_cluster, 'Spec': swarm_spec, } if data_path_addr is not None: if utils.version_lt(self._version, '1.30'): raise errors.InvalidVersion( 'Data address path is only available for ' 'API version >= 1.30' ) data['DataPathAddr'] = data_path_addr if data_path_port is not None: if utils.version_lt(self._version, '1.40'): raise errors.InvalidVersion( 'Data path port is only available for ' 'API version >= 1.40' ) data['DataPathPort'] = data_path_port response = self._post_json(url, data=data) return self._result(response, json=True) @utils.minimum_version('1.24') def inspect_swarm(self): """ Retrieve low-level information about the current swarm. Returns: A dictionary containing data about the swarm. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ url = self._url('/swarm') return self._result(self._get(url), True) @utils.check_resource('node_id') @utils.minimum_version('1.24') def inspect_node(self, node_id): """ Retrieve low-level information about a swarm node Args: node_id (string): ID of the node to be inspected. Returns: A dictionary containing data about this node. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ url = self._url('/nodes/{0}', node_id) return self._result(self._get(url), True) @utils.minimum_version('1.24') def join_swarm(self, remote_addrs, join_token, listen_addr='0.0.0.0:2377', advertise_addr=None, data_path_addr=None): """ Make this Engine join a swarm that has already been created. Args: remote_addrs (:py:class:`list`): Addresses of one or more manager nodes already participating in the Swarm to join. join_token (string): Secret token for joining this Swarm. listen_addr (string): Listen address used for inter-manager communication if the node gets promoted to manager, as well as determining the networking interface used for the VXLAN Tunnel Endpoint (VTEP). Default: ``'0.0.0.0:2377`` advertise_addr (string): Externally reachable address advertised to other nodes. This can either be an address/port combination in the form ``192.168.1.1:4567``, or an interface followed by a port number, like ``eth0:4567``. If the port number is omitted, the port number from the listen address is used. If AdvertiseAddr is not specified, it will be automatically detected when possible. Default: ``None`` data_path_addr (string): Address or interface to use for data path traffic. For example, 192.168.1.1, or an interface, like eth0. Returns: ``True`` if the request went through. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ data = { 'RemoteAddrs': remote_addrs, 'ListenAddr': listen_addr, 'JoinToken': join_token, 'AdvertiseAddr': advertise_addr, } if data_path_addr is not None: if utils.version_lt(self._version, '1.30'): raise errors.InvalidVersion( 'Data address path is only available for ' 'API version >= 1.30' ) data['DataPathAddr'] = data_path_addr url = self._url('/swarm/join') response = self._post_json(url, data=data) self._raise_for_status(response) return True @utils.minimum_version('1.24') def leave_swarm(self, force=False): """ Leave a swarm. Args: force (bool): Leave the swarm even if this node is a manager. Default: ``False`` Returns: ``True`` if the request went through. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ url = self._url('/swarm/leave') response = self._post(url, params={'force': force}) # Ignore "this node is not part of a swarm" error if force and response.status_code == http_client.NOT_ACCEPTABLE: return True # FIXME: Temporary workaround for 1.13.0-rc bug # https://github.com/docker/docker/issues/29192 if force and response.status_code == http_client.SERVICE_UNAVAILABLE: return True self._raise_for_status(response) return True @utils.minimum_version('1.24') def nodes(self, filters=None): """ List swarm nodes. Args: filters (dict): Filters to process on the nodes list. Valid filters: ``id``, ``name``, ``membership`` and ``role``. Default: ``None`` Returns: A list of dictionaries containing data about each swarm node. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ url = self._url('/nodes') params = {} if filters: params['filters'] = utils.convert_filters(filters) return self._result(self._get(url, params=params), True) @utils.check_resource('node_id') @utils.minimum_version('1.24') def remove_node(self, node_id, force=False): """ Remove a node from the swarm. Args: node_id (string): ID of the node to be removed. force (bool): Force remove an active node. Default: `False` Raises: :py:class:`docker.errors.NotFound` If the node referenced doesn't exist in the swarm. :py:class:`docker.errors.APIError` If the server returns an error. Returns: `True` if the request was successful. """ url = self._url('/nodes/{0}', node_id) params = { 'force': force } res = self._delete(url, params=params) self._raise_for_status(res) return True @utils.minimum_version('1.24') def unlock_swarm(self, key): """ Unlock a locked swarm. Args: key (string): The unlock key as provided by :py:meth:`get_unlock_key` Raises: :py:class:`docker.errors.InvalidArgument` If the key argument is in an incompatible format :py:class:`docker.errors.APIError` If the server returns an error. Returns: `True` if the request was successful. Example: >>> key = client.api.get_unlock_key() >>> client.unlock_swarm(key) """ if isinstance(key, dict): if 'UnlockKey' not in key: raise errors.InvalidArgument('Invalid unlock key format') else: key = {'UnlockKey': key} url = self._url('/swarm/unlock') res = self._post_json(url, data=key) self._raise_for_status(res) return True @utils.minimum_version('1.24') def update_node(self, node_id, version, node_spec=None): """ Update the node's configuration Args: node_id (string): ID of the node to be updated. version (int): The version number of the node object being updated. This is required to avoid conflicting writes. node_spec (dict): Configuration settings to update. Any values not provided will be removed. Default: ``None`` Returns: `True` if the request went through. Raises: :py:class:`docker.errors.APIError` If the server returns an error. Example: >>> node_spec = {'Availability': 'active', 'Name': 'node-name', 'Role': 'manager', 'Labels': {'foo': 'bar'} } >>> client.api.update_node(node_id='24ifsmvkjbyhk', version=8, node_spec=node_spec) """ url = self._url('/nodes/{0}/update?version={1}', node_id, str(version)) res = self._post_json(url, data=node_spec) self._raise_for_status(res) return True @utils.minimum_version('1.24') def update_swarm(self, version, swarm_spec=None, rotate_worker_token=False, rotate_manager_token=False, rotate_manager_unlock_key=False): """ Update the Swarm's configuration Args: version (int): The version number of the swarm object being updated. This is required to avoid conflicting writes. swarm_spec (dict): Configuration settings to update. Use :py:meth:`~docker.api.swarm.SwarmApiMixin.create_swarm_spec` to generate a valid configuration. Default: ``None``. rotate_worker_token (bool): Rotate the worker join token. Default: ``False``. rotate_manager_token (bool): Rotate the manager join token. Default: ``False``. rotate_manager_unlock_key (bool): Rotate the manager unlock key. Default: ``False``. Returns: ``True`` if the request went through. Raises: :py:class:`docker.errors.APIError` If the server returns an error. """ url = self._url('/swarm/update') params = { 'rotateWorkerToken': rotate_worker_token, 'rotateManagerToken': rotate_manager_token, 'version': version } if rotate_manager_unlock_key: if utils.version_lt(self._version, '1.25'): raise errors.InvalidVersion( 'Rotate manager unlock key ' 'is only available for API version >= 1.25' ) params['rotateManagerUnlockKey'] = rotate_manager_unlock_key response = self._post_json(url, data=swarm_spec, params=params) self._raise_for_status(response) return True __pycache__/network.cpython-310.pyc 0000644 00000022157 15027743616 0013170 0 ustar 00 o �h�) � @ s>