| import copy |
| import docker |
| import pytest |
| |
| from ..helpers import force_leave_swarm, requires_api_version |
| from .base import BaseAPIIntegrationTest |
| |
| |
| class SwarmTest(BaseAPIIntegrationTest): |
| def setUp(self): |
| super(SwarmTest, self).setUp() |
| force_leave_swarm(self.client) |
| self._unlock_key = None |
| |
| def tearDown(self): |
| try: |
| if self._unlock_key: |
| self.client.unlock_swarm(self._unlock_key) |
| except docker.errors.APIError: |
| pass |
| force_leave_swarm(self.client) |
| super(SwarmTest, self).tearDown() |
| |
| @requires_api_version('1.24') |
| def test_init_swarm_simple(self): |
| assert self.init_swarm() |
| |
| @requires_api_version('1.24') |
| def test_init_swarm_force_new_cluster(self): |
| pytest.skip('Test stalls the engine on 1.12.0') |
| |
| assert self.init_swarm() |
| version_1 = self.client.inspect_swarm()['Version']['Index'] |
| assert self.client.init_swarm(force_new_cluster=True) |
| version_2 = self.client.inspect_swarm()['Version']['Index'] |
| assert version_2 != version_1 |
| |
| @requires_api_version('1.39') |
| def test_init_swarm_custom_addr_pool_defaults(self): |
| assert self.init_swarm() |
| results = self.client.inspect_swarm() |
| assert set(results['DefaultAddrPool']) == {'10.0.0.0/8'} |
| assert results['SubnetSize'] == 24 |
| |
| @requires_api_version('1.39') |
| def test_init_swarm_custom_addr_pool_only_pool(self): |
| assert self.init_swarm(default_addr_pool=['2.0.0.0/16']) |
| results = self.client.inspect_swarm() |
| assert set(results['DefaultAddrPool']) == {'2.0.0.0/16'} |
| assert results['SubnetSize'] == 24 |
| |
| @requires_api_version('1.39') |
| def test_init_swarm_custom_addr_pool_only_subnet_size(self): |
| assert self.init_swarm(subnet_size=26) |
| results = self.client.inspect_swarm() |
| assert set(results['DefaultAddrPool']) == {'10.0.0.0/8'} |
| assert results['SubnetSize'] == 26 |
| |
| @requires_api_version('1.39') |
| def test_init_swarm_custom_addr_pool_both_args(self): |
| assert self.init_swarm(default_addr_pool=['2.0.0.0/16', '3.0.0.0/16'], |
| subnet_size=28) |
| results = self.client.inspect_swarm() |
| assert set(results['DefaultAddrPool']) == {'2.0.0.0/16', '3.0.0.0/16'} |
| assert results['SubnetSize'] == 28 |
| |
| @requires_api_version('1.24') |
| def test_init_already_in_cluster(self): |
| assert self.init_swarm() |
| with pytest.raises(docker.errors.APIError): |
| self.init_swarm() |
| |
| @requires_api_version('1.24') |
| def test_init_swarm_custom_raft_spec(self): |
| spec = self.client.create_swarm_spec( |
| snapshot_interval=5000, log_entries_for_slow_followers=1200 |
| ) |
| assert self.init_swarm(swarm_spec=spec) |
| swarm_info = self.client.inspect_swarm() |
| assert swarm_info['Spec']['Raft']['SnapshotInterval'] == 5000 |
| assert swarm_info['Spec']['Raft']['LogEntriesForSlowFollowers'] == 1200 |
| |
| @requires_api_version('1.30') |
| def test_init_swarm_with_ca_config(self): |
| spec = self.client.create_swarm_spec( |
| node_cert_expiry=7776000000000000, ca_force_rotate=6000000000000 |
| ) |
| |
| assert self.init_swarm(swarm_spec=spec) |
| swarm_info = self.client.inspect_swarm() |
| assert swarm_info['Spec']['CAConfig']['NodeCertExpiry'] == ( |
| spec['CAConfig']['NodeCertExpiry'] |
| ) |
| assert swarm_info['Spec']['CAConfig']['ForceRotate'] == ( |
| spec['CAConfig']['ForceRotate'] |
| ) |
| |
| @requires_api_version('1.25') |
| def test_init_swarm_with_autolock_managers(self): |
| spec = self.client.create_swarm_spec(autolock_managers=True) |
| assert self.init_swarm(swarm_spec=spec) |
| # save unlock key for tearDown |
| self._unlock_key = self.client.get_unlock_key() |
| swarm_info = self.client.inspect_swarm() |
| |
| assert ( |
| swarm_info['Spec']['EncryptionConfig']['AutoLockManagers'] is True |
| ) |
| |
| assert self._unlock_key.get('UnlockKey') |
| |
| @requires_api_version('1.25') |
| @pytest.mark.xfail( |
| reason="This doesn't seem to be taken into account by the engine" |
| ) |
| def test_init_swarm_with_log_driver(self): |
| spec = {'TaskDefaults': {'LogDriver': {'Name': 'syslog'}}} |
| assert self.init_swarm(swarm_spec=spec) |
| swarm_info = self.client.inspect_swarm() |
| |
| assert swarm_info['Spec']['TaskDefaults']['LogDriver']['Name'] == ( |
| 'syslog' |
| ) |
| |
| @requires_api_version('1.24') |
| def test_leave_swarm(self): |
| assert self.init_swarm() |
| with pytest.raises(docker.errors.APIError) as exc_info: |
| self.client.leave_swarm() |
| exc_info.value.response.status_code == 500 |
| assert self.client.leave_swarm(force=True) |
| with pytest.raises(docker.errors.APIError) as exc_info: |
| self.client.inspect_swarm() |
| exc_info.value.response.status_code == 406 |
| assert self.client.leave_swarm(force=True) |
| |
| @requires_api_version('1.24') |
| def test_update_swarm(self): |
| assert self.init_swarm() |
| swarm_info_1 = self.client.inspect_swarm() |
| spec = self.client.create_swarm_spec( |
| snapshot_interval=5000, log_entries_for_slow_followers=1200, |
| node_cert_expiry=7776000000000000 |
| ) |
| assert self.client.update_swarm( |
| version=swarm_info_1['Version']['Index'], |
| swarm_spec=spec, rotate_worker_token=True |
| ) |
| swarm_info_2 = self.client.inspect_swarm() |
| |
| assert ( |
| swarm_info_1['Version']['Index'] != |
| swarm_info_2['Version']['Index'] |
| ) |
| assert swarm_info_2['Spec']['Raft']['SnapshotInterval'] == 5000 |
| assert ( |
| swarm_info_2['Spec']['Raft']['LogEntriesForSlowFollowers'] == 1200 |
| ) |
| assert ( |
| swarm_info_1['JoinTokens']['Manager'] == |
| swarm_info_2['JoinTokens']['Manager'] |
| ) |
| assert ( |
| swarm_info_1['JoinTokens']['Worker'] != |
| swarm_info_2['JoinTokens']['Worker'] |
| ) |
| |
| @requires_api_version('1.24') |
| def test_list_nodes(self): |
| assert self.init_swarm() |
| nodes_list = self.client.nodes() |
| assert len(nodes_list) == 1 |
| node = nodes_list[0] |
| assert 'ID' in node |
| assert 'Spec' in node |
| assert node['Spec']['Role'] == 'manager' |
| |
| filtered_list = self.client.nodes(filters={ |
| 'id': node['ID'] |
| }) |
| assert len(filtered_list) == 1 |
| filtered_list = self.client.nodes(filters={ |
| 'role': 'worker' |
| }) |
| assert len(filtered_list) == 0 |
| |
| @requires_api_version('1.24') |
| def test_inspect_node(self): |
| node_id = self.init_swarm() |
| assert node_id |
| nodes_list = self.client.nodes() |
| assert len(nodes_list) == 1 |
| node = nodes_list[0] |
| node_data = self.client.inspect_node(node['ID']) |
| assert node['ID'] == node_data['ID'] |
| assert node_id == node['ID'] |
| assert node['Version'] == node_data['Version'] |
| |
| @requires_api_version('1.24') |
| def test_update_node(self): |
| assert self.init_swarm() |
| nodes_list = self.client.nodes() |
| node = nodes_list[0] |
| orig_spec = node['Spec'] |
| |
| # add a new label |
| new_spec = copy.deepcopy(orig_spec) |
| new_spec['Labels'] = {'new.label': 'new value'} |
| self.client.update_node(node_id=node['ID'], |
| version=node['Version']['Index'], |
| node_spec=new_spec) |
| updated_node = self.client.inspect_node(node['ID']) |
| assert new_spec == updated_node['Spec'] |
| |
| # Revert the changes |
| self.client.update_node(node_id=node['ID'], |
| version=updated_node['Version']['Index'], |
| node_spec=orig_spec) |
| reverted_node = self.client.inspect_node(node['ID']) |
| assert orig_spec == reverted_node['Spec'] |
| |
| @requires_api_version('1.24') |
| def test_remove_main_node(self): |
| assert self.init_swarm() |
| nodes_list = self.client.nodes() |
| node_id = nodes_list[0]['ID'] |
| with pytest.raises(docker.errors.NotFound): |
| self.client.remove_node('foobar01') |
| with pytest.raises(docker.errors.APIError) as e: |
| self.client.remove_node(node_id) |
| |
| assert e.value.response.status_code >= 400 |
| |
| with pytest.raises(docker.errors.APIError) as e: |
| self.client.remove_node(node_id, True) |
| |
| assert e.value.response.status_code >= 400 |
| |
| @requires_api_version('1.25') |
| def test_rotate_manager_unlock_key(self): |
| spec = self.client.create_swarm_spec(autolock_managers=True) |
| assert self.init_swarm(swarm_spec=spec) |
| swarm_info = self.client.inspect_swarm() |
| key_1 = self.client.get_unlock_key() |
| assert self.client.update_swarm( |
| version=swarm_info['Version']['Index'], |
| rotate_manager_unlock_key=True |
| ) |
| key_2 = self.client.get_unlock_key() |
| assert key_1['UnlockKey'] != key_2['UnlockKey'] |
| |
| @requires_api_version('1.30') |
| def test_init_swarm_data_path_addr(self): |
| assert self.init_swarm(data_path_addr='eth0') |