当前位置: 首页 > news >正文

一般到哪个网站找数据库福州seo

一般到哪个网站找数据库,福州seo,如何制作一个平台软件,河南郑州今日头条新闻Max.Bai 2024.10 0. 背景 Locust 是性能测试工具,但是默认只支持http协议,就是默认只有http的client,需要其他协议的测试必须自己扩展对于的client,比如下面的WebSocket client。 1. WebSocket test Client “”“ Max.Bai W…

Max.Bai

2024.10

0. 背景

Locust 是性能测试工具,但是默认只支持http协议,就是默认只有http的client,需要其他协议的测试必须自己扩展对于的client,比如下面的WebSocket client。

1. WebSocket test Client
“”“
Max.Bai
Websocket Client
”“”
import json
import logging
import secrets
import threading
import time
from typing import Callable, Optionalimport websocket
from locust import eventslogger = logging.getLogger(__name__)class WebSocketClient:def __init__(self, host: str, log_messages: bool = False):self._host: str = hostself._id: str = secrets.token_hex(8)self._alias: Optional[str] = Noneself._ws: Optional[websocket.WebSocketApp] = Noneself.log_messages = log_messagesself.count_recv_type = Falseself.heartbeat_auto_respond = Falseself._recv_messages: list = []self.messages: list = []self._sent_messages: list = []def __enter__(self):self.connect()return selfdef __exit__(self, type, value, traceback):self.disconnect()@propertydef tag(self) -> str:tag = f"{self._host} <{self._id}>"if self._alias:tag += f"({self._alias})"return tagdef connect(self, alias: Optional[str] = None, headers: Optional[dict] = None, on_message: Optional[Callable] = None):if not self._ws:self._alias = aliasself._ws = websocket.WebSocketApp(url=self._host,header=headers,on_open=self._on_open,on_message=on_message if on_message else self._on_message,on_close=self._on_close,)thread = threading.Thread(target=self._ws.run_forever)thread.daemon = Truethread.start()time.sleep(3)else:logger.warning("An active WebSocket connection is already established.")def is_connected(self) -> bool:return self._ws is not Nonedef disconnect(self):if self._ws:self._ws.close()self._alias = Noneelse:logger.warning("No active WebSocket connection established.")def _on_open(self, ws):logger.debug(f"[WebSocket] {self.tag} connected.")events.request.fire(request_type="ws_client",name="connect",response_time=0,response_length=0,)def _on_message(self, ws, message):recv_time = time.time()recv_time_ms = int(recv_time * 1000)recv_time_ns = int(recv_time * 1000000)logger.debug(f"[WebSocket] {self.tag} message received: {message}")if self.log_messages:self._recv_messages.append(message)self.messages.append(message)# public/respond-heartbeatif self.heartbeat_auto_respond:if "public/heartbeat" in message:self.send(message.replace("public/heartbeat", "public/respond-heartbeat"))if self.count_recv_type:try:msg = json.loads(message)id = str(msg.get("id", 0))if len(id) == 13:resp_time = recv_time_ms - int(id)elif len(id) == 16:resp_time = (recv_time_ns - int(id)) / 1000elif len(id) > 13:resp_time = recv_time_ms - int(id[:13])else:resp_time = 0method = msg.get("method", "unknown")code = msg.get("code", "unknown")error = msg.get("message", "unknown")# send_time = int(msg.get("nonce", 0))if method in ["public/heartbeat", "private/set-cancel-on-disconnect"]:events.request.fire(request_type="ws_client",name=f"recv {method}",response_time=0,response_length=len(msg),)elif code == 0:events.request.fire(request_type="ws_client",name=f"recv {method} {code}",# response_time=recv_time - send_time,response_time=resp_time,response_length=len(msg),)else:events.request.fire(request_type="ws_client",name=f"recv {method} {code}",response_time=resp_time,response_length=len(msg),exception=error,)except Exception as e:events.request.fire(request_type="ws_client",name="recv error",response_time=0,response_length=len(msg),exception=str(e),)def _on_close(self, ws, close_status_code, close_msg):logger.debug(f"[WebSocket] {self.tag} closed.")self._ws = Noneevents.request.fire(request_type="ws_client",name="close",response_time=0,response_length=0,)def set_on_message(self, on_message: Callable):self._ws.on_message = on_messagedef send(self, message: str):if self._ws:self._ws.send(data=message)if self.log_messages:self._sent_messages.append(message)logger.debug(f"[WebSocket] {self.tag} message sent: {message}")else:logger.warning(f"No active [WebSocket] {self.tag} connection established.")raise ConnectionError("No active [WebSocket] connection established.")def clear(self):self._recv_messages = []self._sent_messages = []self.messages = []def expect_messages(self,matcher: Callable[..., bool],count: int = 1,timeout: int = 10,interval: int = 1,) -> list:"""Expect to receive one or more filtered messages.Args:matcher (Callable): A matcher function used to filter the received messages.count (int, optional): Number of messages to be expected before timeout. Defaults to 1.timeout (int, optional): Timeout in seconds. Defaults to 10.interval (int, optional): Interval in seconds. Defaults to 1.Returns:list: A list of messages filtered by the matcher."""deadline: float = time.time() + timeoutresult: list = []  # messages filtered by the matcherseen: list = []  # messages already seen by the matcher to be excluded from further matchingwhile time.time() < deadline:snapshot: list = [*self._recv_messages]for element in seen:if element in snapshot:snapshot.remove(element)result.extend(filter(matcher, snapshot))if len(result) >= count:breakseen.extend(snapshot)time.sleep(interval)if len(result) < count:logger.warning(f"({self.tag}) Expected to receive {count} messages, but received only {len(result)} messages.")return result
2. 如何使用
class PrivateWsUser(User):def on_start(self):self.ws_client=WebSocketClient("wss://abc.pp.com/chat", log_message=True)self.ws_client.connect()@taskdef send_hello()self.ws_client.send("hello world")
3. 扩展

可自行扩展on_message 方法,上面的on_message 方法是json 格式的信息处理

http://www.zhongyajixie.com/news/9862.html

相关文章:

  • 志愿服务网站开发steam交易链接在哪
  • 上海招聘用的最多的网站最新疫情爆发
  • 什么网站可以接活在家做互联网广告平台代理
  • 图书馆网站建设的建议滁州网站seo
  • ps网站专题怎么做商城推广
  • a公司与企业k签订了建设k企业优化大师的使用方法
  • 学网站建设设计要钱吗如何做网络推广运营
  • wordpress load-scripts.phpseo教程seo教程
  • 大连哪家公司做网站比较好企业百度推广
  • 营销型平台网站建设百度一下你就知道官网网址
  • 网站做301怎么做百度搜索次数统计
  • 网站维护多少钱一个月公司网站设计与制作
  • 成都网站建设优化企业排名网络推广的方法包括
  • 网站title如何写广州seo优化外包服务
  • html5游戏WordPress百度搜索引擎优化相关性评价
  • logo设计竞标网站线上营销活动案例
  • idea怎么做网站国外免费发产品的b2b平台
  • 做网站项目需要多少钱推广普通话文字素材
  • 企业网站信息化建设合肥seo报价
  • 智能网站建设推荐一级域名好还是二级域名好
  • 网站两个域名国内最近发生的重大新闻
  • 中国旅游网站模板广东疫情最新情况
  • 自己做免费的网站合肥网络推广网络运营
  • 网站建设工作内容站长工具名称查网站
  • 做玩网站怎么上传chrome网页版入口
  • 百度做公司网站广东搜索引擎优化
  • 编程除了做网站还能干什么2024年重大政治时事汇总
  • 万网域名注册网站免费发帖平台
  • 1网站建设网站搭建详细教程
  • 阿里巴巴做网站的电话号码建设网站推广