#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // device pre alloc memory #define DEVICE_BUFFER_POOL_PRE_ALLOC_SIZE 3 #define DEVICE_BUFFER_PRE_ALLOC_SIZE 3200000 // 1s == 1000ms flush #define CUPTI_FLUSH_TIME 1000 // host pre alloc memory 1mb * 10 #define CLIENT_BUFFER_SIZE static_cast(1 * 1024 * 1024) #define CLIENT_BUFFER_NUM 10 #define CLIENT_BUFFER_LIMIT 32 namespace { using buffer_ptr = std::unique_ptr; std::mutex g_buffer_pool_mutex; std::vector g_buffer_pool(CLIENT_BUFFER_NUM); std::size_t g_buffer_pool_cnt = CLIENT_BUFFER_NUM; std::thread g_consume_buffer_from_cupti_task; std::condition_variable g_client_buffer_notify; std::mutex g_process_buffer_mutex; std::vector> g_process_buffer; std::uint8_t g_need_to_stop = 0; // ##################################// // prepare the name of dir and path // // ##################################// const char *_logger_dir = std::getenv("VTIMELINE_LOGGER_DIR"); const char *_hostname = std::getenv("HOSTNAME"); const char *_local_rank = std::getenv("LOCAL_RANK"); const char *_rank = std::getenv("RANK"); const std::string g_logger_dir = _logger_dir == nullptr ? "/var/log" : std::string(_logger_dir); const std::string g_hostname = _hostname == nullptr ? "unkown-hostname" : std::string(_hostname); const std::string g_local_rank = _local_rank == nullptr ? "-1" : std::string(_local_rank); const std::string g_rank = _rank == nullptr ? "-1" : std::string(_rank); void init_spdlog_env() { const size_t max_file_size = static_cast(200 * 1024 * 1024); // 200M const size_t max_files = 5; const size_t q_size = 102400; const size_t thread_count = 3; const uint64_t flush_time = 3; static std::mutex env_mutex; std::lock_guard locker(env_mutex); std::string log_dir_name = g_logger_dir + "/cupti/" + g_hostname; std::string log_file_name = log_dir_name + "/rank_" + g_local_rank + ".log"; auto rotating_sink = std::make_shared( log_file_name, max_file_size, max_files); std::vector sinks{rotating_sink}; spdlog::init_thread_pool(q_size, thread_count); auto logger = std::make_shared( "cupti", sinks.begin(), sinks.end(), spdlog::thread_pool(), spdlog::async_overflow_policy::discard_new); spdlog::set_default_logger(logger); spdlog::set_pattern("%v"); spdlog::flush_every(std::chrono::seconds(flush_time)); } void CUPTIAPI requestBuffer(uint8_t **buffer, size_t *size, size_t *maxNumRecords) { *maxNumRecords = 0; { std::unique_lock locker(g_buffer_pool_mutex); if (!g_buffer_pool.empty()) { *buffer = g_buffer_pool.back().release(); *size = CLIENT_BUFFER_SIZE; g_buffer_pool.pop_back(); return; } } if (g_buffer_pool_cnt >= CLIENT_BUFFER_LIMIT) { std::cerr << "Buffer pool limit reached, cannot allocate new buffer!\n"; *buffer = nullptr; *size = 0; return; } // extend buffer std::cerr << "Allocating new buffer, pool count: " << g_buffer_pool_cnt << "\n"; auto buffer_addr = std::make_unique(CLIENT_BUFFER_SIZE); *buffer = buffer_addr.release(); *size = CLIENT_BUFFER_SIZE; g_buffer_pool_cnt += 1; } // NOTE: the context and stream_id is NULL, be deprecated as of CUDA 6.0 void CUPTIAPI processBuffer(CUcontext context [[maybe_unused]], uint32_t stream_id [[maybe_unused]], uint8_t *buffer, size_t size, size_t valid_size) { if (size == 0 || buffer == nullptr) { return; } { std::unique_lock locker(g_process_buffer_mutex); auto item = std::make_pair(buffer_ptr(buffer), valid_size); g_process_buffer.emplace_back(std::move(item)); } g_client_buffer_notify.notify_one(); } void consume_record_from_buffer(CUpti_Activity *record) { switch (record->kind) { case CUPTI_ACTIVITY_KIND_KERNEL: { CUpti_ActivityKernel9 *kernel = reinterpret_cast(record); // kernel be put in the queue to ready to submit uint64_t start_ts = kernel->start; uint64_t end_ts = kernel->end; uint32_t stream_id = kernel->streamId; const std::string name(kernel->name); // one rank for one device spdlog::info("{},{},{},{},{},{}", start_ts, g_rank, stream_id, "KERNEL", name, "B"); spdlog::info("{},{},{},{},{},{}", end_ts, g_rank, stream_id, "KERNEL", name, "E"); break; } default: break; } } void consume_buffer_from_cupti() { while (true) { buffer_ptr buffer_addr; size_t valid_size = 0; { std::unique_lock locker(g_process_buffer_mutex); if (g_process_buffer.empty()) { return; } auto buffer_item = std::move(g_process_buffer.back()); g_process_buffer.pop_back(); buffer_addr = std::move(buffer_item.first); valid_size = buffer_item.second; } if (valid_size > 0) { CUpti_Activity *record = nullptr; auto *buffer = buffer_addr.get(); do { CUptiResult status = cuptiActivityGetNextRecord(buffer, valid_size, &record); if (status == CUPTI_SUCCESS) { consume_record_from_buffer(record); } else { break; } } while (true); } { std::unique_lock locker(g_buffer_pool_mutex); g_buffer_pool.emplace_back(std::move(buffer_addr)); } } } void init_buffer_process_task() { // init the buffer pool std::generate_n(g_buffer_pool.begin(), CLIENT_BUFFER_NUM, []() { return std::make_unique(CLIENT_BUFFER_SIZE); }); g_consume_buffer_from_cupti_task = std::move(std::thread([]() { g_need_to_stop = 0; while (true) { { std::unique_lock locker(g_process_buffer_mutex); g_client_buffer_notify.wait(locker, []() { return !g_process_buffer.empty() || g_need_to_stop; }); } // process record consume_buffer_from_cupti(); if (g_need_to_stop) { return; } } })); } bool init_cupti_env() { CUpti_SubscriberHandle handler = nullptr; CUptiResult result = cuptiSubscribe(&handler, nullptr, nullptr); if (result != CUPTI_SUCCESS) { std::cerr << "Failed to subscribe CUPTI, error code " << static_cast(result) << "\n"; return false; } size_t attr_value_size = 0; // set device buffer pool limit == pre allocate value // set device buffer size, so peak cuda memory size is buffer size * buffer limit // set flush time // set callbacks to enable the function size_t buffer_pool_limit_size = DEVICE_BUFFER_POOL_PRE_ALLOC_SIZE; attr_value_size = sizeof(buffer_pool_limit_size); result = cuptiActivitySetAttribute(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_POOL_LIMIT, &attr_value_size, (void *)&buffer_pool_limit_size); if (result != CUPTI_SUCCESS) { std::cerr << "Failed to set device buffer pool limit, error code: " << static_cast(result) << "\n"; return false; } result = cuptiActivitySetAttribute(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_PRE_ALLOCATE_VALUE, &attr_value_size, (void *)&buffer_pool_limit_size); if (result != CUPTI_SUCCESS) { std::cerr << "Failed to set device buffer pre-allocate value, error code: " << static_cast(result) << "\n"; return false; } size_t buffer_size = DEVICE_BUFFER_PRE_ALLOC_SIZE; attr_value_size = sizeof(buffer_size); result = cuptiActivitySetAttribute(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_SIZE, &attr_value_size, (void *)&buffer_size); if (result != CUPTI_SUCCESS) { std::cerr << "Failed to set device buffer size, error code: " << static_cast(result) << "\n"; return false; } result = cuptiActivityRegisterCallbacks(requestBuffer, processBuffer); if (result != CUPTI_SUCCESS) { std::cerr << "Failed to register activity callbacks, error code: " << static_cast(result) << "\n"; return false; } return true; } void enable_cupti_activity() { std::array activity_kinds = { CUPTI_ACTIVITY_KIND_KERNEL, CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL, }; for (const CUpti_ActivityKind kind : activity_kinds) { CUptiResult result = cuptiActivityEnable(kind); if (result != CUPTI_SUCCESS) { std::cerr << "Failed to enable activity: " << static_cast(kind) << ", error code: " << static_cast(result) << "\n"; } } } } // namespace extern "C" int enable_vtimeline(void) { ::init_spdlog_env(); ::init_buffer_process_task(); if (!::init_cupti_env()) { return 1; } ::enable_cupti_activity(); return 0; } extern "C" int disable_vtimeline(void) { cuptiActivityFlushAll(CUPTI_ACTIVITY_FLAG_FLUSH_FORCED); g_need_to_stop = 1; g_client_buffer_notify.notify_one(); // wait the backgroud task g_consume_buffer_from_cupti_task.join(); return 0; }