-
-
Notifications
You must be signed in to change notification settings - Fork 419
Expand file tree
/
Copy pathpihole_api_scan.py
More file actions
335 lines (265 loc) · 11.9 KB
/
Copy pathpihole_api_scan.py
File metadata and controls
335 lines (265 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
#!/usr/bin/env python
"""
NetAlertX plugin: PIHOLEAPI
Imports devices from Pi-hole v6 API (Network endpoints) into NetAlertX plugin results.
"""
import os
import sys
import requests
import json
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# --- NetAlertX plugin bootstrap (match example) ---
INSTALL_PATH = os.getenv('NETALERTX_APP', '/app')
sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"])
pluginName = 'PIHOLEAPI'
from plugin_helper import Plugin_Objects, is_mac # noqa: E402 [flake8 lint suppression]
from utils.datetime_utils import timeNowUTC # noqa: E402 [flake8 lint suppression]
from logger import mylog, Logger # noqa: E402 [flake8 lint suppression]
from helper import get_setting_value # noqa: E402 [flake8 lint suppression]
from const import logPath # noqa: E402 [flake8 lint suppression]
import conf # noqa: E402 [flake8 lint suppression]
from pytz import timezone # noqa: E402 [flake8 lint suppression]
from utils.crypto_utils import string_to_fake_mac # noqa: E402 [flake8 lint suppression]
# Setup timezone & logger using standard NAX helpers
conf.tz = timezone(get_setting_value('TIMEZONE'))
Logger(get_setting_value('LOG_LEVEL'))
LOG_PATH = logPath + '/plugins'
RESULT_FILE = os.path.join(LOG_PATH, f'last_result.{pluginName}.log')
plugin_objects = Plugin_Objects(RESULT_FILE)
# --- Global state for session ---
PIHOLEAPI_URL = None
PIHOLEAPI_PASSWORD = None
PIHOLEAPI_SES_VALID = False
PIHOLEAPI_SES_SID = None
PIHOLEAPI_SES_CSRF = None
PIHOLEAPI_API_MAXCLIENTS = None
PIHOLEAPI_VERIFY_SSL = True
PIHOLEAPI_GET_OFFLINE = False
PIHOLEAPI_RUN_TIMEOUT = 10
PIHOLEAPI_FAKE_MAC = get_setting_value('PIHOLEAPI_FAKE_MAC')
VERSION_DATE = "NAX-PIHOLEAPI-1.0"
# ------------------------------------------------------------------
def pihole_api_auth():
"""Authenticate to Pi-hole v6 API and populate session globals."""
global PIHOLEAPI_SES_VALID, PIHOLEAPI_SES_SID, PIHOLEAPI_SES_CSRF
if not PIHOLEAPI_URL:
mylog('none', [f'[{pluginName}] PIHOLEAPI_URL not configured — skipping.'])
return False
# handle SSL verification setting - disable insecure warnings only when PIHOLEAPI_VERIFY_SSL=False
if not PIHOLEAPI_VERIFY_SSL:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
headers = {
"accept": "application/json",
"content-type": "application/json",
"User-Agent": "NetAlertX/" + VERSION_DATE
}
data = {"password": PIHOLEAPI_PASSWORD}
try:
resp = requests.post(PIHOLEAPI_URL + 'api/auth', headers=headers, json=data, verify=PIHOLEAPI_VERIFY_SSL, timeout=PIHOLEAPI_RUN_TIMEOUT)
resp.raise_for_status()
except requests.exceptions.Timeout:
mylog('none', [f'[{pluginName}] Pi-hole auth request timed out. Try increasing PIHOLEAPI_RUN_TIMEOUT.'])
return False
except requests.exceptions.ConnectionError:
mylog('none', [f'[{pluginName}] Connection error during Pi-hole auth. Check PIHOLEAPI_URL and PIHOLEAPI_PASSWORD'])
return False
except Exception as e:
mylog('none', [f'[{pluginName}] Unexpected auth error: {e}'])
return False
try:
response_json = resp.json()
except Exception:
mylog('none', [f'[{pluginName}] Unable to parse Pi-hole auth response JSON.'])
return False
session_data = response_json.get('session', {})
if session_data.get('valid', False):
PIHOLEAPI_SES_VALID = True
PIHOLEAPI_SES_SID = session_data.get('sid')
# csrf might not be present if no password set
PIHOLEAPI_SES_CSRF = session_data.get('csrf')
mylog('verbose', [f'[{pluginName}] Authenticated to Pi-hole (sid present).'])
return True
else:
mylog('none', [f'[{pluginName}] Pi-hole auth required or failed.'])
return False
# ------------------------------------------------------------------
def pihole_api_deauth():
"""Logout from Pi-hole v6 API (best-effort)."""
global PIHOLEAPI_SES_VALID, PIHOLEAPI_SES_SID, PIHOLEAPI_SES_CSRF
if not PIHOLEAPI_URL:
return
if not PIHOLEAPI_SES_SID:
return
headers = {"X-FTL-SID": PIHOLEAPI_SES_SID}
try:
requests.delete(PIHOLEAPI_URL + 'api/auth', headers=headers, verify=PIHOLEAPI_VERIFY_SSL, timeout=PIHOLEAPI_RUN_TIMEOUT)
except Exception:
# ignore errors on logout
pass
PIHOLEAPI_SES_VALID = False
PIHOLEAPI_SES_SID = None
PIHOLEAPI_SES_CSRF = None
# ------------------------------------------------------------------
def get_pihole_interface_data():
"""Return dict mapping mac -> [ipv4 addresses] from Pi-hole interfaces endpoint."""
result = {}
if not PIHOLEAPI_SES_VALID:
return result
headers = {"X-FTL-SID": PIHOLEAPI_SES_SID}
if PIHOLEAPI_SES_CSRF:
headers["X-FTL-CSRF"] = PIHOLEAPI_SES_CSRF
try:
resp = requests.get(PIHOLEAPI_URL + 'api/network/interfaces', headers=headers, verify=PIHOLEAPI_VERIFY_SSL, timeout=PIHOLEAPI_RUN_TIMEOUT)
resp.raise_for_status()
data = resp.json()
except Exception as e:
mylog('none', [f'[{pluginName}] Failed to fetch Pi-hole interfaces: {e}'])
return result
for interface in data.get('interfaces', []):
mac_address = interface.get('address')
if not mac_address or mac_address == "00:00:00:00:00:00":
continue
addrs = []
for addr in interface.get('addresses', []):
if addr.get('family') == 'inet':
a = addr.get('address')
if a:
addrs.append(a)
if addrs:
result[mac_address] = addrs
return result
# ------------------------------------------------------------------
def get_pihole_network_devices():
"""Return list of devices from Pi-hole v6 API (devices endpoint)."""
devices = []
# return empty list if no session available
if not PIHOLEAPI_SES_VALID:
return devices
# prepare headers
headers = {"X-FTL-SID": PIHOLEAPI_SES_SID}
if PIHOLEAPI_SES_CSRF:
headers["X-FTL-CSRF"] = PIHOLEAPI_SES_CSRF
params = {
'max_devices': str(PIHOLEAPI_API_MAXCLIENTS),
'max_addresses': '2'
}
try:
resp = requests.get(PIHOLEAPI_URL + 'api/network/devices', headers=headers, params=params, verify=PIHOLEAPI_VERIFY_SSL, timeout=PIHOLEAPI_RUN_TIMEOUT)
resp.raise_for_status()
data = resp.json()
mylog('debug', [f'[{pluginName}] Pi-hole API returned data: {json.dumps(data)}'])
except Exception as e:
mylog('none', [f'[{pluginName}] Failed to fetch Pi-hole devices: {e}'])
return devices
# The API returns 'devices' list
return data.get('devices', [])
# ------------------------------------------------------------------
def gather_device_entries():
"""
Build a list of device entries.
Online status is determined by comparing lastSeen (in seconds) vs the current time.
"""
entries = []
devices = get_pihole_network_devices()
now_ts = int(timeNowUTC(as_string=False).timestamp())
for device in devices:
hwaddr = device.get('hwaddr')
# Filter out invalid MACs/interfaces
if not hwaddr or hwaddr in ["00:00:00:00:00:00", "ip-::"]:
continue
device_ips = device.get('ips', [])
if not device_ips:
continue
# 1. Find the freshest timestamp across all IPs for this MAC
# This ensures if the device is active on ANY IP, the MAC is considered online.
max_last_seen = 0
for ip_info in device_ips:
ls = ip_info.get('lastSeen', 0)
if ls > max_last_seen:
max_last_seen = ls
# 2. Determine online status: (Current Time - Last Seen) <= PIHOLEAPI_CONSIDER_ONLINE
# Math is in seconds.
if (now_ts - max_last_seen) <= PIHOLEAPI_CONSIDER_ONLINE:
is_online = True
else:
is_online = False
# 3. Skip if offline (and user doesn't want offline devices)
if not is_online and not PIHOLEAPI_GET_OFFLINE:
mylog('verbose', [f'[{pluginName}] Not online in the last {PIHOLEAPI_CONSIDER_ONLINE}s, import of offline disabled (PIHOLEAPI_GET_OFFLINE) skipping device: {device}.'])
continue
mac_vendor = device.get('macVendor', '')
# 4. Process each valid IP for the device
for ip_info in device_ips:
ip = ip_info.get('ip')
# Skip internal Pi-hole placeholders
if not ip or ip in ["0.0.0.0", "::"]:
mylog('verbose', [f'[{pluginName}] Not a valid ip ({ip}), skipping device: {device}.'])
continue
name = ip_info.get('name') or '(unknown)'
tmp_mac = hwaddr.lower()
# Handle Fake MAC logic for non-standard hardware addresses
if PIHOLEAPI_FAKE_MAC and not is_mac(tmp_mac):
tmp_mac = string_to_fake_mac(ip)
entries.append({
'mac': tmp_mac,
'ip': ip,
'name': name,
'macVendor': mac_vendor,
# Pass the Unix timestamp as a string for NAX tracking
'lastQuery': str(max_last_seen) if max_last_seen > 0 else ""
})
return entries
# ------------------------------------------------------------------
def main():
"""Main plugin entrypoint."""
global PIHOLEAPI_URL, PIHOLEAPI_PASSWORD, PIHOLEAPI_API_MAXCLIENTS, PIHOLEAPI_VERIFY_SSL, PIHOLEAPI_RUN_TIMEOUT, PIHOLEAPI_GET_OFFLINE, PIHOLEAPI_CONSIDER_ONLINE
mylog('verbose', [f'[{pluginName}] start script.'])
# Load settings from NAX config
PIHOLEAPI_URL = get_setting_value('PIHOLEAPI_URL')
# ensure trailing slash
if not PIHOLEAPI_URL.endswith('/'):
PIHOLEAPI_URL += '/'
PIHOLEAPI_PASSWORD = get_setting_value('PIHOLEAPI_PASSWORD')
PIHOLEAPI_API_MAXCLIENTS = get_setting_value('PIHOLEAPI_API_MAXCLIENTS')
# Accept boolean or string "True"/"False"
PIHOLEAPI_VERIFY_SSL = get_setting_value('PIHOLEAPI_SSL_VERIFY')
PIHOLEAPI_RUN_TIMEOUT = get_setting_value('PIHOLEAPI_RUN_TIMEOUT')
PIHOLEAPI_GET_OFFLINE = get_setting_value('PIHOLEAPI_GET_OFFLINE')
PIHOLEAPI_CONSIDER_ONLINE = get_setting_value('PIHOLEAPI_CONSIDER_ONLINE')
# Fallback in case the setting is missing or returned as an empty string
if not isinstance(PIHOLEAPI_CONSIDER_ONLINE, int):
PIHOLEAPI_CONSIDER_ONLINE = 300
# Authenticate
if not pihole_api_auth():
mylog('none', [f'[{pluginName}] Authentication failed — no devices imported.'])
return 1
try:
device_entries = gather_device_entries()
if not device_entries:
mylog('verbose', [f'[{pluginName}] No devices found on Pi-hole.'])
else:
for entry in device_entries:
if is_mac(entry['mac']):
# Map to Plugin_Objects fields
mylog('verbose', [f"[{pluginName}] found: {entry['name']}|{entry['mac']}|{entry['ip']}"])
plugin_objects.add_object(
primaryId=str(entry['mac']),
secondaryId=str(entry['ip']),
watched1=str(entry['name']),
watched2=str(entry['macVendor']),
watched3=str(entry['lastQuery']),
watched4="",
extra=pluginName,
foreignKey=str(entry['mac'])
)
else:
mylog('verbose', [f"[{pluginName}] Skipping invalid MAC (see PIHOLEAPI_FAKE_MAC setting): {entry['name']}|{entry['mac']}|{entry['ip']}"])
# Write result file for NetAlertX to ingest
plugin_objects.write_result_file()
mylog('verbose', [f'[{pluginName}] Script finished. Imported {len(device_entries)} entries.'])
finally:
# Deauth best-effort
pihole_api_deauth()
return 0
if __name__ == '__main__':
main()