-
-
Notifications
You must be signed in to change notification settings - Fork 419
Expand file tree
/
Copy pathwebhook.py
More file actions
executable file
·198 lines (157 loc) · 7.27 KB
/
Copy pathwebhook.py
File metadata and controls
executable file
·198 lines (157 loc) · 7.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python
import json
import subprocess
import os
import sys
import hashlib
import hmac
# Register NetAlertX directories
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
import conf # noqa: E402 [flake8 lint suppression]
from const import logPath, confFileName # noqa: E402 [flake8 lint suppression]
from plugin_helper import Plugin_Objects, handleEmpty # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value, write_file # noqa: E402 [flake8 lint suppression]
from models.notification_instance import NotificationInstance # noqa: E402 [flake8 lint suppression]
from database import DB # noqa: E402 [flake8 lint suppression]
from pytz import timezone # noqa: E402 [flake8 lint suppression]
# Make sure the TIMEZONE for logging is correct
conf.tz = timezone(get_setting_value('TIMEZONE'))
# Make sure log level is initialized correctly
Logger(get_setting_value('LOG_LEVEL'))
pluginName = 'WEBHOOK'
LOG_PATH = logPath + '/plugins'
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
def main():
mylog('verbose', [f'[{pluginName}](publisher) In script'])
# Check if basic config settings supplied
if check_config() is False:
mylog('none', [f'[{pluginName}] ⚠ ERROR: Publisher notification gateway not set up correctly. Check your {confFileName} {pluginName}_* variables.'])
return
# Create a database connection
db = DB() # instance of class DB
db.open()
# Initialize the Plugin obj output file
plugin_objects = Plugin_Objects(RESULT_FILE)
# Create a NotificationInstance instance
notifications = NotificationInstance(db)
# Retrieve new notifications
new_notifications = notifications.getNew()
# Process the new notifications (see the Notifications DB table for structure or check the /php/server/query_json.php?file=table_notifications.json endpoint)
for notification in new_notifications:
# Send notification
response_stdout, response_stderr = send(
notification["Text"],
notification["HTML"],
notification["JSON"]
)
# Log result
plugin_objects.add_object(
primaryId = pluginName,
secondaryId = timeNowUTC(),
watched1 = notification["GUID"],
watched2 = handleEmpty(response_stdout),
watched3 = handleEmpty(response_stderr),
watched4 = 'null',
extra = 'null',
foreignKey = notification["GUID"]
)
plugin_objects.write_result_file()
# -------------------------------------------------------------------------------
def check_config():
if get_setting_value('WEBHOOK_URL') == '':
return False
else:
return True
# -------------------------------------------------------------------------------
def send(text_data, html_data, json_data):
response_stderr = ''
response_stdout = ''
# limit = 1024 * 1024 # 1MB limit (1024 bytes * 1024 bytes = 1MB)
limit = get_setting_value('WEBHOOK_SIZE')
payloadType = get_setting_value('WEBHOOK_PAYLOAD')
endpointUrl = get_setting_value('WEBHOOK_URL')
secret = get_setting_value('WEBHOOK_SECRET')
requestMethod = get_setting_value('WEBHOOK_REQUEST_METHOD')
# use data type based on specified payload type
if payloadType == 'json':
# In this code, the truncate_json function is used to recursively traverse the JSON object
# and remove nodes that exceed the size limit. It checks the size of each node's JSON representation
# using json.dumps and includes only the nodes that are within the limit.
json_str = json.dumps(json_data)
if len(json_str) <= limit:
payloadData = json_data
else:
def truncate_json(obj):
if isinstance(obj, dict):
return {
key: truncate_json(value)
for key, value in obj.items()
if len(json.dumps(value)) <= limit
}
elif isinstance(obj, list):
return [
truncate_json(item)
for item in obj
if len(json.dumps(item)) <= limit
]
else:
return obj
payloadData = truncate_json(json_data)
if payloadType == 'html':
if len(html_data) > limit:
payloadData = html_data[:limit] + " <h1>(text was truncated)</h1>"
else:
payloadData = html_data
if payloadType == 'text':
if len(text_data) > limit:
payloadData = text_data[:limit] + " (text was truncated)"
else:
payloadData = text_data
# Define slack-compatible payload
if payloadType == 'text':
_json_payload = {"text": payloadData}
else:
_json_payload = {
"username": "NetAlertX",
"text": "There are new notifications",
"attachments": [{
"title": "NetAlertX Notifications",
"title_link": get_setting_value('REPORT_DASHBOARD_URL'),
"text": payloadData
}]
}
# DEBUG - Write the json payload into a log file for debugging
write_file(logPath + '/webhook_payload.json', json.dumps(_json_payload))
# Using the Slack-Compatible Webhook endpoint for Discord so that the same payload can be used for both
# Consider: curl has the ability to load in data to POST from a file + piping
if (endpointUrl.startswith('https://discord.com/api/webhooks/') and not endpointUrl.endswith("/slack")):
_WEBHOOK_URL = f"{endpointUrl}/slack"
curlParams = ["curl", "-i", "-H", "Content-Type:application/json", "-d", json.dumps(_json_payload), _WEBHOOK_URL]
else:
_WEBHOOK_URL = endpointUrl
curlParams = ["curl", "-i", "-X", requestMethod , "-H", "Content-Type:application/json", "-d", json.dumps(_json_payload), _WEBHOOK_URL]
# Add HMAC signature if configured
if (secret != ''):
h = hmac.new(secret.encode("UTF-8"), json.dumps(_json_payload, separators=(',', ':')).encode(), hashlib.sha256).hexdigest()
curlParams.insert(4, "-H")
curlParams.insert(5, f"X-Webhook-Signature: sha256={h}")
try:
# Execute CURL call
mylog('debug', [f'[{pluginName}] curlParams: ', curlParams])
result = subprocess.run(curlParams, capture_output=True, text=True)
response_stderr = result.stderr
response_stdout = result.stdout
# Write stdout and stderr into .log files for debugging if needed
mylog('debug', [f'[{pluginName}] stdout: ', response_stdout])
mylog('debug', [f'[{pluginName}] stderr: ', response_stderr])
except subprocess.CalledProcessError as e:
# An error occurred, handle it
mylog('none', [f'[{pluginName}] ⚠ ERROR: ', e.output])
response_stderr = e.output
return response_stdout, response_stderr
# -------------------------------------------------------
if __name__ == '__main__':
sys.exit(main())