File manager - Edit - /home/newsbmcs.com/public_html/static/img/logo/cli4.py.tar
Back
usr/local/CyberCP/lib/python3.10/site-packages/cli4/cli4.py 0000644 00000053211 15027745202 0017115 0 ustar 00 #!/usr/bin/env python """Cloudflare API via command line""" import sys import re import getopt import json import CloudFlare from .dump import dump_commands, dump_commands_from_web from . import converters from . import examples my_yaml = None my_jsonlines = None class CLI4InternalError(Exception): """ errors in cli4 """ def load_and_check_yaml(): """ load_and_check_yaml() """ # only called if user uses --yaml flag from . import myyaml global my_yaml try: my_yaml = myyaml.myyaml() except ImportError: sys.exit('cli4: install yaml support via: pip install pyyaml') def load_and_check_jsonlines(): """ load_and_check_yaml() """ # only called if user uses --ndjson flag from . import myjsonlines global my_jsonlines try: my_jsonlines = myjsonlines.myjsonlines() except ImportError: sys.exit('cli4: install jsonlines support via: pip install jsonlines') def strip_multiline(s): """ remove leading/trailing tabs/spaces on each line""" # This hack is needed in order to use yaml.safe_load() on JSON text - tabs are not allowed return '\n'.join([line.strip() for line in s.splitlines()]) def process_params_content_files(method, binary_file, args): """ process_params_content_files() """ digits_only = re.compile('^-?[0-9]+$') floats_only = re.compile('^-?[0-9.]+$') params = None content = None files = None # next grab the params. These are in the form of tag=value or =value or @filename while len(args) > 0 and ('=' in args[0] or args[0][0] == '@'): arg = args.pop(0) if arg[0] == '@': # a file to be uploaded - used in workers/script etc - only via PUT or POST filename = arg[1:] if method not in ['PUT','POST']: sys.exit('cli4: %s - raw file upload only with PUT or POST' % (filename)) try: if filename == '-': if binary_file: content = sys.stdin.buffer.read() else: content = sys.stdin.read() else: if binary_file: with open(filename, 'rb') as f: content = f.read() else: with open(filename, 'r', encoding="utf-8") as f: content = f.read() except IOError: sys.exit('cli4: %s - file open failure' % (filename)) continue tag_string, value_string = arg.split('=', 1) if value_string.lower() == 'true': value = True elif value_string.lower() == 'false': value = False elif value_string == '' or value_string.lower() == 'none': value = None elif value_string[0] == '=' and value_string[1:] == '': sys.exit('cli4: %s== - no number value passed' % (tag_string)) elif value_string[0] == '=' and digits_only.match(value_string[1:]): value = int(value_string[1:]) elif value_string[0] == '=' and floats_only.match(value_string[1:]): value = float(value_string[1:]) elif value_string[0] == '=': sys.exit('cli4: %s== - invalid number value passed' % (tag_string)) elif value_string[0] in '[{' and value_string[-1] in '}]': # a json structure - used in pagerules try: # value = json.loads(value) - changed to yaml code to remove unicode string issues load_and_check_yaml() # cleanup string before parsing so that yaml.safe.load does not complain about whitespace # >>> found character '\t' that cannot start any token <<< value_string = strip_multiline(value_string) try: value = my_yaml.safe_load(value_string) except my_yaml.parser.ParserError: raise ValueError from None except ValueError: sys.exit('cli4: %s="%s" - can\'t parse json value' % (tag_string, value_string)) elif value_string[0] == '@': # a file to be uploaded - used in dns_records/import etc - only via PUT or POST filename = value_string[1:] if method not in ['PUT', 'POST']: sys.exit('cli4: %s=%s - file upload only with PUT or POST' % (tag_string, filename)) if files is None: files = {} if tag_string in files: sys.exit('cli4: %s=%s - duplicate name' % (tag_string, filename)) try: if filename == '-': files[tag_string] = sys.stdin else: files[tag_string] = open(filename, 'rb') except IOError: sys.exit('cli4: %s=%s - file open failure' % (tag_string, filename)) # no need for param code below continue elif (value_string[0] == '"' and value_string[-1] == '"') or (value_string[0] == '\'' and value_string[-1] == '\''): # remove quotes value = value_string[1:-1] else: value = value_string if tag_string == '': # There's no tag; it's just an unnamed list if params is None: params = value else: sys.exit('cli4: %s=%s - param error. Can\'t mix unnamed and named list' % (tag_string, value_string)) else: if params is None: params = {} tag = tag_string try: params[tag] = value except TypeError: sys.exit('cli4: %s=%s - param error. Can\'t mix unnamed and named list' % (tag_string, value_string)) if content and params: sys.exit('cli4: content and params not allowed together') if params and files: for k,v in params.items(): files[k] = (None, v) params = None # sys.exit('cli4: params and files not allowed together') if method != 'GET': if params: content = params params = None return (params, content, files) def run_command(cf, method, command, params=None, content=None, files=None): """run the command line""" # remove leading and trailing /'s if command[0] == '/': command = command[1:] if command[-1] == '/': command = command[:-1] # break down command into it's seperate pieces # these are then checked against the Cloudflare class # to confirm there is a method that matches parts = command.split('/') cmd = [] identifier1 = None identifier2 = None identifier3 = None hex_only = re.compile('^[0-9a-fA-F]+$') waf_rules = re.compile('^[0-9]+[A-Z]*$') uuid_value = re.compile('^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$') # 8-4-4-4-12 m = cf for element in parts: if element[0] == ':': element = element[1:] if identifier1 is None: if len(element) in [32, 40, 48] and hex_only.match(element): # raw identifier - lets just use it as-is identifier1 = element elif len(element) == 36 and uuid_value.match(element): # uuid identifier - lets just use it as-is identifier1 = element elif element[0] == ':': # raw string - used for workers script_name - use ::script_name identifier1 = element[1:] else: try: if cmd[0] == 'certificates': # identifier1 = convert_certificates_to_identifier(cf, element) identifier1 = converters.convert_zones_to_identifier(cf, element) elif cmd[0] == 'zones': identifier1 = converters.convert_zones_to_identifier(cf, element) elif cmd[0] == 'accounts': identifier1 = converters.convert_accounts_to_identifier(cf, element) elif cmd[0] == 'organizations': identifier1 = converters.convert_organizations_to_identifier(cf, element) elif (cmd[0] == 'user') and (cmd[1] == 'organizations'): identifier1 = converters.convert_organizations_to_identifier(cf, element) elif (cmd[0] == 'user') and (cmd[1] == 'invites'): identifier1 = converters.convert_invites_to_identifier(cf, element) elif (cmd[0] == 'user') and (cmd[1] == 'virtual_dns'): identifier1 = converters.convert_virtual_dns_to_identifier(cf, element) elif (cmd[0] == 'user') and (cmd[1] == 'load_balancers') and (cmd[2] == 'pools'): identifier1 = converters.convert_load_balancers_pool_to_identifier(cf, element) else: raise CLI4InternalError("/%s/%s :NOT CODED YET" % ('/'.join(cmd), element)) except CLI4InternalError as e: sys.stderr.write('cli4: /%s - %s\n' % (command, e)) raise e cmd.append(':' + identifier1) elif identifier2 is None: if len(element) in [32, 40, 48] and hex_only.match(element): # raw identifier - lets just use it as-is identifier2 = element elif len(element) == 36 and uuid_value.match(element): # uuid identifier - lets just use it as-is identifier2 = element elif element[0] == ':': # raw string - used for workers script_names identifier2 = element[1:] else: try: if (cmd[0] and cmd[0] == 'zones') and (cmd[2] and cmd[2] == 'dns_records'): identifier2 = converters.convert_dns_record_to_identifier(cf, identifier1, element) elif (cmd[0] and cmd[0] == 'zones') and (cmd[2] and cmd[2] == 'custom_hostnames'): identifier2 = converters.convert_custom_hostnames_to_identifier(cf, identifier1, element) else: raise CLI4InternalError("/%s/:%s :NOT CODED YET" % ('/'.join(cmd), element)) except CLI4InternalError as e: sys.stderr.write('cli4: /%s - %s\n' % (command, e)) raise e # identifier2 may be an array - this needs to be dealt with later if isinstance(identifier2, list): cmd.append(':' + '[' + ','.join(identifier2) + ']') else: cmd.append(':' + identifier2) identifier2 = [identifier2] else: if len(element) in [32, 40, 48] and hex_only.match(element): # raw identifier - lets just use it as-is identifier3 = element elif len(element) == 36 and uuid_value.match(element): # uuid identifier - lets just use it as-is identifier3 = element elif waf_rules.match(element): identifier3 = element elif element[0] == ':': # raw string - used for workers script_names identifier3 = element[1:] else: # /accounts/:id/storage/kv/namespaces/:id/values/:key_name - it's a strange one! if len(cmd) >= 6 and cmd[0] == 'accounts' and cmd[2] == 'storage' and cmd[3] == 'kv' and cmd[4] == 'namespaces' and cmd[6] == 'values': identifier3 = element else: sys.stderr.write('/%s/:%s :NOT CODED YET\n' % ('/'.join(cmd), element)) raise e else: try: m = getattr(m, CloudFlare.CloudFlare.sanitize_verb(element)) cmd.append(element) except AttributeError as e: # the verb/element was not found sys.stderr.write('cli4: /%s - not found\n' % (command)) raise e results = [] if identifier2 is None: identifier2 = [None] for i2 in identifier2: try: if method == 'GET': # no content with a GET call r = m.get(identifier1=identifier1, identifier2=i2, identifier3=identifier3, params=params) elif method == 'PATCH': r = m.patch(identifier1=identifier1, identifier2=i2, identifier3=identifier3, data=content) elif method == 'POST': r = m.post(identifier1=identifier1, identifier2=i2, identifier3=identifier3, data=content, files=files) elif method == 'PUT': r = m.put(identifier1=identifier1, identifier2=i2, identifier3=identifier3, data=content, files=files) elif method == 'DELETE': r = m.delete(identifier1=identifier1, identifier2=i2, identifier3=identifier3, data=content) else: pass except CloudFlare.exceptions.CloudFlareAPIError as e: if len(e) > 0: # more than one error returned by the API for x in e: sys.stderr.write('cli4: /%s - %d %s\n' % (command, int(x), str(x))) sys.stderr.write('cli4: /%s - %d %s\n' % (command, int(e), str(e))) raise e except CloudFlare.exceptions.CloudFlareInternalError as e: sys.stderr.write('cli4: InternalError: /%s - %d %s\n' % (command, int(e), str(e))) raise e except Exception as e: sys.stderr.write('cli4: /%s - %s - api error\n' % (command, str(e))) raise e results.append(r) return results def write_results(results, output): """dump the results""" if output is None: return if len(results) == 1: results = results[0] if isinstance(results, (str, bytes, bytearray)): # if the results are a simple string, then it should be dumped directly # this is only used for /zones/:id/dns_records/export, workers, and other calls # or # output is image or audio or video or something like that so we dump directly pass else: # anything more complex (dict, list, etc) should be dumped as JSON/YAML if output == 'json': try: results = json.dumps(results, indent=4, sort_keys=True, ensure_ascii=False, encoding='utf8') except TypeError: results = json.dumps(results, indent=4, sort_keys=True, ensure_ascii=False) elif output == 'yaml': results = my_yaml.safe_dump(results) elif output == 'ndjson': # NDJSON support seems like a hack. There has to be a better way try: writer = my_jsonlines.Writer(sys.stdout) writer.write_all(results) writer.close() except (BrokenPipeError, IOError): pass return else: # None of the above, so pass thru results except something in byte form if not isinstance(results, (bytes, bytearray)): results = str(results) if results: try: if isinstance(results, (bytes, bytearray)): sys.stdout.buffer.write(results) else: sys.stdout.write(results) if not results.endswith('\n'): sys.stdout.write('\n') except (BrokenPipeError, IOError): pass def do_it(args): """Cloudflare API via command line""" verbose = False output = 'json' example = False raw = False do_dump = False do_openapi = None openapi_url = None binary_file = False profile = None http_headers = None warnings = True method = 'GET' usage = ('usage: cli4 ' + '[-V|--version] [-h|--help] [-v|--verbose] ' + '[-e|--examples] ' + '[-q|--quiet] ' + '[-j|--json] [-y|--yaml] [-n|--ndjson] [-i|--image] ' + '[-r|--raw] ' + '[-d|--dump] ' + '[-A|--openapi url] ' + '[-b|--binary] ' + '[-p|--profile profile-name] ' + '[-h|--header additional-header] ' + '[-w|--warnings [True|False]] ' + '[--get|--patch|--post|--put|--delete] ' + '[item=value|item=@filename|@filename ...] ' + '/command ...') try: opts, args = getopt.getopt(args, 'VhveqjynirdA:bp:h:w:GPOUD', [ 'version', 'help', 'verbose', 'examples', 'quiet', 'json', 'yaml', 'ndjson', 'image', 'raw', 'dump', 'openapi=', 'binary', 'profile=', 'header=', 'warnings=', 'get', 'patch', 'post', 'put', 'delete' ]) except getopt.GetoptError: sys.exit(usage) for opt, arg in opts: if opt in ('-V', '--version'): sys.exit('Cloudflare library version: %s' % (CloudFlare.__version__)) if opt in ('-h', '--help'): sys.exit(usage) elif opt in ('-v', '--verbose'): verbose = True elif opt in ('-q', '--quiet'): output = None elif opt in ('-e', '--examples'): example = True elif opt in ('-j', '--json'): output = 'json' elif opt in ('-y', '--yaml'): load_and_check_yaml() output = 'yaml' elif opt in ('-n', '--ndjson'): load_and_check_jsonlines() output = 'ndjson' elif opt in ('-i', '--image'): output = 'image' elif opt in ('-r', '--raw'): raw = True elif opt in ('-p', '--profile'): profile = arg elif opt in ('-h', '--header'): if http_headers is None: http_headers = [] http_headers.append(arg) elif opt in ('-w', '--warnings'): if arg is None or arg == '': warnings = None elif arg.lower() in ('yes', 'true', '1'): warnings = True elif arg.lower() in ('no', 'false', '0'): warnings = False else: sys.exit('cli4: --warnings takes boolean True/False argument') elif opt in ('-d', '--dump'): do_dump = True elif opt in ('-A', '--openapi'): do_openapi = True openapi_url = arg if arg != '' else None elif opt in ('-b', '--binary'): binary_file = True elif opt in ('-G', '--get'): method = 'GET' elif opt in ('-P', '--patch'): method = 'PATCH' elif opt in ('-O', '--post'): method = 'POST' elif opt in ('-U', '--put'): method = 'PUT' elif opt in ('-D', '--delete'): method = 'DELETE' if example: try: examples.display() except ModuleNotFoundError as e: sys.exit(e) sys.exit(0) try: cf = CloudFlare.CloudFlare(debug=verbose, raw=raw, profile=profile, http_headers=http_headers, warnings=warnings) except Exception as e: sys.exit(e) if do_dump: a = dump_commands(cf) # success - just dump results and exit sys.stdout.write(a) sys.exit(0) if do_openapi: try: a = dump_commands_from_web(cf, openapi_url) except CloudFlare.exceptions.CloudFlareAPIError as e: sys.exit('cli4: %s - Failed' % (e)) # success - just dump results and exit sys.stdout.write(a) sys.exit(0) # next grab the params. These are in the form of tag=value or =value or @filename (params, content, files) = process_params_content_files(method, binary_file, args) # what's left is the command itself if len(args) < 1: sys.exit(usage) commands = args exit_with_error = False for command in commands: try: results = run_command(cf, method, command, params, content, files) write_results(results, output) except KeyboardInterrupt as e: sys.exit('cli4: %s - Interrupted\n' % (command)) except Exception as e: exit_with_error = True if exit_with_error: sys.exit(1) def cli4(args): """Cloudflare API via command line""" do_it(args) sys.exit(0) usr/local/CyberPanel/lib/python3.10/site-packages/cli4/cli4.py 0000644 00000053211 15027746267 0017666 0 ustar 00 #!/usr/bin/env python """Cloudflare API via command line""" import sys import re import getopt import json import CloudFlare from .dump import dump_commands, dump_commands_from_web from . import converters from . import examples my_yaml = None my_jsonlines = None class CLI4InternalError(Exception): """ errors in cli4 """ def load_and_check_yaml(): """ load_and_check_yaml() """ # only called if user uses --yaml flag from . import myyaml global my_yaml try: my_yaml = myyaml.myyaml() except ImportError: sys.exit('cli4: install yaml support via: pip install pyyaml') def load_and_check_jsonlines(): """ load_and_check_yaml() """ # only called if user uses --ndjson flag from . import myjsonlines global my_jsonlines try: my_jsonlines = myjsonlines.myjsonlines() except ImportError: sys.exit('cli4: install jsonlines support via: pip install jsonlines') def strip_multiline(s): """ remove leading/trailing tabs/spaces on each line""" # This hack is needed in order to use yaml.safe_load() on JSON text - tabs are not allowed return '\n'.join([line.strip() for line in s.splitlines()]) def process_params_content_files(method, binary_file, args): """ process_params_content_files() """ digits_only = re.compile('^-?[0-9]+$') floats_only = re.compile('^-?[0-9.]+$') params = None content = None files = None # next grab the params. These are in the form of tag=value or =value or @filename while len(args) > 0 and ('=' in args[0] or args[0][0] == '@'): arg = args.pop(0) if arg[0] == '@': # a file to be uploaded - used in workers/script etc - only via PUT or POST filename = arg[1:] if method not in ['PUT','POST']: sys.exit('cli4: %s - raw file upload only with PUT or POST' % (filename)) try: if filename == '-': if binary_file: content = sys.stdin.buffer.read() else: content = sys.stdin.read() else: if binary_file: with open(filename, 'rb') as f: content = f.read() else: with open(filename, 'r', encoding="utf-8") as f: content = f.read() except IOError: sys.exit('cli4: %s - file open failure' % (filename)) continue tag_string, value_string = arg.split('=', 1) if value_string.lower() == 'true': value = True elif value_string.lower() == 'false': value = False elif value_string == '' or value_string.lower() == 'none': value = None elif value_string[0] == '=' and value_string[1:] == '': sys.exit('cli4: %s== - no number value passed' % (tag_string)) elif value_string[0] == '=' and digits_only.match(value_string[1:]): value = int(value_string[1:]) elif value_string[0] == '=' and floats_only.match(value_string[1:]): value = float(value_string[1:]) elif value_string[0] == '=': sys.exit('cli4: %s== - invalid number value passed' % (tag_string)) elif value_string[0] in '[{' and value_string[-1] in '}]': # a json structure - used in pagerules try: # value = json.loads(value) - changed to yaml code to remove unicode string issues load_and_check_yaml() # cleanup string before parsing so that yaml.safe.load does not complain about whitespace # >>> found character '\t' that cannot start any token <<< value_string = strip_multiline(value_string) try: value = my_yaml.safe_load(value_string) except my_yaml.parser.ParserError: raise ValueError from None except ValueError: sys.exit('cli4: %s="%s" - can\'t parse json value' % (tag_string, value_string)) elif value_string[0] == '@': # a file to be uploaded - used in dns_records/import etc - only via PUT or POST filename = value_string[1:] if method not in ['PUT', 'POST']: sys.exit('cli4: %s=%s - file upload only with PUT or POST' % (tag_string, filename)) if files is None: files = {} if tag_string in files: sys.exit('cli4: %s=%s - duplicate name' % (tag_string, filename)) try: if filename == '-': files[tag_string] = sys.stdin else: files[tag_string] = open(filename, 'rb') except IOError: sys.exit('cli4: %s=%s - file open failure' % (tag_string, filename)) # no need for param code below continue elif (value_string[0] == '"' and value_string[-1] == '"') or (value_string[0] == '\'' and value_string[-1] == '\''): # remove quotes value = value_string[1:-1] else: value = value_string if tag_string == '': # There's no tag; it's just an unnamed list if params is None: params = value else: sys.exit('cli4: %s=%s - param error. Can\'t mix unnamed and named list' % (tag_string, value_string)) else: if params is None: params = {} tag = tag_string try: params[tag] = value except TypeError: sys.exit('cli4: %s=%s - param error. Can\'t mix unnamed and named list' % (tag_string, value_string)) if content and params: sys.exit('cli4: content and params not allowed together') if params and files: for k,v in params.items(): files[k] = (None, v) params = None # sys.exit('cli4: params and files not allowed together') if method != 'GET': if params: content = params params = None return (params, content, files) def run_command(cf, method, command, params=None, content=None, files=None): """run the command line""" # remove leading and trailing /'s if command[0] == '/': command = command[1:] if command[-1] == '/': command = command[:-1] # break down command into it's seperate pieces # these are then checked against the Cloudflare class # to confirm there is a method that matches parts = command.split('/') cmd = [] identifier1 = None identifier2 = None identifier3 = None hex_only = re.compile('^[0-9a-fA-F]+$') waf_rules = re.compile('^[0-9]+[A-Z]*$') uuid_value = re.compile('^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$') # 8-4-4-4-12 m = cf for element in parts: if element[0] == ':': element = element[1:] if identifier1 is None: if len(element) in [32, 40, 48] and hex_only.match(element): # raw identifier - lets just use it as-is identifier1 = element elif len(element) == 36 and uuid_value.match(element): # uuid identifier - lets just use it as-is identifier1 = element elif element[0] == ':': # raw string - used for workers script_name - use ::script_name identifier1 = element[1:] else: try: if cmd[0] == 'certificates': # identifier1 = convert_certificates_to_identifier(cf, element) identifier1 = converters.convert_zones_to_identifier(cf, element) elif cmd[0] == 'zones': identifier1 = converters.convert_zones_to_identifier(cf, element) elif cmd[0] == 'accounts': identifier1 = converters.convert_accounts_to_identifier(cf, element) elif cmd[0] == 'organizations': identifier1 = converters.convert_organizations_to_identifier(cf, element) elif (cmd[0] == 'user') and (cmd[1] == 'organizations'): identifier1 = converters.convert_organizations_to_identifier(cf, element) elif (cmd[0] == 'user') and (cmd[1] == 'invites'): identifier1 = converters.convert_invites_to_identifier(cf, element) elif (cmd[0] == 'user') and (cmd[1] == 'virtual_dns'): identifier1 = converters.convert_virtual_dns_to_identifier(cf, element) elif (cmd[0] == 'user') and (cmd[1] == 'load_balancers') and (cmd[2] == 'pools'): identifier1 = converters.convert_load_balancers_pool_to_identifier(cf, element) else: raise CLI4InternalError("/%s/%s :NOT CODED YET" % ('/'.join(cmd), element)) except CLI4InternalError as e: sys.stderr.write('cli4: /%s - %s\n' % (command, e)) raise e cmd.append(':' + identifier1) elif identifier2 is None: if len(element) in [32, 40, 48] and hex_only.match(element): # raw identifier - lets just use it as-is identifier2 = element elif len(element) == 36 and uuid_value.match(element): # uuid identifier - lets just use it as-is identifier2 = element elif element[0] == ':': # raw string - used for workers script_names identifier2 = element[1:] else: try: if (cmd[0] and cmd[0] == 'zones') and (cmd[2] and cmd[2] == 'dns_records'): identifier2 = converters.convert_dns_record_to_identifier(cf, identifier1, element) elif (cmd[0] and cmd[0] == 'zones') and (cmd[2] and cmd[2] == 'custom_hostnames'): identifier2 = converters.convert_custom_hostnames_to_identifier(cf, identifier1, element) else: raise CLI4InternalError("/%s/:%s :NOT CODED YET" % ('/'.join(cmd), element)) except CLI4InternalError as e: sys.stderr.write('cli4: /%s - %s\n' % (command, e)) raise e # identifier2 may be an array - this needs to be dealt with later if isinstance(identifier2, list): cmd.append(':' + '[' + ','.join(identifier2) + ']') else: cmd.append(':' + identifier2) identifier2 = [identifier2] else: if len(element) in [32, 40, 48] and hex_only.match(element): # raw identifier - lets just use it as-is identifier3 = element elif len(element) == 36 and uuid_value.match(element): # uuid identifier - lets just use it as-is identifier3 = element elif waf_rules.match(element): identifier3 = element elif element[0] == ':': # raw string - used for workers script_names identifier3 = element[1:] else: # /accounts/:id/storage/kv/namespaces/:id/values/:key_name - it's a strange one! if len(cmd) >= 6 and cmd[0] == 'accounts' and cmd[2] == 'storage' and cmd[3] == 'kv' and cmd[4] == 'namespaces' and cmd[6] == 'values': identifier3 = element else: sys.stderr.write('/%s/:%s :NOT CODED YET\n' % ('/'.join(cmd), element)) raise e else: try: m = getattr(m, CloudFlare.CloudFlare.sanitize_verb(element)) cmd.append(element) except AttributeError as e: # the verb/element was not found sys.stderr.write('cli4: /%s - not found\n' % (command)) raise e results = [] if identifier2 is None: identifier2 = [None] for i2 in identifier2: try: if method == 'GET': # no content with a GET call r = m.get(identifier1=identifier1, identifier2=i2, identifier3=identifier3, params=params) elif method == 'PATCH': r = m.patch(identifier1=identifier1, identifier2=i2, identifier3=identifier3, data=content) elif method == 'POST': r = m.post(identifier1=identifier1, identifier2=i2, identifier3=identifier3, data=content, files=files) elif method == 'PUT': r = m.put(identifier1=identifier1, identifier2=i2, identifier3=identifier3, data=content, files=files) elif method == 'DELETE': r = m.delete(identifier1=identifier1, identifier2=i2, identifier3=identifier3, data=content) else: pass except CloudFlare.exceptions.CloudFlareAPIError as e: if len(e) > 0: # more than one error returned by the API for x in e: sys.stderr.write('cli4: /%s - %d %s\n' % (command, int(x), str(x))) sys.stderr.write('cli4: /%s - %d %s\n' % (command, int(e), str(e))) raise e except CloudFlare.exceptions.CloudFlareInternalError as e: sys.stderr.write('cli4: InternalError: /%s - %d %s\n' % (command, int(e), str(e))) raise e except Exception as e: sys.stderr.write('cli4: /%s - %s - api error\n' % (command, str(e))) raise e results.append(r) return results def write_results(results, output): """dump the results""" if output is None: return if len(results) == 1: results = results[0] if isinstance(results, (str, bytes, bytearray)): # if the results are a simple string, then it should be dumped directly # this is only used for /zones/:id/dns_records/export, workers, and other calls # or # output is image or audio or video or something like that so we dump directly pass else: # anything more complex (dict, list, etc) should be dumped as JSON/YAML if output == 'json': try: results = json.dumps(results, indent=4, sort_keys=True, ensure_ascii=False, encoding='utf8') except TypeError: results = json.dumps(results, indent=4, sort_keys=True, ensure_ascii=False) elif output == 'yaml': results = my_yaml.safe_dump(results) elif output == 'ndjson': # NDJSON support seems like a hack. There has to be a better way try: writer = my_jsonlines.Writer(sys.stdout) writer.write_all(results) writer.close() except (BrokenPipeError, IOError): pass return else: # None of the above, so pass thru results except something in byte form if not isinstance(results, (bytes, bytearray)): results = str(results) if results: try: if isinstance(results, (bytes, bytearray)): sys.stdout.buffer.write(results) else: sys.stdout.write(results) if not results.endswith('\n'): sys.stdout.write('\n') except (BrokenPipeError, IOError): pass def do_it(args): """Cloudflare API via command line""" verbose = False output = 'json' example = False raw = False do_dump = False do_openapi = None openapi_url = None binary_file = False profile = None http_headers = None warnings = True method = 'GET' usage = ('usage: cli4 ' + '[-V|--version] [-h|--help] [-v|--verbose] ' + '[-e|--examples] ' + '[-q|--quiet] ' + '[-j|--json] [-y|--yaml] [-n|--ndjson] [-i|--image] ' + '[-r|--raw] ' + '[-d|--dump] ' + '[-A|--openapi url] ' + '[-b|--binary] ' + '[-p|--profile profile-name] ' + '[-h|--header additional-header] ' + '[-w|--warnings [True|False]] ' + '[--get|--patch|--post|--put|--delete] ' + '[item=value|item=@filename|@filename ...] ' + '/command ...') try: opts, args = getopt.getopt(args, 'VhveqjynirdA:bp:h:w:GPOUD', [ 'version', 'help', 'verbose', 'examples', 'quiet', 'json', 'yaml', 'ndjson', 'image', 'raw', 'dump', 'openapi=', 'binary', 'profile=', 'header=', 'warnings=', 'get', 'patch', 'post', 'put', 'delete' ]) except getopt.GetoptError: sys.exit(usage) for opt, arg in opts: if opt in ('-V', '--version'): sys.exit('Cloudflare library version: %s' % (CloudFlare.__version__)) if opt in ('-h', '--help'): sys.exit(usage) elif opt in ('-v', '--verbose'): verbose = True elif opt in ('-q', '--quiet'): output = None elif opt in ('-e', '--examples'): example = True elif opt in ('-j', '--json'): output = 'json' elif opt in ('-y', '--yaml'): load_and_check_yaml() output = 'yaml' elif opt in ('-n', '--ndjson'): load_and_check_jsonlines() output = 'ndjson' elif opt in ('-i', '--image'): output = 'image' elif opt in ('-r', '--raw'): raw = True elif opt in ('-p', '--profile'): profile = arg elif opt in ('-h', '--header'): if http_headers is None: http_headers = [] http_headers.append(arg) elif opt in ('-w', '--warnings'): if arg is None or arg == '': warnings = None elif arg.lower() in ('yes', 'true', '1'): warnings = True elif arg.lower() in ('no', 'false', '0'): warnings = False else: sys.exit('cli4: --warnings takes boolean True/False argument') elif opt in ('-d', '--dump'): do_dump = True elif opt in ('-A', '--openapi'): do_openapi = True openapi_url = arg if arg != '' else None elif opt in ('-b', '--binary'): binary_file = True elif opt in ('-G', '--get'): method = 'GET' elif opt in ('-P', '--patch'): method = 'PATCH' elif opt in ('-O', '--post'): method = 'POST' elif opt in ('-U', '--put'): method = 'PUT' elif opt in ('-D', '--delete'): method = 'DELETE' if example: try: examples.display() except ModuleNotFoundError as e: sys.exit(e) sys.exit(0) try: cf = CloudFlare.CloudFlare(debug=verbose, raw=raw, profile=profile, http_headers=http_headers, warnings=warnings) except Exception as e: sys.exit(e) if do_dump: a = dump_commands(cf) # success - just dump results and exit sys.stdout.write(a) sys.exit(0) if do_openapi: try: a = dump_commands_from_web(cf, openapi_url) except CloudFlare.exceptions.CloudFlareAPIError as e: sys.exit('cli4: %s - Failed' % (e)) # success - just dump results and exit sys.stdout.write(a) sys.exit(0) # next grab the params. These are in the form of tag=value or =value or @filename (params, content, files) = process_params_content_files(method, binary_file, args) # what's left is the command itself if len(args) < 1: sys.exit(usage) commands = args exit_with_error = False for command in commands: try: results = run_command(cf, method, command, params, content, files) write_results(results, output) except KeyboardInterrupt as e: sys.exit('cli4: %s - Interrupted\n' % (command)) except Exception as e: exit_with_error = True if exit_with_error: sys.exit(1) def cli4(args): """Cloudflare API via command line""" do_it(args) sys.exit(0)
| ver. 1.4 |
Github
|
.
| PHP 8.2.28 | Generation time: 0.02 |
proxy
|
phpinfo
|
Settings