-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathspring_client_example.py
More file actions
101 lines (81 loc) · 3.57 KB
/
spring_client_example.py
File metadata and controls
101 lines (81 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from rsockets2 import RSocket, SocketType
import rx
import time
import rsockets2.frames as frames
SPRING_SERVER_HOSTNAME = 'localhost'
SPRING_SERVER_PORT = 24512
"""
Spring Controller Functions Example
@MessageMapping("test.controller.mono")
public Mono<byte[]> monoTest() {
ByteBuffer buffer = ByteBuffer.allocate(12 * 1000000);
for (int i = 0; i < buffer.capacity(); i++) {
buffer.put((byte) i);
}
return Mono.just(buffer.array());
}
@MessageMapping("test.controller.flux")
public Flux<byte[]> fluxTest() {
ByteBuffer buffer = ByteBuffer.allocate(16 * 1000000);
for (int i = 0; i < buffer.capacity(); i++) {
buffer.put((byte) i);
}
return Flux.range(0, 20)
.map(t -> buffer.array());
}
@MessageMapping("test.controller.triggerfnf")
public Mono<Void> triggerfnf(RSocketRequester requester, String payload) {
requester.route("test.controller.triggerfnf")
.data(payload)
.send()
.subscribe();
System.out.println("Send FNF");
return Mono.empty();
}
"""
if __name__ == "__main__":
print("This expects Spring MessageMapping 'test.controller.mono' returning a Mono<byte[]>")
print("This expects Spring MessageMapping 'test.controller.flux' returning a Flux<byte[]>")
print("This expects Spring MessageMapping 'test.controller.triggerfnf' returning a Mono<Void> and sending a FNF")
socket = RSocket(socket_type=SocketType.TCP_SOCKET, keepalive=10000, maxlive=10000,
hostname=SPRING_SERVER_HOSTNAME, port=SPRING_SERVER_PORT)
def request_response_handler(frame: frames.RequestResponse):
return rx.just("5".encode('ASCII'))
socket.on_request_response = request_response_handler
def request_stream_handler(frame: frames.RequestStream):
return rx.of("3".encode('ASCII'), "3".encode('ASCII'), "3".encode('ASCII'))
socket.on_request_stream = request_stream_handler
def fire_and_forget_handler(frame: frames.RequestFNF):
print("Received Fire And Forget! Payload: {}".format(
frame.request_data.decode("ASCII")))
socket.on_fire_and_forget = fire_and_forget_handler
try:
socket.open()
time.sleep(1.0)
total_size = 0
def adder(x):
global total_size
total_size += len(x)
start = time.time()
def on_complete():
global start
needed = time.time() - start
print("Request Response Complete. Received --- {}mb Needed: {}s Speed: {}mb/s".format(total_size /
1000000, needed, total_size / 1000000 / needed))
# Test Request Response
socket.request_response(meta_data=b'test.controller.mono', data=bytes(0)).subscribe(on_next=lambda x: adder(
x), on_error=lambda err: print("Oh my god it failed: {}".format(err)), on_completed=on_complete)
# Test Request Stream
socket.request_stream(meta_data=b'test.controller.flux', data=bytes(0)).subscribe(on_next=lambda x: print(
"Received Size: {} mb".format(len(x) / 1000000.0)), on_error=lambda err: print("Oh my god it failed: {}".format(err)), on_completed=lambda: print("Request Stream Complete"))
time_waited = 0
while True:
# Test Fire And Forget
socket.request_response(
meta_data=b'test.controller.triggerfnf', data=bytes(0)).subscribe()
time.sleep(1.0)
time_waited += 1
if time_waited > 30:
break
finally:
socket.close()