1
+ /**
2
+ *
3
+ * ActionMiner.scala
4
+ *
5
+ * @author Pat Ferrel
6
+ *
7
+ * Copyright: (c) 2015 Finderbots
8
+ *
9
+ */
10
+
11
+ package com.finderbots.drivers
12
+
13
+ //import java.util.Date
14
+
15
+ import com.google.common.collect.{HashBiMap, BiMap}
16
+ import org.apache.log4j.Logger
17
+ import org.apache.mahout.math.cf.SimilarityAnalysis
18
+ import org.apache.mahout.math.indexeddataset._
19
+ import org.apache.mahout.sparkbindings._
20
+ import scala.collection.immutable.HashMap
21
+
22
+ /**
23
+ * Performs cooccurrence analysis on the primary action (purchase) and all secondary actions.
24
+ * indicator 1 = [A'A], where A is all purchase actions - cooccurrence
25
+ * indicator 2 = [A'B], where B is all view actions (view a product detail page) - cross-cooccurrence
26
+ * indicator 3 = [A'C], where C is all category preference actions (clicks on category, or search for category) -
27
+ * cross-cooccurrence
28
+ * Uses hard coded paths to input and output for example data only. Does everything needed to read in three user
29
+ * interaction data, then calculates cooccurrence indicators and writes text files using. All IO and calculations
30
+ * use Spark and are distributed but are run on a "local" Spark master for ease of debugging.
31
+ */
32
+ object CooccurrenceDriver extends App {
33
+ val logger = Logger.getLogger(this.getClass)
34
+
35
+ //The primary actions, to which the rest are compared in cross-cooccurrence is the first--purchase
36
+ val ActionInput = Array(
37
+ ("purchase", "data/purchase.csv"),
38
+ ("view", "data/view.csv"),
39
+ ("category", "data/category.csv"))
40
+
41
+ val OutputPath = "data/indicators/"
42
+
43
+ // may need to change the master to use a cluster or increase executor memory or other Spark context
44
+ // attributes here
45
+ implicit val mc = mahoutSparkContext(masterUrl = "local", appName = "2-input-cooc")
46
+
47
+
48
+ // gets an array of Scala tuples, Array[(actionName, IndexedDataset)]
49
+ val actions = readActions(ActionInput)
50
+
51
+ // strip off names, which only takes and array of IndexedDatasets
52
+ val indicatorMatrices = SimilarityAnalysis.cooccurrencesIDSs(actions.map(a => a._2))
53
+
54
+ // zip pair of arrays into array of pairs, reattaching the action names
55
+ val indicatorDescriptions = actions.map(a => a._1).zip(indicatorMatrices)
56
+ writeIndicators(indicatorDescriptions)
57
+
58
+ /**
59
+ * Write indicatorMatrices to the output dir in the default format
60
+ */
61
+ def writeIndicators( indicators: Array[(String, IndexedDataset)]) = {
62
+ for (indicator <- indicators ) {
63
+ val indicatorDir = OutputPath + indicator._1
64
+ indicator._2.dfsWrite(
65
+ indicatorDir, // do we have to remove the last $ char?
66
+ IndexedDatasetWriteBooleanSchema) // omit LLR strengths and format for search engine indexing
67
+ }
68
+ }
69
+
70
+ /**
71
+ * Read files of element tuple and create IndexedDatasets one per action. These share a userID BiMap but have
72
+ * their own itemID BiMaps
73
+ */
74
+ def readActions(actionInput: Array[(String, String)]): Array[(String, IndexedDataset)] = {
75
+ var actions = Array[(String, IndexedDataset)]()
76
+
77
+ val userDictionary: BiMap[String, Int] = HashBiMap.create()
78
+
79
+ // The first action named in the sequence is the "primary" action and begins to fill up the user dictionary
80
+ for ( actionDescription <- actionInput ) {// grab the path to actions
81
+ val action: IndexedDataset = SparkEngine.indexedDatasetDFSReadElements(
82
+ actionDescription._2,
83
+ schema = DefaultIndexedDatasetElementReadSchema,
84
+ existingRowIDs = userDictionary)
85
+ userDictionary.putAll(action.rowIDs)
86
+ actions = actions :+ (actionDescription._1, action) // put the name in the tuple with the indexedDataset
87
+ logger.info("\n\n\nRead in action " + actionDescription._1 +", which has " + action.matrix.nrow.toString + " rows")
88
+ logger.info("actions has " + actions.length + " elements in it.\n\n\n")
89
+ }
90
+
91
+ // After all actions are read in the userDictonary will contain every user seen, even if they may not have
92
+ // taken all actions . Now we adjust the row rank of all IndxedDataset's to have this number of rows
93
+ // Note: this is very important or the cooccurrence calc may fail
94
+ val numUsers = userDictionary.size() // one more than the cardinality
95
+ logger.info("\n\nTotal number of users for all actions = " + numUsers + "\n\n")
96
+ val resizedNameActionPairs = actions.map { a =>
97
+ logger.info(a._1 + " indicator matrix:")
98
+ logger.info("Number of rows for matrix = " + a._2.matrix.nrow )
99
+ logger.info("Number of columns for matrix = " + a._2.matrix.ncol )
100
+ val resizedMatrix = a._2.create(a._2.matrix, userDictionary, a._2.columnIDs).newRowCardinality(numUsers)
101
+ logger.info(a._1 + " indicator matrix:")
102
+ logger.info("Number of rows after resize = " + resizedMatrix.matrix.nrow )
103
+ logger.info("Number of columns after resize = " + resizedMatrix.matrix.ncol )
104
+ (a._1, resizedMatrix)
105
+ }
106
+ logger.info("\n\n")
107
+ resizedNameActionPairs
108
+ }
109
+
110
+ }
0 commit comments