Skip to content

Commit 15084bc

Browse files
authored
Merge branch 'master' into pm_updates
2 parents 4ff72d1 + 3bc52fb commit 15084bc

8 files changed

Lines changed: 426 additions & 44 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ If installation fails collecting dependancies, make sure your python setuptools
3838

3939
## Usage example
4040

41-
For code samples of programmatic use, please review the files in [/examples](/examples).
41+
For code samples of programmatic use, please review the files in [/examples](https://github.com/Venafi/vcert-python/tree/master/examples).
4242
- For Trust Protection Platform, the `zone` format is the DN of a policy with or without the "\VED\Policy\" prefix (e.g. "\VED\Policy\Certificates\VCert" or simply "Certificates\VCert")
4343
- For Venafi as a Service, the `zone` format is the name of an OutagePREDICT Application and the API Alias of an Issuing Template assigned to it delimited by a single backslash character (e.g. "My Application\My CIT")
4444

tests/test_env.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
CLOUD_URL = environ.get('CLOUD_URL')
3131
CLOUD_APIKEY = environ.get('CLOUD_APIKEY')
3232
CLOUD_ZONE = environ.get('CLOUD_ZONE')
33+
CLOUD_TEAM = environ.get('CLOUD_TEAM')
3334

3435
TPP_PM_ROOT = environ.get('TPP_PM_ROOT')
3536
TPP_CA_NAME = environ.get('TPP_CA_NAME')

tests/test_pm.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from pprint import pformat
1919

2020
from test_env import (TPP_TOKEN_URL, CLOUD_APIKEY, CLOUD_URL, TPP_PM_ROOT, CLOUD_ENTRUST_CA_NAME,
21-
CLOUD_DIGICERT_CA_NAME, TPP_CA_NAME, TPP_USER, TPP_PASSWORD)
21+
CLOUD_DIGICERT_CA_NAME, TPP_CA_NAME, TPP_USER, TPP_PASSWORD, CLOUD_TEAM)
2222
from test_utils import get_tpp_policy_name, get_vaas_zone
2323
from vcert import TPPTokenConnection, CloudConnection, Authentication, SCOPE_PM, logger, VenafiError, KeyType
2424
from vcert.parser import json_parser, yaml_parser
@@ -94,6 +94,9 @@ def __init__(self, *args, **kwargs):
9494
self.yaml_file = POLICY_SPEC_YAML
9595
super(TestTPPPolicyManagement, self).__init__(*args, **kwargs)
9696

97+
prefixed_universal = None
98+
username = "osstestuser"
99+
97100
def test_create_policy_from_json(self):
98101
# ps = json_parser.parse_file(self.json_file)
99102
# self._create_policy_tpp(policy_spec=ps)
@@ -120,6 +123,31 @@ def test_create_policy_no_defaults(self):
120123
policy.key_pair.rsa_key_sizes = [2048]
121124
self._create_policy_tpp(policy=policy)
122125

126+
def test_browse_identities(self):
127+
connector = self.tpp_conn
128+
identity = connector.get_tpp_identity(self.username)
129+
self.assertEqual(self.username, identity.name)
130+
self.prefixed_universal = identity.prefixed_universal
131+
132+
def test_validate_identity(self):
133+
connector = self.tpp_conn
134+
if not self.prefixed_universal:
135+
self.test_browse_identities()
136+
response = connector.validate_identity(self.prefixed_universal)
137+
self.assertIsNotNone(response)
138+
self.assertEqual(self.username, response.name)
139+
140+
def test_create_and_get_policy_with_contacts(self):
141+
connector = self.tpp_conn
142+
zone = f"{TPP_PM_ROOT}\\{_get_tpp_policy_name()}"
143+
policy_specification = PolicySpecification()
144+
policy_specification.policy = _get_policy_obj(ca_type=CA_TYPE_TPP)
145+
policy_specification.defaults = _get_defaults_obj()
146+
policy_specification.policy.key_pair.rsa_key_sizes = [2048]
147+
connector.set_policy(zone, policy_specification)
148+
result = connector.get_policy(zone)
149+
self.assertEqual(1, len(result.users))
150+
123151
def _create_policy_tpp(self, policy_spec=None, policy=None, defaults=None):
124152
zone = f"{TPP_PM_ROOT}\\{get_tpp_policy_name()}"
125153
create_policy(self.tpp_conn, zone, policy_spec, policy, defaults)
@@ -218,6 +246,90 @@ def test_create_policy_uri_ip_email(self):
218246
self.assertTrue(ps.policy.subject_alt_names.uri_allowed)
219247
self.assertTrue(ps.policy.subject_alt_names.ip_allowed)
220248

249+
def test_create_policy_with_no_users(self):
250+
zone = self._get_random_zone()
251+
connector = self.cloud_conn
252+
policy_specification = PolicySpecification()
253+
policy_specification.policy = _get_policy_obj()
254+
policy_specification.defaults = _get_defaults_obj()
255+
connector.set_policy(zone, policy_specification)
256+
result = connector.get_policy(zone)
257+
self.assertEqual(1, len(result.users))
258+
self.assertEqual("jenkins@opensource.qa.venafi.io", result.users[0])
259+
260+
def test_create_policy_with_users(self):
261+
zone = self._get_random_zone()
262+
connector = self.cloud_conn
263+
policy_specification = PolicySpecification()
264+
policy_specification.policy = _get_policy_obj()
265+
policy_specification.defaults = _get_defaults_obj()
266+
policy_specification.users = ["pki-admin@opensource.qa.venafi.io", "resource-owner@opensource.qa.venafi.io"]
267+
connector.set_policy(zone, policy_specification)
268+
result = connector.get_policy(zone)
269+
self.assertEqual(2, len(result.users))
270+
self.assertIn("pki-admin@opensource.qa.venafi.io", result.users)
271+
self.assertIn("resource-owner@opensource.qa.venafi.io", result.users)
272+
273+
def test_update_policy_with_no_users(self):
274+
zone = self._get_random_zone()
275+
connector = self.cloud_conn
276+
policy_specification = PolicySpecification()
277+
policy_specification.policy = _get_policy_obj()
278+
policy_specification.defaults = _get_defaults_obj()
279+
policy_specification.users = ["pki-admin@opensource.qa.venafi.io",
280+
"resource-owner@opensource.qa.venafi.io"]
281+
connector.set_policy(zone, policy_specification)
282+
result = connector.get_policy(zone)
283+
self.assertEqual(2, len(result.users))
284+
self.assertIn("pki-admin@opensource.qa.venafi.io", result.users)
285+
self.assertIn("resource-owner@opensource.qa.venafi.io", result.users)
286+
287+
# Update Policy Specification with no users
288+
policy_specification2 = PolicySpecification()
289+
policy_specification2.policy = _get_policy_obj()
290+
policy_specification2.defaults = _get_defaults_obj()
291+
connector.set_policy(zone, policy_specification2)
292+
result2 = connector.get_policy(zone)
293+
self.assertEqual(2, len(result2.users))
294+
self.assertIn("pki-admin@opensource.qa.venafi.io", result2.users)
295+
self.assertIn("resource-owner@opensource.qa.venafi.io", result2.users)
296+
297+
def test_update_policy_with_users(self):
298+
zone = self._get_random_zone()
299+
connector = self.cloud_conn
300+
policy_specification = PolicySpecification()
301+
policy_specification.policy = _get_policy_obj()
302+
policy_specification.defaults = _get_defaults_obj()
303+
policy_specification.users = ["jenkins@opensource.qa.venafi.io"]
304+
connector.set_policy(zone, policy_specification)
305+
result = connector.get_policy(zone)
306+
self.assertEqual(1, len(result.users))
307+
self.assertEqual("jenkins@opensource.qa.venafi.io", result.users[0])
308+
309+
# Update Policy Specification with users
310+
policy_specification2 = PolicySpecification()
311+
policy_specification2.policy = _get_policy_obj()
312+
policy_specification2.defaults = _get_defaults_obj()
313+
policy_specification2.users = ["pki-admin@opensource.qa.venafi.io",
314+
"resource-owner@opensource.qa.venafi.io"]
315+
connector.set_policy(zone, policy_specification2)
316+
result2 = connector.get_policy(zone)
317+
self.assertEqual(2, len(result2.users))
318+
self.assertIn("pki-admin@opensource.qa.venafi.io", result2.users)
319+
self.assertIn("resource-owner@opensource.qa.venafi.io", result2.users)
320+
321+
def test_create_policy_with_team(self):
322+
zone = self._get_random_zone()
323+
connector = self.cloud_conn
324+
policy_specification = PolicySpecification()
325+
policy_specification.policy = _get_policy_obj()
326+
policy_specification.defaults = _get_defaults_obj()
327+
policy_specification.users = [CLOUD_TEAM]
328+
connector.set_policy(zone, policy_specification)
329+
result = connector.get_policy(zone)
330+
self.assertEqual(1, len(result.users))
331+
self.assertEqual(CLOUD_TEAM, result.users[0])
332+
221333
def _create_policy_cloud(self, policy_spec=None, policy=None, defaults=None):
222334
zone = get_vaas_zone()
223335
response = create_policy(self.cloud_conn, zone, policy_spec, policy, defaults)

vcert/connection_cloud.py

Lines changed: 124 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
from .policy.pm_cloud import (build_policy_spec, validate_policy_spec, AccountDetails, build_cit_request, build_user,
3535
UserDetails, build_company, build_apikey, build_app_update_request, get_ca_info,
3636
CertificateAuthorityDetails, CertificateAuthorityInfo, build_account_details,
37-
build_app_create_request, supported_rsa_key_sizes, supported_elliptic_curves)
38-
from .vaas_utils import AppDetails, RecommendedSettings, EdgeEncryptionKey, zip_to_pem, value_matches_regex
37+
build_app_create_request, build_team, get_cit_data_from_response, build_owner_json,
38+
supported_rsa_key_sizes, supported_elliptic_curves)
39+
from .vaas_utils import AppDetails, OwnerIdsAndTypes, RecommendedSettings, EdgeEncryptionKey, zip_to_pem, value_matches_regex
3940

4041
TOKEN_HEADER_NAME = "tppl-api-key" # nosec
4142
APPLICATION_SERVER_TYPE_ID = "784938d1-ef0d-11eb-9461-7bb533ba575b"
@@ -57,6 +58,8 @@
5758
CSR_ATTR_KEY_TYPE = 'keyType'
5859
CSR_ATTR_KEY_LENGTH = 'keyLength'
5960
CSR_ATTR_KEY_CURVE = 'keyCurve'
61+
OWNER_TYPE_USER = "USER"
62+
OWNER_TYPE_TEAM = "TEAM"
6063

6164
log = get_child("connection-vaas")
6265

@@ -94,6 +97,9 @@ def __init__(self):
9497
ISSUING_TEMPLATES_UPDATE = ISSUING_TEMPLATES + "/{}"
9598
USER_ACCOUNTS = API_VERSION + "useraccounts"
9699
DEK_PUBLIC_KEY = API_VERSION + "edgeencryptionkeys/{}"
100+
USERS_USERNAME = API_VERSION + "users/username/{}"
101+
USERS_ID = API_VERSION + "users/{}"
102+
TEAMS_ID = API_VERSION + "teams"
97103

98104

99105
class CondorChainOptions:
@@ -126,6 +132,16 @@ def _parse_zone(zone):
126132
return app_name, cit_alias
127133

128134

135+
def create_owner(owner_type, owner_id):
136+
owner = OwnerIdsAndTypes(owner_type, owner_id)
137+
return owner
138+
139+
140+
def resolve_apikey_owner(user_details):
141+
owner = create_owner(OWNER_TYPE_USER, user_details.user.user_id)
142+
return owner
143+
144+
129145
class CloudConnection(CommonConnection):
130146
def __init__(self, token, url=None, http_request_kwargs=None):
131147
super().__init__()
@@ -628,22 +644,118 @@ def set_policy(self, zone, policy_spec):
628644
raise VenafiError('User Details not found')
629645

630646
app_details = self._get_app_details_by_name(app_name)
647+
for_update, owners_list = self.resolve_owners(policy_spec.users, user_details)
631648
if app_details:
632-
# Application exists. Update with cit
633-
if not self._policy_exists(zone):
649+
# Application exists. Update with cit and owners
650+
if for_update and len(policy_spec.users) == 0:
651+
log.debug("No users provided in the policy specification")
652+
else:
653+
owner_list = build_owner_json(owners_list)
654+
app_details.owner_ids_and_types = owner_list
655+
if app_details.cit_alias_id_map:
634656
# Only link cit with Application when cit is not already associated with Application
635-
app_req = build_app_update_request(app_details, resp_cit_data)
636-
status, data = self._put(URLS.APP_BY_ID.format(app_details.app_id), app_req)
637-
if status != HTTPStatus.OK:
638-
raise VenafiError(f"Could not update Application [{app_name}] with cit [{resp_cit_data}]")
657+
cit_map = app_details.cit_alias_id_map
658+
cit_id, cit_name = get_cit_data_from_response(cit_data)
659+
cit_map[cit_name] = cit_id
660+
app_req = build_app_update_request(app_details, cit_map)
661+
status, data = self._put(URLS.APP_BY_ID.format(app_details.app_id), app_req)
662+
if status != HTTPStatus.OK:
663+
raise VenafiError(f"Could not update Application [{app_name}] with cit [{resp_cit_data}]")
639664
else:
640665
# Application does not exist. Create one
641-
app_req = build_app_create_request(app_name, user_details, resp_cit_data)
666+
app_req = build_app_create_request(app_name, owners_list, resp_cit_data)
642667
status, data = self._post(URLS.APPLICATIONS, app_req)
643668
if status != HTTPStatus.CREATED:
644669
raise VenafiError(f"Could not create application [{app_name}]")
645670
return
646671

672+
def resolve_owners(self, users_list, user_details):
673+
owners_list = list()
674+
for_update = False
675+
if not users_list:
676+
# When no users are provided on the list, adds apikey user as owner
677+
api_owner = resolve_apikey_owner(user_details)
678+
owners_list.append(api_owner)
679+
for_update = True
680+
else:
681+
# Resolving the usernames list
682+
# Creating a higher level Teams object to cache the response
683+
teams_list = list()
684+
users_list = list(set(users_list))
685+
for username in users_list:
686+
cloud_owner = self.resolve_user_to_cloud_owner(username)
687+
if cloud_owner:
688+
owners_list.append(cloud_owner)
689+
else:
690+
team_owner = self.resolve_user_to_cloud_team(username)
691+
if not team_owner:
692+
raise VenafiError(f"Unable to find identity [{username}]")
693+
owners_list.append(team_owner)
694+
return for_update, owners_list
695+
696+
def resolve_user_to_cloud_owner(self, username):
697+
user = self.get_vaas_identity(username)
698+
if not user:
699+
return None
700+
owner = create_owner(OWNER_TYPE_USER, user.user_id)
701+
return owner
702+
703+
def resolve_user_to_cloud_team(self, username):
704+
teams = list()
705+
data_teams = None
706+
try:
707+
status, response = self._get(URLS.TEAMS_ID)
708+
data_teams = response['teams']
709+
except VenafiError as err:
710+
log.debug(f"Error while getting team [{username}]: {err.args[0]}")
711+
# date_teams will never be empty, we are always expecting at least one team
712+
for data in data_teams:
713+
team = build_team(data)
714+
teams.append(team)
715+
if len(teams) > 0:
716+
for team in teams:
717+
if team.name == username:
718+
owner = create_owner(OWNER_TYPE_TEAM, team.team_id)
719+
return owner
720+
return None
721+
722+
def get_vaas_identity(self, username):
723+
if not username or username == "":
724+
raise VenafiError("Username cannot be empty")
725+
url = URLS.USERS_USERNAME.format(username)
726+
try:
727+
status, response = self._get(url)
728+
if status == HTTPStatus.NOT_FOUND:
729+
return None
730+
identities = response['users'][0]
731+
identity = build_user(identities)
732+
return identity
733+
except VenafiError as err:
734+
log.debug(f"Unable to find identity [{username}] of type USER: {err.args[0]}")
735+
736+
def resolve_cloud_owners_names(self, zone):
737+
app_name, cit_alias = _parse_zone(zone)
738+
app_details = self._get_app_details_by_name(app_name)
739+
users_list = list()
740+
teams_response = list()
741+
for owner in app_details.owner_ids_and_types:
742+
if owner['ownerType'] == OWNER_TYPE_USER:
743+
status, data = self._get(URLS.USERS_ID.format(owner['ownerId']))
744+
user = build_user(data)
745+
users_list.append(user.username)
746+
elif owner['ownerType'] == OWNER_TYPE_TEAM:
747+
if not teams_response:
748+
status, data = self._get(URLS.TEAMS_ID)
749+
data_team = data['teams']
750+
for t in data_team:
751+
team = build_team(t)
752+
teams_response.append(team)
753+
if teams_response:
754+
for team in teams_response:
755+
if team.team_id == owner['ownerId']:
756+
users_list.append(team.name)
757+
return users_list
758+
647759
def _get_ca_details(self, ca_name):
648760
"""
649761
:param str ca_name:
@@ -893,6 +1005,8 @@ def _get_policy(self, zone, subject_cn_to_str):
8931005
raise VenafiError("Certificate Authority info not found")
8941006

8951007
ps = build_policy_spec(cit, info, subject_cn_to_str)
1008+
users_list = self.resolve_cloud_owners_names(zone)
1009+
ps.users = users_list
8961010
return ps
8971011

8981012
def _get_dek_hash(self, cert_id):
@@ -943,3 +1057,4 @@ def _retrieve_service_generated_cert(self, request, dek_info):
9431057

9441058
cert, chain, private_key = zip_to_pem(data, request.chain_option)
9451059
return Certificate(cert=cert, chain=chain, key=private_key)
1060+

0 commit comments

Comments
 (0)