add ab.py and audits.py (#12989)

Signed-off-by: 21pages <sunboeasy@gmail.com>
This commit is contained in:
21pages
2025-09-23 17:13:13 +08:00
committed by GitHub
parent eacb07988d
commit d1159764f6
2 changed files with 1141 additions and 0 deletions

771
res/ab.py Normal file
View File

@@ -0,0 +1,771 @@
#!/usr/bin/env python3
import requests
import argparse
import json
from datetime import datetime, timedelta
def get_personal_ab(url, token):
"""Get personal address book GUID"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(f"{url}/api/ab/personal", headers=headers)
if response.status_code != 200:
return f"Error: {response.status_code} - {response.text}"
return response.json()
def view_shared_abs(url, token, name=None):
"""View all shared address books (excluding personal ones)"""
headers = {"Authorization": f"Bearer {token}"}
pageSize = 30
params = {
"name": name,
}
filtered_params = {
k: "%" + v + "%" if (v != "-" and "%" not in v and k != "name") else v
for k, v in params.items()
if v is not None
}
filtered_params["pageSize"] = pageSize
abs = []
current = 1
while True:
filtered_params["current"] = current
response = requests.get(f"{url}/api/ab/shared/profiles", headers=headers, params=filtered_params)
response_json = response.json()
data = response_json.get("data", [])
abs.extend(data)
total = response_json.get("total", 0)
current += pageSize
if len(data) < pageSize or current > total:
break
return abs
def get_ab_by_name(url, token, ab_name):
"""Get address book by name"""
abs = view_shared_abs(url, token, ab_name)
for ab in abs:
if ab["name"] == ab_name:
return ab
return None
def view_ab_peers(url, token, ab_guid, peer_id=None, alias=None):
"""View peers in an address book"""
headers = {"Authorization": f"Bearer {token}"}
pageSize = 30
params = {
"ab": ab_guid,
"id": peer_id,
"alias": alias,
}
filtered_params = {
k: "%" + v + "%" if (v != "-" and "%" not in v and k not in ["ab"]) else v
for k, v in params.items()
if v is not None
}
filtered_params["pageSize"] = pageSize
peers = []
current = 1
while True:
filtered_params["current"] = current
response = requests.get(f"{url}/api/ab/peers", headers=headers, params=filtered_params)
response_json = response.json()
data = response_json.get("data", [])
peers.extend(data)
total = response_json.get("total", 0)
current += pageSize
if len(data) < pageSize or current > total:
break
return peers
def view_ab_tags(url, token, ab_guid):
"""View tags in an address book"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(f"{url}/api/ab/tags/{ab_guid}", headers=headers)
response_json = check_response(response)
# Handle error responses
if isinstance(response_json, tuple) and response_json[0] == "Failed":
print(f"Error: {response_json[1]} - {response_json[2]}")
return []
# Format color values as hex
if response_json:
for tag in response_json:
if "color" in tag and tag["color"] is not None:
# Convert color to hex format
color_value = tag["color"]
if isinstance(color_value, int):
tag["color"] = f"0x{color_value:08X}"
return response_json if response_json else []
def check_response(response):
"""Check API response and return result"""
if response.status_code == 200:
try:
response_json = response.json()
return response_json
except ValueError:
return response.text or "Success"
else:
return "Failed", response.status_code, response.text
def add_peer(url, token, ab_guid, peer_id, alias=None, note=None, tags=None, password=None):
"""Add a peer to address book"""
print(f"Adding peer {peer_id} to address book")
headers = {"Authorization": f"Bearer {token}"}
payload = {
"id": peer_id,
"note": note,
}
# Add peer info if provided
info = {}
if alias:
info["alias"] = alias
if tags:
info["tags"] = tags if isinstance(tags, list) else [tags]
if password:
info["password"] = password
if info:
payload.update(info)
response = requests.post(f"{url}/api/ab/peer/add/{ab_guid}", headers=headers, json=payload)
return check_response(response)
def delete_peer(url, token, ab_guid, peer_ids):
"""Delete peers from address book by IDs"""
if isinstance(peer_ids, str):
peer_ids = [peer_ids]
print(f"Deleting peers {peer_ids} from address book")
headers = {"Authorization": f"Bearer {token}"}
response = requests.delete(f"{url}/api/ab/peer/{ab_guid}", headers=headers, json=peer_ids)
return check_response(response)
def update_peer(url, token, ab_guid, peer_id, alias=None, note=None, tags=None, password=None):
"""Update a peer in address book"""
print(f"Updating peer {peer_id} in address book")
headers = {"Authorization": f"Bearer {token}"}
# Check if at least one parameter is provided for update
update_params = [alias, note, tags, password]
if all(param is None for param in update_params):
return "Error: At least one parameter must be specified for update"
payload = {
"id": peer_id,
}
# Add fields to update
info = {}
if alias is not None:
info["alias"] = alias
if tags is not None:
info["tags"] = tags if isinstance(tags, list) else [tags]
if password is not None:
info["password"] = password
if info:
payload.update(info)
if note is not None:
payload["note"] = note
response = requests.put(f"{url}/api/ab/peer/update/{ab_guid}", headers=headers, json=payload)
return check_response(response)
def str2color(tag_name, existing_colors=None):
"""Generate color for tag name similar to str2color2 function"""
if existing_colors is None:
existing_colors = []
color_map = {
"red": 0xFFFF0000,
"green": 0xFF008000,
"blue": 0xFF0000FF,
"orange": 0xFFFF9800,
"purple": 0xFF9C27B0,
"grey": 0xFF9E9E9E,
"cyan": 0xFF00BCD4,
"lime": 0xFFCDDC39,
"teal": 0xFF009688,
"pink": 0xFFF48FB1,
"indigo": 0xFF3F51B5,
"brown": 0xFF795548,
}
lower_name = tag_name.lower()
# Check if tag name matches a predefined color
if lower_name in color_map:
return color_map[lower_name]
# Special case for yellow
if lower_name == "yellow":
return 0xFFFFFF00
# Generate hash-based color
hash_value = 0
for char in tag_name:
hash_value += ord(char)
color_list = list(color_map.values())
hash_value = hash_value % len(color_list)
result = color_list[hash_value]
# If color is already used, try to find an unused one
if result in existing_colors:
for color in color_list:
if color not in existing_colors:
result = color
break
return result
def add_tag(url, token, ab_guid, tag_name, color=None):
"""Add a tag to address book"""
print(f"Adding tag '{tag_name}' to address book")
headers = {"Authorization": f"Bearer {token}"}
# If no color specified, generate one based on tag name
if color is None:
# Get existing tags to avoid color conflicts
try:
existing_tags = view_ab_tags(url, token, ab_guid)
existing_colors = [tag.get("color", 0) for tag in existing_tags]
color = str2color(tag_name, existing_colors)
except:
# Fallback to default color if we can't get existing tags
color = str2color(tag_name)
payload = {
"name": tag_name,
"color": color,
}
response = requests.post(f"{url}/api/ab/tag/add/{ab_guid}", headers=headers, json=payload)
return check_response(response)
def update_tag(url, token, ab_guid, tag_name, color):
"""Update a tag in address book"""
print(f"Updating tag '{tag_name}' in address book")
headers = {"Authorization": f"Bearer {token}"}
payload = {
"name": tag_name,
"color": color,
}
response = requests.put(f"{url}/api/ab/tag/update/{ab_guid}", headers=headers, json=payload)
return check_response(response)
def delete_tags(url, token, ab_guid, tag_names):
"""Delete tags from address book"""
if isinstance(tag_names, str):
tag_names = [tag_names]
print(f"Deleting tags {tag_names} from address book")
headers = {"Authorization": f"Bearer {token}"}
response = requests.delete(f"{url}/api/ab/tag/{ab_guid}", headers=headers, json=tag_names)
return check_response(response)
def add_shared_ab(url, token, name, note=None, password=None):
"""Add a new shared address book"""
print(f"Adding shared address book '{name}'")
headers = {"Authorization": f"Bearer {token}"}
payload = {
"name": name,
"note": note,
}
# Add info if password is provided
if password:
payload["info"] = {
"password": password
}
response = requests.post(f"{url}/api/ab/shared/add", headers=headers, json=payload)
return check_response(response)
def update_shared_ab(url, token, ab_guid, name=None, note=None, owner=None, password=None):
"""Update a shared address book"""
print(f"Updating shared address book {ab_guid}")
headers = {"Authorization": f"Bearer {token}"}
# Check if at least one parameter is provided for update
update_params = [name, note, owner, password]
if all(param is None for param in update_params):
return "Error: At least one parameter must be specified for update"
payload = {
"guid": ab_guid,
}
if name is not None:
payload["name"] = name
if note is not None:
payload["note"] = note
if owner is not None:
payload["owner"] = owner
if password is not None:
payload["info"] = {
"password": password
}
response = requests.put(f"{url}/api/ab/shared/update/profile", headers=headers, json=payload)
return check_response(response)
def delete_shared_abs(url, token, ab_guids):
"""Delete shared address books"""
if isinstance(ab_guids, str):
ab_guids = [ab_guids]
print(f"Deleting shared address books {ab_guids}")
headers = {"Authorization": f"Bearer {token}"}
response = requests.delete(f"{url}/api/ab/shared", headers=headers, json=ab_guids)
return check_response(response)
def permission_to_string(permission):
"""Convert numeric permission to string representation"""
permission_map = {
1: "ro", # Read
2: "rw", # ReadWrite
3: "full" # FullControl
}
return permission_map.get(permission, str(permission))
def string_to_permission(permission_str):
"""Convert string permission to numeric representation"""
permission_map = {
"ro": 1, # Read
"rw": 2, # ReadWrite
"full": 3 # FullControl
}
return permission_map.get(permission_str.lower(), None)
def view_ab_rules(url, token, ab_guid):
"""View rules in an address book"""
headers = {"Authorization": f"Bearer {token}"}
pageSize = 30
params = {
"ab": ab_guid,
"pageSize": pageSize,
}
rules = []
current = 1
while True:
params["current"] = current
response = requests.get(f"{url}/api/ab/rules", headers=headers, params=params)
response_json = response.json()
data = response_json.get("data", [])
rules.extend(data)
total = response_json.get("total", 0)
current += pageSize
if len(data) < pageSize or current > total:
break
# Convert numeric permissions to string format
for rule in rules:
if "rule" in rule:
rule["rule"] = permission_to_string(rule["rule"])
return rules
def add_ab_rule(url, token, ab_guid, rule_type, user=None, group=None, rule=1):
"""Add a rule to address book"""
print(f"Adding {rule_type} rule to address book")
headers = {"Authorization": f"Bearer {token}"}
payload = {
"guid": ab_guid,
"rule": rule,
}
if rule_type == "user" and user:
payload["user"] = user
elif rule_type == "group" and group:
payload["group"] = group
elif rule_type == "everyone":
# For everyone, both user and group are None (not included in payload)
pass
response = requests.post(f"{url}/api/ab/rule", headers=headers, json=payload)
return check_response(response)
def update_ab_rule(url, token, rule_guid, rule):
"""Update an address book rule"""
print(f"Updating rule {rule_guid}")
headers = {"Authorization": f"Bearer {token}"}
payload = {
"guid": rule_guid,
"rule": rule,
}
response = requests.patch(f"{url}/api/ab/rule", headers=headers, json=payload)
return check_response(response)
def delete_ab_rules(url, token, rule_guids):
"""Delete address book rules"""
if isinstance(rule_guids, str):
rule_guids = [rule_guids]
print(f"Deleting rules {rule_guids}")
headers = {"Authorization": f"Bearer {token}"}
response = requests.delete(f"{url}/api/ab/rules", headers=headers, json=rule_guids)
return check_response(response)
def main():
def parse_color(value):
"""Parse color value - supports both hex (0xFF00FF00) and decimal"""
if value.startswith('0x') or value.startswith('0X'):
return int(value, 16)
else:
return int(value)
def parse_permission(value):
"""Parse permission value - supports both string (ro/rw/full) and numeric (1/2/3)"""
# Try to parse as string first
permission_num = string_to_permission(value)
if permission_num is not None:
return permission_num
# Try to parse as integer for backward compatibility
try:
num_value = int(value)
if num_value in [1, 2, 3]:
return num_value
else:
raise argparse.ArgumentTypeError(f"Invalid permission value: {value}. Must be one of: ro, rw, full, 1, 2, 3")
except ValueError:
raise argparse.ArgumentTypeError(f"Invalid permission value: {value}. Must be one of: ro, rw, full, 1, 2, 3")
parser = argparse.ArgumentParser(description="Address Book manager")
# Required arguments
parser.add_argument(
"command",
choices=["view-ab", "add-ab", "update-ab", "delete-ab", "get-personal-ab",
"view-peer", "add-peer", "update-peer", "delete-peer",
"view-tag", "add-tag", "update-tag", "delete-tag",
"view-rule", "add-rule", "update-rule", "delete-rule"],
help="Command to execute",
)
# Global arguments (used by all commands)
parser.add_argument("--url", required=True, help="URL of the API")
parser.add_argument("--token", required=True, help="Bearer token for authentication")
# Address book identification (used by most commands except get-personal-ab)
parser.add_argument("--ab-name", help="Address book name (for identification)")
parser.add_argument("--ab-guid", help="Address book GUID (alternative to ab-name)")
# Address book management arguments
parser.add_argument("--ab-update-name", help="New address book name (for update)")
parser.add_argument("--note", help="Note field")
parser.add_argument("--password", help="Password field")
parser.add_argument("--owner", help="Address book owner (username)")
# Peer management arguments
parser.add_argument("--peer-id", help="Peer ID")
parser.add_argument("--alias", help="Peer alias")
parser.add_argument("--tags", help="Peer tags (supports both 'tag1,tag2' and '[tag1,tag2]' formats, use '[]' to clear tags)")
# Tag management arguments
parser.add_argument("--tag-name", help="Tag name")
parser.add_argument("--tag-color", type=parse_color, help="Tag color (hex number like 0xFF00FF00 or decimal, auto-generated if not specified)")
# Rule management arguments
parser.add_argument("--rule-type", choices=["user", "group", "everyone"], help="Rule type (auto-detected if not specified)")
parser.add_argument("--rule-user", help="Rule target user name (auto-sets rule-type=user)")
parser.add_argument("--rule-group", help="Rule target group name (auto-sets rule-type=group)")
parser.add_argument("--rule-permission", type=parse_permission, help="Rule permission (ro=Read, rw=ReadWrite, full=FullControl, or numeric 1/2/3)")
parser.add_argument("--rule-guid", help="Rule GUID (for update/delete)")
args = parser.parse_args()
# Remove trailing slashes from URL
while args.url.endswith("/"):
args.url = args.url[:-1]
if args.command == "view-ab":
# View all shared address books
abs = view_shared_abs(args.url, args.token, args.ab_name)
print(json.dumps(abs, indent=2))
elif args.command == "get-personal-ab":
# Get personal address book GUID
personal_ab = get_personal_ab(args.url, args.token)
print(json.dumps(personal_ab, indent=2))
elif args.command in ["add-ab", "update-ab", "delete-ab"]:
# Address book management commands
if args.command == "add-ab":
if not args.ab_name:
print("Error: --ab-name is required for add-ab command")
return
result = add_shared_ab(args.url, args.token, args.ab_name, args.note, args.password)
print(f"Result: {result}")
elif args.command in ["update-ab", "delete-ab"]:
# Commands that need ab-name or ab-guid
if not args.ab_name and not args.ab_guid:
print("Error: --ab-name or --ab-guid is required for this command")
return
if args.ab_name and args.ab_guid:
print("Error: Cannot specify both --ab-name and --ab-guid")
return
if args.ab_guid:
ab_guid = args.ab_guid
print(f"Working with address book GUID: {ab_guid}")
else:
# Get address book by name
ab = get_ab_by_name(args.url, args.token, args.ab_name)
if not ab:
print(f"Error: Address book '{args.ab_name}' not found")
return
ab_guid = ab["guid"]
print(f"Working with address book: {args.ab_name} (GUID: {ab_guid})")
if args.command == "update-ab":
result = update_shared_ab(args.url, args.token, ab_guid, args.ab_update_name, args.note, args.owner, args.password)
print(f"Result: {result}")
elif args.command == "delete-ab":
result = delete_shared_abs(args.url, args.token, ab_guid)
print(f"Result: {result}")
elif args.command in ["view-peer", "add-peer", "update-peer", "delete-peer", "view-tag", "add-tag", "update-tag", "delete-tag", "view-rule", "add-rule", "update-rule", "delete-rule"]:
if not args.ab_name and not args.ab_guid:
print("Error: --ab-name or --ab-guid is required for this command")
return
if args.ab_name and args.ab_guid:
print("Error: Cannot specify both --ab-name and --ab-guid")
return
if args.ab_guid:
ab_guid = args.ab_guid
print(f"Working with address book GUID: {ab_guid}")
else:
# Get address book by name
ab = get_ab_by_name(args.url, args.token, args.ab_name)
if not ab:
print(f"Error: Address book '{args.ab_name}' not found")
return
ab_guid = ab["guid"]
print(f"Working with address book: {args.ab_name} (GUID: {ab_guid})")
if args.command == "view-peer":
peers = view_ab_peers(args.url, args.token, ab_guid, args.peer_id, args.alias)
print(json.dumps(peers, indent=2))
elif args.command == "add-peer":
if not args.peer_id:
print("Error: --peer-id is required for add-peer command")
return
# Handle tags parsing - support both [tag1,tag2] and tag1,tag2 formats
tags = None
if args.tags is not None:
if args.tags == "[]":
tags = [] # Empty list to clear tags
else:
# Remove brackets if present and split by comma
tags_str = args.tags.strip()
if tags_str.startswith('[') and tags_str.endswith(']'):
tags_str = tags_str[1:-1] # Remove brackets
tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()]
result = add_peer(
args.url,
args.token,
ab_guid,
args.peer_id,
args.alias,
args.note,
tags,
args.password
)
print(f"Result: {result}")
elif args.command == "update-peer":
if not args.peer_id:
print("Error: --peer-id is required for update-peer command")
return
# Handle tags parsing - support both [tag1,tag2] and tag1,tag2 formats
tags = None
if args.tags is not None:
if args.tags == "[]":
tags = [] # Empty list to clear tags
else:
# Remove brackets if present and split by comma
tags_str = args.tags.strip()
if tags_str.startswith('[') and tags_str.endswith(']'):
tags_str = tags_str[1:-1] # Remove brackets
tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()]
result = update_peer(
args.url,
args.token,
ab_guid,
args.peer_id,
args.alias,
args.note,
tags,
args.password
)
print(f"Result: {result}")
elif args.command == "delete-peer":
if not args.peer_id:
print("Error: --peer-id is required for delete-peer command")
return
result = delete_peer(args.url, args.token, ab_guid, args.peer_id)
print(f"Result: {result}")
elif args.command == "view-tag":
tags = view_ab_tags(args.url, args.token, ab_guid)
print(json.dumps(tags, indent=2))
elif args.command == "add-tag":
if not args.tag_name:
print("Error: --tag-name is required for add-tag command")
return
result = add_tag(args.url, args.token, ab_guid, args.tag_name, args.tag_color)
print(f"Result: {result}")
elif args.command == "update-tag":
if not args.tag_name:
print("Error: --tag-name is required for update-tag command")
return
result = update_tag(args.url, args.token, ab_guid, args.tag_name, args.tag_color)
print(f"Result: {result}")
elif args.command == "delete-tag":
if not args.tag_name:
print("Error: --tag-name is required for delete-tag command")
return
result = delete_tags(args.url, args.token, ab_guid, args.tag_name)
print(f"Result: {result}")
elif args.command == "view-rule":
rules = view_ab_rules(args.url, args.token, ab_guid)
print(json.dumps(rules, indent=2))
elif args.command == "add-rule":
if not args.rule_permission:
print("Error: --rule-permission is required for add-rule command")
return
# Auto-detect rule type if not explicitly specified
if not args.rule_type:
if args.rule_user and args.rule_group:
print("Error: Cannot specify both --rule-user and --rule-group")
return
elif args.rule_user:
rule_type = "user"
elif args.rule_group:
rule_type = "group"
else:
print("Error: Must specify --rule-type=everyone, --rule-user, or --rule-group")
return
else:
rule_type = args.rule_type
# Validate explicit rule type with parameters
if rule_type == "user" and not args.rule_user:
print("Error: --rule-user is required when rule-type=user")
return
elif rule_type == "group" and not args.rule_group:
print("Error: --rule-group is required when rule-type=group")
return
elif rule_type == "user" and args.rule_group:
print("Error: Cannot specify --rule-group when rule-type=user")
return
elif rule_type == "group" and args.rule_user:
print("Error: Cannot specify --rule-user when rule-type=group")
return
elif rule_type == "everyone" and (args.rule_user or args.rule_group):
print("Error: Cannot specify --rule-user or --rule-group when rule-type=everyone")
return
result = add_ab_rule(args.url, args.token, ab_guid, rule_type, args.rule_user, args.rule_group, args.rule_permission)
print(f"Result: {result}")
elif args.command == "update-rule":
if not args.rule_guid:
print("Error: --rule-guid is required for update-rule command")
return
if not args.rule_permission:
print("Error: --rule-permission is required for update-rule command")
return
result = update_ab_rule(args.url, args.token, args.rule_guid, args.rule_permission)
print(f"Result: {result}")
elif args.command == "delete-rule":
if not args.rule_guid:
print("Error: --rule-guid is required for delete-rule command")
return
result = delete_ab_rules(args.url, args.token, args.rule_guid)
print(f"Result: {result}")
if __name__ == "__main__":
main()

370
res/audits.py Normal file
View File

@@ -0,0 +1,370 @@
#!/usr/bin/env python3
import requests
import argparse
import json
from datetime import datetime, timedelta, timezone
def format_timestamp(timestamp):
"""Convert Unix timestamp to readable local datetime"""
if timestamp is None:
return None
try:
# Convert to local time
local_dt = datetime.fromtimestamp(timestamp)
return local_dt.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
return timestamp
def parse_local_time_to_utc_string(time_str):
"""Parse local time string to UTC time string for API filtering"""
try:
# Parse the local time string
local_dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S.%f")
# Make the datetime object timezone-aware using system's local timezone
local_dt = local_dt.replace(tzinfo=datetime.now().astimezone().tzinfo)
utc_dt = local_dt.astimezone(timezone.utc)
return utc_dt.strftime("%Y-%m-%d %H:%M:%S.000")
except ValueError:
try:
# Try without microseconds
local_dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
# Make the datetime object timezone-aware using system's local timezone
local_dt = local_dt.replace(tzinfo=datetime.now().astimezone().tzinfo)
utc_dt = local_dt.astimezone(timezone.utc)
return utc_dt.strftime("%Y-%m-%d %H:%M:%S.000")
except ValueError:
return None
def get_connection_type_name(conn_type):
"""Convert connection type number to readable name"""
type_map = {
0: "Remote Desktop",
1: "File Transfer",
2: "Port Transfer",
3: "View Camera",
4: "Terminal"
}
return type_map.get(conn_type, f"Unknown ({conn_type})")
def get_console_type_name(console_type):
"""Convert console audit type number to readable name"""
type_map = {
0: "Group Management",
1: "User Management",
2: "Device Management",
3: "Address Book Management"
}
return type_map.get(console_type, f"Unknown ({console_type})")
def get_console_operation_name(operation_code):
"""Convert console operation code to readable name"""
operation_map = {
0: "User Login",
1: "Add Group",
2: "Add User",
3: "Add Device",
4: "Delete Groups",
5: "Disconnect Device",
6: "Enable Users",
7: "Disable Users",
8: "Enable Devices",
9: "Disable Devices",
10: "Update Group",
11: "Update User",
12: "Update Device",
13: "Delete User",
14: "Delete Device",
15: "Add Address Book",
16: "Delete Address Book",
17: "Change Address Book Name",
18: "Delete Devices in the Address Book Recycle Bin",
19: "Empty Address Book Recycle Bin",
20: "Add Address Book Permission",
21: "Delete Address Book Permission",
22: "Update Address Book Permission"
}
return operation_map.get(operation_code, f"Unknown ({operation_code})")
def get_alarm_type_name(alarm_type):
"""Convert alarm type number to readable name"""
type_map = {
0: "Access attempt outside the IP whiltelist",
1: "Over 30 consecutive access attempts",
2: "Multiple access attempts within one minute",
3: "Over 30 consecutive login attempts",
4: "Multiple login attempts within one minute",
5: "Multiple login attempts within one hour"
}
return type_map.get(alarm_type, f"Unknown ({alarm_type})")
def enhance_audit_data(data, audit_type):
"""Enhance audit data with readable formats"""
if not data:
return data
enhanced_data = []
for item in data:
enhanced_item = item.copy()
# Convert timestamps - replace original values
if 'created_at' in enhanced_item:
enhanced_item['created_at'] = format_timestamp(enhanced_item['created_at'])
if 'end_time' in enhanced_item:
enhanced_item['end_time'] = format_timestamp(enhanced_item['end_time'])
# Add type-specific enhancements - replace original values
if audit_type == 'conn':
if 'conn_type' in enhanced_item:
enhanced_item['conn_type'] = get_connection_type_name(enhanced_item['conn_type'])
else:
enhanced_item['conn_type'] = "Not Logged In"
elif audit_type == 'console':
if 'typ' in enhanced_item:
# Replace typ field with type and convert to readable name
enhanced_item['type'] = get_console_type_name(enhanced_item['typ'])
del enhanced_item['typ']
if 'iop' in enhanced_item:
# Replace iop field with operation and convert to readable name
enhanced_item['operation'] = get_console_operation_name(enhanced_item['iop'])
del enhanced_item['iop']
elif audit_type == 'alarm' and 'typ' in enhanced_item:
# Replace typ field with type and convert to readable name
enhanced_item['type'] = get_alarm_type_name(enhanced_item['typ'])
del enhanced_item['typ']
enhanced_data.append(enhanced_item)
return enhanced_data
def check_response(response):
"""Check API response and return result"""
if response.status_code == 200:
try:
response_json = response.json()
return response_json
except ValueError:
return response.text or "Success"
else:
return "Failed", response.status_code, response.text
def view_audits_common(url, token, endpoint, filters=None, page_size=None, current=None,
created_at=None, days_ago=None, non_wildcard_fields=None):
"""Common function for viewing audits"""
headers = {"Authorization": f"Bearer {token}"}
# Set default page size and current page
if page_size is None:
page_size = 10
if current is None:
current = 1
params = {
"pageSize": page_size,
"current": current
}
# Add filter parameters if provided
if filters:
for key, value in filters.items():
if value is not None:
params[key] = value
# Handle time filters
if days_ago is not None:
# Calculate datetime from days ago
target_time = datetime.now() - timedelta(days=days_ago)
# Convert to UTC time string using system timezone
utc_timestamp = target_time.timestamp()
utc_dt = datetime.fromtimestamp(utc_timestamp, timezone.utc)
params["created_at"] = utc_dt.strftime("%Y-%m-%d %H:%M:%S.000")
elif created_at:
# Parse local time string and convert to UTC time string
utc_time_str = parse_local_time_to_utc_string(created_at)
if utc_time_str is not None:
params["created_at"] = utc_time_str
else:
# If parsing fails, pass the original value
params["created_at"] = created_at
# Apply wildcard patterns for string fields (excluding specific fields)
if non_wildcard_fields is None:
non_wildcard_fields = set()
# Always exclude these fields from wildcard treatment
non_wildcard_fields.update(["created_at", "pageSize", "current"])
string_params = {}
for k, v in params.items():
if isinstance(v, str) and k not in non_wildcard_fields:
if v != "-" and "%" not in v:
string_params[k] = "%" + v + "%"
else:
string_params[k] = v
else:
string_params[k] = v
response = requests.get(f"{url}/api/audits/{endpoint}", headers=headers, params=string_params)
response_json = response.json()
# Enhance the data with readable formats
data = enhance_audit_data(response_json.get("data", []), endpoint)
return {
"data": data,
"total": response_json.get("total", 0),
"current": current,
"pageSize": page_size
}
def view_conn_audits(url, token, remote=None, conn_type=None,
page_size=None, current=None, created_at=None, days_ago=None):
"""View connection audits"""
filters = {
"remote": remote,
"conn_type": conn_type
}
non_wildcard_fields = {"conn_type"}
return view_audits_common(
url, token, "conn", filters, page_size, current, created_at, days_ago, non_wildcard_fields
)
def view_file_audits(url, token, remote=None,
page_size=None, current=None, created_at=None, days_ago=None):
"""View file audits"""
filters = {
"remote": remote
}
non_wildcard_fields = set()
return view_audits_common(
url, token, "file", filters, page_size, current, created_at, days_ago, non_wildcard_fields
)
def view_alarm_audits(url, token, device=None,
page_size=None, current=None, created_at=None, days_ago=None):
"""View alarm audits"""
filters = {
"device": device
}
non_wildcard_fields = set()
return view_audits_common(
url, token, "alarm", filters, page_size, current, created_at, days_ago, non_wildcard_fields
)
def view_console_audits(url, token, operator=None,
page_size=None, current=None, created_at=None, days_ago=None):
"""View console audits"""
filters = {
"operator": operator
}
non_wildcard_fields = set()
return view_audits_common(
url, token, "console", filters, page_size, current, created_at, days_ago, non_wildcard_fields
)
def main():
parser = argparse.ArgumentParser(description="Audits manager")
parser.add_argument(
"command",
choices=["view-conn", "view-file", "view-alarm", "view-console"],
help="Command to execute",
)
parser.add_argument("--url", required=True, help="URL of the API")
parser.add_argument("--token", required=True, help="Bearer token for authentication")
# Pagination parameters
parser.add_argument("--page-size", type=int, default=10, help="Number of records per page (default: 10)")
parser.add_argument("--current", type=int, default=1, help="Current page number (default: 1)")
# Time filtering parameters
parser.add_argument("--created-at", help="Filter by creation time in local time (format: 2025-09-16 14:15:57 or 2025-09-16 14:15:57.000)")
parser.add_argument("--days-ago", type=int, help="Filter by days ago (e.g., 7 for last 7 days)")
# Audit filters (simplified)
parser.add_argument("--remote", help="Remote peer ID filter (for conn/file audits)")
parser.add_argument("--device", help="Device ID filter (for alarm audits)")
parser.add_argument("--conn-type", type=int, help="Connection type filter (for conn audits only): 0=Remote Desktop, 1=File Transfer, 2=Port Transfer, 3=View Camera, 4=Terminal")
parser.add_argument("--operator", help="Operator filter (for console audits only)")
args = parser.parse_args()
# Remove trailing slashes from URL
while args.url.endswith("/"):
args.url = args.url[:-1]
if args.command == "view-conn":
# View connection audits
result = view_conn_audits(
args.url,
args.token,
args.remote,
args.conn_type,
args.page_size,
args.current,
args.created_at,
args.days_ago
)
print(json.dumps(result, indent=2))
elif args.command == "view-file":
# View file audits
result = view_file_audits(
args.url,
args.token,
args.remote,
args.page_size,
args.current,
args.created_at,
args.days_ago
)
print(json.dumps(result, indent=2))
elif args.command == "view-alarm":
# View alarm audits
result = view_alarm_audits(
args.url,
args.token,
args.device,
args.page_size,
args.current,
args.created_at,
args.days_ago
)
print(json.dumps(result, indent=2))
elif args.command == "view-console":
# View console audits
result = view_console_audits(
args.url,
args.token,
args.operator,
args.page_size,
args.current,
args.created_at,
args.days_ago
)
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()