diff --git a/res/ab.py b/res/ab.py new file mode 100644 index 000000000..338bd3c64 --- /dev/null +++ b/res/ab.py @@ -0,0 +1,771 @@ +#!/usr/bin/env python3 + +import requests +import argparse +import json +from datetime import datetime, timedelta + + +def get_personal_ab(url, token): + """Get personal address book GUID""" + headers = {"Authorization": f"Bearer {token}"} + + response = requests.get(f"{url}/api/ab/personal", headers=headers) + + if response.status_code != 200: + return f"Error: {response.status_code} - {response.text}" + + return response.json() + + +def view_shared_abs(url, token, name=None): + """View all shared address books (excluding personal ones)""" + headers = {"Authorization": f"Bearer {token}"} + pageSize = 30 + params = { + "name": name, + } + + filtered_params = { + k: "%" + v + "%" if (v != "-" and "%" not in v and k != "name") else v + for k, v in params.items() + if v is not None + } + filtered_params["pageSize"] = pageSize + + abs = [] + current = 1 + + while True: + filtered_params["current"] = current + response = requests.get(f"{url}/api/ab/shared/profiles", headers=headers, params=filtered_params) + response_json = response.json() + + data = response_json.get("data", []) + abs.extend(data) + + total = response_json.get("total", 0) + current += pageSize + if len(data) < pageSize or current > total: + break + + return abs + + +def get_ab_by_name(url, token, ab_name): + """Get address book by name""" + abs = view_shared_abs(url, token, ab_name) + for ab in abs: + if ab["name"] == ab_name: + return ab + return None + + +def view_ab_peers(url, token, ab_guid, peer_id=None, alias=None): + """View peers in an address book""" + headers = {"Authorization": f"Bearer {token}"} + pageSize = 30 + params = { + "ab": ab_guid, + "id": peer_id, + "alias": alias, + } + + filtered_params = { + k: "%" + v + "%" if (v != "-" and "%" not in v and k not in ["ab"]) else v + for k, v in params.items() + if v is not None + } + filtered_params["pageSize"] = pageSize + + peers = [] + current = 1 + + while True: + filtered_params["current"] = current + response = requests.get(f"{url}/api/ab/peers", headers=headers, params=filtered_params) + response_json = response.json() + + data = response_json.get("data", []) + peers.extend(data) + + total = response_json.get("total", 0) + current += pageSize + if len(data) < pageSize or current > total: + break + + return peers + + +def view_ab_tags(url, token, ab_guid): + """View tags in an address book""" + headers = {"Authorization": f"Bearer {token}"} + response = requests.get(f"{url}/api/ab/tags/{ab_guid}", headers=headers) + response_json = check_response(response) + + # Handle error responses + if isinstance(response_json, tuple) and response_json[0] == "Failed": + print(f"Error: {response_json[1]} - {response_json[2]}") + return [] + + # Format color values as hex + if response_json: + for tag in response_json: + if "color" in tag and tag["color"] is not None: + # Convert color to hex format + color_value = tag["color"] + if isinstance(color_value, int): + tag["color"] = f"0x{color_value:08X}" + + return response_json if response_json else [] + + +def check_response(response): + """Check API response and return result""" + if response.status_code == 200: + try: + response_json = response.json() + return response_json + except ValueError: + return response.text or "Success" + else: + return "Failed", response.status_code, response.text + + +def add_peer(url, token, ab_guid, peer_id, alias=None, note=None, tags=None, password=None): + """Add a peer to address book""" + print(f"Adding peer {peer_id} to address book") + headers = {"Authorization": f"Bearer {token}"} + + payload = { + "id": peer_id, + "note": note, + } + + # Add peer info if provided + info = {} + if alias: + info["alias"] = alias + if tags: + info["tags"] = tags if isinstance(tags, list) else [tags] + if password: + info["password"] = password + + if info: + payload.update(info) + + response = requests.post(f"{url}/api/ab/peer/add/{ab_guid}", headers=headers, json=payload) + return check_response(response) + + +def delete_peer(url, token, ab_guid, peer_ids): + """Delete peers from address book by IDs""" + if isinstance(peer_ids, str): + peer_ids = [peer_ids] + + print(f"Deleting peers {peer_ids} from address book") + headers = {"Authorization": f"Bearer {token}"} + response = requests.delete(f"{url}/api/ab/peer/{ab_guid}", headers=headers, json=peer_ids) + return check_response(response) + +def update_peer(url, token, ab_guid, peer_id, alias=None, note=None, tags=None, password=None): + """Update a peer in address book""" + print(f"Updating peer {peer_id} in address book") + headers = {"Authorization": f"Bearer {token}"} + + # Check if at least one parameter is provided for update + update_params = [alias, note, tags, password] + if all(param is None for param in update_params): + return "Error: At least one parameter must be specified for update" + + payload = { + "id": peer_id, + } + + # Add fields to update + info = {} + if alias is not None: + info["alias"] = alias + if tags is not None: + info["tags"] = tags if isinstance(tags, list) else [tags] + if password is not None: + info["password"] = password + + if info: + payload.update(info) + + if note is not None: + payload["note"] = note + + response = requests.put(f"{url}/api/ab/peer/update/{ab_guid}", headers=headers, json=payload) + return check_response(response) + + +def str2color(tag_name, existing_colors=None): + """Generate color for tag name similar to str2color2 function""" + if existing_colors is None: + existing_colors = [] + + color_map = { + "red": 0xFFFF0000, + "green": 0xFF008000, + "blue": 0xFF0000FF, + "orange": 0xFFFF9800, + "purple": 0xFF9C27B0, + "grey": 0xFF9E9E9E, + "cyan": 0xFF00BCD4, + "lime": 0xFFCDDC39, + "teal": 0xFF009688, + "pink": 0xFFF48FB1, + "indigo": 0xFF3F51B5, + "brown": 0xFF795548, + } + + lower_name = tag_name.lower() + + # Check if tag name matches a predefined color + if lower_name in color_map: + return color_map[lower_name] + + # Special case for yellow + if lower_name == "yellow": + return 0xFFFFFF00 + + # Generate hash-based color + hash_value = 0 + for char in tag_name: + hash_value += ord(char) + + color_list = list(color_map.values()) + hash_value = hash_value % len(color_list) + result = color_list[hash_value] + + # If color is already used, try to find an unused one + if result in existing_colors: + for color in color_list: + if color not in existing_colors: + result = color + break + + return result + + +def add_tag(url, token, ab_guid, tag_name, color=None): + """Add a tag to address book""" + print(f"Adding tag '{tag_name}' to address book") + headers = {"Authorization": f"Bearer {token}"} + + # If no color specified, generate one based on tag name + if color is None: + # Get existing tags to avoid color conflicts + try: + existing_tags = view_ab_tags(url, token, ab_guid) + existing_colors = [tag.get("color", 0) for tag in existing_tags] + color = str2color(tag_name, existing_colors) + except: + # Fallback to default color if we can't get existing tags + color = str2color(tag_name) + + payload = { + "name": tag_name, + "color": color, + } + + response = requests.post(f"{url}/api/ab/tag/add/{ab_guid}", headers=headers, json=payload) + return check_response(response) + + +def update_tag(url, token, ab_guid, tag_name, color): + """Update a tag in address book""" + print(f"Updating tag '{tag_name}' in address book") + headers = {"Authorization": f"Bearer {token}"} + + payload = { + "name": tag_name, + "color": color, + } + + response = requests.put(f"{url}/api/ab/tag/update/{ab_guid}", headers=headers, json=payload) + return check_response(response) + + +def delete_tags(url, token, ab_guid, tag_names): + """Delete tags from address book""" + if isinstance(tag_names, str): + tag_names = [tag_names] + + print(f"Deleting tags {tag_names} from address book") + headers = {"Authorization": f"Bearer {token}"} + response = requests.delete(f"{url}/api/ab/tag/{ab_guid}", headers=headers, json=tag_names) + return check_response(response) + + +def add_shared_ab(url, token, name, note=None, password=None): + """Add a new shared address book""" + print(f"Adding shared address book '{name}'") + headers = {"Authorization": f"Bearer {token}"} + + payload = { + "name": name, + "note": note, + } + + # Add info if password is provided + if password: + payload["info"] = { + "password": password + } + + response = requests.post(f"{url}/api/ab/shared/add", headers=headers, json=payload) + return check_response(response) + + +def update_shared_ab(url, token, ab_guid, name=None, note=None, owner=None, password=None): + """Update a shared address book""" + print(f"Updating shared address book {ab_guid}") + headers = {"Authorization": f"Bearer {token}"} + + # Check if at least one parameter is provided for update + update_params = [name, note, owner, password] + if all(param is None for param in update_params): + return "Error: At least one parameter must be specified for update" + + payload = { + "guid": ab_guid, + } + + if name is not None: + payload["name"] = name + if note is not None: + payload["note"] = note + if owner is not None: + payload["owner"] = owner + if password is not None: + payload["info"] = { + "password": password + } + + response = requests.put(f"{url}/api/ab/shared/update/profile", headers=headers, json=payload) + return check_response(response) + + +def delete_shared_abs(url, token, ab_guids): + """Delete shared address books""" + if isinstance(ab_guids, str): + ab_guids = [ab_guids] + + print(f"Deleting shared address books {ab_guids}") + headers = {"Authorization": f"Bearer {token}"} + response = requests.delete(f"{url}/api/ab/shared", headers=headers, json=ab_guids) + return check_response(response) + + +def permission_to_string(permission): + """Convert numeric permission to string representation""" + permission_map = { + 1: "ro", # Read + 2: "rw", # ReadWrite + 3: "full" # FullControl + } + return permission_map.get(permission, str(permission)) + + +def string_to_permission(permission_str): + """Convert string permission to numeric representation""" + permission_map = { + "ro": 1, # Read + "rw": 2, # ReadWrite + "full": 3 # FullControl + } + return permission_map.get(permission_str.lower(), None) + + +def view_ab_rules(url, token, ab_guid): + """View rules in an address book""" + headers = {"Authorization": f"Bearer {token}"} + pageSize = 30 + params = { + "ab": ab_guid, + "pageSize": pageSize, + } + + rules = [] + current = 1 + + while True: + params["current"] = current + response = requests.get(f"{url}/api/ab/rules", headers=headers, params=params) + response_json = response.json() + + data = response_json.get("data", []) + rules.extend(data) + + total = response_json.get("total", 0) + current += pageSize + if len(data) < pageSize or current > total: + break + + # Convert numeric permissions to string format + for rule in rules: + if "rule" in rule: + rule["rule"] = permission_to_string(rule["rule"]) + + return rules + + +def add_ab_rule(url, token, ab_guid, rule_type, user=None, group=None, rule=1): + """Add a rule to address book""" + print(f"Adding {rule_type} rule to address book") + headers = {"Authorization": f"Bearer {token}"} + + payload = { + "guid": ab_guid, + "rule": rule, + } + + if rule_type == "user" and user: + payload["user"] = user + elif rule_type == "group" and group: + payload["group"] = group + elif rule_type == "everyone": + # For everyone, both user and group are None (not included in payload) + pass + + response = requests.post(f"{url}/api/ab/rule", headers=headers, json=payload) + return check_response(response) + + +def update_ab_rule(url, token, rule_guid, rule): + """Update an address book rule""" + print(f"Updating rule {rule_guid}") + headers = {"Authorization": f"Bearer {token}"} + + payload = { + "guid": rule_guid, + "rule": rule, + } + + response = requests.patch(f"{url}/api/ab/rule", headers=headers, json=payload) + return check_response(response) + + +def delete_ab_rules(url, token, rule_guids): + """Delete address book rules""" + if isinstance(rule_guids, str): + rule_guids = [rule_guids] + + print(f"Deleting rules {rule_guids}") + headers = {"Authorization": f"Bearer {token}"} + response = requests.delete(f"{url}/api/ab/rules", headers=headers, json=rule_guids) + return check_response(response) + + +def main(): + def parse_color(value): + """Parse color value - supports both hex (0xFF00FF00) and decimal""" + if value.startswith('0x') or value.startswith('0X'): + return int(value, 16) + else: + return int(value) + + def parse_permission(value): + """Parse permission value - supports both string (ro/rw/full) and numeric (1/2/3)""" + # Try to parse as string first + permission_num = string_to_permission(value) + if permission_num is not None: + return permission_num + + # Try to parse as integer for backward compatibility + try: + num_value = int(value) + if num_value in [1, 2, 3]: + return num_value + else: + raise argparse.ArgumentTypeError(f"Invalid permission value: {value}. Must be one of: ro, rw, full, 1, 2, 3") + except ValueError: + raise argparse.ArgumentTypeError(f"Invalid permission value: {value}. Must be one of: ro, rw, full, 1, 2, 3") + + parser = argparse.ArgumentParser(description="Address Book manager") + + # Required arguments + parser.add_argument( + "command", + choices=["view-ab", "add-ab", "update-ab", "delete-ab", "get-personal-ab", + "view-peer", "add-peer", "update-peer", "delete-peer", + "view-tag", "add-tag", "update-tag", "delete-tag", + "view-rule", "add-rule", "update-rule", "delete-rule"], + help="Command to execute", + ) + + # Global arguments (used by all commands) + parser.add_argument("--url", required=True, help="URL of the API") + parser.add_argument("--token", required=True, help="Bearer token for authentication") + + # Address book identification (used by most commands except get-personal-ab) + parser.add_argument("--ab-name", help="Address book name (for identification)") + parser.add_argument("--ab-guid", help="Address book GUID (alternative to ab-name)") + + # Address book management arguments + parser.add_argument("--ab-update-name", help="New address book name (for update)") + parser.add_argument("--note", help="Note field") + parser.add_argument("--password", help="Password field") + parser.add_argument("--owner", help="Address book owner (username)") + + # Peer management arguments + parser.add_argument("--peer-id", help="Peer ID") + parser.add_argument("--alias", help="Peer alias") + parser.add_argument("--tags", help="Peer tags (supports both 'tag1,tag2' and '[tag1,tag2]' formats, use '[]' to clear tags)") + + # Tag management arguments + parser.add_argument("--tag-name", help="Tag name") + parser.add_argument("--tag-color", type=parse_color, help="Tag color (hex number like 0xFF00FF00 or decimal, auto-generated if not specified)") + + # Rule management arguments + parser.add_argument("--rule-type", choices=["user", "group", "everyone"], help="Rule type (auto-detected if not specified)") + parser.add_argument("--rule-user", help="Rule target user name (auto-sets rule-type=user)") + parser.add_argument("--rule-group", help="Rule target group name (auto-sets rule-type=group)") + parser.add_argument("--rule-permission", type=parse_permission, help="Rule permission (ro=Read, rw=ReadWrite, full=FullControl, or numeric 1/2/3)") + parser.add_argument("--rule-guid", help="Rule GUID (for update/delete)") + + args = parser.parse_args() + + # Remove trailing slashes from URL + while args.url.endswith("/"): + args.url = args.url[:-1] + + if args.command == "view-ab": + # View all shared address books + abs = view_shared_abs(args.url, args.token, args.ab_name) + print(json.dumps(abs, indent=2)) + + elif args.command == "get-personal-ab": + # Get personal address book GUID + personal_ab = get_personal_ab(args.url, args.token) + print(json.dumps(personal_ab, indent=2)) + + elif args.command in ["add-ab", "update-ab", "delete-ab"]: + # Address book management commands + if args.command == "add-ab": + if not args.ab_name: + print("Error: --ab-name is required for add-ab command") + return + + result = add_shared_ab(args.url, args.token, args.ab_name, args.note, args.password) + print(f"Result: {result}") + + elif args.command in ["update-ab", "delete-ab"]: + # Commands that need ab-name or ab-guid + if not args.ab_name and not args.ab_guid: + print("Error: --ab-name or --ab-guid is required for this command") + return + + if args.ab_name and args.ab_guid: + print("Error: Cannot specify both --ab-name and --ab-guid") + return + + if args.ab_guid: + ab_guid = args.ab_guid + print(f"Working with address book GUID: {ab_guid}") + else: + # Get address book by name + ab = get_ab_by_name(args.url, args.token, args.ab_name) + if not ab: + print(f"Error: Address book '{args.ab_name}' not found") + return + ab_guid = ab["guid"] + print(f"Working with address book: {args.ab_name} (GUID: {ab_guid})") + + if args.command == "update-ab": + result = update_shared_ab(args.url, args.token, ab_guid, args.ab_update_name, args.note, args.owner, args.password) + print(f"Result: {result}") + + elif args.command == "delete-ab": + result = delete_shared_abs(args.url, args.token, ab_guid) + print(f"Result: {result}") + + elif args.command in ["view-peer", "add-peer", "update-peer", "delete-peer", "view-tag", "add-tag", "update-tag", "delete-tag", "view-rule", "add-rule", "update-rule", "delete-rule"]: + if not args.ab_name and not args.ab_guid: + print("Error: --ab-name or --ab-guid is required for this command") + return + + if args.ab_name and args.ab_guid: + print("Error: Cannot specify both --ab-name and --ab-guid") + return + + if args.ab_guid: + ab_guid = args.ab_guid + print(f"Working with address book GUID: {ab_guid}") + else: + # Get address book by name + ab = get_ab_by_name(args.url, args.token, args.ab_name) + if not ab: + print(f"Error: Address book '{args.ab_name}' not found") + return + + ab_guid = ab["guid"] + print(f"Working with address book: {args.ab_name} (GUID: {ab_guid})") + + if args.command == "view-peer": + peers = view_ab_peers(args.url, args.token, ab_guid, args.peer_id, args.alias) + print(json.dumps(peers, indent=2)) + + elif args.command == "add-peer": + if not args.peer_id: + print("Error: --peer-id is required for add-peer command") + return + + # Handle tags parsing - support both [tag1,tag2] and tag1,tag2 formats + tags = None + if args.tags is not None: + if args.tags == "[]": + tags = [] # Empty list to clear tags + else: + # Remove brackets if present and split by comma + tags_str = args.tags.strip() + if tags_str.startswith('[') and tags_str.endswith(']'): + tags_str = tags_str[1:-1] # Remove brackets + tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()] + + result = add_peer( + args.url, + args.token, + ab_guid, + args.peer_id, + args.alias, + args.note, + tags, + args.password + ) + print(f"Result: {result}") + + elif args.command == "update-peer": + if not args.peer_id: + print("Error: --peer-id is required for update-peer command") + return + + # Handle tags parsing - support both [tag1,tag2] and tag1,tag2 formats + tags = None + if args.tags is not None: + if args.tags == "[]": + tags = [] # Empty list to clear tags + else: + # Remove brackets if present and split by comma + tags_str = args.tags.strip() + if tags_str.startswith('[') and tags_str.endswith(']'): + tags_str = tags_str[1:-1] # Remove brackets + tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()] + + result = update_peer( + args.url, + args.token, + ab_guid, + args.peer_id, + args.alias, + args.note, + tags, + args.password + ) + print(f"Result: {result}") + + elif args.command == "delete-peer": + if not args.peer_id: + print("Error: --peer-id is required for delete-peer command") + return + + result = delete_peer(args.url, args.token, ab_guid, args.peer_id) + print(f"Result: {result}") + + elif args.command == "view-tag": + tags = view_ab_tags(args.url, args.token, ab_guid) + print(json.dumps(tags, indent=2)) + + elif args.command == "add-tag": + if not args.tag_name: + print("Error: --tag-name is required for add-tag command") + return + + result = add_tag(args.url, args.token, ab_guid, args.tag_name, args.tag_color) + print(f"Result: {result}") + + elif args.command == "update-tag": + if not args.tag_name: + print("Error: --tag-name is required for update-tag command") + return + + result = update_tag(args.url, args.token, ab_guid, args.tag_name, args.tag_color) + print(f"Result: {result}") + + elif args.command == "delete-tag": + if not args.tag_name: + print("Error: --tag-name is required for delete-tag command") + return + + result = delete_tags(args.url, args.token, ab_guid, args.tag_name) + print(f"Result: {result}") + + elif args.command == "view-rule": + rules = view_ab_rules(args.url, args.token, ab_guid) + print(json.dumps(rules, indent=2)) + + elif args.command == "add-rule": + if not args.rule_permission: + print("Error: --rule-permission is required for add-rule command") + return + + # Auto-detect rule type if not explicitly specified + if not args.rule_type: + if args.rule_user and args.rule_group: + print("Error: Cannot specify both --rule-user and --rule-group") + return + elif args.rule_user: + rule_type = "user" + elif args.rule_group: + rule_type = "group" + else: + print("Error: Must specify --rule-type=everyone, --rule-user, or --rule-group") + return + else: + rule_type = args.rule_type + + # Validate explicit rule type with parameters + if rule_type == "user" and not args.rule_user: + print("Error: --rule-user is required when rule-type=user") + return + elif rule_type == "group" and not args.rule_group: + print("Error: --rule-group is required when rule-type=group") + return + elif rule_type == "user" and args.rule_group: + print("Error: Cannot specify --rule-group when rule-type=user") + return + elif rule_type == "group" and args.rule_user: + print("Error: Cannot specify --rule-user when rule-type=group") + return + elif rule_type == "everyone" and (args.rule_user or args.rule_group): + print("Error: Cannot specify --rule-user or --rule-group when rule-type=everyone") + return + + result = add_ab_rule(args.url, args.token, ab_guid, rule_type, args.rule_user, args.rule_group, args.rule_permission) + print(f"Result: {result}") + + elif args.command == "update-rule": + if not args.rule_guid: + print("Error: --rule-guid is required for update-rule command") + return + if not args.rule_permission: + print("Error: --rule-permission is required for update-rule command") + return + + result = update_ab_rule(args.url, args.token, args.rule_guid, args.rule_permission) + print(f"Result: {result}") + + elif args.command == "delete-rule": + if not args.rule_guid: + print("Error: --rule-guid is required for delete-rule command") + return + + result = delete_ab_rules(args.url, args.token, args.rule_guid) + print(f"Result: {result}") + + +if __name__ == "__main__": + main() diff --git a/res/audits.py b/res/audits.py new file mode 100644 index 000000000..b5cf28504 --- /dev/null +++ b/res/audits.py @@ -0,0 +1,370 @@ +#!/usr/bin/env python3 + +import requests +import argparse +import json +from datetime import datetime, timedelta, timezone + + +def format_timestamp(timestamp): + """Convert Unix timestamp to readable local datetime""" + if timestamp is None: + return None + try: + # Convert to local time + local_dt = datetime.fromtimestamp(timestamp) + return local_dt.strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, TypeError): + return timestamp + + +def parse_local_time_to_utc_string(time_str): + """Parse local time string to UTC time string for API filtering""" + try: + # Parse the local time string + local_dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S.%f") + # Make the datetime object timezone-aware using system's local timezone + local_dt = local_dt.replace(tzinfo=datetime.now().astimezone().tzinfo) + utc_dt = local_dt.astimezone(timezone.utc) + return utc_dt.strftime("%Y-%m-%d %H:%M:%S.000") + except ValueError: + try: + # Try without microseconds + local_dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") + # Make the datetime object timezone-aware using system's local timezone + local_dt = local_dt.replace(tzinfo=datetime.now().astimezone().tzinfo) + utc_dt = local_dt.astimezone(timezone.utc) + return utc_dt.strftime("%Y-%m-%d %H:%M:%S.000") + except ValueError: + return None + + +def get_connection_type_name(conn_type): + """Convert connection type number to readable name""" + type_map = { + 0: "Remote Desktop", + 1: "File Transfer", + 2: "Port Transfer", + 3: "View Camera", + 4: "Terminal" + } + return type_map.get(conn_type, f"Unknown ({conn_type})") + + +def get_console_type_name(console_type): + """Convert console audit type number to readable name""" + type_map = { + 0: "Group Management", + 1: "User Management", + 2: "Device Management", + 3: "Address Book Management" + } + return type_map.get(console_type, f"Unknown ({console_type})") + + +def get_console_operation_name(operation_code): + """Convert console operation code to readable name""" + operation_map = { + 0: "User Login", + 1: "Add Group", + 2: "Add User", + 3: "Add Device", + 4: "Delete Groups", + 5: "Disconnect Device", + 6: "Enable Users", + 7: "Disable Users", + 8: "Enable Devices", + 9: "Disable Devices", + 10: "Update Group", + 11: "Update User", + 12: "Update Device", + 13: "Delete User", + 14: "Delete Device", + 15: "Add Address Book", + 16: "Delete Address Book", + 17: "Change Address Book Name", + 18: "Delete Devices in the Address Book Recycle Bin", + 19: "Empty Address Book Recycle Bin", + 20: "Add Address Book Permission", + 21: "Delete Address Book Permission", + 22: "Update Address Book Permission" + } + return operation_map.get(operation_code, f"Unknown ({operation_code})") + + +def get_alarm_type_name(alarm_type): + """Convert alarm type number to readable name""" + type_map = { + 0: "Access attempt outside the IP whiltelist", + 1: "Over 30 consecutive access attempts", + 2: "Multiple access attempts within one minute", + 3: "Over 30 consecutive login attempts", + 4: "Multiple login attempts within one minute", + 5: "Multiple login attempts within one hour" + } + return type_map.get(alarm_type, f"Unknown ({alarm_type})") + + +def enhance_audit_data(data, audit_type): + """Enhance audit data with readable formats""" + if not data: + return data + + enhanced_data = [] + for item in data: + enhanced_item = item.copy() + + # Convert timestamps - replace original values + if 'created_at' in enhanced_item: + enhanced_item['created_at'] = format_timestamp(enhanced_item['created_at']) + if 'end_time' in enhanced_item: + enhanced_item['end_time'] = format_timestamp(enhanced_item['end_time']) + + # Add type-specific enhancements - replace original values + if audit_type == 'conn': + if 'conn_type' in enhanced_item: + enhanced_item['conn_type'] = get_connection_type_name(enhanced_item['conn_type']) + else: + enhanced_item['conn_type'] = "Not Logged In" + + elif audit_type == 'console': + if 'typ' in enhanced_item: + # Replace typ field with type and convert to readable name + enhanced_item['type'] = get_console_type_name(enhanced_item['typ']) + del enhanced_item['typ'] + if 'iop' in enhanced_item: + # Replace iop field with operation and convert to readable name + enhanced_item['operation'] = get_console_operation_name(enhanced_item['iop']) + del enhanced_item['iop'] + + elif audit_type == 'alarm' and 'typ' in enhanced_item: + # Replace typ field with type and convert to readable name + enhanced_item['type'] = get_alarm_type_name(enhanced_item['typ']) + del enhanced_item['typ'] + + enhanced_data.append(enhanced_item) + + return enhanced_data + + +def check_response(response): + """Check API response and return result""" + if response.status_code == 200: + try: + response_json = response.json() + return response_json + except ValueError: + return response.text or "Success" + else: + return "Failed", response.status_code, response.text + + +def view_audits_common(url, token, endpoint, filters=None, page_size=None, current=None, + created_at=None, days_ago=None, non_wildcard_fields=None): + """Common function for viewing audits""" + headers = {"Authorization": f"Bearer {token}"} + + # Set default page size and current page + if page_size is None: + page_size = 10 + if current is None: + current = 1 + + params = { + "pageSize": page_size, + "current": current + } + + # Add filter parameters if provided + if filters: + for key, value in filters.items(): + if value is not None: + params[key] = value + + # Handle time filters + if days_ago is not None: + # Calculate datetime from days ago + target_time = datetime.now() - timedelta(days=days_ago) + # Convert to UTC time string using system timezone + utc_timestamp = target_time.timestamp() + utc_dt = datetime.fromtimestamp(utc_timestamp, timezone.utc) + params["created_at"] = utc_dt.strftime("%Y-%m-%d %H:%M:%S.000") + elif created_at: + # Parse local time string and convert to UTC time string + utc_time_str = parse_local_time_to_utc_string(created_at) + if utc_time_str is not None: + params["created_at"] = utc_time_str + else: + # If parsing fails, pass the original value + params["created_at"] = created_at + + # Apply wildcard patterns for string fields (excluding specific fields) + if non_wildcard_fields is None: + non_wildcard_fields = set() + + # Always exclude these fields from wildcard treatment + non_wildcard_fields.update(["created_at", "pageSize", "current"]) + + string_params = {} + for k, v in params.items(): + if isinstance(v, str) and k not in non_wildcard_fields: + if v != "-" and "%" not in v: + string_params[k] = "%" + v + "%" + else: + string_params[k] = v + else: + string_params[k] = v + + response = requests.get(f"{url}/api/audits/{endpoint}", headers=headers, params=string_params) + response_json = response.json() + + # Enhance the data with readable formats + data = enhance_audit_data(response_json.get("data", []), endpoint) + + return { + "data": data, + "total": response_json.get("total", 0), + "current": current, + "pageSize": page_size + } + + +def view_conn_audits(url, token, remote=None, conn_type=None, + page_size=None, current=None, created_at=None, days_ago=None): + """View connection audits""" + filters = { + "remote": remote, + "conn_type": conn_type + } + non_wildcard_fields = {"conn_type"} + + return view_audits_common( + url, token, "conn", filters, page_size, current, created_at, days_ago, non_wildcard_fields + ) + + +def view_file_audits(url, token, remote=None, + page_size=None, current=None, created_at=None, days_ago=None): + """View file audits""" + filters = { + "remote": remote + } + non_wildcard_fields = set() + + return view_audits_common( + url, token, "file", filters, page_size, current, created_at, days_ago, non_wildcard_fields + ) + + +def view_alarm_audits(url, token, device=None, + page_size=None, current=None, created_at=None, days_ago=None): + """View alarm audits""" + filters = { + "device": device + } + non_wildcard_fields = set() + + return view_audits_common( + url, token, "alarm", filters, page_size, current, created_at, days_ago, non_wildcard_fields + ) + + +def view_console_audits(url, token, operator=None, + page_size=None, current=None, created_at=None, days_ago=None): + """View console audits""" + filters = { + "operator": operator + } + non_wildcard_fields = set() + + return view_audits_common( + url, token, "console", filters, page_size, current, created_at, days_ago, non_wildcard_fields + ) + + +def main(): + parser = argparse.ArgumentParser(description="Audits manager") + parser.add_argument( + "command", + choices=["view-conn", "view-file", "view-alarm", "view-console"], + help="Command to execute", + ) + parser.add_argument("--url", required=True, help="URL of the API") + parser.add_argument("--token", required=True, help="Bearer token for authentication") + + # Pagination parameters + parser.add_argument("--page-size", type=int, default=10, help="Number of records per page (default: 10)") + parser.add_argument("--current", type=int, default=1, help="Current page number (default: 1)") + + # Time filtering parameters + parser.add_argument("--created-at", help="Filter by creation time in local time (format: 2025-09-16 14:15:57 or 2025-09-16 14:15:57.000)") + parser.add_argument("--days-ago", type=int, help="Filter by days ago (e.g., 7 for last 7 days)") + + # Audit filters (simplified) + parser.add_argument("--remote", help="Remote peer ID filter (for conn/file audits)") + parser.add_argument("--device", help="Device ID filter (for alarm audits)") + parser.add_argument("--conn-type", type=int, help="Connection type filter (for conn audits only): 0=Remote Desktop, 1=File Transfer, 2=Port Transfer, 3=View Camera, 4=Terminal") + parser.add_argument("--operator", help="Operator filter (for console audits only)") + + args = parser.parse_args() + + # Remove trailing slashes from URL + while args.url.endswith("/"): + args.url = args.url[:-1] + + if args.command == "view-conn": + # View connection audits + result = view_conn_audits( + args.url, + args.token, + args.remote, + args.conn_type, + args.page_size, + args.current, + args.created_at, + args.days_ago + ) + print(json.dumps(result, indent=2)) + + elif args.command == "view-file": + # View file audits + result = view_file_audits( + args.url, + args.token, + args.remote, + args.page_size, + args.current, + args.created_at, + args.days_ago + ) + print(json.dumps(result, indent=2)) + + elif args.command == "view-alarm": + # View alarm audits + result = view_alarm_audits( + args.url, + args.token, + args.device, + args.page_size, + args.current, + args.created_at, + args.days_ago + ) + print(json.dumps(result, indent=2)) + + elif args.command == "view-console": + # View console audits + result = view_console_audits( + args.url, + args.token, + args.operator, + args.page_size, + args.current, + args.created_at, + args.days_ago + ) + print(json.dumps(result, indent=2)) + + +if __name__ == "__main__": + main()