From bf07c54372c9a091ebb08b2d2f1b97429098175e Mon Sep 17 00:00:00 2001 From: yezhengmao1 Date: Fri, 13 Jun 2025 14:03:52 +0800 Subject: [PATCH] [vtimeline] add tracepoint --- tracepoint.py | 158 ++++++++++++++++++++++++++++++++++++++++++++++++++ vtimeline.cpp | 7 ++- 2 files changed, 162 insertions(+), 3 deletions(-) create mode 100644 tracepoint.py diff --git a/tracepoint.py b/tracepoint.py new file mode 100644 index 0000000..cc532bb --- /dev/null +++ b/tracepoint.py @@ -0,0 +1,158 @@ +import os +import queue +import socket +import atexit +import ctypes +from pathlib import Path +import logging.handlers + +################################ +### Tracepoint for vtimeline ### +################################ + + +_ROTATE_FILE_COUNT = 5 +_ROTATE_FILE_MAX_SIZE = 200 * 1024 * 1024 + + +class TracePointFormatter(logging.Formatter): + def __init__(self): + self.pid = os.getenv("RANK", -1) # global rank + self.tid = 0 + + def format(self, record: logging.LogRecord): + # the chrome trace metirc include + # name : event name + # cat : the event categories + # ph : the event type, B-begin E-end + # ts : tracing clock timestamp of the event, mciro sencond + # pid : pid, or global rank + # tid : tid, or stream id + + # must use like below: + # logger.info("event_name", extra={"cat": "cat", "pid": pid, "tid": tid, ph: "B"}) + + try: + format_str = ",".join( + [ + str(int(record.created * 1000000)), # microsecond + str(self.pid), # global rank + str(self.tid), # stream_id, default is cpu + record.cat, + record.getMessage(), + record.ph, + ] + ) + except Exception as e: + format_str = f"error logger format : {str(e)}" + + return format_str + + +class TracePoint: + def __init__(self, event_name: str, cat_name: str): + self.logger = logging.getLogger("TracePoint") + self.name = event_name + self.cat = cat_name + + def begin(self): + self.record(self.name, self.cat, "B") + + def end(self): + self.record(self.name, self.cat, "E") + + def record(self, event_name: str, cat_name: str, ph: str): + self.logger.info( + event_name, + extra={ + "cat": cat_name, + "ph": ph, + }, + ) + + def __enter__(self): + self.begin() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.end() + return False + + +class CUDAVTimeLine: + def __init__( + self, + lib_path="/usr/local/lib/libvtimeline.so", + ): + self.lib = ctypes.CDLL(lib_path) + + self.lib.enable_vtimeline.argtypes = [] + self.lib.enable_vtimeline.restype = ctypes.c_int + + self.lib.disable_vtimeline.argtypes = [] + self.lib.disable_vtimeline.restype = ctypes.c_int + + def enable(self): + if self.lib.enable_vtimeline() != 0: + print("Failed to enable CUDAVTimeline") + + def disable(self): + self.lib.disable_vtimeline() + + +def __create_async_rotating_file_handler( + log_file_path: Path, formatter: logging.Formatter +): + # set up handler + # rorate to 5 file, and each file 200MB + file_handler = logging.handlers.RotatingFileHandler( + filename=log_file_path, + mode="a", + maxBytes=_ROTATE_FILE_MAX_SIZE, + backupCount=_ROTATE_FILE_COUNT, + encoding="utf-8", + ) + file_handler.setFormatter(formatter) + + queue_buffer = queue.Queue() + + # queue_handler -> listener -> file_hander + queue_handler = logging.handlers.QueueHandler(queue_buffer) + + queue_listener = logging.handlers.QueueListener( + queue_buffer, file_handler, respect_handler_level=True + ) + queue_listener.start() + atexit.register(queue_listener.stop) + + return queue_handler + + +def __create_logger(log_root_dir: str, logger_name: str): + log_dir = Path(log_root_dir) / logger_name / socket.gethostname() + log_dir.mkdir(parents=True, exist_ok=True) + + # set logger + logger = logging.getLogger(logger_name) + logger.setLevel(logging.INFO) + logger.addHandler( + __create_async_rotating_file_handler( + log_dir / f"{logger_name}_{os.getenv('LOCAL_RANK', -1)}.log", + logging.Formatter( + fmt="[%(levelname)s][%(process)d][%(name)s][%(asctime)s] %(message)s" + ), + ) + ) + logger.propagate = False + + +def tracepoint_module_setup(): + log_dir = os.getenv("VTIMELINE_LOGGER_DIR", "/var/log") + + __create_logger(log_dir, "TracePoint") + + +def cudavtimeline_module_setup(): + cupti = CUDAVTimeLine() + cupti.enable() + atexit.register(cupti.disable) diff --git a/vtimeline.cpp b/vtimeline.cpp index 20fd2d7..fc22d64 100644 --- a/vtimeline.cpp +++ b/vtimeline.cpp @@ -68,8 +68,8 @@ void init_spdlog_env() { static std::mutex env_mutex; std::lock_guard locker(env_mutex); - std::string log_dir_name = g_logger_dir + "/megatron_cupti/" + g_hostname; - std::string log_file_name = log_dir_name + "/megatron_" + g_local_rank + ".log"; + std::string log_dir_name = g_logger_dir + "/cupti/" + g_hostname; + std::string log_file_name = log_dir_name + "/rank_" + g_local_rank + ".log"; auto rotating_sink = std::make_shared( log_file_name, max_file_size, max_files); @@ -277,8 +277,9 @@ bool init_cupti_env() { } void enable_cupti_activity() { - std::array activity_kinds = { + std::array activity_kinds = { CUPTI_ACTIVITY_KIND_KERNEL, + CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL, }; for (const CUpti_ActivityKind kind : activity_kinds) {