]> git.ipfire.org Git - thirdparty/gcc.git/blob - liboffloadmic/runtime/offload_engine.cpp
00b673aebcb1777b4e632b920229dece862518bc
[thirdparty/gcc.git] / liboffloadmic / runtime / offload_engine.cpp
1 /*
2 Copyright (c) 2014-2015 Intel Corporation. All Rights Reserved.
3
4 Redistribution and use in source and binary forms, with or without
5 modification, are permitted provided that the following conditions
6 are met:
7
8 * Redistributions of source code must retain the above copyright
9 notice, this list of conditions and the following disclaimer.
10 * Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
13 * Neither the name of Intel Corporation nor the names of its
14 contributors may be used to endorse or promote products derived
15 from this software without specific prior written permission.
16
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30
31 #include "offload_engine.h"
32 #include <signal.h>
33 #include <errno.h>
34
35 #include <algorithm>
36 #include <vector>
37
38 #include "offload_host.h"
39 #include "offload_table.h"
40 #include "offload_iterator.h"
41
42 // Static members of Stream class must be described somewhere.
43 // This members describe the list of all streams defined in programm
44 // via call to _Offload_stream_create.
45 uint64_t Stream::m_streams_count = 0;
46 StreamMap Stream::all_streams;
47 mutex_t Stream::m_stream_lock;
48
49 const char* Engine::m_func_names[Engine::c_funcs_total] =
50 {
51 "server_compute",
52 #ifdef MYO_SUPPORT
53 "server_myoinit",
54 "server_myofini",
55 #endif // MYO_SUPPORT
56 "server_init",
57 "server_var_table_size",
58 "server_var_table_copy",
59 "server_set_stream_affinity"
60 };
61
62 // Symbolic representation of system signals. Fix for CQ233593
63 const char* Engine::c_signal_names[Engine::c_signal_max] =
64 {
65 "Unknown SIGNAL",
66 "SIGHUP", /* 1, Hangup (POSIX). */
67 "SIGINT", /* 2, Interrupt (ANSI). */
68 "SIGQUIT", /* 3, Quit (POSIX). */
69 "SIGILL", /* 4, Illegal instruction (ANSI). */
70 "SIGTRAP", /* 5, Trace trap (POSIX). */
71 "SIGABRT", /* 6, Abort (ANSI). */
72 "SIGBUS", /* 7, BUS error (4.2 BSD). */
73 "SIGFPE", /* 8, Floating-point exception (ANSI). */
74 "SIGKILL", /* 9, Kill, unblockable (POSIX). */
75 "SIGUSR1", /* 10, User-defined signal 1 (POSIX). */
76 "SIGSEGV", /* 11, Segmentation violation (ANSI). */
77 "SIGUSR2", /* 12, User-defined signal 2 (POSIX). */
78 "SIGPIPE", /* 13, Broken pipe (POSIX). */
79 "SIGALRM", /* 14, Alarm clock (POSIX). */
80 "SIGTERM", /* 15, Termination (ANSI). */
81 "SIGSTKFLT", /* 16, Stack fault. */
82 "SIGCHLD", /* 17, Child status has changed (POSIX). */
83 "SIGCONT", /* 18, Continue (POSIX). */
84 "SIGSTOP", /* 19, Stop, unblockable (POSIX). */
85 "SIGTSTP", /* 20, Keyboard stop (POSIX). */
86 "SIGTTIN", /* 21, Background read from tty (POSIX). */
87 "SIGTTOU", /* 22, Background write to tty (POSIX). */
88 "SIGURG", /* 23, Urgent condition on socket (4.2 BSD). */
89 "SIGXCPU", /* 24, CPU limit exceeded (4.2 BSD). */
90 "SIGXFSZ", /* 25, File size limit exceeded (4.2 BSD). */
91 "SIGVTALRM", /* 26, Virtual alarm clock (4.2 BSD). */
92 "SIGPROF", /* 27, Profiling alarm clock (4.2 BSD). */
93 "SIGWINCH", /* 28, Window size change (4.3 BSD, Sun). */
94 "SIGIO", /* 29, I/O now possible (4.2 BSD). */
95 "SIGPWR", /* 30, Power failure restart (System V). */
96 "SIGSYS" /* 31, Bad system call. */
97 };
98
99 void Engine::init(void)
100 {
101 if (!m_ready) {
102 mutex_locker_t locker(m_lock);
103
104 if (!m_ready) {
105 // start process if not done yet
106 if (m_process == 0) {
107 init_process();
108 }
109
110 // load penging images
111 load_libraries();
112
113 // and (re)build pointer table
114 init_ptr_data();
115
116 // it is ready now
117 m_ready = true;
118 }
119 }
120 }
121
122 void Engine::init_process(void)
123 {
124 COIENGINE engine;
125 COIRESULT res;
126 const char **environ;
127 char buf[4096]; // For exe path name
128
129 // create environment for the target process
130 environ = (const char**) mic_env_vars.create_environ_for_card(m_index);
131 if (environ != 0) {
132 for (const char **p = environ; *p != 0; p++) {
133 OFFLOAD_DEBUG_TRACE(3, "Env Var for card %d: %s\n", m_index, *p);
134 }
135 }
136
137 // Create execution context in the specified device
138 OFFLOAD_DEBUG_TRACE(2, "Getting device %d (engine %d) handle\n", m_index,
139 m_physical_index);
140 res = COI::EngineGetHandle(COI_ISA_MIC, m_physical_index, &engine);
141 check_result(res, c_get_engine_handle, m_index, res);
142
143 // Get engine info on threads and cores.
144 // The values of core number and thread number will be used later at stream
145 // creation by call to _Offload_stream_create(device,number_of_cpus).
146
147 COI_ENGINE_INFO engine_info;
148
149 res = COI::EngineGetInfo(engine, sizeof(COI_ENGINE_INFO), &engine_info);
150 check_result(res, c_get_engine_info, m_index, res);
151
152 // m_cpus bitset has 1 for available thread. At the begining all threads
153 // are available and m_cpus(i) is set to
154 // 1 for i = [0...engine_info.NumThreads].
155 m_cpus.reset();
156 for (int i = 0; i < engine_info.NumThreads; i++) {
157 m_cpus.set(i);
158 }
159
160 // The following values will be used at pipeline creation for streams
161 m_num_cores = engine_info.NumCores;
162 m_num_threads = engine_info.NumThreads;
163
164 // Check if OFFLOAD_DMA_CHANNEL_COUNT is set to 2
165 // Only the value 2 is supported in 16.0
166 if (mic_dma_channel_count == 2) {
167 if (COI::ProcessConfigureDMA) {
168 // Set DMA channels using COI API
169 COI::ProcessConfigureDMA(2, COI::DMA_MODE_READ_WRITE);
170 }
171 else {
172 // Set environment variable COI_DMA_CHANNEL_COUNT
173 // use putenv instead of setenv as Windows has no setenv.
174 // Note: putenv requires its argument can't be freed or modified.
175 // So no free after call to putenv or elsewhere.
176 char * env_var = (char*) malloc(sizeof("COI_DMA_CHANNEL_COUNT=2"));
177 sprintf(env_var, "COI_DMA_CHANNEL_COUNT=2");
178 putenv(env_var);
179 }
180 }
181
182 // Target executable is not available then use compiler provided offload_main
183 if (__target_exe == 0) {
184 if (mic_device_main == 0)
185 LIBOFFLOAD_ERROR(c_report_no_host_exe);
186
187 OFFLOAD_DEBUG_TRACE(2,
188 "Loading target executable %s\n",mic_device_main);
189
190 res = COI::ProcessCreateFromFile(
191 engine, // in_Engine
192 mic_device_main, // in_pBinaryName
193 0, // in_Argc
194 0, // in_ppArgv
195 environ == 0, // in_DupEnv
196 environ, // in_ppAdditionalEnv
197 mic_proxy_io, // in_ProxyActive
198 mic_proxy_fs_root, // in_ProxyfsRoot
199 mic_buffer_size, // in_BufferSpace
200 mic_library_path, // in_LibrarySearchPath
201 &m_process // out_pProcess
202 );
203 }
204 else {
205 // Target executable should be available by the time when we
206 // attempt to initialize the device
207
208 // Need the full path of the FAT exe for VTUNE
209 {
210 #ifndef TARGET_WINNT
211 ssize_t len = readlink("/proc/self/exe", buf,1000);
212 #else
213 int len = GetModuleFileName(NULL, buf,1000);
214 #endif // TARGET_WINNT
215 if (len == -1) {
216 LIBOFFLOAD_ERROR(c_report_no_host_exe);
217 exit(1);
218 }
219 else if (len > 999) {
220 LIBOFFLOAD_ERROR(c_report_path_buff_overflow);
221 exit(1);
222 }
223 buf[len] = '\0';
224 }
225
226 OFFLOAD_DEBUG_TRACE(2,
227 "Loading target executable \"%s\" from %p, size %lld, host file %s\n",
228 __target_exe->name, __target_exe->data, __target_exe->size,
229 buf);
230
231 res = COI::ProcessCreateFromMemory(
232 engine, // in_Engine
233 __target_exe->name, // in_pBinaryName
234 __target_exe->data, // in_pBinaryBuffer
235 __target_exe->size, // in_BinaryBufferLength,
236 0, // in_Argc
237 0, // in_ppArgv
238 environ == 0, // in_DupEnv
239 environ, // in_ppAdditionalEnv
240 mic_proxy_io, // in_ProxyActive
241 mic_proxy_fs_root, // in_ProxyfsRoot
242 mic_buffer_size, // in_BufferSpace
243 mic_library_path, // in_LibrarySearchPath
244 buf, // in_FileOfOrigin
245 -1, // in_FileOfOriginOffset use -1 to indicate to
246 // COI that is is a FAT binary
247 &m_process // out_pProcess
248 );
249 }
250 check_result(res, c_process_create, m_index, res);
251
252 if ((mic_4k_buffer_size != 0) || (mic_2m_buffer_size !=0)) {
253 // available only in MPSS 4.2 and greater
254 if (COI::ProcessSetCacheSize != 0 ) {
255 int flags;
256 // Need compiler to use MPSS 3.2 or greater to get these
257 // definition so currently hardcoding it
258 // COI_CACHE_ACTION_GROW_NOW && COI_CACHE_MODE_ONDEMAND_SYNC;
259 flags = 0x00020002;
260 res = COI::ProcessSetCacheSize(
261 m_process, // in_Process
262 mic_2m_buffer_size, // in_HugePagePoolSize
263 flags, // inHugeFlags
264 mic_4k_buffer_size, // in_SmallPagePoolSize
265 flags, // inSmallFlags
266 0, // in_NumDependencies
267 0, // in_pDependencies
268 0 // out_PCompletion
269 );
270 OFFLOAD_DEBUG_TRACE(2,
271 "Reserve target buffers 4K pages = %d 2M pages = %d\n",
272 mic_4k_buffer_size, mic_2m_buffer_size);
273 check_result(res, c_process_set_cache_size, m_index, res);
274 }
275 else {
276 OFFLOAD_DEBUG_TRACE(2,
277 "Reserve target buffers not supported in current MPSS\n");
278 }
279 }
280
281 // get function handles
282 res = COI::ProcessGetFunctionHandles(m_process, c_funcs_total,
283 m_func_names, m_funcs);
284 check_result(res, c_process_get_func_handles, m_index, res);
285
286 // initialize device side
287 pid_t pid = init_device();
288
289 // For IDB
290 if (__dbg_is_attached) {
291 // TODO: we have in-memory executable now.
292 // Check with IDB team what should we provide them now?
293 if (strlen(__target_exe->name) < MAX_TARGET_NAME) {
294 strcpy(__dbg_target_exe_name, __target_exe->name);
295 }
296 __dbg_target_so_pid = pid;
297 __dbg_target_id = m_physical_index;
298 __dbg_target_so_loaded();
299 }
300 }
301
302 void Engine::fini_process(bool verbose)
303 {
304 if (m_process != 0) {
305 uint32_t sig;
306 int8_t ret;
307
308 // destroy target process
309 OFFLOAD_DEBUG_TRACE(2, "Destroying process on the device %d\n",
310 m_index);
311
312 COIRESULT res = COI::ProcessDestroy(m_process, -1, 0, &ret, &sig);
313 m_process = 0;
314
315 if (res == COI_SUCCESS) {
316 OFFLOAD_DEBUG_TRACE(3, "Device process: signal %d, exit code %d\n",
317 sig, ret);
318 if (verbose) {
319 if (sig != 0) {
320 LIBOFFLOAD_ERROR(
321 c_mic_process_exit_sig, m_index, sig,
322 c_signal_names[sig >= c_signal_max ? 0 : sig]);
323 }
324 else {
325 LIBOFFLOAD_ERROR(c_mic_process_exit_ret, m_index, ret);
326 }
327 }
328
329 // for idb
330 if (__dbg_is_attached) {
331 __dbg_target_so_unloaded();
332 }
333 }
334 else {
335 if (verbose) {
336 LIBOFFLOAD_ERROR(c_mic_process_exit, m_index);
337 }
338 }
339 }
340 }
341
342 void Engine::load_libraries()
343 {
344 // load libraries collected so far
345 for (TargetImageList::iterator it = m_images.begin();
346 it != m_images.end(); it++) {
347 OFFLOAD_DEBUG_TRACE(2,
348 "Loading library \"%s\" from %p, size %llu, host file %s\n",
349 it->name, it->data, it->size, it->origin);
350
351 // load library to the device
352 COILIBRARY lib;
353 COIRESULT res;
354 res = COI::ProcessLoadLibraryFromMemory(m_process,
355 it->data,
356 it->size,
357 it->name,
358 mic_library_path,
359 it->origin,
360 (it->origin) ? -1 : 0,
361 COI_LOADLIBRARY_V1_FLAGS,
362 &lib);
363 m_dyn_libs.push_front(DynLib(it->name, it->data, lib));
364
365 if (res != COI_SUCCESS && res != COI_ALREADY_EXISTS) {
366 check_result(res, c_load_library, m_index, res);
367 }
368 }
369 m_images.clear();
370 }
371
372 void Engine::unload_library(const void *data, const char *name)
373 {
374 if (m_process == 0) {
375 return;
376 }
377 for (DynLibList::iterator it = m_dyn_libs.begin();
378 it != m_dyn_libs.end(); it++) {
379 if (it->data == data) {
380 COIRESULT res;
381 OFFLOAD_DEBUG_TRACE(2,
382 "Unloading library \"%s\"\n",name);
383 res = COI::ProcessUnloadLibrary(m_process,it->lib);
384 m_dyn_libs.erase(it);
385 if (res != COI_SUCCESS) {
386 check_result(res, c_unload_library, m_index, res);
387 }
388 return;
389 }
390 }
391 }
392
393 static bool target_entry_cmp(
394 const VarList::BufEntry &l,
395 const VarList::BufEntry &r
396 )
397 {
398 const char *l_name = reinterpret_cast<const char*>(l.name);
399 const char *r_name = reinterpret_cast<const char*>(r.name);
400 return strcmp(l_name, r_name) < 0;
401 }
402
403 static bool host_entry_cmp(
404 const VarTable::Entry *l,
405 const VarTable::Entry *r
406 )
407 {
408 return strcmp(l->name, r->name) < 0;
409 }
410
411 void Engine::init_ptr_data(void)
412 {
413 COIRESULT res;
414 COIEVENT event;
415
416 // Prepare table of host entries
417 std::vector<const VarTable::Entry*> host_table(
418 Iterator(__offload_vars.get_head()),
419 Iterator());
420
421 // no need to do anything further is host table is empty
422 if (host_table.size() <= 0) {
423 return;
424 }
425
426 // Get var table entries from the target.
427 // First we need to get size for the buffer to copy data
428 struct {
429 int64_t nelems;
430 int64_t length;
431 } params;
432
433 res = COI::PipelineRunFunction(get_pipeline(),
434 m_funcs[c_func_var_table_size],
435 0, 0, 0,
436 0, 0,
437 0, 0,
438 &params, sizeof(params),
439 &event);
440 check_result(res, c_pipeline_run_func, m_index, res);
441
442 res = COI::EventWait(1, &event, -1, 1, 0, 0);
443 check_result(res, c_event_wait, res);
444
445 if (params.length == 0) {
446 return;
447 }
448
449 // create buffer for target entries and copy data to host
450 COIBUFFER buffer;
451 res = COI::BufferCreate(params.length, COI_BUFFER_NORMAL, 0, 0, 1,
452 &m_process, &buffer);
453 check_result(res, c_buf_create, m_index, res);
454
455 COI_ACCESS_FLAGS flags = COI_SINK_WRITE;
456 res = COI::PipelineRunFunction(get_pipeline(),
457 m_funcs[c_func_var_table_copy],
458 1, &buffer, &flags,
459 0, 0,
460 &params.nelems, sizeof(params.nelems),
461 0, 0,
462 &event);
463 check_result(res, c_pipeline_run_func, m_index, res);
464
465 res = COI::EventWait(1, &event, -1, 1, 0, 0);
466 check_result(res, c_event_wait, res);
467
468 // patch names in target data
469 VarList::BufEntry *target_table;
470 COIMAPINSTANCE map_inst;
471 res = COI::BufferMap(buffer, 0, params.length, COI_MAP_READ_ONLY, 0, 0,
472 0, &map_inst,
473 reinterpret_cast<void**>(&target_table));
474 check_result(res, c_buf_map, res);
475
476 VarList::table_patch_names(target_table, params.nelems);
477
478 // and sort entries
479 std::sort(target_table, target_table + params.nelems, target_entry_cmp);
480 std::sort(host_table.begin(), host_table.end(), host_entry_cmp);
481
482 // merge host and target entries and enter matching vars map
483 std::vector<const VarTable::Entry*>::const_iterator hi =
484 host_table.begin();
485 std::vector<const VarTable::Entry*>::const_iterator he =
486 host_table.end();
487 const VarList::BufEntry *ti = target_table;
488 const VarList::BufEntry *te = target_table + params.nelems;
489
490 while (hi != he && ti != te) {
491 int res = strcmp((*hi)->name, reinterpret_cast<const char*>(ti->name));
492 if (res == 0) {
493 bool is_new;
494 // add matching entry to var map
495 PtrData *ptr = insert_ptr_data((*hi)->addr, (*hi)->size, is_new);
496
497 // store address for new entries
498 if (is_new) {
499 ptr->mic_addr = ti->addr;
500 ptr->is_static = true;
501 }
502 ptr->alloc_ptr_data_lock.unlock();
503 hi++;
504 ti++;
505 }
506 else if (res < 0) {
507 hi++;
508 }
509 else {
510 ti++;
511 }
512 }
513
514 // cleanup
515 res = COI::BufferUnmap(map_inst, 0, 0, 0);
516 check_result(res, c_buf_unmap, res);
517
518 res = COI::BufferDestroy(buffer);
519 check_result(res, c_buf_destroy, res);
520 }
521
522 COIRESULT Engine::compute(
523 _Offload_stream stream,
524 const std::list<COIBUFFER> &buffers,
525 const void* data,
526 uint16_t data_size,
527 void* ret,
528 uint16_t ret_size,
529 uint32_t num_deps,
530 const COIEVENT* deps,
531 COIEVENT* event
532 ) /* const */
533 {
534 COIBUFFER *bufs;
535 COI_ACCESS_FLAGS *flags;
536 COIRESULT res;
537
538 // convert buffers list to array
539 int num_bufs = buffers.size();
540 if (num_bufs > 0) {
541 bufs = (COIBUFFER*) alloca(num_bufs * sizeof(COIBUFFER));
542 flags = (COI_ACCESS_FLAGS*) alloca(num_bufs *
543 sizeof(COI_ACCESS_FLAGS));
544
545 int i = 0;
546 for (std::list<COIBUFFER>::const_iterator it = buffers.begin();
547 it != buffers.end(); it++) {
548 bufs[i] = *it;
549
550 // TODO: this should be fixed
551 flags[i++] = COI_SINK_WRITE;
552 }
553 }
554 else {
555 bufs = 0;
556 flags = 0;
557 }
558 COIPIPELINE pipeline = (stream == no_stream) ?
559 get_pipeline() :
560 get_pipeline(stream);
561 // start computation
562 res = COI::PipelineRunFunction(pipeline,
563 m_funcs[c_func_compute],
564 num_bufs, bufs, flags,
565 num_deps, deps,
566 data, data_size,
567 ret, ret_size,
568 event);
569 return res;
570 }
571
572 pid_t Engine::init_device(void)
573 {
574 struct init_data {
575 int device_index;
576 int devices_total;
577 int console_level;
578 int offload_report_level;
579 } data;
580 COIRESULT res;
581 COIEVENT event;
582 pid_t pid;
583
584 OFFLOAD_DEBUG_TRACE_1(2, 0, c_offload_init,
585 "Initializing device with logical index %d "
586 "and physical index %d\n",
587 m_index, m_physical_index);
588
589 // setup misc data
590 data.device_index = m_index;
591 data.devices_total = mic_engines_total;
592 data.console_level = console_enabled;
593 data.offload_report_level = offload_report_level;
594
595 res = COI::PipelineRunFunction(get_pipeline(),
596 m_funcs[c_func_init],
597 0, 0, 0, 0, 0,
598 &data, sizeof(data),
599 &pid, sizeof(pid),
600 &event);
601 check_result(res, c_pipeline_run_func, m_index, res);
602
603 res = COI::EventWait(1, &event, -1, 1, 0, 0);
604 check_result(res, c_event_wait, res);
605
606 OFFLOAD_DEBUG_TRACE(2, "Device process pid is %d\n", pid);
607
608 return pid;
609 }
610
611 // data associated with each thread
612 struct Thread {
613 Thread(long* addr_coipipe_counter) {
614 m_addr_coipipe_counter = addr_coipipe_counter;
615 memset(m_pipelines, 0, sizeof(m_pipelines));
616 }
617
618 ~Thread() {
619 #ifndef TARGET_WINNT
620 __sync_sub_and_fetch(m_addr_coipipe_counter, 1);
621 #else // TARGET_WINNT
622 _InterlockedDecrement(m_addr_coipipe_counter);
623 #endif // TARGET_WINNT
624 for (int i = 0; i < mic_engines_total; i++) {
625 if (m_pipelines[i] != 0) {
626 COI::PipelineDestroy(m_pipelines[i]);
627 }
628 }
629 }
630
631 COIPIPELINE get_pipeline(int index) const {
632 return m_pipelines[index];
633 }
634
635 void set_pipeline(int index, COIPIPELINE pipeline) {
636 m_pipelines[index] = pipeline;
637 }
638
639 AutoSet& get_auto_vars() {
640 return m_auto_vars;
641 }
642
643 private:
644 long* m_addr_coipipe_counter;
645 AutoSet m_auto_vars;
646 COIPIPELINE m_pipelines[MIC_ENGINES_MAX];
647 };
648
649 COIPIPELINE Engine::get_pipeline(void)
650 {
651 Thread* thread = (Thread*) thread_getspecific(mic_thread_key);
652 if (thread == 0) {
653 thread = new Thread(&m_proc_number);
654 thread_setspecific(mic_thread_key, thread);
655 }
656
657 COIPIPELINE pipeline = thread->get_pipeline(m_index);
658 if (pipeline == 0) {
659 COIRESULT res;
660 int proc_num;
661
662 #ifndef TARGET_WINNT
663 proc_num = __sync_fetch_and_add(&m_proc_number, 1);
664 #else // TARGET_WINNT
665 proc_num = _InterlockedIncrement(&m_proc_number);
666 #endif // TARGET_WINNT
667
668 if (proc_num > COI_PIPELINE_MAX_PIPELINES) {
669 LIBOFFLOAD_ERROR(c_coipipe_max_number, COI_PIPELINE_MAX_PIPELINES);
670 LIBOFFLOAD_ABORT;
671 }
672 // create pipeline for this thread
673 res = COI::PipelineCreate(m_process, 0, mic_stack_size, &pipeline);
674 check_result(res, c_pipeline_create, m_index, res);
675 thread->set_pipeline(m_index, pipeline);
676 }
677 return pipeline;
678 }
679
680 Stream* Stream::find_stream(uint64_t handle, bool remove)
681 {
682 Stream *stream = 0;
683
684 m_stream_lock.lock();
685 {
686 StreamMap::iterator it = all_streams.find(handle);
687 if (it != all_streams.end()) {
688 stream = it->second;
689 if (remove) {
690 all_streams.erase(it);
691 }
692 }
693 }
694 m_stream_lock.unlock();
695 return stream;
696 }
697
698 COIPIPELINE Engine::get_pipeline(_Offload_stream handle)
699 {
700 Stream * stream = Stream::find_stream(handle, false);
701
702 if (!stream) {
703 LIBOFFLOAD_ERROR(c_offload_no_stream, m_index);
704 LIBOFFLOAD_ABORT;
705 }
706
707 COIPIPELINE pipeline = stream->get_pipeline();
708
709 if (pipeline == 0) {
710 COIRESULT res;
711 int proc_num;
712 COI_CPU_MASK in_Mask ;
713
714 #ifndef TARGET_WINNT
715 proc_num = __sync_fetch_and_add(&m_proc_number, 1);
716 #else // TARGET_WINNT
717 proc_num = _InterlockedIncrement(&m_proc_number);
718 #endif // TARGET_WINNT
719
720 if (proc_num > COI_PIPELINE_MAX_PIPELINES) {
721 LIBOFFLOAD_ERROR(c_coipipe_max_number, COI_PIPELINE_MAX_PIPELINES);
722 LIBOFFLOAD_ABORT;
723 }
724
725 m_stream_lock.lock();
726
727 // start process if not done yet
728 if (m_process == 0) {
729 init_process();
730 }
731
732 // create CPUmask
733 res = COI::PipelineClearCPUMask(in_Mask);
734 check_result(res, c_clear_cpu_mask, m_index, res);
735
736 int stream_cpu_num = stream->get_cpu_number();
737
738 stream->m_stream_cpus.reset();
739
740 int threads_per_core = m_num_threads / m_num_cores;
741
742 // The "stream_cpu_num" available threads is set in mask.
743 // Available threads are defined by examining of m_cpus bitset.
744 // We skip thread 0 .
745 for (int i = 1; i < m_num_threads; i++) {
746 // for available thread i m_cpus[i] is equal to 1
747 if (m_cpus[i]) {
748 res = COI::PipelineSetCPUMask(m_process,
749 i / threads_per_core,
750 i % threads_per_core,
751 in_Mask);
752
753 check_result(res, c_set_cpu_mask, res);
754 // mark thread i as nonavailable
755 m_cpus.set(i,0);
756 // Mark thread i as given for the stream.
757 // In case of stream destroying by call to
758 // _Offload_stream_destroy we can mark the thread i as
759 // available.
760 stream->m_stream_cpus.set(i);
761 if (--stream_cpu_num <= 0) {
762 break;
763 }
764 }
765 }
766
767 // if stream_cpu_num is greater than 0 there are not enough
768 // available threads
769 if (stream_cpu_num > 0) {
770 LIBOFFLOAD_ERROR(c_create_pipeline_for_stream, m_num_threads);
771 LIBOFFLOAD_ABORT;
772 }
773 // create pipeline for this thread
774 OFFLOAD_DEBUG_TRACE(2, "COIPipelineCreate Mask\n"
775 "%016lx %016lx %016lx %016lx\n%016lx %016lx %016lx %016lx\n"
776 "%016lx %016lx %016lx %016lx\n%016lx %016lx %016lx %016lx\n",
777 in_Mask[0], in_Mask[1], in_Mask[2], in_Mask[3],
778 in_Mask[4], in_Mask[5], in_Mask[6], in_Mask[7],
779 in_Mask[8], in_Mask[9], in_Mask[10], in_Mask[11],
780 in_Mask[12], in_Mask[13], in_Mask[14], in_Mask[15]);
781 res = COI::PipelineCreate(m_process, in_Mask,
782 mic_stack_size, &pipeline);
783 check_result(res, c_pipeline_create, m_index, res);
784
785 // Set stream's affinities
786 {
787 struct affinity_spec affinity_spec;
788 char* affinity_type;
789 int i;
790
791 // "compact" by default
792 affinity_spec.affinity_type = affinity_compact;
793
794 // Check if user has specified type of affinity
795 if ((affinity_type = getenv("OFFLOAD_STREAM_AFFINITY")) !=
796 NULL)
797 {
798 char affinity_str[16];
799 int affinity_str_len;
800
801 OFFLOAD_DEBUG_TRACE(2,
802 "User has specified OFFLOAD_STREAM_AFFINITY=%s\n",
803 affinity_type);
804
805 // Set type of affinity requested
806 affinity_str_len = strlen(affinity_type);
807 for (i=0; i<affinity_str_len && i<15; i++)
808 {
809 affinity_str[i] = tolower(affinity_type[i]);
810 }
811 affinity_str[i] = '\0';
812 if (strcmp(affinity_str, "compact") == 0) {
813 affinity_spec.affinity_type = affinity_compact;
814 OFFLOAD_DEBUG_TRACE(2, "Setting affinity=compact\n");
815 } else if (strcmp(affinity_str, "scatter") == 0) {
816 affinity_spec.affinity_type = affinity_scatter;
817 OFFLOAD_DEBUG_TRACE(2, "Setting affinity=scatter\n");
818 } else {
819 LIBOFFLOAD_ERROR(c_incorrect_affinity, affinity_str);
820 affinity_spec.affinity_type = affinity_compact;
821 OFFLOAD_DEBUG_TRACE(2, "Setting affinity=compact\n");
822 }
823 }
824 // Make flat copy of sink mask because COI's mask is opaque
825 for (i=0; i<16; i++) {
826 affinity_spec.sink_mask[i] = in_Mask[i];
827 }
828 // Set number of cores and threads
829 affinity_spec.num_cores = m_num_cores;
830 affinity_spec.num_threads = m_num_threads;
831
832 COIEVENT event;
833 res = COI::PipelineRunFunction(pipeline,
834 m_funcs[c_func_set_stream_affinity],
835 0, 0, 0,
836 0, 0,
837 &affinity_spec, sizeof(affinity_spec),
838 0, 0,
839 &event);
840 check_result(res, c_pipeline_run_func, m_index, res);
841
842 res = COI::EventWait(1, &event, -1, 1, 0, 0);
843 check_result(res, c_event_wait, res);
844 }
845
846 m_stream_lock.unlock();
847 stream->set_pipeline(pipeline);
848 }
849 return pipeline;
850 }
851
852 void Engine::stream_destroy(_Offload_stream handle)
853 {
854 // get stream
855 Stream * stream = Stream::find_stream(handle, true);
856
857 if (stream) {
858 // return cpus for future use
859 for (int i = 0; i < m_num_threads; i++) {
860 if (stream->m_stream_cpus.test(i)) {
861 m_cpus.set(i);
862 }
863 }
864 delete stream;
865 }
866 else {
867 LIBOFFLOAD_ERROR(c_offload_no_stream, m_index);
868 LIBOFFLOAD_ABORT;
869 }
870 }
871
872 uint64_t Engine::get_thread_id(void)
873 {
874 Thread* thread = (Thread*) thread_getspecific(mic_thread_key);
875 if (thread == 0) {
876 thread = new Thread(&m_proc_number);
877 thread_setspecific(mic_thread_key, thread);
878 }
879
880 return reinterpret_cast<uint64_t>(thread);
881 }
882
883 AutoSet& Engine::get_auto_vars(void)
884 {
885 Thread* thread = (Thread*) thread_getspecific(mic_thread_key);
886 if (thread == 0) {
887 thread = new Thread(&m_proc_number);
888 thread_setspecific(mic_thread_key, thread);
889 }
890
891 return thread->get_auto_vars();
892 }
893
894 void Engine::destroy_thread_data(void *data)
895 {
896 delete static_cast<Thread*>(data);
897 }