2009年9月19日土曜日

MapReduce を C# で 03

IParallelEventCollection 実装サンプル。メモリ上に展開する一番簡単なバージョン。TryNext が複雑かも。 Java の Iterator みたく、 hasNext と next に分けたほうが良さげ。

public class Sample_RAMStrage : IParallelEventCollection<string>
{
private List<string> list;
private object syncObject;

public Sample_RAMStrage()
{
this.list = new List<string>();
this.syncObject = new object();
}

private class Sample_RAMIterator : ParallelIterator<string>
{
private int count;
private Sample_RAMStrage source;
private string current;

public Sample_RAMIterator(Sample_RAMStrage source)
{
this.count = -1;
this.source = source;
}

#region Iterator<string> メンバ

public bool TryNext(out string result)
{
try
{
Monitor.Enter(this.source.syncObject);

// Wait 後に条件を再計算させるため、 this.count + 1 < this.source.Count を
// while 前にローカル変数とかに保存してしまうと悲しい結果になる・・・。
// Java の Iterator みたく hasNext メソッドと next メソッドにわけたほうが良さげ。
while (!this.source.EndIterationPending || this.count + 1 < this.source.Count)
{
if (this.count + 1 < this.source.Count)
{
result = this.source[++this.count];
this.current = result;
return true;
}
else
{
Monitor.Wait(this.source.syncObject);
}
}

result = default(string);
this.current = result;
return false;
}
finally
{
Monitor.Exit(this.source.syncObject);
}
}

public void Reset()
{
throw new NotImplementedException();
}

#endregion

#region IEnumerator<string> メンバ

public string Current
{
get
{
try
{
Monitor.Enter(this.source.syncObject);
return this.current;
}
finally
{
Monitor.Exit(this.source.syncObject);
}
}
}

#endregion

#region IDisposable メンバ

public void Dispose()
{
// NOP
}

#endregion

#region IEnumerator メンバ

object IEnumerator.Current
{
get { return this.Current; }
}

public bool MoveNext()
{
return this.TryNext(out this.current);
}

#endregion
}


#region IIterableList<string> メンバ

public string this[int index]
{
get
{
try
{
Monitor.Enter(this.syncObject);
return this.list[index];
}
finally
{
Monitor.Exit(this.syncObject);
}
}
set
{
try
{
Monitor.Enter(this.syncObject);
this.list[index] = value;
}
finally
{
Monitor.Exit(this.syncObject);
}
}
}

public int IndexOf(string item)
{
throw new NotImplementedException();
}

public void Insert(int index, string item)
{
throw new NotImplementedException();
}

public void RemoveAt(int index)
{
throw new NotImplementedException();
}

#endregion

#region IIterableCollection<string> メンバ

public int Count
{
get
{
try
{
Monitor.Enter(this.syncObject);
return this.list.Count;
}
finally
{
Monitor.Exit(this.syncObject);
}
}
}

public bool IsReadOnly
{
get { throw new NotImplementedException(); }
}

public void Add(string item)
{
try
{
Monitor.Enter(this.syncObject);
this.list.Add(item);
Monitor.Pulse(this.syncObject);
}
finally
{
Monitor.Exit(this.syncObject);
}
}

public void Clear()
{
throw new NotImplementedException();
}

public bool Contains(string item)
{
throw new NotImplementedException();
}

public void CopyTo(string[] array, int arrayIndex)
{
throw new NotImplementedException();
}

public bool Remove(string item)
{
throw new NotImplementedException();
}

#endregion

#region Iterable<string> メンバ

public ParallelIterator<string> GetIterator()
{
return new Sample_RAMIterator(this);
}

#endregion

#region IEnumerable<string> メンバ

public IEnumerator<string> GetEnumerator()
{
return this.GetIterator();
}

#endregion

#region IEnumerable メンバ

IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}

#endregion

#region IEventCollection<string> メンバ

public void EndIterationAsync()
{
try
{
Monitor.Enter(this.syncObject);
this.EndIterationPending = true;
Monitor.PulseAll(this.syncObject);
}
finally
{
Monitor.Exit(this.syncObject);
}
}

public bool EndIterationPending { get; set; }

#endregion
}

3 件のコメント:

  1. わけたところで、今度は MoveNext が複雑になる + 何度もロックすることになる 罠。このままでいいや(-_-;)

    返信削除
  2. シングルスレッドだと、EndIterationAsync を列挙前に呼んでおかないとデッドロックするなー。 EndIterationAsync は列挙後に呼んでと言わんばかりの名前だから、これはいけない気がする。「追加が終わった」「変更が終わった」みたいな意味にしないといけない。commit とか?

    返信削除
  3. > 「追加が終わった」「変更が終わった」みたいな意味…

    別スレッドで追加もありだから、これも紛らわしい。「列挙終了可能にする」みたいな意味のほうがいいかも。

    返信削除