DeploySharedSchemas.py 5.92 KB
Newer Older
1
2
3
4
5
6
import json
import os
import time
import requests
import argparse
from Utility import Utility, RunEnv
7
from typing import Tuple
8
9
10


class DeploySharedSchemas:
11
12
13
14
15
16
    SCHEMA_AUTHORITY_TO_REPLACE = '{{schema-authority}}'
    SCHEMA_EXISTS = 'Schema Id is already present'
    PUBLISHED_SCHEMA_ERROR = ['Only schema in development stage can be updated',
                              'Only schema in developement stage can be updated']
    ALREADY_PUBLISHED = 'AlreadyPublished'
    SUCCESS = 'Success'
17
18
19
20
21
22
23
24
25
26
27
28
29
30

    def __init__(self):
        parser = argparse.ArgumentParser(
            description="Given a path to an load sequence file, load/update the schemas "
                        "listed in the load sequence file.")
        parser.add_argument('-u', help='The complete URL to the Schema Service.',
                            default=None)
        arguments = parser.parse_args()
        if arguments.u is not None:
            RunEnv.SCHEMA_SERVICE_URL = arguments.u
        if RunEnv.SCHEMA_SERVICE_URL is None:
            exit('The schema service URL is not specified')
        self.url = RunEnv.SCHEMA_SERVICE_URL
        self.schema_registered = None
31
32
33
34
35
36
37
38
        self.schema_info_registered = None
        self.headers = {
            'data-partition-id': RunEnv.DATA_PARTITION,
            'Content-Type': 'application/json',
            'AppKey': RunEnv.APP_KEY,
            'Authorization': RunEnv.BEARER_TOKEN
        }
        print('Current data-partition-id: {}'.format(RunEnv.DATA_PARTITION))
39
40
41
42
43
44
45
46
47
        ok, error_mess = RunEnv().is_ok()
        if not ok:
            exit('Error: environment setting incomplete: {}'.format(error_mess))

    def create_schema(self):
        messages = list()
        deployments = Utility.path_to_deployments()
        start = time.time()

48
49
50
51
52
53
54
55
56
57
        bootstrap_options = json.loads(RunEnv.BOOTSTRAP_OPTIONS)
        for option in bootstrap_options:
            try:
                schema_path = option['folder']
                schema_authority = option['authority']
                load_sequence = option['load-sequence']
            except KeyError as e:
                exit('Key missing in bootstrap-options::{}'.format(str(e)))

        sequence = Utility.load_json(os.path.join(deployments, RunEnv.SCHEMAS_FOLDER, schema_path, load_sequence))
58
59
60
61
        for item in sequence:
            self.schema_registered = None
            schema_file = os.path.join(deployments, item['relativePath'])
            schema = open(schema_file, 'r').read()
62
            schema = schema.replace(self.SCHEMA_AUTHORITY_TO_REPLACE, schema_authority)
63
64
            kind = self.__kind_from_schema_info(schema)
            self.__register_one(kind, schema, messages)
65
66
67
68
69
70
71
72

        elapsed = time.time() - start
        print('This update took {:.2f} seconds.'.format(elapsed))
        if len(messages) != 0:
            print('Following schemas failed:')
            print('\n'.join(messages))
            exit(1)
        else:
73
74
            print('All {} schemas registered, updated or '
                  'left unchanged because of status PUBLISHED.'.format(str(len(sequence))))
75

76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
    @staticmethod
    def __kind_from_schema_info(schema_as_str: str) -> str:
        kind = 'Error'
        try:
            schema = json.loads(schema_as_str)
            si = schema.get('schemaInfo', dict()).get('schemaIdentity', dict())
            authority = si.get('authority', '')
            source = si.get('source', '')
            entity = si.get('entityType', '')
            major = str(si.get('schemaVersionMajor', 0))
            minor = str(si.get('schemaVersionMinor', 0))
            patch = str(si.get('schemaVersionPatch', 0))
            kind = '{}:{}:{}:{}.{}.{}'.format(authority, source, entity, major, minor, patch)
        except Exception as e:
            exit('Invalid JSON in payload {}'.format(str(e)))
        return kind

    def __register_one(self, kind, schema, messages):
        method = 'POST'
        try_it = 'Try {} for id: {}'
        print(try_it.format(method, kind))
        response = requests.request(method, self.url, headers=self.headers, data=schema)
        is_error, message, method = self.__evaluate_response(response)
        if method == 'PUT':  # try again
            print(try_it.format(method, kind))
            response = requests.request(method, self.url, headers=self.headers, data=schema)
            is_error, message, method = self.__evaluate_response(response)

        if is_error:
            message = 'Error with kind {}: Message: {}'.format(kind, message)
            print(message)
            messages.append(message)
        elif method == self.SUCCESS:
            print('The kind {} was registered successfully.'.format(kind))
        elif method == self.ALREADY_PUBLISHED:
            print('The kind {} was already registered with status PUBLISHED '
                  'and was not updated.'.format(kind))

    def __evaluate_response(self, response: requests.Response) -> Tuple[bool, str, str]:
        code = response.status_code
        message = ''
        method = 'Give up'
        error = code not in range(200, 300)
        if error:
            # further test:
            try:
                js_err = json.loads(response.text)
                message = js_err.get('error', dict()).get('message', '')
                if message == self.SCHEMA_EXISTS:
                    method = 'PUT'  # try PUT, it might have been DEVELOPMENT, than we can overwrite
                elif message in self.PUBLISHED_SCHEMA_ERROR:  # already PUBLISHED, no bootstrap required
                    method = self.ALREADY_PUBLISHED
                    error = False  # this is not considered an error
                # everything else is an error
            except Exception as e:
                message = str(e)
        else:
            method = self.SUCCESS
        return error, message, method
135
136
137
138


if __name__ == '__main__':
    DeploySharedSchemas().create_schema()