commit 66c8ba7c271f38416dc7456b9f05f9678ab61942 Author: yezhengmao Date: Thu Jun 12 14:20:47 2025 +0800 [vtimeline] diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..72e4920 --- /dev/null +++ b/.clang-format @@ -0,0 +1,15 @@ +# Copyright (c) 2025 yezhengmao + +--- +# need clang-format version >= 15 +Language: Cpp +IndentWidth: 4 +AccessModifierOffset: -4 +EmptyLineBeforeAccessModifier: LogicalBlock +MaxEmptyLinesToKeep: 1 +ColumnLimit: 90 +AllowShortBlocksOnASingleLine: Never +AllowShortFunctionsOnASingleLine: None +AllowShortLambdasOnASingleLine: None +PointerAlignment: Right +SortIncludes: false \ No newline at end of file diff --git a/.clang-tidy b/.clang-tidy new file mode 100644 index 0000000..5292655 --- /dev/null +++ b/.clang-tidy @@ -0,0 +1,44 @@ +# Copyright (c) 2025 yezhengmao + +--- +# need clang-tidy version >= 15 +Checks: 'misc-*, + hicpp-*, + bugprone-*, + modernize-*, + performance-*, + portability-*, + readability-*, + concurrency-*, + clang-analyzer-*, + cppcoreguidelines-*, + -modernize-avoid-c-arrays, + -modernize-use-trailing-return-type, + -modernize-use-auto, + -cppcoreguidelines-avoid-non-const-global-variables, + -cppcoreguidelines-pro-bounds-pointer-arithmetic, + -cppcoreguidelines-pro-type-vararg, + -cppcoreguidelines-pro-bounds-array-to-pointer-decay, + -cppcoreguidelines-non-private-member-variables-in-classes, + -cppcoreguidelines-pro-type-reinterpret-cast, + -cppcoreguidelines-avoid-c-arrays, + -cppcoreguidelines-avoid-do-while, + -misc-non-private-member-variables-in-classe, + -misc-non-private-member-variables-in-classes, + -hicpp-vararg, + -hicpp-avoid-c-arrays, + -hicpp-no-array-decay, + -hicpp-use-auto, + -hicpp-signed-bitwise, + -readability-magic-numbers' +WarningsAsErrors: true +HeaderFilterRegex: '.*' +CheckOptions: + - key: cppcoreguidelines-macro-usage.AllowedRegexp + value: 'LOG_*|STRINGIFY*' + - key: cppcoreguidelines-avoid-magic-numbers.IgnoredIntegerValues + value: '0;1;2;8;256;1024;1900' + - key: readability-function-cognitive-complexity.IgnoreMacros + value: true + - key: bugprone-easily-swappable-parameters.MinimumLength + value: 3 \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9785597 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +build +.cache diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..e054150 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "libs/spdlog"] + path = libs/spdlog + url = https://github.com/gabime/spdlog.git diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..d648607 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,41 @@ +cmake_minimum_required(VERSION 3.22) + +project(vtimeline LANGUAGES CXX) +set(LIBNAME vtimeline) + +# specify the c++ standard +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED True) + +# generate compile_commands for code analyze +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + +find_package(CUDA REQUIRED) +find_package(spdlog REQUIRED) + +set(SOURCES vtimeline.cpp) + +add_library(${LIBNAME} SHARED ${SOURCES}) + +set_target_properties( + ${LIBNAME} PROPERTIES + VERSION 0.0.1 +) + +target_include_directories( + ${LIBNAME} + PUBLIC + PRIVATE + ${CUDA_TOOLKIT_ROOT_DIR}/include + ${CMAKE_CURRENT_BINARY_DIR}/ +) + +message(STATUS "${CMAKE_CURRENT_BINARY_DIR}") + +target_link_libraries( + ${LIBNAME} + PUBLIC + ${CUDA_TOOLKIT_ROOT_DIR}/lib64/libcupti.so + PRIVATE + spdlog::spdlog_header_only +) \ No newline at end of file diff --git a/libs/spdlog b/libs/spdlog new file mode 160000 index 0000000..6fa3601 --- /dev/null +++ b/libs/spdlog @@ -0,0 +1 @@ +Subproject commit 6fa36017cfd5731d617e1a934f0e5ea9c4445b13 diff --git a/vtimeline.cpp b/vtimeline.cpp new file mode 100644 index 0000000..20fd2d7 --- /dev/null +++ b/vtimeline.cpp @@ -0,0 +1,319 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// device pre alloc memory +#define DEVICE_BUFFER_POOL_PRE_ALLOC_SIZE 3 +#define DEVICE_BUFFER_PRE_ALLOC_SIZE 3200000 + +// 1s == 1000ms flush +#define CUPTI_FLUSH_TIME 1000 + +// host pre alloc memory 1mb * 10 +#define CLIENT_BUFFER_SIZE static_cast(1 * 1024 * 1024) +#define CLIENT_BUFFER_NUM 10 +#define CLIENT_BUFFER_LIMIT 32 + +namespace { + +using buffer_ptr = std::unique_ptr; + +std::mutex g_buffer_pool_mutex; +std::vector g_buffer_pool(CLIENT_BUFFER_NUM); +std::size_t g_buffer_pool_cnt = CLIENT_BUFFER_NUM; + +std::thread g_consume_buffer_from_cupti_task; +std::condition_variable g_client_buffer_notify; +std::mutex g_process_buffer_mutex; +std::vector> g_process_buffer; + +std::uint8_t g_need_to_stop = 0; + +// ##################################// +// prepare the name of dir and path // +// ##################################// +const char *_logger_dir = std::getenv("VTIMELINE_LOGGER_DIR"); +const char *_hostname = std::getenv("HOSTNAME"); +const char *_local_rank = std::getenv("LOCAL_RANK"); +const char *_rank = std::getenv("RANK"); + +const std::string g_logger_dir = + _logger_dir == nullptr ? "/var/log" : std::string(_logger_dir); +const std::string g_hostname = + _hostname == nullptr ? "unkown-hostname" : std::string(_hostname); +const std::string g_local_rank = _local_rank == nullptr ? "-1" : std::string(_local_rank); +const std::string g_rank = _rank == nullptr ? "-1" : std::string(_rank); + +void init_spdlog_env() { + const size_t max_file_size = static_cast(200 * 1024 * 1024); // 200M + const size_t max_files = 5; + + const size_t q_size = 102400; + const size_t thread_count = 3; + + const uint64_t flush_time = 3; + + static std::mutex env_mutex; + std::lock_guard locker(env_mutex); + + std::string log_dir_name = g_logger_dir + "/megatron_cupti/" + g_hostname; + std::string log_file_name = log_dir_name + "/megatron_" + g_local_rank + ".log"; + + auto rotating_sink = std::make_shared( + log_file_name, max_file_size, max_files); + + std::vector sinks{rotating_sink}; + + spdlog::init_thread_pool(q_size, thread_count); + + auto logger = std::make_shared( + "cupti", sinks.begin(), sinks.end(), spdlog::thread_pool(), + spdlog::async_overflow_policy::discard_new); + + spdlog::set_default_logger(logger); + spdlog::set_pattern("%v"); + spdlog::flush_every(std::chrono::seconds(flush_time)); +} + +void CUPTIAPI requestBuffer(uint8_t **buffer, size_t *size, size_t *maxNumRecords) { + *maxNumRecords = 0; + + { + std::unique_lock locker(g_buffer_pool_mutex); + if (!g_buffer_pool.empty()) { + *buffer = g_buffer_pool.back().release(); + *size = CLIENT_BUFFER_SIZE; + g_buffer_pool.pop_back(); + return; + } + } + + if (g_buffer_pool_cnt >= CLIENT_BUFFER_LIMIT) { + std::cerr << "Buffer pool limit reached, cannot allocate new buffer!\n"; + *buffer = nullptr; + *size = 0; + return; + } + + // extend buffer + std::cerr << "Allocating new buffer, pool count: " << g_buffer_pool_cnt << "\n"; + auto buffer_addr = std::make_unique(CLIENT_BUFFER_SIZE); + + *buffer = buffer_addr.release(); + *size = CLIENT_BUFFER_SIZE; + g_buffer_pool_cnt += 1; +} + +// NOTE: the context and stream_id is NULL, be deprecated as of CUDA 6.0 +void CUPTIAPI processBuffer(CUcontext context [[maybe_unused]], + uint32_t stream_id [[maybe_unused]], uint8_t *buffer, + size_t size, size_t valid_size) { + if (size == 0 || buffer == nullptr) { + return; + } + + { + std::unique_lock locker(g_process_buffer_mutex); + auto item = std::make_pair(buffer_ptr(buffer), valid_size); + g_process_buffer.emplace_back(std::move(item)); + } + + g_client_buffer_notify.notify_one(); +} + +void consume_record_from_buffer(CUpti_Activity *record) { + switch (record->kind) { + case CUPTI_ACTIVITY_KIND_KERNEL: { + CUpti_ActivityKernel9 *kernel = reinterpret_cast(record); + // kernel be put in the queue to ready to submit + uint64_t start_ts = kernel->start; + uint64_t end_ts = kernel->end; + + uint32_t stream_id = kernel->streamId; + const std::string name(kernel->name); + + // one rank for one device + spdlog::info("{},{},{},{},{},{}", start_ts, g_rank, stream_id, "KERNEL", name, + "B"); + spdlog::info("{},{},{},{},{},{}", end_ts, g_rank, stream_id, "KERNEL", name, "E"); + break; + } + + default: + break; + } +} + +void consume_buffer_from_cupti() { + while (true) { + buffer_ptr buffer_addr; + size_t valid_size = 0; + + { + std::unique_lock locker(g_process_buffer_mutex); + if (g_process_buffer.empty()) { + return; + } + auto buffer_item = std::move(g_process_buffer.back()); + g_process_buffer.pop_back(); + + buffer_addr = std::move(buffer_item.first); + valid_size = buffer_item.second; + } + + if (valid_size > 0) { + CUpti_Activity *record = nullptr; + auto *buffer = buffer_addr.get(); + do { + CUptiResult status = + cuptiActivityGetNextRecord(buffer, valid_size, &record); + + if (status == CUPTI_SUCCESS) { + consume_record_from_buffer(record); + } else { + break; + } + } while (true); + } + + { + std::unique_lock locker(g_buffer_pool_mutex); + g_buffer_pool.emplace_back(std::move(buffer_addr)); + } + } +} + +void init_buffer_process_task() { + // init the buffer pool + std::generate_n(g_buffer_pool.begin(), CLIENT_BUFFER_NUM, []() { + return std::make_unique(CLIENT_BUFFER_SIZE); + }); + + g_consume_buffer_from_cupti_task = std::move(std::thread([]() { + g_need_to_stop = 0; + + while (true) { + { + std::unique_lock locker(g_process_buffer_mutex); + g_client_buffer_notify.wait(locker, []() { + return !g_process_buffer.empty() || g_need_to_stop; + }); + } + + // process record + consume_buffer_from_cupti(); + + if (g_need_to_stop) { + return; + } + } + })); +} + +bool init_cupti_env() { + CUpti_SubscriberHandle handler = nullptr; + CUptiResult result = cuptiSubscribe(&handler, nullptr, nullptr); + if (result != CUPTI_SUCCESS) { + std::cerr << "Failed to subscribe CUPTI, error code " << static_cast(result) + << "\n"; + return false; + } + + size_t attr_value_size = 0; + + // set device buffer pool limit == pre allocate value + // set device buffer size, so peak cuda memory size is buffer size * buffer limit + // set flush time + // set callbacks to enable the function + size_t buffer_pool_limit_size = DEVICE_BUFFER_POOL_PRE_ALLOC_SIZE; + attr_value_size = sizeof(buffer_pool_limit_size); + result = cuptiActivitySetAttribute(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_POOL_LIMIT, + &attr_value_size, (void *)&buffer_pool_limit_size); + if (result != CUPTI_SUCCESS) { + std::cerr << "Failed to set device buffer pool limit, error code: " + << static_cast(result) << "\n"; + return false; + } + + result = + cuptiActivitySetAttribute(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_PRE_ALLOCATE_VALUE, + &attr_value_size, (void *)&buffer_pool_limit_size); + if (result != CUPTI_SUCCESS) { + std::cerr << "Failed to set device buffer pre-allocate value, error code: " + << static_cast(result) << "\n"; + return false; + } + + size_t buffer_size = DEVICE_BUFFER_PRE_ALLOC_SIZE; + attr_value_size = sizeof(buffer_size); + result = cuptiActivitySetAttribute(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_SIZE, + &attr_value_size, (void *)&buffer_size); + if (result != CUPTI_SUCCESS) { + std::cerr << "Failed to set device buffer size, error code: " + << static_cast(result) << "\n"; + return false; + } + + result = cuptiActivityRegisterCallbacks(requestBuffer, processBuffer); + if (result != CUPTI_SUCCESS) { + std::cerr << "Failed to register activity callbacks, error code: " + << static_cast(result) << "\n"; + return false; + } + + return true; +} + +void enable_cupti_activity() { + std::array activity_kinds = { + CUPTI_ACTIVITY_KIND_KERNEL, + }; + + for (const CUpti_ActivityKind kind : activity_kinds) { + CUptiResult result = cuptiActivityEnable(kind); + if (result != CUPTI_SUCCESS) { + std::cerr << "Failed to enable activity: " << static_cast(kind) + << ", error code: " << static_cast(result) << "\n"; + } + } +} + +} // namespace + +extern "C" int enable_vtimeline(void) { + ::init_spdlog_env(); + + ::init_buffer_process_task(); + + if (!::init_cupti_env()) { + return 1; + } + + ::enable_cupti_activity(); + + return 0; +} + +extern "C" int disable_vtimeline(void) { + cuptiActivityFlushAll(CUPTI_ACTIVITY_FLAG_FLUSH_FORCED); + + g_need_to_stop = 1; + g_client_buffer_notify.notify_one(); + + // wait the backgroud task + g_consume_buffer_from_cupti_task.join(); + + return 0; +} \ No newline at end of file