Skip to content

Commit f2d6175

Browse files
committed
feat(stats): accurate Xray node traffic via outbound stats
1 parent ff9357e commit f2d6175

6 files changed

Lines changed: 101 additions & 91 deletions

File tree

cc-agent/api.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
type API struct {
1616
cfg *Config
1717
userStore *UserStore
18-
statsStore *StatsStore
1918
xrayClient *XrayClient
2019
}
2120

@@ -192,23 +191,38 @@ func (a *API) handleRemoveUser(w http.ResponseWriter, r *http.Request) {
192191
jsonOK(w, map[string]string{"status": "ok"})
193192
}
194193

195-
// GET /stats — collect fresh stats from Xray, return accumulated totals, then reset
194+
// GET /stats — take one atomic snapshot from Xray (reset=true) and return
195+
// { users: { <userId>: {tx, rx} }, node: {tx, rx} }.
196+
// Users carry per-user uplink/downlink; node is the sum of outbound uplink/downlink
197+
// across all non-API outbounds (i.e. real traffic that traversed Xray).
196198
func (a *API) handleStats(w http.ResponseWriter, r *http.Request) {
197199
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
198200
defer cancel()
199201

200-
if err := a.statsStore.CollectFromXray(ctx, a.xrayClient); err != nil {
201-
log.Printf("[stats] CollectFromXray: %v", err)
202+
rawStats, err := a.xrayClient.QueryStats(ctx, "", true)
203+
if err != nil {
204+
log.Printf("[stats] QueryStats: %v", err)
205+
jsonOK(w, map[string]any{
206+
"users": map[string]any{},
207+
"node": map[string]int64{"tx": 0, "rx": 0},
208+
})
209+
return
202210
}
203211

204-
raw := a.statsStore.GetAndReset()
212+
snap := ParseSnapshot(rawStats)
205213

206-
result := make(map[string]map[string]int64, len(raw))
207-
for email, t := range raw {
208-
result[email] = map[string]int64{"tx": t.Tx, "rx": t.Rx}
214+
users := make(map[string]map[string]int64, len(snap.Users))
215+
for email, t := range snap.Users {
216+
if t.Tx == 0 && t.Rx == 0 {
217+
continue
218+
}
219+
users[email] = map[string]int64{"tx": t.Tx, "rx": t.Rx}
209220
}
210221

211-
jsonOK(w, result)
222+
jsonOK(w, map[string]any{
223+
"users": users,
224+
"node": map[string]int64{"tx": snap.Node.Tx, "rx": snap.Node.Rx},
225+
})
212226
}
213227

214228
// POST /restart — restart Xray service, then restore all users (Xray loses state on restart)

cc-agent/main.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"time"
1313
)
1414

15-
const Version = "1.0.0"
15+
const Version = "2.0.0"
1616

1717
var startTime = time.Now()
1818

@@ -36,14 +36,15 @@ func main() {
3636
log.Printf("[main] Warning: could not load users from disk: %v", err)
3737
}
3838

39-
statsStore := NewStatsStore()
40-
4139
xrayClient, err := NewXrayClient(cfg)
4240
if err != nil {
4341
log.Printf("[main] Warning: could not connect to Xray gRPC: %v (will retry on use)", err)
4442
}
4543

46-
// Restore users to Xray after brief startup delay (Xray might still be starting)
44+
// Restore users to Xray after a brief startup delay (Xray might still be starting),
45+
// then discard the first Xray stats snapshot. Xray accumulates counters from boot,
46+
// and without this discard the panel's first /stats poll would attribute all of
47+
// that since-boot traffic to the current interval, producing a cold-start spike.
4748
go func() {
4849
time.Sleep(3 * time.Second)
4950
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
@@ -54,17 +55,10 @@ func main() {
5455
} else {
5556
log.Printf("[main] Restored %d users to Xray", count)
5657
}
57-
}()
58-
59-
// Periodic stats collection from Xray every 60s
60-
go func() {
61-
for {
62-
time.Sleep(60 * time.Second)
63-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
64-
if err := statsStore.CollectFromXray(ctx, xrayClient); err != nil {
65-
log.Printf("[stats] Collection error: %v", err)
66-
}
67-
cancel()
58+
if _, err := xrayClient.QueryStats(ctx, "", true); err != nil {
59+
log.Printf("[main] Startup stats discard failed: %v", err)
60+
} else {
61+
log.Printf("[main] Startup stats discarded (cold-start reset)")
6862
}
6963
}()
7064

@@ -81,7 +75,6 @@ func main() {
8175
api := &API{
8276
cfg: cfg,
8377
userStore: userStore,
84-
statsStore: statsStore,
8578
xrayClient: xrayClient,
8679
}
8780

cc-agent/stats.go

Lines changed: 48 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,70 @@
11
package main
22

3-
import (
4-
"context"
5-
"log"
6-
"strings"
7-
"sync"
8-
)
3+
import "strings"
94

10-
// UserTraffic holds accumulated uplink/downlink bytes for a user
5+
// UserTraffic holds uplink/downlink bytes.
6+
// For users: Tx = client uplink (client -> server), Rx = client downlink (server -> client).
7+
// For the node: Tx/Rx are the sum of outbound uplink/downlink across all non-API outbounds,
8+
// which is the real traffic that actually traversed Xray.
119
type UserTraffic struct {
1210
Tx int64 `json:"tx"` // uplink bytes
1311
Rx int64 `json:"rx"` // downlink bytes
1412
}
1513

16-
// StatsStore accumulates traffic stats between panel polls
17-
type StatsStore struct {
18-
mu sync.Mutex
19-
traffic map[string]*UserTraffic // keyed by email
14+
// Snapshot is the parsed result of a single Xray QueryStats call with an empty
15+
// pattern. Users is keyed by email (which equals the panel userId). Node is the
16+
// aggregated node-level traffic derived from outbound stats.
17+
type Snapshot struct {
18+
Users map[string]*UserTraffic
19+
Node UserTraffic
2020
}
2121

22-
func NewStatsStore() *StatsStore {
23-
return &StatsStore{
24-
traffic: make(map[string]*UserTraffic),
25-
}
26-
}
22+
// apiOutboundTag is the tag of the internal Xray API outbound; its counters
23+
// represent gRPC control-plane traffic and must not be attributed to the node.
24+
const apiOutboundTag = "API"
2725

28-
// CollectFromXray fetches stats from Xray (with reset) and adds to local accumulator.
29-
// Xray resets its own counters after each query with reset=true.
30-
func (s *StatsStore) CollectFromXray(ctx context.Context, xc *XrayClient) error {
31-
rawStats, err := xc.QueryStats(ctx, true)
32-
if err != nil {
33-
return err
26+
// ParseSnapshot converts a flat map of Xray stat names to a structured Snapshot.
27+
// Expected stat name formats:
28+
// - user>>>{email}>>>traffic>>>{uplink|downlink}
29+
// - outbound>>>{tag}>>>traffic>>>{uplink|downlink}
30+
// - inbound>>>{tag}>>>traffic>>>{uplink|downlink} (ignored here; reserved for future metrics)
31+
func ParseSnapshot(rawStats map[string]int64) Snapshot {
32+
snap := Snapshot{
33+
Users: make(map[string]*UserTraffic, len(rawStats)/2),
3434
}
3535

36-
s.mu.Lock()
37-
defer s.mu.Unlock()
38-
3936
for name, value := range rawStats {
40-
// Name format: user>>>email>>>traffic>>>uplink or downlink
4137
parts := strings.Split(name, ">>>")
42-
if len(parts) != 4 || parts[0] != "user" || parts[2] != "traffic" {
38+
if len(parts) != 4 || parts[2] != "traffic" {
4339
continue
4440
}
45-
email := parts[1]
46-
direction := parts[3]
41+
kind, id, direction := parts[0], parts[1], parts[3]
4742

48-
if s.traffic[email] == nil {
49-
s.traffic[email] = &UserTraffic{}
50-
}
51-
switch direction {
52-
case "uplink":
53-
s.traffic[email].Tx += value
54-
case "downlink":
55-
s.traffic[email].Rx += value
43+
switch kind {
44+
case "user":
45+
ut := snap.Users[id]
46+
if ut == nil {
47+
ut = &UserTraffic{}
48+
snap.Users[id] = ut
49+
}
50+
switch direction {
51+
case "uplink":
52+
ut.Tx += value
53+
case "downlink":
54+
ut.Rx += value
55+
}
56+
case "outbound":
57+
if id == apiOutboundTag {
58+
continue
59+
}
60+
switch direction {
61+
case "uplink":
62+
snap.Node.Tx += value
63+
case "downlink":
64+
snap.Node.Rx += value
65+
}
5666
}
5767
}
5868

59-
if len(rawStats) > 0 {
60-
log.Printf("[stats] Collected %d stat entries from Xray", len(rawStats))
61-
}
62-
return nil
63-
}
64-
65-
// GetAndReset returns all accumulated stats and resets the local store.
66-
// Called when the panel polls /stats.
67-
func (s *StatsStore) GetAndReset() map[string]*UserTraffic {
68-
s.mu.Lock()
69-
defer s.mu.Unlock()
70-
71-
result := s.traffic
72-
s.traffic = make(map[string]*UserTraffic)
73-
return result
69+
return snap
7470
}

cc-agent/xray.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,12 @@ func (c *XrayClient) RemoveUser(ctx context.Context, email string) error {
7272
return nil
7373
}
7474

75-
// QueryStats fetches traffic stats from Xray; reset=true clears counters in Xray
76-
func (c *XrayClient) QueryStats(ctx context.Context, reset bool) (map[string]int64, error) {
75+
// QueryStats fetches traffic stats from Xray matching the given pattern.
76+
// An empty pattern returns all stats (users, inbounds, outbounds) in one call.
77+
// reset=true atomically clears the matched counters in Xray.
78+
func (c *XrayClient) QueryStats(ctx context.Context, pattern string, reset bool) (map[string]int64, error) {
7779
resp, err := c.stats.QueryStats(ctx, &stats_command.QueryStatsRequest{
78-
Pattern: "user>>>",
80+
Pattern: pattern,
7981
Reset_: reset,
8082
})
8183
if err != nil {

src/services/configGenerator.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,8 @@ function generateXrayConfig(node, users) {
507507
system: {
508508
statsInboundUplink: true,
509509
statsInboundDownlink: true,
510+
statsOutboundUplink: true,
511+
statsOutboundDownlink: true,
510512
},
511513
},
512514
inbounds: [

src/services/syncService.js

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,10 @@ class SyncService {
377377

378378
/**
379379
* Collect traffic stats from Xray node via Agent GET /stats.
380-
* Agent accumulates stats between polls (Xray counters are reset on each agent collection).
380+
* Response shape (agent v2.0.0+):
381+
* { users: { <userId>: { tx, rx } }, node: { tx, rx } }
382+
* Node tx/rx come from Xray outbound stats (real traffic that traversed
383+
* Xray), not from summing per-user counters.
381384
*/
382385
async collectXrayTrafficStats(node) {
383386
if (!(node.xray?.agentToken)) {
@@ -387,23 +390,23 @@ class SyncService {
387390

388391
try {
389392
const response = await this._agentRequest(node, 'GET', '/stats');
390-
const stats = response.data || {};
393+
const data = response.data || {};
394+
const users = data.users || {};
395+
const nodeTraffic = data.node || { tx: 0, rx: 0 };
396+
const nodeTx = nodeTraffic.tx || 0;
397+
const nodeRx = nodeTraffic.rx || 0;
391398

392-
if (Object.keys(stats).length === 0) return;
399+
const userEntries = Object.entries(users);
400+
if (userEntries.length === 0 && nodeTx === 0 && nodeRx === 0) return;
393401

394-
let nodeTx = 0;
395-
let nodeRx = 0;
396402
const bulkOps = [];
397403
const now = new Date();
398404

399-
for (const [email, traffic] of Object.entries(stats)) {
405+
for (const [email, traffic] of userEntries) {
400406
const tx = traffic.tx || 0;
401407
const rx = traffic.rx || 0;
402408
if (tx === 0 && rx === 0) continue;
403409

404-
nodeTx += tx;
405-
nodeRx += rx;
406-
407410
// email == userId (as set in configGenerator and agent)
408411
bulkOps.push({
409412
updateOne: {
@@ -419,7 +422,7 @@ class SyncService {
419422
if (bulkOps.length > 0) {
420423
const result = await HyUser.bulkWrite(bulkOps, { ordered: false });
421424
logger.debug(`[Agent Stats] ${node.name}: updated ${result.modifiedCount}/${bulkOps.length} users`);
422-
this._checkUserLimits(Object.keys(stats)).catch(() => {});
425+
this._checkUserLimits(userEntries.map(([email]) => email)).catch(() => {});
423426
}
424427

425428
if (nodeTx > 0 || nodeRx > 0) {
@@ -430,7 +433,7 @@ class SyncService {
430433
$set: { 'traffic.lastUpdate': now },
431434
}
432435
);
433-
logger.info(`[Agent Stats] ${node.name}: ${bulkOps.length} users, ↑${(nodeTx / 1024 / 1024).toFixed(1)}MB ↓${(nodeRx / 1024 / 1024).toFixed(1)}MB`);
436+
logger.info(`[Agent Stats] ${node.name}: ${bulkOps.length} users, node ${(nodeTx / 1024 / 1024).toFixed(1)}MB ↓${(nodeRx / 1024 / 1024).toFixed(1)}MB`);
434437
}
435438
} catch (error) {
436439
logger.error(`[Agent Stats] ${node.name} error: ${error.message}`);

0 commit comments

Comments
 (0)