multithreading - Threading in c# to build a distributed DFS -


i have been trying implement distributed depth first search in c#. have beem successful upto point have got synchronisation error. not able rectify error. trying make each node communicate 1 other using task parallel dataflow , thereby attain parallelism in dfs. below code:

public class dfs { static list<string> traversedlist = new list<string>();  static list<string> parentlist = new list<string>(); static thread[] thread_array; static bufferblock<object> buffer1 = new bufferblock<object>();  public static void main(string[] args) {      int n = 100;     int m = n * 4;     int p = n * 16;      stopwatch stopwatch = new stopwatch();     stopwatch.start();      list<string> global_list = new list<string>();      streamreader file = new streamreader(args[args.length - 2]);       string text = file.readtoend();      string[] lines = text.split('\n');        string[][] array1 = new string[lines.length][];      (int = 0; < lines.length; i++)     {         lines[i] = lines[i].trim();         string[] words = lines[i].split(' ');          array1[i] = new string[words.length];          (int j = 0; j < words.length; j++)         {             array1[i][j] = words[j];         }     }      streamwriter sr = new streamwriter("e:\\newtext1.txt");      (int = 0; < array1.length; i++)     {         (int j = 0; j < array1[i].length; j++)         {             if (j != 0)             {                 sr.write(array1[i][0] + ":" + array1[i][j]);                 console.writeline(array1[i][0] + ":" + array1[i][j]);                 sr.write(sr.newline);             }         }      }     int start_no = convert.toint32(args[args.length - 1]);     thread_array = new thread[lines.length];     string first_message = "root";     buffer1.post(first_message);     buffer1.post(array1);     buffer1.post(start_no);     buffer1.post(1);      (int t = 1; t < lines.length; t++)     {         console.writeline("thread" + t);         thread_array[t] = new thread(new threadstart(thread_run));         thread_array[t].name = t.tostring();         lock (thread_array[t])         {             console.writeline("working");             thread_array[t].start();             thread_array[t].join();         }      }     stopwatch.stop();      console.writeline(stopwatch.elapsed);     console.readline(); }  private static void dfs(string[][] array, int point) {     (int z = 1; z < array[point].length; z++)     {         if ((!traversedlist.contains(array[point][z])))         {             traversedlist.add(array[point][z]);             parentlist.add(point.tostring());             dfs(array, int.parse(array[point][z]));         }      }     return;   } public static void thread_run() {     try     {         string parent;         string[][] array1;         int point;         int id;         parent = (string)buffer1.receive();         array1 = (string[][])buffer1.receive();         point = (int)buffer1.receive();         id = (int)buffer1.receive();         object value;         console.writeline("times");          if (thread.currentthread.name.equals(point.tostring()))         {             if (!traversedlist.contains(point.tostring()))             {                 console.writeline("node:" + point + " parent:" + parent + " id:" + id);                 traversedlist.add(point.tostring());                 parent = point.tostring();                 (int x = 1; x < array1[point].length; x++)                 {                     console.writeline("times");                     if (buffer1.tryreceive(out value))                     {                         array1 = (string[][])value;                     }                     if (buffer1.tryreceive(out value))                     {                         id = (int)buffer1.receive();                     }                     id++;                     buffer1.post(parent);                     buffer1.post(array1);                     buffer1.post(x);                     buffer1.post(id);                     console.writeline("times");                     monitor.pulseall(thread.currentthread);                 }                  //return;             }             else             {                 buffer1.post(parent);                 buffer1.post(array1);                 buffer1.post(point);                 buffer1.post(id);                 console.writeline("working 1");                 monitor.pulseall(thread.currentthread);             }         }         else         {             console.writeline("working 2");             monitor.wait(thread.currentthread);         }         //console.writeline(parent);     }     catch (exception ex)     {         console.writeline(ex.message);     }  }  } 

enter image description here

there various issues code.

incorrect use of locking , "touching" traversedlist multiple threads obvious problem.

more importantly, code doesn't use dataflow, uses bufferblock in manner similar concurrentqueue or other concurrent collection. whole point of dataflow use actionblocks instead of threads simplify processing. default action block use single thread processing can specify many threads want through dataflowblockoptions class.

actionblocks have own input , output buffers don't have add additional bufferblocks buffering.

passing multiple related values block problem, can lead errors , makes code confusing. creating data structure hold values doesn't cost anything.

assuming use class hold processing message:

    public class pointmessage     {         public string message { get; set; }         public string[][] lines{get;set;}         public int point { get; set; }         public int id { get; set; }     } 

you can create actionblock process these messages this:

static actionblock<pointmessage> _block; ... var options = new executiondataflowblockoptions { maxdegreeofparallelism = executiondataflowblockoptions.unbounded }; _block=new actionblock<pointmessage>(msg=>processmessage(msg),options); 

and process each message this:

    private static void processmessage(pointmessage arg)     {         if (...)         {             ...             arg.id++;             _block.post(arg);         }         else         {              ...             _block.post(arg);         }     } 

if function returns value, can use transformblock instead of actionblock.

i don't understand code won't try rewrite using dataflow. if clean bit, easier help.


Comments

Popular posts from this blog

java - Play! framework 2.0: How to display multiple image? -

gmail - Is there any documentation for read-only access to the Google Contacts API? -

php - Controller/JToolBar not working in Joomla 2.5 -