Protocol Support
WebSocket, gRPC, MQTT, AMQP, and SSE protocols
WebSocket
ws.connect(url, [options])Connect to WebSocket server, returns socket object
socket.send(message)Send text message
socket.sendBinary(data)Send binary data
socket.recv()Receive next message - text as string, binary as {type, data}
socket.isConnected()Check if WebSocket is connected
socket.close()Close the connection
export default function() {
const socket = ws.connect('wss://echo.websocket.org');
socket.send('Hello, WebSocket!');
const response = socket.recv();
// Text frames return as string, binary as { type: "binary", data: "<base64>" }
if (typeof response === 'string') {
check(response, {
'echo matches': (r) => r === 'Hello, WebSocket!'
});
}
socket.close();
}WebSocket Options
headers: Object - Custom headers for handshake
timeout: String - Connection timeout
reconnect: Boolean - Enable auto-reconnect on disconnection (default: false)
maxRetries: Number - Maximum reconnect attempts (default: 3)
Auto-Reconnect
// Enable auto-reconnect with exponential backoff
const socket = ws.connect('wss://api.example.com/stream', {
reconnect: true,
maxRetries: 5
});
// If connection drops, send/recv auto-reconnect
socket.send('Hello');
const msg = socket.recv();
print('Connected: ' + socket.isConnected());
socket.close();gRPC
new GrpcClient()Create a new gRPC client
client.load(protos, importPaths)Load proto files
client.connect(url)Connect to gRPC server
client.invoke(method, request, [metadata])Call unary RPC method
client.close()Close connection
const client = new GrpcClient();
// Load proto definitions
client.load(['./proto/greeter.proto'], ['./proto/']);
export default function() {
client.connect('http://localhost:50051');
const response = client.invoke('greeter.Greeter/SayHello', {
name: 'World'
});
check(response, {
'greeting received': (r) => r.message.includes('Hello')
});
client.close();
}Streaming RPCs
// Server streaming - recv() returns { value, reason }
const stream = client.serverStream('service/StreamData', { id: 1 });
let result;
while ((result = stream.recv(5000)) && result.value !== null) {
print('Received: ' + JSON.stringify(result.value));
}
if (result.reason === 'timeout') print('Timed out');
// Client streaming
const stream = client.clientStream('service/UploadData');
stream.send({ chunk: 'data1' });
stream.send({ chunk: 'data2' });
const response = stream.closeAndRecv();
// Bidirectional streaming
const stream = client.bidiStream('service/Chat');
stream.send({ message: 'Hello' });
const result = stream.recv();
if (result.value) print(result.value.response);
stream.close();RecvResult Type
value: Message object or null
reason: null on success, or "timeout" | "closed" | "not_connected"
Server-Sent Events (SSE)
sse.connect(url)Connect to SSE endpoint
client.recv()Receive next event (blocking). Returns {event, data, id} or null
client.close()Close the connection
client.urlProperty containing the connection URL
export default function() {
const client = sse.connect('https://api.example.com/events');
// Receive events
for (let i = 0; i < 5; i++) {
const event = client.recv();
if (event) {
print(`Event: ${event.event}, Data: ${event.data}`);
} else {
break; // Connection closed
}
}
client.close();
}Event Object
event: String - Event type name
data: String - Event data
id: String - Event ID (for reconnection)
MQTT
new JsMqttClient()Create MQTT client
client.connect(host, port, clientId)Connect to broker
client.publish(topic, message, [qos], [retain])Publish message (QoS 0, 1, or 2; optional retain)
client.subscribe(topic, [qos])Subscribe to topic
client.unsubscribe(topic)Unsubscribe from topic
client.recv([timeoutMs])Receive next message, returns RecvResult
client.close()Disconnect
export default function() {
const client = new JsMqttClient();
client.connect('test.mosquitto.org', 1883, 'fusillade-' + utils.uuid());
// Publish
client.publish('test/topic', 'Hello MQTT!', 1); // QoS 1
// Subscribe and receive - recv() returns { value, reason }
client.subscribe('test/responses', 1);
const result = client.recv(5000);
if (result.value) {
print(`Received on ${result.value.topic}: ${result.value.payload}`);
} else if (result.reason === 'timeout') {
print('No message received within timeout');
}
client.close();
}AMQP (RabbitMQ)
new JsAmqpClient()Create AMQP client
client.connect(url)Connect to broker (amqp://host:port)
client.publish(exchange, routingKey, message)Publish message
client.subscribe(queue)Subscribe to queue
client.recv([timeoutMs])Receive next message, returns RecvResult
client.ack(deliveryTag)Acknowledge message
client.nack(deliveryTag, requeue)Negative acknowledge message
client.declareExchange(name, type, [opts])Declare exchange (type: direct/fanout/topic/headers)
client.declareQueue(name, [opts])Declare queue (opts: durable, autoDelete, exclusive)
client.bindQueue(queue, exchange, routingKey)Bind queue to exchange
client.close()Disconnect
export default function() {
const client = new JsAmqpClient();
client.connect('amqp://127.0.0.1:5672');
// Publish to exchange
client.publish('', 'test_queue', JSON.stringify({
action: 'test',
timestamp: Date.now()
}));
// Consume from queue - recv() returns { value, reason }
client.subscribe('response_queue');
const result = client.recv(5000);
if (result.value) {
print(`Received: ${result.value.body}`);
client.ack(result.value.deliveryTag);
} else if (result.reason === 'timeout') {
print('No message received within timeout');
}
client.close();
}Exchange & Queue Management
const client = new JsAmqpClient();
client.connect('amqp://127.0.0.1:5672');
// Declare exchange and queue
client.declareExchange('events', 'topic', { durable: true });
client.declareQueue('order-events', { durable: true, autoDelete: false });
client.bindQueue('order-events', 'events', 'order.*');
// Publish to the exchange
client.publish('events', 'order.created', JSON.stringify({ orderId: 123 }));
client.close();