One of the inherent challenges within the nature of this work is the challenge of processing and analysing large/big datasets. Thus, to implement the systems that have been introduced highlighted in Sections4 and5, a custom solution is developed based around Apache Spark for processing the data (Section7.5), and Apache HBase for storage of intermediate results (Section7.5).
Storage
One of the challenges in working with network data is the need to store the data in an accessible manner, but also deal with the large number of edges and nodes. The Reddit comment network has 861,955 nodes and 2,402,202 edges, which, when stored in an in-memory data structure such as a dense matrix, can limit the performance, especially when additional matrices are needed for edge values such as weight and time.
As stated in section5.2, each social network can be represented as an adjacency matrixA, which is a
n×nmatrix, where columni and rowj represent the directed relationship (outward edge) fromi→j, withai,j 6=aj,i representing a directed relationship. A matrix can also be used to represent additional
values between nodes, such as the time at which relationships were created (τi,j ∈T), and the weight of
the relationship (wi,j∈W). Additionally, the majority of the computed values introduced in section7.4
can also be represented in the form of matrices, such asOv2u, where the row would bev and the column
u.
For this reason, Apache HBase8is used to store the respective matrices in a permanent and queryable format that can be accessed from a number of machines. HBase itself is a distributedNoSQL database designed for storage of large/big sparse datasets, which can be represented in the form of a table. HBase allows data to be accessed through row keys, with data being stored in columns. Additionally, columns can be grouped into families of columns through the use of acolumn familykey. This allows the same column key to be used across multiple data points in the same row.
HBase, thus, can be used to store each matrix in a scalable and accessible manner. For the adjacency matrixA, the row and column values are stored in the rows and columns within HBase. Additionally, each matrix has identical dimensions n×n; thus, the same columns and rows. Therefore, instead of creating multiple HBase tables, we can store each within a different column family. By storing values across different column families, instead of having to perform multiplegetrequests across multiple tables, onegetis performed that results in multiple column families.
However, one of the limitations of using HBase is that cells can only be accessed through row keys. This is ideal for undirected networks in an adjacency matrixai,j=aj,i. However, for directed networks
ai,j 6= aj,i, the cell represents direction from i → j. The network stored in HBase is directed, with
Column Family vin Av2u Au Avoru
Node gif funny pic gif funny pic game gif funny pic game gif funny pic
gif 1 4 3 4 4
funny 1 2 2 4 4
pic 4 4 3 4
Table 7.3: Example HBase table
the row values representing the outward edges for the node (with the node represented as the row key). Inward edges for a node are then represented by accessing the columns with the same key. However, accessing values (the inward edges) through row keys introduces excessive overhead, as we cannot simply access a column, but rather have to iterate over each row, selecting the column value that is of interest. This is computationally expensive as it requires agetrequest on each row. To reduce the complexity and speed up the time taken to query the data, one main optimisation is implemented, by storing two copies of the matrix, one being the transposition of the other, all within the same table, though separated into different column families. This means we can access both inward and outward edges through the use of therow key, speeding up access time.
As before, to limit the number ofgetrequests on the table, the transposition of the table is stored in its own column family. This, however, does mean that each matrix is stored in the HBase twice, though this is a negligible increase in storage costs compared to the performance increase achieved.
Processing
As stated earlier, Apache Spark allows the computational tasks to be distributed across a number of machines in a scalable and parallel manner. Scaling an application is achieved by breaking a process down into tasks that can be performed in parallel on each item of data. As stated, the data that is being processed is large in nature and cannot easily be storage or processed on one machine; therefore, a big data approach to this research is taken in distributing the processing of the data.
The aim of the proposed model (section7.4) is to learn the four versions of influence across the three variants of time. Each measure of influence is comprised of the same set of values that is computed from each innovation diffusion across the given network. Therefore, to compute the raw values and learn the influence between nodes in a scalable manner, the method proposed in the original paper [83] is adapted to utilise Apache Spark and HBase. As with the original paper [83], processing is split into two phases: phase one (algorithm7.1) learns the raw valuesOv2u,OuandOo&u, and phase 2 (algorithm7.2)
computes the respective influence values based on the values from phase one.
The values in phase one can be learnt in amap-reduceformat, with eachmappertaking as the input a complete diffusion of an innovation in the form of a time-ordered list of username and time pairs. The majority of the learnt values can be seen as incremental counts across diffusions (section 7.4.1). Therefore, to implement the proposed framework in amap-reducepattern one canemita<key,value>
each time a value should be incremented. Thus, for Ov2u, when v useso beforeu, the key value pair
< Ov2u,1>will be emitted; then, in thereducephase, the values are summed together across different
innovation diffusions. The output of the reduce phase is then loaded into HBase, allowing the values to be accessed in phase two.
The majority of measures are incremental whole numbers; however, measures such asτv,u(the average
time of adoption betweenu andv) are not. This require averages to be computed, with the results in large floating point numbers, which HBase finds challenging to store. To circumvent the inability to store floating point numbers in HBase, the array of values (time differences) is stored, and then the average is computed when the cell is accessed.
We now walk through how the values in phase one are learnt using Apache Spark (algorithm 7.1). Multiple mappers run across multiple nodes in a cluster, each of which has access to the social network stored within HBase (line2), and takes a complete diffusion as an input. Line3iterates over the ordered list for a given diffusion, checking to see if v performed the action before its neighbour u (line 6). If so, then, the relationship existed between them before as they both used the innovation (line10), the following three lines then emit the values to incrementOv2u andτv,u. Line13then emits the value that
Ov&uincremented, which represents the credit thatvhas inuadopting the innovation (o). Line10adds
nodev to a list of parents, which is used in lines15to17to compute the partial credit associated with
uadoptingo.
Phase two (algorithm 7.2) is similar to phase one (algorithm 7.1), but focuses on computing the influence values that are dependent on the window of average diffusion time τv,u, such as the partial
credit creditτv,u
v,u and the count of cascades of innovation between two users Av2u. Similar to phase
one, aspects of phase two can be thought of as being incremental counts, so can be summed within the
reducer. Line 9 shows that Ov2u is learnt for a second time; whereas, for the value learnt in phase
one (algorithm 7.1, line8) counted any successive usage. The value learnt in phase two is dependent on a window of average diffusion between the two users (τu,v), which is partially learnt in phase one
(algorithm 7.1, line 9), and accessed from HBase. Lines 14 to 21 additionally recompute the partial credit that is associated with each parent, dependent on the average diffusion time between the parent and the node (creditτv,u
v,u ). Again, the reduce class then sums the respective metrics values together,
outputting the resulting quadrupedal to be stored in HBase.
The two phases (algorithm7.1and7.2) learn the values needed to now compute the influence between users, based on 80% of the dataset. The evaluation stage now takes these raw values for each of the three time forms,static,continuous anddiscreet (equations7.4.9, 7.4.9and7.4.9), across the four variants of influence between users (equations7.4.1,7.4.2,7.4.5, and7.4.4).
As with learning the influence values between nodes, the evaluation phase is implemented as a number of Spark jobs. This allows the collective pressure on users to be computed across a cluster of machines,
Algorithm 7.1:Learning Phase 1
Data: Each mapper receives a complete innovation diffusion in the [userid, time], with the docid being the name of the diffusion
Result: List of quads that represents (row, column family, column, value), which then can be loaded into HBase
1 ClassMapper(docida,diffusiond)
2 n←network 3 forrow u, tu∈ddo 4 parents←[]
5 forrow v, tv∈parentsdo 6 if tv < tu then
7 if eu,v∈nthen
8 Emit(metricOv2u,1) 9 Emit(metricτv,u, tu−tv)
10 parents+v
11 end
12 end
13 Emit(metricOv&u,1)
14 end
15 forp∈parentsdo
16 Emit(metriccreditv,u,(1/len(parents))
17 end
18 end
19 ClassReduccer(metricm,counts [c1, c2, . . .])
20 Emit(metricm,countsum([c1, c2, . . .]))
thus speeding up the application. As before, the input into each job is the complete diffusion in the form of a list of tuppels, again consisting of <user, time> pairs, representing an innovation diffusion. The basic premise of the system is to compute the join influence on a user who has been exposed to the adoption of an innovation (o); thus, as user uadopts an innovation, the join influence is updated on all the neighboursS(u).
Again, the input of each mapper is a complete diffusion in the form of an ordered list of tuppels containing the username and time of adoption. Line4 iterates through the diffusion, initially checking to see if the user has been exposed to the innovation before; if so, the user is set to have adopted the innovation (line 6), and if not, the user has then innovated with the innovation (line 8). Then, each neighbour Sin(n) is iterated again, and their respective joint influences are set or updated (line 10 to
18).
The three states that a node can be in are:
Innovator They used an innovation with no exposure to the innovation before
Algorithm 7.2:Learning Phase 2
Data: Each mapper receives a complete innovation diffusion in the [userid, time], with the docid being the name of the diffusion
Result: List of quads that represents (row, column family, column, value), which then can be loaded into HBase
1 ClassMapper(docida,diffusiond)
2 n←network 3 current←[] 4 forrow u, tu∈ddo 5 parents←[] 6 forc∈currentdo 7 if eu,v∈nthen 8 if 0< tu−tv < τu,v then 9 Emit(metricOv2u,count 1)
10 parents+v 11 end 12 end 13 end 14 forp∈parentsdo 15 s←[] 16 forp∈parentsdo
17 if v < u&tu−tv< τv,u then
18 s+p
19 end
20 end
21 Emit(metriccreditτv,uv,u,count 1/len(s))
22 end
23 current+u
24 end
1 ClassReduccer(metricm,counts [c1, c2, . . .])
2 Emit(metricm,countsum([c1, c2, . . .]))
Not Performed They didnotuse an innovation even though they had been exposed to it before
However, this implementation is limited to the static time model; therefore, a number of modifications need to be made to compute the continuous (equation7.4.9) and discrete (equation7.4.8) time models. This requires the re-computation of the joint probability upon each exposure; however, to achieve this, each node must remember its historic exposures. Therefore, upon further exposures, the influence has to be updated in respect ofτ and each of its neighbours.
The output of each time model is in the form of aCSVfile containing data on whether the user has adopted an innovation, the largest join influence exerted on the user, the name of the node, and the innovation itself. These are not stored in HBase as they are no longer needed in a queryable form and are at a size at which they can be loaded into a Python script to compute the ROC to determine the
Algorithm 7.3:Evaluating - Static Time
Data: Each mapper receives a complete innovation diffusion in the [userid, time], with the docid being the name of the diffusion.
Result: List of quads that represents (row, column family, column, value), which then can be loaded into HBase.
1 ClassMapper(docida,diffusiond)
2 n←network
3 results table←[(user, messure, value, perf ormed)] 4 forrow u, tu∈ddo
5 if v∈table then
6 results table[u]←perf ored
7 else
8 results table[u]←0, innovator
9 end
10 forv∈Sout(ut)do
11 pv,u←set
12 if u /∈results tablethen
13 results table[u]←pv,u, notperf ormed
14 else
15 results table[u]←update(results table[u], pv,u)
16 end
17 end
18 end
threshold of adoption.